Reactive Intent vs Reactive Default

Seven situations where a naive @effect doesn’t reflect what the programmer meant — and the escape hatches for each.

Why this pattern exists

The reactive default — every write notifies; every notified effect re-runs — is what you want most of the time. But four recurring situations break that assumption, and a programmer who hasn’t seen them tends to either ship subtle bugs (auth attempts with mismatched creds, half-cancelled streams) or build elaborate workarounds (manual debouncing, state-machine glue, ad-hoc locks).

This page is the recipe shelf. Each section: what the default does → what goes wrong → the hatch → when not to reach for it.

Situation Default behavior Bug it causes Hatch
Two writes that mean one update Effect fires twice on a half-state Action with mismatched values (auth error, partial commit) with batch():
Async effect mid-flight, deps change, body decomposable Old run completes fully, then re-runs Wasted work — minutes of compute thrown away is_stale()
Async effect mid-flight, body holds a resource Old run blocks the next Streams stuck on an old endpoint, leases never released @effect(cancel_on_supersede=True)
Cross-thread signal.set() from a worker / blocking SDK callback Sync effects fire on the worker thread; async effects silently fail Async effects never run; thread-affine work crashes loop.call_soon_threadsafe(signal.set, v)
Mutating a value in place and re-set()-ing it No notification fires (identity check, no change) “I changed the dict — why did nothing react?” Replace, don’t mutate: signal.set({**old, "k": v})
Effect runs once at activation, before any “change” Body fires on startup Phantom-event bugs: a “rotation handler” fires on first boot _initialized guard, or move setup into @lifecycle.activate
Effect allocates a resource on each run Old resource stays open when new one replaces it on self Connection / file-descriptor / thread leaks accumulating per re-run Close-prev-then-open-new at top of each run

The first three are documented inline in tutorial 3 as part of teaching @effect. The fourth (cross-thread) and last three (value semantics, activation timing, resource cleanup) are new here. All seven reference Threading Model for the underlying algorithm.


1. Two writes mean one thing — batch()

What goes wrong

You’re rotating an API endpoint and its credential together. Both live in config; the deploy script writes them one after the other:

config.set("api.url",   "https://api-v2.example.com")
config.set("api.token", "tok_v2_xxxxx")

A naive consumer:

@effect
async def reconnect(self):
    url   = self.rt.config.get("api.url")
    token = self.rt.config.get("api.token")
    await self._open(url, token)

The effect runs twice. First run: (v2_url, v1_token) — a real auth attempt with mismatched creds, going to a real server, generating a real audit-log alarm. Second run: (v2_url, v2_token), succeeds.

The first run wasn’t just wasted — it was wrong.

The hatch

from signalpy.kernel import batch

with batch():
    config.set("api.url",   "https://api-v2.example.com")
    config.set("api.token", "tok_v2_xxxxx")
# effect fires ONCE, with both new values

batch() defers notifications until the block exits, then flushes once. From the effect’s point of view no half-state ever existed.

When not to reach for it

The kernel batches automatically in two common cases:

  • Hot-add / hot-remove cascades. A single kernel.hot_add(GermanDict) is one cascade, one effect run, even though it touches multiple internal Signals.
  • Diamond dependencies. If A → B, A → C, and an effect reads both B and C, a single A update runs the effect once, not twice.

You only need batch() for two top-level set() calls that are semantically one update. The kernel cannot tell two unrelated writes from two correlated writes; you have to declare the correlation.


2. Async work about to be redone — is_stale()

What goes wrong

The default async-supersede behavior is: if a notification arrives while the body is mid-await, the engine queues a re-run for after the current body finishes. Always safe.

That’s fine for a 200ms config reload. It’s not fine for a 30-second batch indexer that’s already obsolete:

@effect
async def reindex(self):
    documents = self.rt.docs.list()           # tracked
    for chunk in chunked(documents, 100):
        await self.rt.search.write(chunk)     # 30s total
    # docs changed at second 5 — but we just spent 25s writing stale data

The default will re-run after the 30s body finishes. The 25 seconds of stale writes happened anyway.

The hatch

from signalpy.kernel import is_stale

@effect
async def reindex(self):
    documents = self.rt.docs.list()
    for chunk in chunked(documents, 100):
        await self.rt.search.write(chunk)
        if is_stale():
            return                            # newer doc set arrived

is_stale() flips to True the moment a notification arrives. Check it at any await-point that’s safe to abandon work at. The engine will fire the next run regardless — is_stale() is purely your hint to skip the rest of this run.

