sequenceDiagram
participant H as hot_add(GermanDict)
participant R as Registry
participant S as Signal (rt.dicts)
participant E as @effect rebuild_index
H->>R: provide(IDictionary, german)
R->>S: Signal.set([en, fr, de])
S->>E: notify: deps changed
E->>E: re-run → rebuilds index
Note over E: Index now has EN, FR, DE
3. Dynamic Services
Services come and go. Your component reacts automatically.
The Problem
You have two dictionaries (English, French) and a spell checker. @requires(dictionary=IDictionary) injects one dictionary. But the spell checker needs all dictionaries — and it needs to handle new ones appearing and old ones disappearing at runtime.
In traditional service-oriented frameworks, you’d write @bind/@unbind callbacks — manual wiring for every service that comes and goes. In SignalPy, you use @requires(dicts=list[IDictionary]) plus a reactive method called @effect (introduced below) — a method the kernel re-runs automatically when its tracked dependencies change. The reactive engine handles propagation:
Aggregate Injection
@component("checker")
@requires(dictionaries=list[IDictionary])
class SpellChecker:
@lifecycle.activate
def activate(self):
print(f"Checker ready with {len(self.rt.dictionaries)} dictionaries")Checker ready with 2 dictionaries
@requires(dictionaries=list[IDictionary]) injects a list of all services matching the contract. The list[C] type hint tells the kernel “I want all of them.”
Reactive Effects
An @effect is a method that re-runs whenever its tracked dependencies change. Read self.rt.dictionaries inside an effect → the reactive engine tracks that. When the list changes, the effect re-runs automatically.
@component("checker")
@requires(dictionaries=list[IDictionary])
class SpellChecker:
@lifecycle.activate
def activate(self):
self.by_language = {}
@effect
def rebuild_index(self):
# Reads self.rt.dictionaries — tracked automatically.
# When the list changes, this method re-runs.
self.by_language = {}
for d in self.rt.dictionaries:
lang = getattr(d, "_language", "??")
self.by_language[lang] = d
print(f" Index rebuilt: {list(self.by_language.keys())}") Index rebuilt: ['EN', 'FR']
Instead of separate add/remove callbacks, write one method that reads the current state. The reactive engine tracks what it reads and re-runs it when those values change.
Hot-Add at Runtime
@component("dict-de")
@provides(IDictionary)
@prop("_language", "language", "DE") # @prop(attr_name, service_property, default)
class GermanDict:
@lifecycle.activate
def activate(self):
self.words = {"hallo", "welt"}
def check_word(self, word):
return word.lower() in self.words
await kernel.hot_add(GermanDict)@prop(attr_name, service_property, default)
Three arguments: the Python attribute name on the instance (self._language), the service-registry property name other components match against ("language"), and the default value. Service properties are how the registry filters and ranks providers — see “Properties & Ranking” below.
Index rebuilt: ['EN', 'FR', 'DE']
The spell checker’s @effect fires automatically. Zero changes to the spell checker code.
Hot-Remove
await kernel.hot_remove("dict-fr") Index rebuilt: ['EN', 'DE']
It didn’t know German was coming. It doesn’t know French left. The @effect kept its index current automatically.
Computed Properties
A @computed property caches its result and recomputes only when dependencies change:
@computed
def available_languages(self):
return [getattr(d, "_language", "?") for d in self.rt.dictionaries]Read self.available_languages() — always current, cached until the dictionary list changes.
When two writes belong together: batch()
A reactive engine fires effects after every write. That’s what you want most of the time — but sometimes two writes belong together, and you don’t want anyone to observe the half-updated state in between.
The problem, made concrete
You’re rotating an API endpoint and its credential at the same time. Both live in config, written one after the other.
(IConfig is one of the kernel’s built-in contracts from signalpy.kernel.contracts — ConfigProvider ships in the kernel and implements it. You can @requires(config=IConfig) without writing a provider yourself.)
@component("client")
@requires(config=IConfig)
class APIClient:
@effect
async def reconnect(self):
url = self.rt.config.get("api.url")
token = self.rt.config.get("api.token")
await self._open(url, token) # uses BOTH on every changeIn a deploy script:
config.set("api.url", "https://api-v2.example.com")
config.set("api.token", "tok_v2_xxxxx")Without batching, the effect runs twice:
| Step | Trigger | Body sees | Outcome |
|---|---|---|---|
| 1 | set("api.url", v2_url) fires |
v2_url, v1_token |
tries to open v2 endpoint with v1 token → auth error |
| 2 | set("api.token", v2_token) fires |
v2_url, v2_token |
opens correctly |
The first run wasn’t just wasted work — it was wrong. A real auth attempt went out with mismatched creds.
The fix
Wrap the correlated writes:
from signalpy.kernel import batch
with batch():
config.set("api.url", "https://api-v2.example.com")
config.set("api.token", "tok_v2_xxxxx")
# effect fires ONCE, with both new valuesNow the body sees (v2_url, v2_token) on its single run. No half-state ever exists from the effect’s point of view.
When you don’t need it
The kernel batches automatically in two common situations, so you don’t need to reach for batch():
- Hot-add / hot-remove cascades — a single
kernel.hot_add(GermanDict)causes one cascade; one effect run. - Diamond dependencies — if
A → BandA → Cand an effect reads bothBandC, a single update toAruns the effect once, not twice.
You only need batch() for two top-level set calls that are semantically one update. The kernel cannot guess they’re related; you have to tell it.
Long-running async effects: when work outlives a notification
Async effects open a new question: what happens if a dependency changes while the body is mid-await? The body has already read old values; the new write arrives before the body’s old work has finished. We say the new run supersedes the old one — a fresh notification cancels the relevance of an in-flight body.
The default is always safe: the engine remembers the new notification and re-runs the effect once the in-flight body (the one currently mid-await, not yet returned) completes. You write the obvious code and it works:
@component("client")
@requires(config=IConfig)
class APIClient:
@effect
async def reconnect(self):
url = self.rt.config.get("api.url")
self._client = await connect_slowly(url) # 2-second handshakeIf config.set("api.url", new_url) fires while connect_slowly is mid-handshake, the engine queues a re-run. When the old handshake returns, the body re-fires with the new URL. Nothing gets dropped. No code changes from you.
This works for the vast majority of cases (config reloads, fetches on dependency change, debounced rebuilds). Two specific situations need explicit handling.
When the in-flight work is expensive and about to be redone — is_stale()
Default behavior runs the in-flight body all the way to the end, even if its result is going to be discarded. That’s fine for a quick handshake. It’s not fine for a 30-second batch indexing job that’s about to be re-run with newer data.
is_stale() lets the body bail out voluntarily between awaits:
from signalpy.kernel import is_stale
@component("indexer")
@requires(docs=IDocStore, search=ISearchIndex)
class Indexer:
@effect
async def reindex(self):
documents = self.rt.docs.list() # tracked
for batch_of_docs in chunked(documents, 100):
await self.rt.search.write(batch_of_docs)
if is_stale():
return # newer doc set arrivedis_stale() flips to True the moment a notification arrives. Check it at any await-point boundary that’s safe to abandon work at. The engine will fire the next run regardless — is_stale() is purely your hint that the rest of the current body is wasted.
When the in-flight work holds a resource — cancel_on_supersede
Sometimes the in-flight body isn’t just wasteful but blocking. A long-poll holding a connection, a file lock, a GPU lease — the next run can’t even begin until the previous one releases.
@component("event-stream")
@requires(config=IConfig)
class EventStream:
@effect(cancel_on_supersede=True)
async def stream(self):
url = self.rt.config.get("stream.url")
async with httpx.AsyncClient() as client:
async for event in client.stream("GET", url):
self.rt.publish("stream.event", event) # bus eventWhen config.set("stream.url", new_url) fires:
- The engine calls
task.cancel()on the in-flight task. - The async generator inside
client.streamraisesCancelledError. Theasync withblock closes the HTTP connection cleanly. - The engine re-runs the effect with the new URL.
The next stream connects without waiting for an event from the old endpoint to release the loop.
Picking which mode
You almost never need to think about this. The default works for “fetch / reload / rebuild on dependency change” type effects. Reach for is_stale() only when the body is genuinely expensive and decomposable into checkpoint-able chunks. Reach for cancel_on_supersede=True only when the body owns a long-held resource the next run needs.
If you find yourself reaching for both in the same effect, that’s a sign the effect is doing too much — split it. See Threading Model → Async effect supersede for the full mechanism, including the table of which engine step each escape hatch controls.
For a recipe-shelf summary of all four escape hatches (the three above plus cross-thread writes via loop.call_soon_threadsafe), with worked-example scenarios you can lift into your own code, see Patterns → Reactive Intent vs Default.
Properties & Ranking
Properties are metadata attached to a service when it registers:
@component("dict-en")
@provides(IDictionary)
@prop("_language", "language", "EN") # property: language = "EN"
@prop("_ranking", "service.ranking", 0) # priority: 0 = highest
class EnglishDict: ...The special property service.ranking controls priority. When multiple services match a single @requires, the one with the lowest ranking wins.