Reactive Engine: Line by Line
A complete walkthrough of reactive.py — every data structure, every method, every edge case.
The entire reactive engine is one file: src/signalpy/kernel/reactive.py (~350 lines). This tutorial walks through it top to bottom, then traces three scenarios through the data structures to show exactly what happens at runtime.
The Five Data Structures
Everything in the reactive engine is built from five things:
┌─────────────────────────────────────────────────────────────┐
│ Global State │
│ │
│ _active_consumer: ContextVar "who is reading right now" │
│ _batch_depth: int "are we inside a batch?" │
│ _batch_queue: list[Effect] "effects waiting to run" │
│ _lock: Lock "thread safety" │
│ _creation_counter: int "ordering effects" │
└─────────────────────────────────────────────────────────────┘
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Signal │ │ Computed │ │ Effect │
│ │ │ (is a _Consumer) │ │ (is a _Consumer) │
│ _value: T │ │ _fn: callable │ │ _fn: callable │
│ _version: int │ │ _value: T │ │ _is_async: bool │
│ _subscribers: │ │ _dirty: bool │ │ _disposed: bool │
│ set[_Consumer] │ │ _version: int │ │ _running: bool │
│ │ │ _subscribers: │ │ │
│ │ │ set[_Consumer] │ │ (from _Consumer) │
│ │ │ _disposed: bool │ │ _deps: set[Sig] │
│ │ │ │ │ _id: int │
│ │ │ (from _Consumer) │ │ │
│ │ │ _deps: set[Sig] │ └──────────────────┘
│ │ │ _id: int │
└──────────────────┘ └──────────────────┘
That’s it. Three classes, five globals.
The double-link pattern: when a consumer reads a Signal, the dependency is recorded on BOTH sides:
consumer._deps.add(signal) "I depend on this signal"
signal._subscribers.add(consumer) "this consumer reads me"
Both are sets, so adding the same link twice is a no-op. The double link exists so that either side can initiate: signals notify subscribers (push), and consumers untrack their deps (pull, during cleanup).
Global State: _active_consumer
_active_consumer: ContextVar[_Consumer | None] = ContextVar(
"_active_consumer", default=None
)This is the trick that makes automatic tracking work. It answers one question: “Is someone currently executing who wants to track dependencies?”
None→ nobody is tracking.Signal.get()just returns the value.some_effect→ an Effect is running.Signal.get()records the dependency.some_computed→ a Computed is recomputing. Same thing.
It’s a ContextVar, not a global variable, because:
Task A (running Effect 1): _active_consumer = Effect1
Task B (running Effect 2): _active_consumer = Effect2
Each async task has its own copy. They don’t interfere.
Thread 1: _active_consumer = Effect1 ← one context
Thread 2: _active_consumer = Effect2 ← different context
Each thread/task has its own copy. ContextVar handles this automatically.
Signal: The Source of Truth
Signal.__init__
def __init__(self, value: T) -> None:
self._value: T = value # the actual data
self._version: int = 0 # bumps on each change (for optimization)
self._subscribers: set[_Consumer] = set() # who reads meState after name = Signal("Alice"):
name:
_value = "Alice"
_version = 0
_subscribers = {} ← empty, nobody has read it yet
Signal.get — the tracking read
def get(self) -> T:
consumer = _active_consumer.get() # line 105: who's asking?
if consumer is not None: # line 106: someone IS asking
consumer._track(self) # line 107: consumer records "I depend on this"
self._subscribers.add(consumer) # line 108: signal records "this consumer reads me"
return self._value # line 109: return the value either wayIf nobody is asking (_active_consumer is None): lines 106-108 are skipped. It’s a plain value read. Zero overhead.
If an Effect is running: both sides record the link. This is the double-link.
Signal.set — the notifying write
def set(self, value: T) -> None:
if value is self._value: # line 123: identity check, not ==
return # same object → skip entirely
self._value = value # line 125: store new value
self._version += 1 # line 126: bump version
self._notify_subscribers() # line 127: tell everyone who reads meIdentity comparison (is) is critical. If you mutate a dict in place:
data = signal.peek()
data["key"] = "new"
signal.set(data) # data IS data → identity match → SKIPPED!You must create a new object:
data = dict(signal.peek()) # copy → new object
data["key"] = "new"
signal.set(data) # new dict IS NOT old dict → notifiesThis is why ConfigProvider.set() copies the dict.
Signal._notify_subscribers — the fan-out
def _notify_subscribers(self) -> None:
with _lock: # thread-safe
self._subscribers = { # clean up dead ones
c for c in self._subscribers
if not getattr(c, '_disposed', False)
}
snapshot = list(self._subscribers) # snapshot under lock
for consumer in snapshot: # iterate snapshot
consumer._notify() # each consumer decides what to doThe snapshot prevents “set modified during iteration” errors. The disposed cleanup prevents memory leaks from deactivated components. The lock makes this safe across threads.
Each consumer type handles _notify() differently:
- Effect._notify(): if batching → enqueue; else →
run()immediately - Computed._notify(): mark dirty, propagate to own subscribers
Effect: The Side-Effect Runner
Effect.__init__
def __init__(self, fn: Callable, *, lazy: bool = False) -> None:
super().__init__() # _Consumer.__init__: _deps={}, _id=N
self._fn = fn # the function to run
self._is_async = inspect.iscoroutinefunction(fn)
self._disposed = False # can be stopped
self._running = False # re-entrancy guard
if not lazy:
self.run() # run immediately on creation!lazy=False (default) means the effect runs at creation time. This is how @effect methods work — the kernel creates the Effect during component activation, and it runs immediately, tracking its initial dependencies.
Effect.run — the core execution loop
def run(self) -> None:
if self._disposed or self._running: # guards
return
self._running = True # re-entrancy guard ON
self._untrack_all() # ① clear old deps
token = _active_consumer.set(self) # ② "I'm asking now"
try:
self._fn() # ③ execute — Signal reads track me
except Exception:
log.exception("Effect execution error")
finally:
_active_consumer.reset(token) # ④ "I'm done asking"
self._running = False # re-entrancy guard OFFFour steps:
① _untrack_all() — removes this effect from all its old signals’ subscriber lists and clears self._deps. Why? Because the function might take a different code path this time and read different signals. Stale deps must be removed.
Before _untrack_all:
effect._deps = {signal_A, signal_B}
signal_A._subscribers = {effect, ...}
signal_B._subscribers = {effect, ...}
After _untrack_all:
effect._deps = {}
signal_A._subscribers = {...} ← effect removed
signal_B._subscribers = {...} ← effect removed
② Set _active_consumer — now Signal.get() calls during self._fn() will see this effect as the active consumer and track the dependency.
③ Execute — the function runs. Every signal.get() inside it creates a link. After execution, effect._deps contains all signals that were read.
④ Reset — clean up. The finally block ensures reset even on exception.
Re-entrancy guard: if the function writes to a signal it reads, _notify calls run() again. The self._running check prevents infinite recursion. The write is not lost — it just doesn’t trigger a synchronous re-run. The change will be picked up on the next notification.
Effect._notify — what happens when a dep changes
def _notify(self) -> None:
if self._disposed: # dead? ignore
return
with _lock:
in_batch = _batch_depth > 0 # check batch state (thread-safe)
if in_batch:
with _lock:
if self not in _batch_queue: # dedup: don't enqueue twice
_batch_queue.append(self)
else:
self.run() # not batching → run immediatelyTwo modes:
- Not batching:
run()immediately. The effect re-executes, re-tracks deps. - Batching: enqueue for later.
_flush_batch()will run it when the batch ends.
Computed: The Cached Derived Value
Computed is both a consumer (it reads signals) and a signal-like (others can depend on it). This dual role is what makes reactive chains work.
Signal ← Computed ← Computed ← Effect
│ │ │ │
│ consumer consumer consumer
│ AND AND
│ signal-like signal-like
Computed.get — lazy evaluation
def get(self) -> T:
if self._disposed:
return self._value
if self._dirty: # need recomputation?
self._recompute() # yes → run fn, cache result
consumer = _active_consumer.get() # who's reading ME?
if consumer is not None:
consumer._track(self) # track: consumer depends on this computed
self._subscribers.add(consumer)
return self._value # return cached valueLazy: doesn’t recompute until someone reads it. If nobody reads it after a dep change, the function never runs. This is the pull side of push-pull.
Cached: if _dirty is False, returns the cached value without running the function. If you read a computed 10 times without its deps changing, the function runs once.
Computed._recompute — the actual computation
def _recompute(self) -> None:
self._untrack_all() # clear old deps (same as Effect)
token = _active_consumer.set(self) # "I'm asking now"
try:
old = self._value
self._value = self._fn() # execute, track deps
self._dirty = False
if self._value is not old: # value actually changed?
self._version += 1
self._propagate() # yes → tell MY subscribers
finally:
_active_consumer.reset(token)The optimization: if self._fn() returns the same object (identity check), _propagate() is skipped. Downstream effects don’t re-run. This prevents unnecessary cascading through computed chains.
Computed._notify — a dep changed upstream
def _notify(self) -> None:
if self._disposed:
return
was_dirty = self._dirty
self._dirty = True # mark: I need recomputation
if not was_dirty: # was I clean before?
for consumer in list(self._subscribers): # yes → tell my subscribers
consumer._notify() # (they might be effects that need to re-run)Dirty dedup: if the computed was already dirty (e.g., two of its deps changed in the same batch), the second notification doesn’t propagate. Subscribers were already told.
Batch: Grouping Changes
@contextmanager
def batch():
global _batch_depth
with _lock:
_batch_depth += 1 # enter batch mode
try:
yield
finally:
with _lock:
_batch_depth -= 1 # leave batch mode
should_flush = _batch_depth == 0
if should_flush:
_flush_batch() # run all pending effectsdef _flush_batch() -> None:
global _batch_queue
iterations = 0
while True:
with _lock:
if not _batch_queue:
break
_batch_queue.sort(key=lambda e: e._id) # creation order
pending = list(_batch_queue)
_batch_queue.clear()
for effect in pending:
if not effect._disposed:
effect.run() # may enqueue more effects
iterations += 1Nesting: batch() inside batch() increments depth. Only the outermost batch triggers the flush. This prevents partial flushes.
Ordering: effects are sorted by _id (creation counter). Parent effects (created first) run before child effects. Deterministic.
Iteration cap: effects running may trigger new effects. The while loop handles this but caps at 100 iterations to prevent infinite loops.
Scenario 1: Simple Effect Tracking
name = Signal("Alice")
greeting = Signal("Hello")
log = []
e = Effect(lambda: log.append(f"{greeting.get()}, {name.get()}!"))Step 1: Effect created, run() called (not lazy)
_active_consumer = e # Effect sets itself
greeting.get(): # Signal.get checks consumer
e._deps.add(greeting) # effect records dep
greeting._subscribers.add(e) # signal records subscriber
→ returns "Hello"
name.get():
e._deps.add(name)
name._subscribers.add(e)
→ returns "Alice"
log = ["Hello, Alice!"]
_active_consumer = None # reset
State after construction:
greeting._subscribers = {e} name._subscribers = {e}
↑ ↑
└─── e._deps = {greeting, name} ───┘
log = ["Hello, Alice!"]
Step 2: name.set("Bob")
name.set("Bob"):
"Bob" is not "Alice" # identity check: different
name._value = "Bob"
name._version = 1
name._notify_subscribers():
snapshot = [e]
e._notify(): # not batching → run()
e.run():
e._untrack_all(): # remove from both signals
greeting._subscribers.discard(e)
name._subscribers.discard(e)
e._deps.clear()
_active_consumer = e
greeting.get() → re-tracks
name.get() → re-tracks
log.append("Hello, Bob!")
_active_consumer = None
log = ["Hello, Alice!", "Hello, Bob!"]
Scenario 2: Computed Chain
first = Signal("Ada")
last = Signal("Lovelace")
full = Computed(lambda: f"{first.get()} {last.get()}")
upper = Computed(lambda: full.get().upper())
log = []
e = Effect(lambda: log.append(upper.get()))After construction:
first ──→ full ──→ upper ──→ e
last ──↗ (Computed) (Computed) (Effect)
(Signal) (Signal)
first._subscribers = {full}
last._subscribers = {full}
full._subscribers = {upper}
full._deps = {first, last}
upper._subscribers = {e}
upper._deps = {full}
e._deps = {upper}
log = ["ADA LOVELACE"]
first.set("Grace"):
first.set("Grace"):
first._notify_subscribers():
full._notify(): # Computed._notify
full._dirty = True # mark dirty (was clean)
propagate to subscribers:
upper._notify(): # Computed._notify
upper._dirty = True
propagate:
e._notify(): # Effect._notify
e.run(): # runs the effect
upper.get(): # Computed.get: dirty → recompute
upper._recompute():
full.get(): # Computed.get: dirty → recompute
full._recompute():
first.get() → "Grace"
last.get() → "Lovelace"
full._value = "Grace Lovelace"
full._dirty = False
"Grace Lovelace" is not "Ada Lovelace" → propagate
→ returns "Grace Lovelace"
upper._value = "GRACE LOVELACE"
upper._dirty = False
→ returns "GRACE LOVELACE"
log.append("GRACE LOVELACE")
log = ["ADA LOVELACE", "GRACE LOVELACE"]
The push-pull pattern:
- Push:
first.set()pushes dirty flags down the chain: full → upper → e - Pull:
e.run()pulls values back up: e reads upper, which reads full, which reads first and last
Push ensures effects know to re-run. Pull ensures values are fresh.
Scenario 3: Batch with Multiple Signals
x = Signal(0)
y = Signal(0)
log = []
e = Effect(lambda: log.append(f"{x.get()}+{y.get()}"))
# log = ["0+0"]
with batch():
x.set(1) # enqueues e (doesn't run)
y.set(2) # e already in queue (dedup)
# batch exits → flush → e.run() once
# log = ["0+0", "1+2"]Step by step:
batch() entered: _batch_depth = 1
x.set(1):
x._notify_subscribers():
e._notify():
_batch_depth > 0 → enqueue
_batch_queue = [e]
y.set(2):
y._notify_subscribers():
e._notify():
_batch_depth > 0 → e already in queue → skip
_batch_queue = [e] ← still just one
batch() exits: _batch_depth = 0 → _flush_batch()
_flush_batch():
sort by _id (just one effect, trivial)
pending = [e]
_batch_queue = []
e.run():
reads x.get() → 1
reads y.get() → 2
log.append("1+2")
log = ["0+0", "1+2"]
Without batch, it would have been:
x.set(1) → e.run() → log.append("1+0")
y.set(2) → e.run() → log.append("1+2")
log = ["0+0", "1+0", "1+2"] ← intermediate state "1+0" is visible
Batch prevents the intermediate state.
The _Consumer Base Class
class _Consumer:
__slots__ = ("_deps", "_id")
def __init__(self) -> None:
self._deps: set[Signal] = set() # signals I read
self._id: int = _next_id() # creation order
def _track(self, signal: Signal) -> None:
self._deps.add(signal) # record dep (set → dedup)
def _untrack_all(self) -> None:
for sig in self._deps:
sig._subscribers.discard(self) # remove ME from each signal
self._deps.clear() # clear MY dep list_untrack_all is called at the start of every run() and _recompute(). This is why deps can change per run — the old links are cleared, and new links are created based on what the function actually reads this time.
_id is a monotonic counter. Effects created earlier have lower IDs. In batch flush, effects are sorted by _id, so parent components’ effects run before their children’s. This matches Vue 3’s behavior.
Thread Safety
Every mutation of shared state goes through _lock:
_lock = threading.Lock()
# Subscriber cleanup and snapshot:
with _lock:
self._subscribers = {c for c in self._subscribers if not ...}
snapshot = list(self._subscribers)
# Batch state:
with _lock:
_batch_depth += 1
# Queue operations:
with _lock:
if self not in _batch_queue:
_batch_queue.append(self)Signal.get() and the core tracking (consumer._track, signal._subscribers.add) do NOT take the lock. In CPython, set.add() is atomic due to the GIL. For free-threaded Python 3.13+, these would need protection — that’s a future fix.
The lock primarily protects the batch queue (multi-writer) and subscriber set iteration (read-while-write).
Complete Class Hierarchy
object
│
├── Signal[T]
│ _value, _version, _subscribers
│ get(), set(), peek(), update()
│ _notify_subscribers()
│
└── _Consumer
_deps, _id
_track(), _untrack_all()
│
├── Effect(_Consumer)
│ _fn, _is_async, _disposed, _running
│ run(), _notify(), dispose()
│
└── Computed(_Consumer, Generic[T])
_fn, _value, _dirty, _version, _subscribers, _disposed
get(), peek(), _recompute(), _propagate()
_notify(), dispose()
Computed inherits from _Consumer (it tracks signals it reads) but also has _subscribers (other consumers depend on it). It’s both reader and writer in the reactive graph.
Line Count
| Section | Lines | What |
|---|---|---|
| Globals + _Consumer | 27-78 | ContextVar, Lock, counter, base class |
| Signal | 83-167 | Value container, tracking, notification |
| Computed | 172-278 | Lazy cached value, push-pull, propagation |
| Effect | 283-373 | Side-effect runner, async support, re-entrancy |
| Batch | 378-428 | Grouping, ordering, flush loop |
| Total | ~350 |
That’s the entire reactive engine.
Signal.peek() — read the value without recording a subscription (won’t make the surrounding effect depend on this signal). Used internally by the runtime and useful in lifecycle callbacks where you need a value without creating a reactive dependency.