When not to reach for it

  • Fast bodies (< ~100ms). The bookkeeping is more code than it’s worth.
  • Bodies that aren’t decomposable into checkpoints. If you can only bail at the very end, there’s nothing to bail from.
  • Bodies that mutate external state in ways that can’t be left half-applied. is_stale() returns from the body, which means partial work is committed, not rolled back. If half-written results are corrupt, use a transaction or a staging buffer — is_stale() doesn’t undo.

3. Async work holding a resource — cancel_on_supersede

What goes wrong

Sometimes the in-flight body isn’t just wasteful but blocking the next run. A long-poll holding a connection, a file lock, a GPU lease, a backend session that licenses one consumer at a time:

@effect
async def stream(self):
    url = self.rt.config.get("stream.url")
    async with httpx.AsyncClient() as client:
        async for event in client.stream("GET", url):   # holds the conn
            self.rt.publish("stream.event", event)

If config.set("stream.url", new_url) fires, the default behavior queues a re-run. But the current body never returns — async for event in client.stream(...) blocks inside a single network read, waiting for the old server to send the next event. If that takes 10 minutes, your body sits inside that one await for 10 more minutes. The next run never starts. Worse, the connection to the old endpoint stays open until something kicks it loose.

“Why isn’t is_stale() + return enough here?”

A natural question if you just read section 2: why introduce a second hatch instead of putting if is_stale(): return after the self.rt.publish(event) line?

Two reasons:

1. is_stale() is cooperative — it only fires between awaits. It’s a function you call. Python only runs your code between await points. So if is_stale(): return only takes effect at checkpoints you wrote, right after some await returns control to your code. task.cancel() fires CancelledError at the next await, including the one currently blocked. You don’t have to be holding the CPU.

2. The stream body has no checkpoints to reach. The indexer in section 2 awaits self.rt.search.write(chunk) — short calls returning control to your loop frequently. Many checkpoints. The stream body has one effective await: the next iteration of async for, which is itself blocked on a network read that may take minutes. Even if you added if is_stale(): return after publish, you’d still wait for the next event from the old server before reaching it. Cancel doesn’t wait.

Hatch Mechanism Works when
is_stale() + return You poll a flag at points you choose The body returns control to you frequently — short awaits in a loop
cancel_on_supersede The runtime injects CancelledError at the next await The body is stuck in a long await you don’t control

Mental model: is_stale() is “I’ll bail when I get a chance.” cancel_on_supersede is “stop now, I don’t care what you’re doing.”

The hatch

@effect(cancel_on_supersede=True)
async def stream(self):
    url = self.rt.config.get("stream.url")
    async with httpx.AsyncClient() as client:
        async for event in client.stream("GET", url):
            self.rt.publish("stream.event", event)

When the dep changes, the engine calls task.cancel() on the in-flight task. The async generator inside client.stream raises CancelledError, the async with block closes the connection cleanly, and the engine re-runs the effect with the new URL.

When not to reach for it

  • The body has finalizers that must complete and that don’t tolerate CancelledError. Cancellation is cooperative — Python raises CancelledError at the next await, but finally: blocks must themselves yield to actually run their cleanup. If your finally: does a blocking os.remove() of a temp file, it’ll run; if it does await self._send_goodbye(), the cancel will likely propagate through it. Test it.
  • Idempotent bodies that just compute and return. Use the default — there’s nothing to cancel that the GC won’t clean up.

If you find yourself wanting both is_stale() and cancel_on_supersede in the same effect, that’s a sign the effect is doing too much. Split it.


4. Cross-thread writes — marshalling to the kernel loop

What goes wrong

The kernel guards shared state with a single global RLock, so calling signal.set(v) from another thread is value-safe: no torn writes, no corrupt subscriber sets. But two things still go wrong if you do it naively.

Sync effects fire on the writing thread. A worker-pool thread calls signal.set(v); any @effect subscribed to that signal runs on the worker thread, not the kernel’s event loop thread. If the effect calls self.rt.invoke(...), touches a connection pool that’s loop-affine, or logs through a handler that expects loop context, things break in hard-to-diagnose ways.

Async effects silently fail. An async effect’s Effect._notify() calls asyncio.get_running_loop() to schedule its body. If the writing thread doesn’t have a running loop (worker threads don’t), it raises RuntimeError, which the engine swallows by setting _running = False and moving on. The effect simply does not run. No exception bubbles up.

This bites in three common situations:

  • Blocking SDK callbacks. A library hands you a callback that fires on its own thread (Kafka consumer, gRPC stream, native audio).
  • run_in_executor results. You await loop.run_in_executor(...) and inside the worker callable you call signal.set(result).
  • GUI integrations. Tkinter / Qt fire events on the GUI thread.

