Reactive Engine: Line by Line

A complete walkthrough of reactive.py — every data structure, every method, every edge case.

The entire reactive engine is one file: src/signalpy/kernel/reactive.py (~350 lines). This tutorial walks through it top to bottom, then traces three scenarios through the data structures to show exactly what happens at runtime.

The Five Data Structures

Everything in the reactive engine is built from five things:

┌─────────────────────────────────────────────────────────────┐
│  Global State                                               │
│                                                             │
│  _active_consumer: ContextVar    "who is reading right now" │
│  _batch_depth:     int           "are we inside a batch?"   │
│  _batch_queue:     list[Effect]  "effects waiting to run"   │
│  _lock:            Lock          "thread safety"            │
│  _creation_counter: int          "ordering effects"         │
└─────────────────────────────────────────────────────────────┘

┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐
│  Signal           │  │  Computed         │  │  Effect           │
│                   │  │  (is a _Consumer) │  │  (is a _Consumer) │
│  _value: T        │  │  _fn: callable    │  │  _fn: callable    │
│  _version: int    │  │  _value: T        │  │  _is_async: bool  │
│  _subscribers:    │  │  _dirty: bool     │  │  _disposed: bool  │
│    set[_Consumer] │  │  _version: int    │  │  _running: bool   │
│                   │  │  _subscribers:    │  │                   │
│                   │  │    set[_Consumer] │  │  (from _Consumer) │
│                   │  │  _disposed: bool  │  │  _deps: set[Sig]  │
│                   │  │                   │  │  _id: int          │
│                   │  │  (from _Consumer) │  │                   │
│                   │  │  _deps: set[Sig]  │  └──────────────────┘
│                   │  │  _id: int          │
└──────────────────┘  └──────────────────┘

That’s it. Three classes, five globals.

The double-link pattern: when a consumer reads a Signal, the dependency is recorded on BOTH sides:

consumer._deps.add(signal)          "I depend on this signal"
signal._subscribers.add(consumer)   "this consumer reads me"

Both are sets, so adding the same link twice is a no-op. The double link exists so that either side can initiate: signals notify subscribers (push), and consumers untrack their deps (pull, during cleanup).

Global State: _active_consumer

_active_consumer: ContextVar[_Consumer | None] = ContextVar(
    "_active_consumer", default=None
)

This is the trick that makes automatic tracking work. It answers one question: “Is someone currently executing who wants to track dependencies?”

  • None → nobody is tracking. Signal.get() just returns the value.
  • some_effect → an Effect is running. Signal.get() records the dependency.
  • some_computed → a Computed is recomputing. Same thing.

It’s a ContextVar, not a global variable, because:

Task A (running Effect 1):   _active_consumer = Effect1
Task B (running Effect 2):   _active_consumer = Effect2

Each async task has its own copy. They don’t interfere.

Thread 1:  _active_consumer = Effect1   ← one context
Thread 2:  _active_consumer = Effect2   ← different context

Each thread/task has its own copy. ContextVar handles this automatically.

Signal: The Source of Truth

Signal.__init__

def __init__(self, value: T) -> None:
    self._value: T = value            # the actual data
    self._version: int = 0            # bumps on each change (for optimization)
    self._subscribers: set[_Consumer] = set()  # who reads me

State after name = Signal("Alice"):

name:
  _value = "Alice"
  _version = 0
  _subscribers = {}         ← empty, nobody has read it yet

Signal.get — the tracking read

def get(self) -> T:
    consumer = _active_consumer.get()       # line 105: who's asking?
    if consumer is not None:                # line 106: someone IS asking
        consumer._track(self)               # line 107: consumer records "I depend on this"
        self._subscribers.add(consumer)     # line 108: signal records "this consumer reads me"
    return self._value                      # line 109: return the value either way

If nobody is asking (_active_consumer is None): lines 106-108 are skipped. It’s a plain value read. Zero overhead.

If an Effect is running: both sides record the link. This is the double-link.

Signal.set — the notifying write

def set(self, value: T) -> None:
    if value is self._value:        # line 123: identity check, not ==
        return                      # same object → skip entirely
    self._value = value             # line 125: store new value
    self._version += 1              # line 126: bump version
    self._notify_subscribers()      # line 127: tell everyone who reads me

Identity comparison (is) is critical. If you mutate a dict in place:

data = signal.peek()
data["key"] = "new"
signal.set(data)     # data IS data → identity match → SKIPPED!

You must create a new object:

data = dict(signal.peek())   # copy → new object
data["key"] = "new"
signal.set(data)              # new dict IS NOT old dict → notifies

This is why ConfigProvider.set() copies the dict.

Signal._notify_subscribers — the fan-out

