Reactivity by Example
Consumer, Provider, and Kernel — side by side, one cycle from first run to re-run.
The reactive engine is small (~450 lines) but the value of reading any one file in isolation is limited. The interesting question is: when a consumer calls a provider, what does the kernel do underneath, and how does that produce automatic re-runs on change?
This page walks one full cycle, with the three layers shown side by side and moments numbered so you can trace the same instant across all three columns.
Read this page first. It’s the shortest path to the mental model. When you want to dig deeper, follow the links at the end:
- Context, ContextVar, and the Active Consumer — why a single
_active_consumeris enough- Reactive Engine: Line by Line — every line of
reactive.py- Threading Model — what changes (and doesn’t) under threads
The cast
Three real pieces of code, lifted from this codebase:
# Consumer — your application code
@component("search")
@requires(cfg=IConfig)
class SearchService:
@effect
def on_url_change(self):
self.url = self.rt.cfg.get("api.url")
self.client = HTTPClient(self.url)# Provider — signalpy/providers/config.py (excerpt)
class ConfigProvider:
def get(self, key, default=None):
value = self._read_path(key)
sig = self._key_signal(key, value) # lazy per-key Signal
sig.get() # reactive read
return value if value is not None else default
def set(self, key, value):
self._set_in(self._state, key, value)
with self._key_lock:
if key in self._key_signals:
self._key_signals[key].set(self._read_path(key))
self._all_version.set(self._all_version.peek() + 1)Signal.peek() vs Signal.get()
get() reads the current value and records a subscription if it’s called inside an effect/computed — that’s how reactive tracking works. peek() reads the value without recording a subscription. Use peek() when you need the value but don’t want the surrounding effect to re-run when this signal changes (here, the set method is incrementing its own version, not consuming it reactively).
# Kernel — signalpy/kernel/reactive.py (excerpt)
_active_consumer: ContextVar = ContextVar("_active_consumer", default=None)
class Signal:
def get(self):
with _lock:
c = _active_consumer.get()
if c is not None:
c._deps.add(self)
self._subscribers.add(c)
return self._value
def set(self, value):
with _lock:
if value is self._value: return
self._value = value
self._version += 1
subs = list(self._subscribers)
for c in subs:
c._notify()
class Effect:
def run(self):
with _lock:
self._untrack_all()
token = _active_consumer.set(self)
try:
self._fn()
finally:
_active_consumer.reset(token)The walkthrough below traces exactly one cycle: boot → first run → mutation → re-run.
Why does the provider have so much machinery? A typical provider has one piece of state, so it just does
self._state = Signal(value)— one line. ConfigProvider has many logical pieces of state (one per dotted key) and consumers want fine-grained subscriptions, so it keeps a dict of Signals. The extra code is bookkeeping for that dict, not framework overhead. See Why ConfigProvider looks complicated at the end of the page.
How @effect becomes a runtime Effect
Before the cycle starts, something has to turn the user’s decorated method into an actual Effect object the engine can call. That happens in three stages — decoration attaches a marker, the kernel scans for markers at discovery time, and activation wraps each marker in a ReactiveEffect.
The walkthrough — including why the kernel uses a marker pattern instead of a closure wrapper, descriptor, or global registry — is in Architecture → Decoration → Activation. Read that first if you want to know what the decorator does; read this page for the runtime lifecycle of an effect once it’s been wired up.
① Boot: the Effect runs for the first time
After the kernel activates SearchService, it wraps on_url_change in an Effect and calls Effect.run(). The kernel installs the effect as the “current consumer” via the ContextVar, then invokes the body.
Consumer
@effect
def on_url_change(self):
# body about to run
...Provider
# not yet involvedKernel
def run(self):
self._untrack_all()
token = _active_consumer.set(self)
# _active_consumer = effect_A
try:
self._fn() # ← calls body
finally:
_active_consumer.reset(token)① State after — _active_consumer = effect_A. No subscriptions yet. The body is about to execute.
② Consumer reads config — three layers track in one call
Inside the body, self.rt.cfg.get("api.url") is one Python expression. But underneath it, three layers cooperate. The provider walks the dict and asks the kernel for the per-key Signal. The kernel checks who’s currently running, and on both sides records the dependency.
Consumer
def on_url_change(self):
self.url = (
self.rt.cfg.get("api.url")
)
# returns "http://api.local"Provider
def get(self, key, default=None):
value = self._read_path(key)
sig = self._key_signal(
key, value
)
sig.get() # ← reactive
return valueKernel
def get(self): # Signal.get
with _lock:
c = _active_consumer.get()
# c is effect_A
if c is not None:
c._deps.add(self)
self._subscribers.add(c)
return self._value② State after —
effect_A._deps = {api_url_signal}api_url_signal._subscribers = {effect_A}
These two sets are the persistent reactive graph. The ContextVar is just the temporary “who’s asking” badge that let the kernel write into the right effect’s dep set.
The provider doesn’t know which consumer called it. It just calls
sig.get(). The kernel reads_active_consumerto find out. That’s the entire trick — see Context, ContextVar, and the Active Consumer for why one ContextVar is enough.
③ Effect body returns — ContextVar resets, graph remains
The body finishes. The finally clause restores _active_consumer to whatever it was before (here, None). The dep/subscriber sets stay.
Consumer
# body returned;
# self.url = "http://api.local"
# self.client builtProvider
# idle; per-key signal
# _key_signals["api.url"]
# is alive and trackedKernel
# Effect.run finally:
_active_consumer.reset(token)
# _active_consumer = None
# api_url_signal._subscribers
# = {effect_A}③ State after — engine is dormant. No effects running. The graph is wired and waiting.
④ Someone mutates the config
A REST call, an admin, or another component does cfg.set("api.url", new). The provider updates its dict, looks up the per-key Signal, and calls sig.set(new_value). The Signal compares identity, bumps the version, and notifies every subscriber.
Consumer
# not running yet —
# notification arrives nextProvider
def set(self, key, value):
self._set_in(
self._state, key, value
)
with self._key_lock:
if key in self._key_signals:
self._key_signals[
key
].set(value) # ← Signal.setKernel
def set(self, value): # Signal.set
with _lock:
if value is self._value:
return
self._value = value
self._version += 1
subs = list(self._subscribers)
for c in subs:
c._notify() # ← effect_A④ State after — effect_A._notify() enqueues a re-run into the implicit batch the engine opens around every Signal.set notification cascade. The effect actually fires when that batch flushes, in creation order, deduplicated against other signals in the same cascade.
Because we have per-key signals, only consumers of
"api.url"get notified. A change to"db.url"would notify a different signal — your effect doesn’t re-run unnecessarily. This is what test_config_per_key.py verifies.
⑤ Effect re-runs — same path, fresh value
effect_A._notify() calls effect_A.run(). The kernel clears the old _deps, sets the ContextVar again, runs the body. Re-tracking happens exactly like in ②. The body sees the new URL.
Consumer
def on_url_change(self):
self.url = (
self.rt.cfg.get("api.url")
) # "http://prod.api.com"
self.client = HTTPClient(self.url)Provider
def get(self, key, default=None):
value = self._read_path(key)
sig = self._key_signal(
key, value
)
sig.get() # re-tracks
return valueKernel
def run(self): # Effect.run
self._untrack_all()
token = _active_consumer.set(self)
try:
self._fn()
finally:
_active_consumer.reset(token)⑤ State after — graph rewired (sets are idempotent, so re-adding the same subscription is a no-op). Consumer holds the new URL and a new HTTP client. No callbacks were registered. No glue code exists.
What the cycle taught us
| Question | Answer |
|---|---|
| Who told the consumer to re-run? | The Signal it read. The Signal knew because the kernel registered the subscription during ②. |
| Where does the dependency live? | effect._deps and signal._subscribers — symmetric sets on the two ends. |
| Why doesn’t the consumer write callbacks? | Reading the signal is the subscription. There’s nothing to register. |
| Why doesn’t the provider know its consumers? | The provider calls sig.get() and sig.set(). The kernel handles routing through _active_consumer. The provider has zero coupling to the consumer. |
| Why one ContextVar for many effects? | Effects don’t overlap on a single flow. The slot is reused like a return-address register. See contextvar-and-tracking. |
| What about threads? | ContextVar is per-flow; subscriber sets are guarded by RLock. See threading-model. |
Variations of the same pattern
Once you see this cycle, every other pattern is a small variation:
@computed— like an@effect, but lazy and cached. It plays both roles: a consumer (it reads signals) and a signal (other effects can read it). When its inputs change it marks itself dirty; on the next read it recomputes.- Service replacement (
hot_add/hot_remove) — the kernel’s runtime exposes injected services as Signals. Replacing the service callsSignal.set(new_service)on the runtime — your effect re-runs because it readself.rt.cfg, which is the outer Signal. - Component-local Signals — you can
Signal(initial)inside a component (e.g., circuit-breaker state). Same mechanics. The reactive primitives don’t care whether the Signal lives in the kernel or in your code. config.all()— the provider keeps a separate_all_version: Signal. Everysetbumps it. Effects that calledcfg.all()subscribed to that one and re-run on any change. The per-key signals stay isolated.
The principle is always the same: read = subscribe, write = notify, the rest is bookkeeping.
Why ConfigProvider looks more complicated
The provider in our example has more code than typical. Each piece earns its keep — but it’s worth seeing why, because the same questions come up every time someone reads it for the first time.
Why isn’t it just self._state = Signal(...)?
Most providers are exactly that one line. ConfigProvider differs in its data model: many logical pieces of state (one per dotted key), and consumers expect that writing "server.port" won’t re-run effects that only read "db.url". So instead of one Signal wrapping a whole dict, it keeps a lazy dict of Signals — one per key, created on first read.
Pre-refactor it really was a single Signal[dict]. Simpler code, but every write to any key re-ran every effect that had read any key. The per-key design is a deliberate tradeoff: more bookkeeping, much better fan-out behavior. (Tested by test_config_per_key.py.)
Why does get manually call sig.get() and discard the result?
The kernel tracks Signal reads, and only Signal reads. If get() just walked the plain dict and returned the leaf, the consumer would have no subscription — nothing for a later set() to notify.
def get(self, key, default=None):
value = self._read_path(key) # plain dict walk — NOT reactive
sig = self._key_signal(key, value) # find/create the Signal for this key
sig.get() # ← THIS line is the subscription
return value if value is not None else defaultThe sig.get() call has nothing to do with retrieving the value — _read_path already did that. It exists purely for its side effect: it tells the kernel “the currently-running effect depends on this key.” The return value is discarded.
Mental model: reading IS subscribing. A more honest method name would be signal.subscribe_active_consumer_and_return_value() — but get is what the kernel API exposes, because in normal use the side effect and the return value are both wanted.
What does _set_in(self._state, ...) do?
Pure dict mutation — walks a dotted path and sets the leaf. Zero reactivity:
@staticmethod
def _set_in(data, key, value):
parts = key.split(".")
cur = data
for part in parts[:-1]:
if part not in cur or not isinstance(cur[part], dict):
cur[part] = {}
cur = cur[part]
cur[parts[-1]] = valueFor key="server.port", value=9090 it produces data["server"]["port"] = 9090. The reactive notification is a separate step on the next few lines of set(). Splitting them keeps the dict layer (the source of truth) and the signal layer (the subscription channel) decoupled.
What does _key_lock guard?
The _key_signals dict — the lazy key → Signal map. It does not guard individual Signals; each Signal has its own RLock from the engine.
The race it prevents: two threads call get("brand-new.key") at the same moment. Both see no entry, both create a Signal, both insert into the dict — last writer wins. The first thread’s effect now points at a Signal that’s no longer in the dict; future set() calls notify the other Signal and the first effect never re-runs. The subscription is silently lost.
def _key_signal(self, key, current_value):
sig = self._key_signals.get(key)
if sig is not None: return sig
with self._key_lock: # ← serialize lazy-create
sig = self._key_signals.get(key) # double-check inside lock
if sig is None:
sig = Signal(current_value)
self._key_signals[key] = sig
return sigStrictly about the dict layer. Two orthogonal pieces of state, two orthogonal locks — see Threading Model for why that split matters.
What is _all_version for? Will it overflow?
It’s a Signal that consumers of cfg.all() subscribe to. Those consumers want to re-run on any config change, but they can’t subscribe to the dict itself (not a Signal), nor to “every per-key Signal” (don’t know which keys exist yet, and new ones can appear at any time). Solution: one shared counter Signal that every set() bumps.
self._all_version.set(self._all_version.peek() + 1)Will it overflow? No — Python ints are arbitrary precision. After a quintillion writes you’ve added a few bytes to a long int; nothing breaks.
Why an integer at all? The value is irrelevant. What matters is that peek() + 1 produces a new int with a different Python identity, so Signal.set()’s value is self._value check fails and it notifies subscribers. self._all_version.set(object()) would work just as well — the integer is a debugging convenience so you can tell at a glance how many writes have happened.
See also
- Context, ContextVar, and the Active Consumer
- Reactive Engine: Line by Line
- Threading Model
- Architecture — where these pieces live in the codebase