The hatch

Marshal the set() back to the kernel’s loop:

import asyncio

class KafkaBridge:
    @lifecycle.activate
    def activate(self):
        self._loop = asyncio.get_running_loop()      # capture once
        self._latest = Signal(None)
        self._consumer = KafkaConsumer(...)
        self._consumer.on_message = self._on_kafka_message

    def _on_kafka_message(self, msg):
        # called from Kafka's own thread — DO NOT touch self._latest directly
        self._loop.call_soon_threadsafe(self._latest.set, msg.value)

loop.call_soon_threadsafe is the asyncio-blessed primitive for “schedule this on the loop thread.” When the loop picks it up, the set() runs inside the loop, Effect._notify() finds the running loop, and async effects schedule normally.

Worked example: blocking executor callback

@component("transcoder")
@requires(config=IConfig)
class Transcoder:

    @lifecycle.activate
    def activate(self):
        self._loop = asyncio.get_running_loop()
        self._progress = Signal(0.0)

    @runnable("transcode", params=TranscodeParams, description="Transcode video")
    async def transcode(self, params):
        await self._loop.run_in_executor(
            None, self._do_transcode, params.path)

    def _do_transcode(self, path):                   # runs on a worker thread
        for pct in ffmpeg_pipeline(path):
            # WRONG: self._progress.set(pct)        # async effects won't fire
            self._loop.call_soon_threadsafe(self._progress.set, pct)

A consumer can now subscribe to self._progress with an async @effect and it will reliably fire from the loop thread.

When not to reach for it

  • Pure async code. If every write to the signal happens inside an async def reached via the kernel’s normal call paths, you’re already on the loop thread. No marshalling needed.
  • Sync-only effects that don’t touch loop-affine resources. A sync @effect that just appends to a list or updates a metric counter is fine on a worker thread — the RLock covers it.

The hatch is for bodies that cross the boundary: data produced on one thread, observed by effects that need to be on another.


5. Mutating a value in place doesn’t notify

What goes wrong

old = self._cache.peek()
old["new_key"] = "value"        # mutate in place
self._cache.set(old)            # NO-OP

The kernel uses identity comparison (is not) in Signal.set — see reactive.py:144. Calling set() with the same object the Signal already holds is silently dropped, regardless of whether you mutated its contents. The intent (“I changed the data, please react”) is foiled by the default (“nothing changed by identity”).

Why identity, not equality

This is a category-wide convention, not a SignalPy quirk:

Framework Bail-out check
React useState Object.is (≈ identity for objects)
Vue 3 reactivity Object.is
Solid signals === (reference equality)
MobX reference equality (configurable)
SignalPy is not

The reasoning every framework converged on:

  • Cost. == on a 10MB dict is O(n). is is one pointer compare. The check runs on every set().
  • Predictability. With identity, you decide when notifications fire by choosing whether to create a new object. With ==, the framework decides for you and you can’t override it.
  • Pedagogy. Identity-comparison forces immutable updates. That friction is intentional — frameworks that allow in-place mutation grow subtle propagation bugs.

And the kicker: == wouldn’t help with the mutate-in-place case either. A mutated dict is == to itself (same object). Switching to == would only suppress notifications when you write a new object with equal contents — a rare case better handled by checking before set().

The hatch

Replace, don’t mutate:

self._cache.set({**self._cache.peek(), "new_key": "value"})

or use the convenience wrapper:

self._cache.update(lambda d: {**d, "new_key": "value"})

For lists: set([*old, new_item]). For sets: set(old | {new_item}). The pattern generalizes — produce a new container, don’t mutate the held one.

When not to reach for it

This isn’t an opt-in pattern; it’s how the framework expects you to write state updates. If you find yourself wanting to mutate-in-place, the question to ask is: does anything reactive depend on this value? If no, fine — it’s just a regular Python dict, mutate freely. If yes, you owe the engine a new object.


6. The first run at activation isn’t always what you wanted

What goes wrong

@effect runs once at activation, before any dependency has “changed.” For some effects this is desired (initialize from current state). For others it’s a phantom event:

@effect
async def on_url_rotation(self):
    url = self.rt.config.get("api.url")
    await self._notify_ops_team(f"URL rotated to {url}")   # PAGES SOMEONE

The intent: react to future rotations. The default: also fire on startup, because tracking-the-first-read counts as a “run.” Ops gets paged every time the service boots.

The hatch

A _initialized guard:

@effect
async def on_url_rotation(self):
    url = self.rt.config.get("api.url")     # always read — establishes tracking
    if not getattr(self, "_initialized", False):
        self._initialized = True
        return
    await self._notify_ops_team(f"URL rotated to {url}")

