Context, ContextVar, and the Active Consumer

How a single ContextVar can correctly route reactive tracking for any number of effects, providers, and threads.

A common confusion when reading the reactive engine for the first time:

There’s only one _active_consumer. It’s a single ContextVar. The program has many effects, many signals, many computeds. How can one variable possibly track all of that?

The short answer: _active_consumer doesn’t track all of them. It tracks exactly one effect at a time — whichever one is currently running — and only for as long as that effect’s body is on the call stack. The rest of the time it’s None.

The persistent “who depends on what” graph lives somewhere else entirely: in the _subscribers: set on each Signal and the _deps: set on each Effect.

This page walks through that distinction with a concrete program and a timeline.

The two kinds of state in the engine

State Lives on Lifetime What it answers
_active_consumer a ContextVar (per-flow current value) nanoseconds — set just before a body runs, reset after “If a Signal is read this instant, who should subscribe?”
signal._subscribers each Signal object the lifetime of the effects that read it “When I change, who do I notify?”
effect._deps each Effect object the lifetime of the effect “What signals do I read, so I can unsubscribe at dispose time?”

The ContextVar is transient bookkeeping during a single function call. The subscriber/dep sets are the actual reactive graph. The ContextVar exists only to bridge a Signal read to its caller without making the caller pass itself explicitly.

Once you internalize this split, “how does one variable serve many effects” stops being mysterious. It’s the same way one return-address register can serve any number of function calls — calls happen one at a time, the register is reused.

A concrete program

Three signals (call them “providers” loosely), two effects, then a mutation.

from signalpy.kernel.reactive import Signal, Effect

port    = Signal(8080)
db_url  = Signal("postgres://prod")
log_lvl = Signal("INFO")

# Effect A reads only `port`
def render_port():
    print(f"Port is {port.get()}")
Effect(render_port)

# Effect B reads `port` AND `db_url`
def render_full():
    print(f"Server :{port.get()}, DB={db_url.get()}")
Effect(render_full)

# Mutation
port.set(9090)
db_url.set("postgres://staging")

What _active_consumer looks like over time

The whole point: at every moment, this variable holds at most one effect, and only while that effect’s body is mid-execution.

moment                       | _active_consumer | port._subs | db_url._subs
-----------------------------|------------------|------------|--------------
program start                | None             | {}         | {}
Signals constructed          | None             | {}         | {}
                             |                  |            |
Effect(render_port) created  |                  |            |
  set(A) → token             | A                | {}         | {}
  render_port() runs         | A                | {}         | {}
    port.get():              |                  |            |
      sees A in ACV          | A                | {}         | {}
      port._subscribers.add  | A                | {A}        | {}
      A._deps.add(port)      | A                | {A}        | {}
    print "Port is 8080"     | A                | {A}        | {}
  reset(token)               | None             | {A}        | {}
                             |                  |            |
Effect(render_full) created  |                  |            |
  set(B) → token             | B                | {A}        | {}
  render_full() runs         | B                | {A}        | {}
    port.get():              |                  |            |
      sees B in ACV          | B                | {A}        | {}
      port._subscribers.add  | B                | {A,B}      | {}
      B._deps.add(port)      | B                | {A,B}      | {}
    db_url.get():            |                  |            |
      sees B in ACV          | B                | {A,B}      | {}
      db_url._subs.add       | B                | {A,B}      | {B}
      B._deps.add(db_url)    | B                | {A,B}      | {B}
    print                    | B                | {A,B}      | {B}
  reset(token)               | None             | {A,B}      | {B}
                             |                  |            |
port.set(9090)               | None             | {A,B}      | {B}
  iterate port._subscribers  |                  |            |
    A._notify() → A.run():   |                  |            |
      set(A) → token         | A                | {A,B}      | {B}
      render_port() (re-run) | A                | {A,B}      | {B}
        port.get() re-tracks | A                | {A,B}      | {B}   (idempotent)
      reset(token)           | None             | {A,B}      | {B}
    B._notify() → B.run():   |                  |            |
      set(B) → token         | B                | {A,B}      | {B}
      render_full() (re-run) | B                | {A,B}      | {B}
      reset(token)           | None             | {A,B}      | {B}
                             |                  |            |
db_url.set("postgres://...") | None             | {A,B}      | {B}
  iterate db_url._subs       |                  |            |
    B._notify() → B.run():   |                  |            |
      set(B) → token         | B                | {A,B}      | {B}
      render_full() (re-run) | B                | {A,B}      | {B}
      reset(token)           | None             | {A,B}      | {B}

Two things jump out:

  1. _active_consumer is None most of the time. It only flips to a real value briefly, while one specific effect’s body is on the call stack. Once that body returns, the slot goes back to whatever it was before (here, None).
  2. The subscriber sets only ever grow during effect runs. They are the persistent graph. The ContextVar is the temporary “who’s asking” badge that lets a Signal write into the right Effect’s dep set.

Sequence diagram (one flow, two effects)