def _notify_subscribers(self) -> None:
    with _lock:                                          # thread-safe
        self._subscribers = {                            # clean up dead ones
            c for c in self._subscribers
            if not getattr(c, '_disposed', False)
        }
        snapshot = list(self._subscribers)               # snapshot under lock

    for consumer in snapshot:                            # iterate snapshot
        consumer._notify()                               # each consumer decides what to do

The snapshot prevents “set modified during iteration” errors. The disposed cleanup prevents memory leaks from deactivated components. The lock makes this safe across threads.

Each consumer type handles _notify() differently:

  • Effect._notify(): if batching → enqueue; else → run() immediately
  • Computed._notify(): mark dirty, propagate to own subscribers

Effect: The Side-Effect Runner

Effect.__init__

def __init__(self, fn: Callable, *, lazy: bool = False) -> None:
    super().__init__()                  # _Consumer.__init__: _deps={}, _id=N
    self._fn = fn                       # the function to run
    self._is_async = inspect.iscoroutinefunction(fn)
    self._disposed = False              # can be stopped
    self._running = False               # re-entrancy guard
    if not lazy:
        self.run()                      # run immediately on creation!

lazy=False (default) means the effect runs at creation time. This is how @effect methods work — the kernel creates the Effect during component activation, and it runs immediately, tracking its initial dependencies.

Effect.run — the core execution loop

def run(self) -> None:
    if self._disposed or self._running:      # guards
        return
    self._running = True                      # re-entrancy guard ON
    self._untrack_all()                       # ① clear old deps

    token = _active_consumer.set(self)        # ② "I'm asking now"
    try:
        self._fn()                            # ③ execute — Signal reads track me
    except Exception:
        log.exception("Effect execution error")
    finally:
        _active_consumer.reset(token)         # ④ "I'm done asking"
        self._running = False                 # re-entrancy guard OFF

Four steps:

_untrack_all() — removes this effect from all its old signals’ subscriber lists and clears self._deps. Why? Because the function might take a different code path this time and read different signals. Stale deps must be removed.

Before _untrack_all:
  effect._deps = {signal_A, signal_B}
  signal_A._subscribers = {effect, ...}
  signal_B._subscribers = {effect, ...}

After _untrack_all:
  effect._deps = {}
  signal_A._subscribers = {...}        ← effect removed
  signal_B._subscribers = {...}        ← effect removed

② Set _active_consumer — now Signal.get() calls during self._fn() will see this effect as the active consumer and track the dependency.

③ Execute — the function runs. Every signal.get() inside it creates a link. After execution, effect._deps contains all signals that were read.

④ Reset — clean up. The finally block ensures reset even on exception.

Re-entrancy guard: if the function writes to a signal it reads, _notify calls run() again. The self._running check prevents infinite recursion. The write is not lost — it just doesn’t trigger a synchronous re-run. The change will be picked up on the next notification.

Effect._notify — what happens when a dep changes

def _notify(self) -> None:
    if self._disposed:                    # dead? ignore
        return
    with _lock:
        in_batch = _batch_depth > 0       # check batch state (thread-safe)
    if in_batch:
        with _lock:
            if self not in _batch_queue:   # dedup: don't enqueue twice
                _batch_queue.append(self)
    else:
        self.run()                         # not batching → run immediately

Two modes:

  • Not batching: run() immediately. The effect re-executes, re-tracks deps.
  • Batching: enqueue for later. _flush_batch() will run it when the batch ends.

Computed: The Cached Derived Value

Computed is both a consumer (it reads signals) and a signal-like (others can depend on it). This dual role is what makes reactive chains work.

Signal ← Computed ← Computed ← Effect
  │         │           │          │
  │     consumer     consumer   consumer
  │       AND          AND
  │     signal-like  signal-like

Computed.get — lazy evaluation

def get(self) -> T:
    if self._disposed:
        return self._value
    if self._dirty:                              # need recomputation?
        self._recompute()                        # yes → run fn, cache result
    consumer = _active_consumer.get()            # who's reading ME?
    if consumer is not None:
        consumer._track(self)                    # track: consumer depends on this computed
        self._subscribers.add(consumer)
    return self._value                           # return cached value

Lazy: doesn’t recompute until someone reads it. If nobody reads it after a dep change, the function never runs. This is the pull side of push-pull.

Cached: if _dirty is False, returns the cached value without running the function. If you read a computed 10 times without its deps changing, the function runs once.

Computed._recompute — the actual computation

def _recompute(self) -> None:
    self._untrack_all()                          # clear old deps (same as Effect)
    token = _active_consumer.set(self)           # "I'm asking now"
    try:
        old = self._value
        self._value = self._fn()                 # execute, track deps
        self._dirty = False
        if self._value is not old:               # value actually changed?
            self._version += 1
            self._propagate()                    # yes → tell MY subscribers
    finally:
        _active_consumer.reset(token)

