Reactive Engine Internals

The five data structures, the tracking mechanism, and three scenarios traced step by step.

Reading order: Reactivity by Example (the mental model) → You are here (how it actually works inside reactive.py) → Threading Model (concurrency, async supersede).

The entire reactive engine is one file: src/signalpy/kernel/reactive.py (~600 lines). This page explains how it works internally — the data structures, the tracking mechanism, and three worked scenarios.

You don’t need to read this to use the kernel. This is for when you want to understand why @effect auto-tracks, how batch() deduplicates, or what breaks if you mutate a dict in place.

The Five Data Structures

Everything is built from five things:

┌─────────────────────────────────────────────────────────┐
│  Global State                                           │
│                                                         │
│  _active_consumer: ContextVar    "who is reading now?"  │
│  _batch_depth:     int           "are we in a batch?"   │
│  _batch_queue:     list[Effect]  "effects waiting"      │
│  _lock:            Lock          "thread safety"        │
│  _creation_counter: int          "ordering effects"     │
└─────────────────────────────────────────────────────────┘

┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐
│  Signal           │  │  Computed         │  │  Effect           │
│                   │  │  (is a _Consumer) │  │  (is a _Consumer) │
│  _value: T        │  │  _fn: callable    │  │  _fn: callable    │
│  _version: int    │  │  _value: T        │  │  _is_async: bool  │
│  _subscribers:    │  │  _dirty: bool     │  │  _disposed: bool  │
│    set[_Consumer] │  │  _version: int    │  │  _running: bool   │
│                   │  │  _subscribers:    │  │                   │
│                   │  │    set[_Consumer] │  │  (from _Consumer) │
│                   │  │  _disposed: bool  │  │  _deps: set[Sig]  │
│                   │  │                   │  │  _id: int          │
│                   │  │  (from _Consumer) │  │                   │
│                   │  │  _deps: set[Sig]  │  └──────────────────┘
│                   │  │  _id: int          │
└──────────────────┘  └──────────────────┘

Three classes, five globals. That’s it.

The double-link pattern: when a consumer reads a Signal, the dependency is recorded on BOTH sides:

consumer._deps.add(signal)          "I depend on this signal"
signal._subscribers.add(consumer)   "this consumer reads me"

Why both? So either side can initiate: signals push notifications to subscribers, and consumers pull-cleanup their deps at dispose time.

The Tracking Mechanism: _active_consumer

_active_consumer: ContextVar[_Consumer | None] = ContextVar(
    "_active_consumer", default=None
)

This one variable makes auto-tracking work. It answers: “if a Signal is read right now, who should subscribe?”

  • None → nobody is tracking. Signal.get() just returns the value. Zero overhead.
  • Some Effect → that Effect is running. Signal.get() records the dependency.
  • Some Computed → that Computed is recomputing. Same thing.

Why one variable is enough for many effects

A common confusion: there are potentially hundreds of effects, but only one _active_consumer. How?

Because effects don’t overlap on a single execution flow. They take turns. The variable is like a return-address register on a CPU — there’s only one, but every function call uses it, because calls happen one at a time.

The ContextVar’s value at any moment is “the one effect whose body is currently on the call stack.” The rest of the time it’s None. The persistent graph lives elsewhere:

State Lives on Lifetime Answers
_active_consumer ContextVar (per-flow) nanoseconds “Who is reading right now?”
signal._subscribers each Signal lifetime of subscribed effects “When I change, who do I notify?”
effect._deps each Effect lifetime of the effect “What signals do I read?”

What about concurrency?

ContextVar gives each thread and each asyncio Task its own copy of the value automatically:

Main thread:     _active_consumer = EffectA  (then None, then EffectB, ...)
Worker thread:   _active_consumer = EffectC  (independent)
Asyncio task:    _active_consumer = EffectD  (snapshot from parent context)

Parallel effects don’t trample each other’s tracking. The _subscribers set IS shared across flows — that’s what the RLock protects.

