Reactive Engine Internals
The five data structures, the tracking mechanism, and three scenarios traced step by step.
Reading order: Reactivity by Example (the mental model) → You are here (how it actually works inside
reactive.py) → Threading Model (concurrency, async supersede).
The entire reactive engine is one file: src/signalpy/kernel/reactive.py (~600 lines). This page explains how it works internally — the data structures, the tracking mechanism, and three worked scenarios.
You don’t need to read this to use the kernel. This is for when you want to understand why @effect auto-tracks, how batch() deduplicates, or what breaks if you mutate a dict in place.
The Five Data Structures
Everything is built from five things:
┌─────────────────────────────────────────────────────────┐
│ Global State │
│ │
│ _active_consumer: ContextVar "who is reading now?" │
│ _batch_depth: int "are we in a batch?" │
│ _batch_queue: list[Effect] "effects waiting" │
│ _lock: Lock "thread safety" │
│ _creation_counter: int "ordering effects" │
└─────────────────────────────────────────────────────────┘
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Signal │ │ Computed │ │ Effect │
│ │ │ (is a _Consumer) │ │ (is a _Consumer) │
│ _value: T │ │ _fn: callable │ │ _fn: callable │
│ _version: int │ │ _value: T │ │ _is_async: bool │
│ _subscribers: │ │ _dirty: bool │ │ _disposed: bool │
│ set[_Consumer] │ │ _version: int │ │ _running: bool │
│ │ │ _subscribers: │ │ │
│ │ │ set[_Consumer] │ │ (from _Consumer) │
│ │ │ _disposed: bool │ │ _deps: set[Sig] │
│ │ │ │ │ _id: int │
│ │ │ (from _Consumer) │ │ │
│ │ │ _deps: set[Sig] │ └──────────────────┘
│ │ │ _id: int │
└──────────────────┘ └──────────────────┘
Three classes, five globals. That’s it.
The double-link pattern: when a consumer reads a Signal, the dependency is recorded on BOTH sides:
consumer._deps.add(signal) "I depend on this signal"
signal._subscribers.add(consumer) "this consumer reads me"
Why both? So either side can initiate: signals push notifications to subscribers, and consumers pull-cleanup their deps at dispose time.
The Tracking Mechanism: _active_consumer
_active_consumer: ContextVar[_Consumer | None] = ContextVar(
"_active_consumer", default=None
)This one variable makes auto-tracking work. It answers: “if a Signal is read right now, who should subscribe?”
None→ nobody is tracking.Signal.get()just returns the value. Zero overhead.- Some Effect → that Effect is running.
Signal.get()records the dependency. - Some Computed → that Computed is recomputing. Same thing.
Why one variable is enough for many effects
A common confusion: there are potentially hundreds of effects, but only one _active_consumer. How?
Because effects don’t overlap on a single execution flow. They take turns. The variable is like a return-address register on a CPU — there’s only one, but every function call uses it, because calls happen one at a time.
The ContextVar’s value at any moment is “the one effect whose body is currently on the call stack.” The rest of the time it’s None. The persistent graph lives elsewhere:
| State | Lives on | Lifetime | Answers |
|---|---|---|---|
_active_consumer |
ContextVar (per-flow) | nanoseconds | “Who is reading right now?” |
signal._subscribers |
each Signal | lifetime of subscribed effects | “When I change, who do I notify?” |
effect._deps |
each Effect | lifetime of the effect | “What signals do I read?” |
What about concurrency?
ContextVar gives each thread and each asyncio Task its own copy of the value automatically:
Main thread: _active_consumer = EffectA (then None, then EffectB, ...)
Worker thread: _active_consumer = EffectC (independent)
Asyncio task: _active_consumer = EffectD (snapshot from parent context)
Parallel effects don’t trample each other’s tracking. The _subscribers set IS shared across flows — that’s what the RLock protects.
What about nesting?
If effect A’s body triggers effect B synchronously (e.g., A writes a Signal that B subscribes to), the Token mechanism handles it like a stack:
A.run() starts → ACV: None → A (token_A saved)
A writes sig → notifies B → B.run() starts
B.run() → ACV: A → B (token_B saved)
B reads sig3 → registers B ✓
B.run() ends → ACV: B → A (reset token_B)
A continues
A reads sig4 → registers A ✓ (still correct!)
A.run() ends → ACV: A → None (reset token_A)
The try/finally: reset(token) in every run() is critical — even if the body raises, the slot is restored. Without it, a single exception would poison all subsequent tracking on that flow.
Signal: The Source of Truth
Signal.get — the tracking read
def get(self) -> T:
consumer = _active_consumer.get() # who's asking?
if consumer is not None: # someone IS asking
consumer._track(self) # consumer records dep
self._subscribers.add(consumer) # signal records subscriber
return self._value # return value either wayOutside an effect: lines 2-4 are skipped. It’s a plain value read. Inside an effect: both sides record the link. This is the double-link.
Signal.set — the notifying write
def set(self, value: T) -> None:
if value is self._value: # IDENTITY check, not ==
return # same object → skip entirely
self._value = value
self._version += 1
self._notify_subscribers() # tell everyone who reads meis and not ==?
Identity comparison means mutating a dict in place won’t trigger notifications:
data = signal.peek()
data["key"] = "new"
signal.set(data) # data IS data → same object → SKIPPED!You must create a new object:
data = {**signal.peek(), "key": "new"} # new dict
signal.set(data) # different object → notifies!This is why ConfigProvider.set() copies the dict internally.
Effect: The Side-Effect Runner
Effect.run — the core execution loop
def run(self) -> None:
if self._disposed or self._running: # guards
return
self._running = True # re-entrancy guard ON
self._untrack_all() # ① clear old deps
token = _active_consumer.set(self) # ② "I'm asking now"
try:
self._fn() # ③ execute — reads track me
except Exception:
log.exception("Effect execution error")
finally:
_active_consumer.reset(token) # ④ "I'm done asking"
self._running = FalseFour steps:
① _untrack_all() — removes this effect from all its old signals’ subscriber lists and clears self._deps. Why? Because the function might take a different code path this time and read different signals. Stale deps must go.
② Set _active_consumer — now any Signal.get() during self._fn() will see this effect and create the link.
③ Execute — the function runs. Every signal read creates a dependency.
④ Reset — clean up, even on exception.
Re-entrancy guard: if the body writes to a signal it reads, _notify would call run() again. self._running prevents infinite recursion. The change isn’t lost — it queues a re-run after the current body finishes.
Effect._notify — what happens when a dep changes
def _notify(self) -> None:
if self._disposed: return
if _batch_depth > 0:
_batch_queue.append(self) # batching → defer
else:
self.run() # not batching → run nowComputed: The Cached Derived Value
Computed is both a consumer (reads signals) and a signal-like (others depend on it). This dual role makes reactive chains work:
Signal ← Computed ← Computed ← Effect
both both consumer
consumer consumer only
AND AND
signal-like signal-like
Computed.get — lazy evaluation
def get(self) -> T:
if self._dirty: # need recomputation?
self._recompute() # yes → run fn, cache result
consumer = _active_consumer.get()
if consumer is not None:
consumer._track(self) # whoever reads ME becomes my subscriber
self._subscribers.add(consumer)
return self._value # cachedLazy: doesn’t recompute until someone reads it. Cached: if deps haven’t changed, returns the cached value without running fn.
The push-pull pattern
When a Signal changes:
- Push: dirty flags propagate down: Signal → Computed → Computed → Effect
- Pull: when the Effect runs, it reads Computeds back up the chain, triggering recomputation only where needed
Push ensures effects know to re-run. Pull ensures only necessary recomputation happens.
Batch: Grouping Changes
@contextmanager
def batch():
global _batch_depth
_batch_depth += 1 # enter batch mode
try:
yield
finally:
_batch_depth -= 1
if _batch_depth == 0:
_flush_batch() # run all pending effectsdef _flush_batch():
while _batch_queue:
_batch_queue.sort(key=lambda e: e._id) # creation order
pending = list(_batch_queue)
_batch_queue.clear()
for effect in pending:
if not effect._disposed:
effect.run() # may enqueue more effectsNesting: inner batch increments depth; only outermost triggers flush. Ordering: effects sorted by _id (creation order) — parent effects before children. Deterministic. Convergence: effects running may trigger more effects. Loop continues until queue is empty (capped at 100 iterations for runaway protection).
Scenario 1: Simple Effect Tracking
name = Signal("Alice")
greeting = Signal("Hello")
e = Effect(lambda: print(f"{greeting.get()}, {name.get()}!"))
# prints "Hello, Alice!"After creation: greeting._subscribers = {e}, name._subscribers = {e}, e._deps = {greeting, name}.
name.set("Bob")name._notify_subscribers()→ callse._notify()→e.run()e._untrack_all()— removes e from both signals’ subscriber sets_active_consumer = e- Body runs:
greeting.get()re-tracks,name.get()re-tracks - Prints “Hello, Bob!”
Scenario 2: Computed Chain
first = Signal("Ada")
last = Signal("Lovelace")
full = Computed(lambda: f"{first.get()} {last.get()}")
upper = Computed(lambda: full.get().upper())
e = Effect(lambda: print(upper.get()))
# prints "ADA LOVELACE"Graph: first/last → full → upper → e
first.set("Grace"):
- Push phase:
firstnotifiesfull→full._dirty = True→fullnotifiesupper→upper._dirty = True→uppernotifiese→e.run() - Pull phase:
ereadsupper.get()→ dirty, soupper._recompute()→ readsfull.get()→ dirty, sofull._recompute()→ readsfirst(“Grace”) andlast(“Lovelace”) →full._value = "Grace Lovelace"→upper._value = "GRACE LOVELACE"→eprints it.
Scenario 3: Batch Deduplication
x = Signal(0)
y = Signal(0)
e = Effect(lambda: print(f"{x.get()}+{y.get()}"))
# prints "0+0"
with batch():
x.set(1) # enqueues e
y.set(2) # e already in queue (dedup)
# batch exits → flush → e.run() ONCE → prints "1+2"Without batch: x.set(1) → prints “1+0”, then y.set(2) → prints “1+2”. With batch: intermediate state “1+0” never exists.
Thread Safety
Every mutation of shared state goes through _lock (a threading.Lock):
- Subscriber set cleanup and snapshot
- Batch depth increment/decrement
- Queue append and dedup
Signal.get() and tracking (consumer._track, signal._subscribers.add) do NOT take the lock — in CPython, set.add() is atomic due to the GIL.
The lock primarily protects the batch queue (multi-writer) and subscriber set iteration (read-while-write from another thread’s set() call).
Class Hierarchy
object
├── Signal[T] _value, _version, _subscribers
│ get(), set(), peek(), update()
│
└── _Consumer _deps, _id, _track(), _untrack_all()
│
├── Effect _fn, _is_async, _disposed, _running
│ run(), _notify(), dispose()
│
└── Computed _fn, _value, _dirty, _version, _subscribers
get(), peek(), _recompute(), _notify(), dispose()
Computed inherits from _Consumer (reads signals) but also has _subscribers (others depend on it). It’s both reader and writer in the reactive graph.
See also
- Reactivity by Example — one full cycle, consumer/provider/kernel side-by-side
- Threading Model — concurrency, async supersede,
is_stale(),cancel_on_supersede - Python
contextvarsdocs