The optimization: if self._fn() returns the same object (identity check), _propagate() is skipped. Downstream effects don’t re-run. This prevents unnecessary cascading through computed chains.

Computed._notify — a dep changed upstream

def _notify(self) -> None:
    if self._disposed:
        return
    was_dirty = self._dirty
    self._dirty = True                           # mark: I need recomputation
    if not was_dirty:                            # was I clean before?
        for consumer in list(self._subscribers): # yes → tell my subscribers
            consumer._notify()                   # (they might be effects that need to re-run)

Dirty dedup: if the computed was already dirty (e.g., two of its deps changed in the same batch), the second notification doesn’t propagate. Subscribers were already told.

Batch: Grouping Changes

@contextmanager
def batch():
    global _batch_depth
    with _lock:
        _batch_depth += 1              # enter batch mode
    try:
        yield
    finally:
        with _lock:
            _batch_depth -= 1          # leave batch mode
            should_flush = _batch_depth == 0
        if should_flush:
            _flush_batch()             # run all pending effects
def _flush_batch() -> None:
    global _batch_queue
    iterations = 0
    while True:
        with _lock:
            if not _batch_queue:
                break
            _batch_queue.sort(key=lambda e: e._id)   # creation order
            pending = list(_batch_queue)
            _batch_queue.clear()

        for effect in pending:
            if not effect._disposed:
                effect.run()                          # may enqueue more effects
        iterations += 1

Nesting: batch() inside batch() increments depth. Only the outermost batch triggers the flush. This prevents partial flushes.

Ordering: effects are sorted by _id (creation counter). Parent effects (created first) run before child effects. Deterministic.

Iteration cap: effects running may trigger new effects. The while loop handles this but caps at 100 iterations to prevent infinite loops.

Scenario 1: Simple Effect Tracking

name = Signal("Alice")
greeting = Signal("Hello")
log = []
e = Effect(lambda: log.append(f"{greeting.get()}, {name.get()}!"))

Step 1: Effect created, run() called (not lazy)

_active_consumer = e                        # Effect sets itself

greeting.get():                             # Signal.get checks consumer
  e._deps.add(greeting)                    # effect records dep
  greeting._subscribers.add(e)             # signal records subscriber
  → returns "Hello"

name.get():
  e._deps.add(name)
  name._subscribers.add(e)
  → returns "Alice"

log = ["Hello, Alice!"]

_active_consumer = None                     # reset

State after construction:

  greeting._subscribers = {e}    name._subscribers = {e}
         ↑                              ↑
         └─── e._deps = {greeting, name} ───┘

  log = ["Hello, Alice!"]

Step 2: name.set("Bob")

name.set("Bob"):
  "Bob" is not "Alice"                      # identity check: different
  name._value = "Bob"
  name._version = 1
  name._notify_subscribers():
    snapshot = [e]
    e._notify():                            # not batching → run()
      e.run():
        e._untrack_all():                   # remove from both signals
          greeting._subscribers.discard(e)
          name._subscribers.discard(e)
          e._deps.clear()
        _active_consumer = e
        greeting.get() → re-tracks
        name.get() → re-tracks
        log.append("Hello, Bob!")
        _active_consumer = None

  log = ["Hello, Alice!", "Hello, Bob!"]

Scenario 2: Computed Chain

first = Signal("Ada")
last = Signal("Lovelace")
full = Computed(lambda: f"{first.get()} {last.get()}")
upper = Computed(lambda: full.get().upper())
log = []
e = Effect(lambda: log.append(upper.get()))

After construction:

first ──→ full ──→ upper ──→ e
last  ──↗        (Computed)  (Computed)  (Effect)
(Signal)  (Signal)

first._subscribers = {full}
last._subscribers = {full}
full._subscribers = {upper}
full._deps = {first, last}
upper._subscribers = {e}
upper._deps = {full}
e._deps = {upper}

log = ["ADA LOVELACE"]

first.set("Grace"):

first.set("Grace"):
  first._notify_subscribers():
    full._notify():                         # Computed._notify
      full._dirty = True                    # mark dirty (was clean)
      propagate to subscribers:
        upper._notify():                    # Computed._notify
          upper._dirty = True
          propagate:
            e._notify():                    # Effect._notify
              e.run():                      # runs the effect
                upper.get():                # Computed.get: dirty → recompute
                  upper._recompute():
                    full.get():             # Computed.get: dirty → recompute
                      full._recompute():
                        first.get() → "Grace"
                        last.get() → "Lovelace"
                        full._value = "Grace Lovelace"
                        full._dirty = False
                        "Grace Lovelace" is not "Ada Lovelace" → propagate
                    → returns "Grace Lovelace"
                    upper._value = "GRACE LOVELACE"
                    upper._dirty = False
                → returns "GRACE LOVELACE"
                log.append("GRACE LOVELACE")