What about nesting?

If effect A’s body triggers effect B synchronously (e.g., A writes a Signal that B subscribes to), the Token mechanism handles it like a stack:

A.run() starts  → ACV: None → A (token_A saved)
  A writes sig  → notifies B → B.run() starts
    B.run()     → ACV: A → B (token_B saved)
      B reads sig3 → registers B ✓
    B.run() ends → ACV: B → A (reset token_B)
  A continues
  A reads sig4  → registers A ✓  (still correct!)
A.run() ends    → ACV: A → None (reset token_A)

The try/finally: reset(token) in every run() is critical — even if the body raises, the slot is restored. Without it, a single exception would poison all subsequent tracking on that flow.

Signal: The Source of Truth

Signal.get — the tracking read

def get(self) -> T:
    consumer = _active_consumer.get()       # who's asking?
    if consumer is not None:                # someone IS asking
        consumer._track(self)               # consumer records dep
        self._subscribers.add(consumer)     # signal records subscriber
    return self._value                      # return value either way

Outside an effect: lines 2-4 are skipped. It’s a plain value read. Inside an effect: both sides record the link. This is the double-link.

Signal.set — the notifying write

def set(self, value: T) -> None:
    if value is self._value:        # IDENTITY check, not ==
        return                      # same object → skip entirely
    self._value = value
    self._version += 1
    self._notify_subscribers()      # tell everyone who reads me
Why is and not ==?

Identity comparison means mutating a dict in place won’t trigger notifications:

data = signal.peek()
data["key"] = "new"
signal.set(data)     # data IS data → same object → SKIPPED!

You must create a new object:

data = {**signal.peek(), "key": "new"}  # new dict
signal.set(data)                         # different object → notifies!

This is why ConfigProvider.set() copies the dict internally.

Effect: The Side-Effect Runner

Effect.run — the core execution loop

def run(self) -> None:
    if self._disposed or self._running:      # guards
        return
    self._running = True                      # re-entrancy guard ON
    self._untrack_all()                       # ① clear old deps
    token = _active_consumer.set(self)        # ② "I'm asking now"
    try:
        self._fn()                            # ③ execute — reads track me
    except Exception:
        log.exception("Effect execution error")
    finally:
        _active_consumer.reset(token)         # ④ "I'm done asking"
        self._running = False

Four steps:

_untrack_all() — removes this effect from all its old signals’ subscriber lists and clears self._deps. Why? Because the function might take a different code path this time and read different signals. Stale deps must go.

② Set _active_consumer — now any Signal.get() during self._fn() will see this effect and create the link.

③ Execute — the function runs. Every signal read creates a dependency.

④ Reset — clean up, even on exception.

Re-entrancy guard: if the body writes to a signal it reads, _notify would call run() again. self._running prevents infinite recursion. The change isn’t lost — it queues a re-run after the current body finishes.

Effect._notify — what happens when a dep changes

def _notify(self) -> None:
    if self._disposed: return
    if _batch_depth > 0:
        _batch_queue.append(self)    # batching → defer
    else:
        self.run()                    # not batching → run now

Computed: The Cached Derived Value

Computed is both a consumer (reads signals) and a signal-like (others depend on it). This dual role makes reactive chains work:

Signal ← Computed ← Computed ← Effect
           both          both      consumer
         consumer      consumer      only
          AND           AND
        signal-like  signal-like

Computed.get — lazy evaluation

def get(self) -> T:
    if self._dirty:                  # need recomputation?
        self._recompute()            # yes → run fn, cache result
    consumer = _active_consumer.get()
    if consumer is not None:
        consumer._track(self)        # whoever reads ME becomes my subscriber
        self._subscribers.add(consumer)
    return self._value               # cached

Lazy: doesn’t recompute until someone reads it. Cached: if deps haven’t changed, returns the cached value without running fn.

The push-pull pattern