The first read still has to happen — that’s how the engine learns this effect depends on api.url. The guard skips only the side effects.

For a cleaner separation, split the work between @lifecycle.activate (one-shot setup) and @effect (delta handler):

@lifecycle.activate
def activate(self):
    self._url = self.rt.config.peek("api.url")     # peek — no tracking

@effect
async def on_url_rotation(self):
    new_url = self.rt.config.get("api.url")        # tracked
    if new_url == self._url:
        return                                      # phantom (also covers 1st run)
    old, self._url = self._url, new_url
    await self._notify_ops_team(f"URL rotated: {old}{new_url}")

This pattern (compare against a stored “last known value”) doubles as duplicate-write suppression — useful for any effect whose body costs real money or wakes a human up.

When not to reach for it

If the body is idempotent and cheap (rebuild an in-memory index, recompute a string), the first run is harmless and saves you a “first-time initialization” path. The guard is for effects with side effects you wouldn’t want to fire spuriously: paging, billing, notifications, audit log entries, rate-limited API calls.


7. Effects don’t auto-clean what they allocated last time

What goes wrong

An effect re-runs because a dep changed. Whatever it allocated last time (a connection, a file watcher, a background thread) is still alive — the variable just got overwritten:

@effect
def open_listener(self):
    url = self.rt.config.get("api.url")
    self._listener = SDK.connect(url)   # opens a TCP connection

Run 1 (url=v1): opens TCP connection #1 to v1, stores in self._listener.

Run 2 (url changed to v2): opens TCP connection #2 to v2, overwrites self._listener. Connection #1 is still open — the SDK has no idea we abandoned it. Now you have two open connections, one of them useless. Each rotation adds another. By midnight you’re out of file descriptors.

The effect expressed intent: “the listener should be configured for the current url.” The default does only the forward half of that (open new); the backward half (close old) it leaves to you.

Why isn’t this cancel_on_supersede?

cancel_on_supersede is for async effects whose body is currently mid-await. This effect is sync — it returned immediately. There’s no in-flight body to cancel. The cleanup we need is for state stored on self between runs, not work happening during a run.

The hatch

Track on self, close at the top of every run:

@effect
def open_listener(self):
    # close previous, if any
    prev = getattr(self, "_listener", None)
    if prev is not None:
        prev.close()
    # open current
    url = self.rt.config.get("api.url")
    self._listener = SDK.connect(url)

Yes, this is React’s useEffect cleanup pattern, just inverted. Instead of “return a cleanup callback that runs before the next effect,” you “open with the previous still around, close it at the start of the next.” Same lifecycle, different syntax.

For deactivation cleanup (when the component shuts down), pair with @lifecycle.deactivate:

@lifecycle.deactivate
def deactivate(self):
    if getattr(self, "_listener", None) is not None:
        self._listener.close()

The kernel disposes the effect on deactivate but doesn’t know what resources it owned — that’s still your responsibility.

When not to reach for it

  • Pure-compute effects. A body that rebuilds an index or recomputes a string doesn’t allocate anything beyond Python objects — GC handles it.
  • Effects whose “allocation” is a Signal you own. self._cache.set(new) doesn’t leak; Signal replacement is just an attribute swap.
  • Async resources mid-await. Use cancel_on_supersede=True (section 3). The async with block closes them via CancelledError propagation.

What ties them together

All seven hatches solve the same shape of problem: the reactive engine’s defaults are locally correct mechanical answers that sometimes diverge from the developer’s notion of what matters. The engine’s “the deps changed” is precise — every write to a tracked Signal, fire each subscribed effect once. The developer’s “this matters” is fuzzier — these two writes go together; this in-flight work is wasted; that mutation should propagate; this resource shouldn’t leak.

The kernel cannot infer any of those from your code. Each hatch is a small, explicit declaration that this situation isn’t the default one:

  • batch() — “these writes are one update.”
  • is_stale() — “abandon if newer notification arrived.”
  • cancel_on_supersede — “interrupt the in-flight body.”
  • call_soon_threadsafe — “marshal this write to the loop thread.”
  • Replace-don’t-mutate — “I produced a new value, please notify.”
  • _initialized guard — “only react to changes after the first run.”
  • Close-prev-then-open — “the resource lifecycle follows the effect.”

For the underlying mechanism — the 5-step propagation algorithm and which algorithmic step each hatch controls — see Threading Model → The propagation algorithm. For the introductory teaching of batch(), is_stale(), and cancel_on_supersede, see Tutorial 3: Dynamic Services.