sequenceDiagram
    participant Main
    participant ACV as _active_consumer
    participant A as Effect A<br/>(render_port)
    participant B as Effect B<br/>(render_full)
    participant Port as Signal port
    participant DB as Signal db_url

    Note over ACV: None
    Main->>A: create
    A->>ACV: set(A)
    Note over ACV: A
    A->>Port: get()
    Port->>ACV: get()
    ACV-->>Port: A
    Port->>Port: _subs.add(A)
    Port->>A: _deps.add(port)
    Port-->>A: 8080
    A->>ACV: reset
    Note over ACV: None

    Main->>B: create
    B->>ACV: set(B)
    Note over ACV: B
    B->>Port: get()
    Port->>ACV: get()
    ACV-->>Port: B
    Port->>Port: _subs.add(B)
    Port-->>B: 8080
    B->>DB: get()
    DB->>ACV: get()
    ACV-->>DB: B
    DB->>DB: _subs.add(B)
    DB-->>B: "postgres://prod"
    B->>ACV: reset
    Note over ACV: None

    Main->>Port: set(9090)
    Port->>A: notify
    A->>ACV: set(A)
    Note over ACV: A
    A->>Port: get() (re-track)
    A->>ACV: reset
    Note over ACV: None
    Port->>B: notify
    B->>ACV: set(B)
    Note over ACV: B
    B->>Port: get() (re-track)
    B->>DB: get() (re-track)
    B->>ACV: reset
    Note over ACV: None

Why one variable is enough

A single execution flow can only run one piece of code at a time. So “currently running” is always a slot of size one, never a list. Effects are sequential within a flow even when there are millions of them — the engine just enters and exits them one after another, reusing the same slot.

The reason this seems counter-intuitive at first is that most programmers think of “tracking” as something a long-lived structure does. Here it’s the opposite: the ContextVar’s job is intentionally brief. It’s a baton that gets passed in, used for one read or one effect’s body, then dropped. The data that persists is in the Signal/Effect objects themselves.

A useful analogy: a return-address register on a CPU. There’s only one such register, but every function call uses it. It works because calls don’t overlap — each one saves the register, fills it in, runs, and restores. _active_consumer is doing the same thing for “current effect” instead of “return address.”

What about concurrency?

If a single execution flow can only run one effect at a time, what happens when multiple effects need to run concurrently — e.g., two threads, or two asyncio tasks?

This is exactly where ContextVar (rather than a plain global or threading.local) matters. Each execution flow has its own copy of the variable’s value.

Main thread (event loop)
  ┌─────────────────────────────────┐
  │ _active_consumer in this flow:  │
  │   …None → A → None → B → None…  │
  └─────────────────────────────────┘

Worker thread (e.g. background poller)
  ┌─────────────────────────────────┐
  │ _active_consumer in THIS flow:  │
  │   …None → C → None…             │
  └─────────────────────────────────┘

Asyncio task spawned from main
  ┌─────────────────────────────────┐
  │ _active_consumer in THIS flow:  │
  │   (snapshot copy of parent's)   │
  │   …D → None…                    │
  └─────────────────────────────────┘

Each flow sees its own value. There is no global “current effect” — each running thread/task has its own. So the slot-of-size-one model holds per flow, and parallel effects don’t trample each other’s tracking. This is the key reason the engine works correctly under threading and asyncio without any locking on _active_consumer itself.

(The Signal’s _subscribers set, however, is shared across flows when threads share a Signal. That needs a lock — which is exactly what the kernel’s RLock is for. Two orthogonal problems, two different mechanisms.)

What if effects nest?

If effect A’s body somehow triggers effect B’s body synchronously on the same flow — for instance, A calls a Signal whose set logic synchronously fires B — what happens?

A.run() begins
  ACV: None → A (token_A saved, prior was None)
  A's fn() runs
    A reads sig1   → registers A
    A writes sig2  → notifies B → B.run() runs synchronously
      ACV: A → B (token_B saved, prior was A)
      B's fn() runs
        B reads sig3 → registers B  ✓ (correct!)
      ACV: B → A (reset(token_B))
    A continues
    A reads sig4   → registers A    ✓ (still correct!)
  ACV: A → None (reset(token_A))

The Token mechanism on set() / reset() is what makes this work. Each set saves the prior value; each reset restores it. The slot acts like a stack frame — push, run, pop. Because the token is opaque and tied to the matching set, you can’t accidentally restore the wrong value.

This is why the engine wraps every effect/computed body in try / finally: reset(token). The finally is critical even if the body raises — without it, the consumer slot would stay polluted and the next signal read on that flow would attribute itself to a dead effect.

Summary

  • _active_consumer is a ContextVar. Conceptually it’s a key into a per-execution-flow dictionary; it doesn’t store anything itself.
  • Its value is “the effect/computed whose body is currently executing on this flow,” set briefly with a Token and reset immediately after the body returns.
  • The persistent reactive graph is in the Signals’ _subscribers and the consumers’ _deps, not in the ContextVar.
  • One slot is enough because effects don’t overlap on a single flow — they take turns.
  • Multiple flows (threads/tasks) each have their own slot, automatically, courtesy of ContextVar’s per-flow semantics.
  • Nesting works because set() returns a Token that lets reset() restore the prior value, making the slot behave like a stack.

The whole thing is small because the work is done by Python’s runtime context machinery; the engine just leans on it.

See also