When a Signal changes:

  1. Push: dirty flags propagate down: Signal → Computed → Computed → Effect
  2. Pull: when the Effect runs, it reads Computeds back up the chain, triggering recomputation only where needed

Push ensures effects know to re-run. Pull ensures only necessary recomputation happens.

Batch: Grouping Changes

@contextmanager
def batch():
    global _batch_depth
    _batch_depth += 1              # enter batch mode
    try:
        yield
    finally:
        _batch_depth -= 1
        if _batch_depth == 0:
            _flush_batch()         # run all pending effects
def _flush_batch():
    while _batch_queue:
        _batch_queue.sort(key=lambda e: e._id)   # creation order
        pending = list(_batch_queue)
        _batch_queue.clear()
        for effect in pending:
            if not effect._disposed:
                effect.run()          # may enqueue more effects

Nesting: inner batch increments depth; only outermost triggers flush. Ordering: effects sorted by _id (creation order) — parent effects before children. Deterministic. Convergence: effects running may trigger more effects. Loop continues until queue is empty (capped at 100 iterations for runaway protection).

Scenario 1: Simple Effect Tracking

name = Signal("Alice")
greeting = Signal("Hello")
e = Effect(lambda: print(f"{greeting.get()}, {name.get()}!"))
# prints "Hello, Alice!"

After creation: greeting._subscribers = {e}, name._subscribers = {e}, e._deps = {greeting, name}.

name.set("Bob")
  1. name._notify_subscribers() → calls e._notify()e.run()
  2. e._untrack_all() — removes e from both signals’ subscriber sets
  3. _active_consumer = e
  4. Body runs: greeting.get() re-tracks, name.get() re-tracks
  5. Prints “Hello, Bob!”

Scenario 2: Computed Chain

first = Signal("Ada")
last = Signal("Lovelace")
full = Computed(lambda: f"{first.get()} {last.get()}")
upper = Computed(lambda: full.get().upper())
e = Effect(lambda: print(upper.get()))
# prints "ADA LOVELACE"

Graph: first/last → full → upper → e

first.set("Grace"):

  1. Push phase: first notifies fullfull._dirty = Truefull notifies upperupper._dirty = Trueupper notifies ee.run()
  2. Pull phase: e reads upper.get() → dirty, so upper._recompute() → reads full.get() → dirty, so full._recompute() → reads first (“Grace”) and last (“Lovelace”) → full._value = "Grace Lovelace"upper._value = "GRACE LOVELACE"e prints it.

Scenario 3: Batch Deduplication

x = Signal(0)
y = Signal(0)
e = Effect(lambda: print(f"{x.get()}+{y.get()}"))
# prints "0+0"

with batch():
    x.set(1)     # enqueues e
    y.set(2)     # e already in queue (dedup)
# batch exits → flush → e.run() ONCE → prints "1+2"

Without batch: x.set(1) → prints “1+0”, then y.set(2) → prints “1+2”. With batch: intermediate state “1+0” never exists.

Thread Safety

Every mutation of shared state goes through _lock (a threading.Lock):

  • Subscriber set cleanup and snapshot
  • Batch depth increment/decrement
  • Queue append and dedup

Signal.get() and tracking (consumer._track, signal._subscribers.add) do NOT take the lock — in CPython, set.add() is atomic due to the GIL.

The lock primarily protects the batch queue (multi-writer) and subscriber set iteration (read-while-write from another thread’s set() call).

Class Hierarchy

object
  ├── Signal[T]          _value, _version, _subscribers
  │                      get(), set(), peek(), update()
  │
  └── _Consumer          _deps, _id, _track(), _untrack_all()
        │
        ├── Effect       _fn, _is_async, _disposed, _running
        │                run(), _notify(), dispose()
        │
        └── Computed     _fn, _value, _dirty, _version, _subscribers
                         get(), peek(), _recompute(), _notify(), dispose()

Computed inherits from _Consumer (reads signals) but also has _subscribers (others depend on it). It’s both reader and writer in the reactive graph.

See also