Threading Model
Does this work with threads? How do two threads share a Signal? Are we reinventing the wheel?
When people first read the reactive engine they tend to ask three questions in this order:
- Will this even work in a multi-threaded service?
- If two threads share a Signal, what happens?
- Doesn’t Python already have something like this in the standard library?
This page answers all three, in the same order, with concrete examples.
Q1. Does the kernel work with threads?
Yes. The reactive engine uses two complementary mechanisms:
ContextVarfor per-flow state (“who is currently running?”). Each thread orasyncio.Taskautomatically has its own value — no locking needed for the who’s-asking slot.threading.RLockfor shared state mutation (subscriber sets, version counters, batch queue). One reentrant lock guards every read-modify-write that touches data shared between flows.
The result: any number of threads can read and write Signals concurrently without losing notifications, double-firing effects, or producing torn state. This is verified by tests/test_thread_safety.py — five targeted concurrency tests, all green.
What “works” means precisely: notifications never get dropped, version counters are accurate, and the engine stays consistent. What it does not mean: that an @effect body is itself thread-safe — if your effect mutates a shared dict without a lock, that’s still your problem. The engine guarantees the graph is correct; the bodies are your code.
Q3. Are we reinventing the wheel?
The honest answer in three layers: stdlib primitives — reused. The reactive pattern — borrowed from JS. The combination at the layer SignalPy targets — does not exist as an off-the-shelf library, but one library (reaktiv) overlaps enough that we should be specific about where we differ.
What stdlib gives us, and what we lean on
| Need | Stdlib piece | What we use it for |
|---|---|---|
| Per-flow state | contextvars.ContextVar |
_active_consumer — automatic per-thread/task isolation |
| Mutual exclusion | threading.RLock |
All shared-state mutations in reactive.py |
| Async task context propagation | asyncio.Task (uses copy_context() automatically) |
@effect async def … — child task inherits _active_consumer |
| Cleanup on scope exit | try/finally, ContextVar.reset(token) |
Restoring the consumer slot, even on exceptions |
We do not roll our own threading primitives. We do not roll our own context machinery. The engine is a thin coordinator on top of them.
What stdlib does not give us
- A reactive graph. There’s no
signal.add_subscriber()/effect.add_dependency()in the stdlib. That’s the bookkeeping the ~450-line engine adds. - Auto dep tracking. Without the
_active_consumerContextVar plus theSignal.get/Effect.runcooperation, you’d be writing callback-registration code by hand — exactly what the reactivity by example page contrasts as “the glue you don’t write.” - Notification fan-out with batching. Vue’s
nextTick, ourbatch()context — neither is in stdlib.
“Aren’t signals just a subscriber registry?”
A reasonable critique: at the lowest level, Signal._subscribers is literally a set of consumers. The kernel’s ServiceRegistry is also a registry. Why is one called “reactive” and the other isn’t?
The difference is who writes the registration code. With a plain observer registry:
# Observer pattern (e.g., blinker, PyDispatcher)
config.url_changed.connect(my_callback) # explicit subscribe
config.token_changed.connect(my_callback) # one per signal
config.url_changed.disconnect(my_callback) # explicit unsubscribeWith auto-tracking signals:
@effect
def my_effect(self):
url = self.rt.config.get("url") # auto-subscribe
token = self.rt.config.get("token") # auto-subscribe
# no disconnect — engine cleans up at disposeSame set underneath. But the effect doesn’t know it’s subscribing, the framework doesn’t know which signals matter — both sides discover that fact at runtime through the _active_consumer ContextVar hand-off during get(). Removing this layer is what produced the @bind/@unbind decorators of v1, the kind of glue every user eventually got tired of writing.
So the registry critique is true at the data-structure level and mostly wrong at the code-the-user-writes level. The reactive layer is two things on top of the registry: auto-tracking (saves the subscribe), and propagation policy (saves the diamond/order/supersede bookkeeping — see the propagation algorithm below).
The Python landscape, honestly
The closest active competitor is reaktiv (PyPI v0.21.3, Feb 2026) — a Python Signal/Computed/Effect lib explicitly aimed at backend use (FastAPI, IoT, data pipelines), inspired by Angular/SolidJS. We had not benchmarked against it directly until this audit; here is where the libraries differ:
| Library | Maintenance | Cells (Sig/Comp/Eff)? | Auto-track? | Thread-safe? | Async supersede? | DI / component model? |
|---|---|---|---|---|---|---|
| SignalPy | active (this repo) | Yes | Yes | Yes (RLock + ContextVar) | Yes (is_stale() + cancel_on_supersede) |
Yes (@component/@provides/@requires) |
| reaktiv | active (Feb 2026) | Yes | Yes | Not documented | Experimental; docs recommend sync effect that spawns asyncio.create_task |
No |
| observ (Vue 2 port) | active (Feb 2026) | Yes | Yes | No (class-level Dep.stack) |
Not officially supported | No (UI-focused; patchdiff dep) |
| traitlets | mature | Cells but no auto-track (.observe(handler)) |
No | Not claimed | Async observers open since 2017 | No (Jupyter-coupled) |
HoloViz param |
active | .rx/.bind reactive expressions |
Partial | Not claimed | Sync-oriented | No (HoloViz/Panel-coupled) |
| blinker | active (Pallets) | No (pure pub-sub) | No (manual connect) |
Yes | send_async exists; no graph |
No |
| RxPY / aioreactive | RxPY slow / aioreactive niche | Streams, not cells | Manual operator pipelines | N/A | Different paradigm | No |
| Trellis (PEAK) | discontinued | Yes (historical interest) | Yes | — | — | — |
| Solara / Reflex state | active | Yes, framework-bound | Yes | Framework-managed | Framework-managed | Framework-managed (UI) |
The picture: there is exactly one other actively-maintained Python library doing fine-grained reactive cells aimed at backend services (reaktiv). It stops at the primitive layer. SignalPy extends past it in three concrete directions:
- Documented thread safety. Every shared-state mutation goes through one RLock; every “who is asking” lookup goes through a ContextVar.
reaktiv’s docs do not make a thread-safety claim;observ,param,traitletsare all single-thread by design. - Async supersede semantics. When a notification arrives during an in-flight async effect, SignalPy schedules a re-run instead of silently dropping.
is_stale()for cooperative bail; thecancel_on_supersede=Trueflag for resource-bound bodies.reaktivdocuments async effects as experimental and recommends working around them by spawning a task from a sync effect. - Component composition. SignalPy’s reactive primitives are wired into
@component/@provides/@requiresso the injected service itself is a Signal, and reads viaself.rt.Xare tracked automatically.reaktivis just primitives — you’d build the DI layer yourself.
Could we build SignalPy on top of reaktiv instead? In theory. In practice, the parts we extend most heavily — thread safety, supersede semantics — are at the primitive layer where the two libraries’ implementations are not interchangeable. The honest trade-off is: if all you need are reactive cells in single-thread code, reaktiv is smaller and well-maintained, use it. If you need backend-service composition (DI, lifecycle, hot-reload, structural scoping — per-component isolation of credentials and storage by default, not opt-in) and concurrency guarantees, that combination is what the SignalPy kernel supplies.
The rest of the landscape is in different niches: pub-sub event buses (blinker), push streams (RxPY, aioreactive), notebook data binding (traitlets, param), or framework-bound state (Solara, Reflex). None of those are wheels we’d be reinventing because none target the same problem.
The propagation algorithm — one logic, four corner cases
You may have a hunch about what the engine is doing: “group the affected effects into a subgraph, dedupe, deal with the awkward ones.” That’s exactly right. Every behavior in the rest of this document falls out of one five-step algorithm:
Signal.set(new_value)
│
▼
┌──────────────────┐
① Track │ collect the subgraph │
│ (transitive readers │
│ of this signal) │
└──────────┬───────────┘
│
▼
┌──────────────────┐
② Queue │ enqueue each effect│
│ in a global batch │
│ (set-dedup) │
└──────────┬───────────┘
│
▼
┌──────────────────┐
③ Resolve│ for each effect: │
│ is it OK to run? │ ← this is where
│ defer? skip? │ "corner cases" live
└──────────┬───────────┘
│
▼
┌──────────────────┐
④ Order │ sort by creation │
│ order (_id) │
└──────────┬───────────┘
│
▼
┌──────────────────┐
⑤ Flush │ run each effect │
│ exactly once │
└──────────────────┘
That’s the whole thing. Everything else in this section is one specific situation that step ③ has to make a decision about.
Step ① — Track. When an effect (or computed) reads Signal.get(), the signal records the consumer in its subscriber set. After the body runs, every signal it touched knows about it. This is just lazy graph-building — see Reactivity by Example for the cycle.
Step ② — Queue. When Signal.set fires, the engine walks the subscriber set transitively (including computeds that propagate to their subscribers) and collects every reachable effect into the global batch queue. The queue is a set, so an effect reached via two paths (a diamond) is only enqueued once.
Step ③ — Resolve. For each enqueued effect, the engine asks: can I run this body right now? Four answers are possible — they are the four “corner cases” the rest of this doc discusses:
| Situation at the time the effect is dequeued | What step ③ decides |
|---|---|
| Effect is idle. Normal case. | Run it. |
| Effect is sync, currently running (it just wrote to a signal it itself reads → re-entry). | Skip. The body is already executing on the latest value; running it again would loop. |
Effect is async, mid-await (a notification arrived while a previous run is still suspended at an await). |
Defer. Set _pending_run=True on the effect. When the in-flight body finishes, the engine re-runs it from the finally block. Optionally cancel the in-flight task (cancel_on_supersede=True). |
| Effect is disposed. | Drop. It’s gone. |
Step ④ — Order. Each effect has a creation-time _id counter. The queue is sorted by _id before flushing, so parents created before children always run before them. Without this you’d get non-deterministic order driven by set iteration (hash-randomized between Python processes).
Step ⑤ — Flush. Run each effect’s body. Bodies may themselves call Signal.set — those writes also enter step ① and queue more effects. The flush loop runs until the queue is empty (with a 100-iteration runaway guard, in case the user’s code has a genuine cycle that won’t quiesce).
Where does batch() fit in?
batch() is just a way to defer step ⑤. While _batch_depth > 0, nothing flushes — writes accumulate in the queue. When the outermost batch exits, the queue is sorted and run once. Two writes that would normally trigger two cascades collapse into one.
The engine opens a batch implicitly at three places:
- around every
Signal._notify_subscriberscall — so a singleSignal.setdeduplicates downstream effects (the “diamond” case). - around every
Computed._recomputecall — same reason at the computed layer. - around every sync effect body — so writes from inside the body coalesce.
Users only need to write with batch(): themselves when two top-level writes should land atomically:
with batch():
cfg.set("url", a)
cfg.set("token", b)
# only one flush — readers see (a, b), never (a, old_token).This is the one place users still need the explicit primitive — the engine cannot guess that two unrelated set calls are correlated.
Async bodies are the exception: they do not get an implicit batch, because _batch_depth is process-global and a long-running async body would hold depth>0 across its awaits, swallowing every other task’s cascade — including its own supersede signal. Wrap explicitly inside async if you need write coalescing.
Verified table of cases
Each row is “what step ③ does in this situation,” tied to a test that pins the behavior:
| Case | What you’d fear | What step ③ does | Test |
|---|---|---|---|
| Diamond (A → B and A → C, both feeding E) | E fires twice | Step ② enqueues E once (set-dedup); step ⑤ flushes once. | test_diamond_runs_effect_once |
| Multi-effect fan-out | Order varies between Python processes | Step ④ sorts by _id (deterministic). |
test_effect_order_outside_batch_matches_creation_order |
| Sync effect writes two signals | Two cascades, dependents fire twice | Sync body wrapped in implicit batch — writes coalesce. | test_writes_from_effect_body_coalesce |
| Computed writes a signal during recompute | Same problem at the computed layer | _recompute wrapped in implicit batch. |
(covered indirectly) |
| Effect transitively writes a signal it reads | Stack overflow from re-entry | Step ③ skips (_running=True). 100-iteration runaway guard catches genuine cycles. |
(covered indirectly) |
| Async effect notified mid-await | Notification dropped — body finishes with stale value | Step ③ defers (sets _pending_run). Finally block re-runs after in-flight completes. |
test_async_pending_rerun_fires_after_inflight_finishes |
| Async effect, body holds a resource | Old in-flight run holds socket/lease the new run needs | Opt into cancel_on_supersede=True — step ③ defers AND cancels the in-flight task. |
test_cancel_on_supersede_aborts_inflight |
Once you see step ③ as the resolution policy, the rest of this doc is just which policy to pick when. Read on for the async cases — they have the most decisions to make because async bodies persist across awaits.
Is batch() the most reliable escape hatch?
Yes — for the problem it actually solves. The three escape hatches sit at different points in the algorithm and address different problems:
| Hatch | Where in the algorithm | Problem it solves |
|---|---|---|
with batch(): |
Step ⑤ (flush deferral) | Correlated writes: two top-level set calls that should land atomically. |
is_stale() |
Step ③, async case | Wasted work inside a long async body that’s about to be redone. |
cancel_on_supersede=True |
Step ③, async case | A resource held across an await that the next run needs to acquire. |
batch() is the most fundamental and the most reliable for one reason: it controls step ⑤ directly, deterministically, and locally. You decide the boundaries, the engine respects them, no surprises:
# Always works. Never wrong. Reader sees both new values or neither.
with batch():
cfg.set("url", new_url)
cfg.set("token", new_token)The async hatches are different — they manage the temporal gap between when an effect starts a run and when it finishes one. No amount of batch() wrapping at the call site can help, because the awaiting body is in its own flow, suspended past the moment your batch ended.
# Wrapping the writer in batch does NOT help the long-running effect:
with batch():
cfg.set("url", new_url) # batch ends here
# long-poll effect still mid-await
# no checkpoint, no way to bail —
# batch can't reach into another flowSo:
- Correlated writes → always
with batch():. Reach for it first; it’s the one hatch that’s purely additive (wrong-batches just delay flush; they don’t change semantics). - Long async body, decomposable work →
is_stale()betweenawaits. Cheap, no exception machinery. - Long async body, resource-bound →
@effect(cancel_on_supersede=True). Hard but necessary when nothing else can release the resource in time.
If you’re not sure which problem you have: ask whether the issue is “two writes shouldn’t be observed apart” (use batch()) or “a long-running body is reading stale state” (use the async hatches). The two questions never have the same answer.
Async effect supersede: what happens when notifications race the body
This section explains the mechanism. For a recipe-shelf summary of when each escape hatch (batch(), is_stale(), cancel_on_supersede, and loop.call_soon_threadsafe for cross-thread writes) is the right tool — with copy-pasteable scenarios — see Patterns → Reactive Intent vs Default.
Async effects are where reactive frameworks usually leak abstractions. Imagine this:
@effect
async def reload_connection(self):
url = self.rt.config.get("url") # tracked ✓
self._client = await connect_slowly(url)The user changes url while connect_slowly() is still in flight. What does the engine do? In a naive implementation, three different bad things can happen:
- Silent drop. The notification arrives during the in-flight body, sees
_running=True, and gets discarded. The new URL is never reconnected to. (This is what SignalPy v0.x did. It’s the bug we spent the P1 pass fixing.) - Stale write wins. Both the old and new connections complete; the old one finishes second and overwrites
self._client, leaving the user pointing at a connection for the wrong URL. - Stack overflow / re-entry. The notification triggers a synchronous re-run of the body before the first await returns, blowing the stack.
SignalPy now handles all three correctly out of the box, and gives you two escape hatches when the default isn’t what you want.
Default: pending re-run after in-flight finishes
You write the same code you’d write if races didn’t exist:
@effect
async def reload_connection(self):
url = self.rt.config.get("url")
self._client = await connect_slowly(url)What happens when cfg.set("url", "https://b") lands while the first connect_slowly("https://a") is still mid-await:
| Step | Time | What happens |
|---|---|---|
| 1 | t=0 | First run starts. _running=True. Body reads url="https://a", calls await connect_slowly("https://a"). |
| 2 | t=10ms | Outside, cfg.set("url", "https://b") fires. The engine sees _running=True and sets _pending_run=True instead of dropping the notification. |
| 3 | t=2s | First connect_slowly resolves. Body assigns self._client = <client_a>. |
| 4 | t=2s+ε | The finally block notices _pending_run=True, clears the flag, and re-runs the effect. Second body reads url="https://b" and connects. |
| 5 | t=4s | self._client = <client_b>. Done. The “wrong” client briefly existed at step 3, but step 5 overwrote it before any code that depends on self._client could read it (because that code is also reactive and re-runs in step 4’s same flush). |
Zero user code changes required. The body runs to completion every time. The supersede signal is never lost. Verified by test_async_pending_rerun_fires_after_inflight_finishes.
await
Python’s contextvars persist across await within a task, so signal reads after the first await in an async effect’s body are usually tracked correctly — _active_consumer is still set to the effect when control resumes. This is more robust than Vue 3’s situation, where JS’s lack of contextvars makes post-await tracking unreliable.
That said, the kernel’s source comment (reactive.py:412) is conservative on this point: it does not guarantee tracking after the first await, because exotic body shapes (sub-tasks via asyncio.gather, libraries that manipulate context, framework re-entry) can in principle lose the contextvar. The portable, defensive convention — read all reactive deps before the first await — is recommended for code that might run under adversarial async setups, but is not strictly required for vanilla async def bodies in SignalPy.
This default suits the vast majority of cases — config reloads, fetch data on dependency change, debounced rebuilds — where the work is either idempotent or cheap to redo.
Escape hatch 1: is_stale() for cooperative bail-out
Default behavior runs the in-flight body to completion. That’s wasteful when the work is expensive (a 5-second batch insert, a network upload) and the result is going to be thrown away anyway. Use is_stale() between awaits to skip the rest of the body voluntarily:
from signalpy.kernel import is_stale
@effect
async def index_documents(self):
docs = self.rt.docs.get()
for batch in chunked(docs, 100):
await self._index_one_batch(batch)
if is_stale():
return # newer doc set arrived; bailis_stale() flips to True the moment a notification arrives during your body. Check it at any await-point boundary that’s safe to abandon work at. Verified by test_is_stale_inside_async_body.
Why not always check it? It’s purely advisory — the engine will run the next body whether you check or not. It’s a hint to your code that the rest of the current body is wasted work.
For sync effects, is_stale() always returns False (sync bodies can’t be superseded mid-execution under the global lock; the running guard catches re-entry differently). Verified by test_is_stale_false_for_sync_effect.
Escape hatch 2: cancel_on_supersede=True for hard abort
When the in-flight work isn’t just wasteful but actively wrong to finish — e.g., a long-poll that’s still holding a connection to the old endpoint, or a CPU loop you’d rather kill than coast through — opt into task cancellation:
@effect(cancel_on_supersede=True)
async def long_poll(self):
url = self.rt.endpoint.get()
async with httpx.AsyncClient() as client:
try:
async for event in client.stream("GET", url):
self.rt.events.publish(event)
except asyncio.CancelledError:
# connection torn down; let the next run open the new one
raiseWhen a dependency changes, the engine calls task.cancel() on the in-flight task. The body sees CancelledError at the next await point. The engine then re-runs the effect with the latest values. Verified by test_cancel_on_supersede_aborts_inflight.
Pick this when: the in-flight work owns a resource (socket, file handle, GPU lease) that the next run needs to acquire. Without cancellation, the next run waits for the old one to finish releasing.
Don’t pick this when: the body has cleanup that must run on the old value (a finally block writing partial progress to disk; a two-phase commit). Cancellation interrupts mid-await — your finally will run, but only what hadn’t yet awaited will execute synchronously.
Caveat: write coalescing inside async bodies
Sync effect bodies get an implicit with batch(): wrapper, so two writes back-to-back inside an effect coalesce — dependents see one update, not two. Async bodies do not. The reason: _batch_depth is process-global, and a long-running async body would hold depth>0 across its awaits, swallowing every other task’s notification cascade — including its own supersede signal. The cure was worse than the disease, so we don’t apply it.
If you need write-coalescing inside an async body, do it explicitly:
@effect
async def aggregate(self):
docs = self.rt.docs.get()
summary = await self._summarize(docs)
with batch():
self.rt.summary.set(summary.text)
self.rt.tags.set(summary.tags)
self.rt.last_updated.set(time.time())
# Dependent effects re-run once for these three writes, not three times.Without the explicit with batch():, each set() triggers a separate flush, and effects that read more than one of these signals fire once per write.
Putting it together: a decision flow
Default — read deps before await, write results after, ignore supersede:
@effect
async def thing(self):
x = self.rt.x.get()
y = self.rt.y.get()
result = await compute(x, y)
self.rt.result.set(result)Bail out early on supersede if the work is expensive:
@effect
async def thing(self):
items = self.rt.items.get()
for item in items:
await process(item)
if is_stale():
returnCancel in-flight on supersede if the resource must be released first:
@effect(cancel_on_supersede=True)
async def thing(self):
url = self.rt.url.get()
async with open_connection(url) as conn:
await conn.read_forever()No flag answers all three. The default is what most code wants, and the two opt-ins exist precisely because the engine cannot guess whether your body is cheap-to-redo (default), expensive-but-bailable (is_stale), or holds a resource (cancel_on_supersede).
When the threading model is not enough
Two cases where you still need to think:
2. Effects that span an await
@effect
async def reload(self):
url = self.rt.config.get("url") # ✓ tracked
key = self.rt.creds.get("key") # ✓ tracked
await self._reconnect(url, key) # ✗ not tracked — fine here
# because we already read what we needed
other = self.rt.something.peek() # ✗ not tracked even if we'd wanted
# it to be — read before the awaitRead all signals you depend on before the first await. After the await, the microtask resumes in a different context and _active_consumer may be reset. This is the same restriction Vue 3 and SolidJS document for their async-effect equivalents — see the reactive engine line by line doc for the detailed mechanics.
This is purely a tracking limitation, not a correctness one — if a dependency changes while the body is awaiting, the engine still re-fires the effect (see “Async effect supersede” above). You just have to read the signal at the top so the engine knows it’s a dependency in the first place.
See also
- Reactivity by Example — start here for the cycle
- Context, ContextVar, and the Active Consumer — why one slot is enough
- Reactive Engine: Line by Line — every lock acquisition annotated
tests/test_thread_safety.py— the five concurrency tests that pin this behavior- PEP 567 —
ContextVardesign rationale - Python
contextvarsdocs