log = ["ADA LOVELACE", "GRACE LOVELACE"]

The push-pull pattern:

  1. Push: first.set() pushes dirty flags down the chain: full → upper → e
  2. Pull: e.run() pulls values back up: e reads upper, which reads full, which reads first and last

Push ensures effects know to re-run. Pull ensures values are fresh.

Scenario 3: Batch with Multiple Signals

x = Signal(0)
y = Signal(0)
log = []
e = Effect(lambda: log.append(f"{x.get()}+{y.get()}"))

# log = ["0+0"]

with batch():
    x.set(1)     # enqueues e (doesn't run)
    y.set(2)     # e already in queue (dedup)

# batch exits → flush → e.run() once
# log = ["0+0", "1+2"]

Step by step:

batch() entered: _batch_depth = 1

x.set(1):
  x._notify_subscribers():
    e._notify():
      _batch_depth > 0 → enqueue
      _batch_queue = [e]

y.set(2):
  y._notify_subscribers():
    e._notify():
      _batch_depth > 0 → e already in queue → skip
      _batch_queue = [e]       ← still just one

batch() exits: _batch_depth = 0 → _flush_batch()

_flush_batch():
  sort by _id (just one effect, trivial)
  pending = [e]
  _batch_queue = []
  e.run():
    reads x.get() → 1
    reads y.get() → 2
    log.append("1+2")

log = ["0+0", "1+2"]

Without batch, it would have been:

x.set(1) → e.run() → log.append("1+0")
y.set(2) → e.run() → log.append("1+2")
log = ["0+0", "1+0", "1+2"]      ← intermediate state "1+0" is visible

Batch prevents the intermediate state.

The _Consumer Base Class

class _Consumer:
    __slots__ = ("_deps", "_id")

    def __init__(self) -> None:
        self._deps: set[Signal] = set()    # signals I read
        self._id: int = _next_id()          # creation order

    def _track(self, signal: Signal) -> None:
        self._deps.add(signal)              # record dep (set → dedup)

    def _untrack_all(self) -> None:
        for sig in self._deps:
            sig._subscribers.discard(self)  # remove ME from each signal
        self._deps.clear()                  # clear MY dep list

_untrack_all is called at the start of every run() and _recompute(). This is why deps can change per run — the old links are cleared, and new links are created based on what the function actually reads this time.

_id is a monotonic counter. Effects created earlier have lower IDs. In batch flush, effects are sorted by _id, so parent components’ effects run before their children’s. This matches Vue 3’s behavior.

Thread Safety

Every mutation of shared state goes through _lock:

_lock = threading.Lock()

# Subscriber cleanup and snapshot:
with _lock:
    self._subscribers = {c for c in self._subscribers if not ...}
    snapshot = list(self._subscribers)

# Batch state:
with _lock:
    _batch_depth += 1

# Queue operations:
with _lock:
    if self not in _batch_queue:
        _batch_queue.append(self)

Signal.get() and the core tracking (consumer._track, signal._subscribers.add) do NOT take the lock. In CPython, set.add() is atomic due to the GIL. For free-threaded Python 3.13+, these would need protection — that’s a future fix.

The lock primarily protects the batch queue (multi-writer) and subscriber set iteration (read-while-write).

Complete Class Hierarchy

object
  │
  ├── Signal[T]
  │     _value, _version, _subscribers
  │     get(), set(), peek(), update()
  │     _notify_subscribers()
  │
  └── _Consumer
        _deps, _id
        _track(), _untrack_all()
        │
        ├── Effect(_Consumer)
        │     _fn, _is_async, _disposed, _running
        │     run(), _notify(), dispose()
        │
        └── Computed(_Consumer, Generic[T])
              _fn, _value, _dirty, _version, _subscribers, _disposed
              get(), peek(), _recompute(), _propagate()
              _notify(), dispose()

Computed inherits from _Consumer (it tracks signals it reads) but also has _subscribers (other consumers depend on it). It’s both reader and writer in the reactive graph.

Line Count

Section Lines What
Globals + _Consumer 27-78 ContextVar, Lock, counter, base class
Signal 83-167 Value container, tracking, notification
Computed 172-278 Lazy cached value, push-pull, propagation
Effect 283-373 Side-effect runner, async support, re-entrancy
Batch 378-428 Grouping, ordering, flush loop
Total ~350

That’s the entire reactive engine.

Signal.peek()

Signal.peek() — read the value without recording a subscription (won’t make the surrounding effect depend on this signal). Used internally by the runtime and useful in lifecycle callbacks where you need a value without creating a reactive dependency.