3. Dynamic Services

Services come and go. Your component reacts automatically.

The Problem

You have two dictionaries (English, French) and a spell checker. @requires(dictionary=IDictionary) injects one dictionary. But the spell checker needs all dictionaries — and it needs to handle new ones appearing and old ones disappearing at runtime.

In traditional service-oriented frameworks, you’d write @bind/@unbind callbacks — manual wiring for every service that comes and goes. In SignalPy, you use @requires(dicts=list[IDictionary]) plus a reactive method called @effect (introduced below) — a method the kernel re-runs automatically when its tracked dependencies change. The reactive engine handles propagation:

sequenceDiagram
    participant H as hot_add(GermanDict)
    participant R as Registry
    participant S as Signal (rt.dicts)
    participant E as @effect rebuild_index

    H->>R: provide(IDictionary, german)
    R->>S: Signal.set([en, fr, de])
    S->>E: notify: deps changed
    E->>E: re-run → rebuilds index
    Note over E: Index now has EN, FR, DE

Aggregate Injection

@component("checker")
@requires(dictionaries=list[IDictionary])
class SpellChecker:
    @lifecycle.activate
    def activate(self):
        print(f"Checker ready with {len(self.rt.dictionaries)} dictionaries")
Checker ready with 2 dictionaries
Note

@requires(dictionaries=list[IDictionary]) injects a list of all services matching the contract. The list[C] type hint tells the kernel “I want all of them.”

Reactive Effects

An @effect is a method that re-runs whenever its tracked dependencies change. Read self.rt.dictionaries inside an effect → the reactive engine tracks that. When the list changes, the effect re-runs automatically.

@component("checker")
@requires(dictionaries=list[IDictionary])
class SpellChecker:

    @lifecycle.activate
    def activate(self):
        self.by_language = {}

    @effect
    def rebuild_index(self):
        # Reads self.rt.dictionaries — tracked automatically.
        # When the list changes, this method re-runs.
        self.by_language = {}
        for d in self.rt.dictionaries:
            lang = getattr(d, "_language", "??")
            self.by_language[lang] = d
        print(f"  Index rebuilt: {list(self.by_language.keys())}")
  Index rebuilt: ['EN', 'FR']
@effect replaces @bind/@unbind

Instead of separate add/remove callbacks, write one method that reads the current state. The reactive engine tracks what it reads and re-runs it when those values change.

Hot-Add at Runtime

@component("dict-de")
@provides(IDictionary)
@prop("_language", "language", "DE")    # @prop(attr_name, service_property, default)
class GermanDict:
    @lifecycle.activate
    def activate(self):
        self.words = {"hallo", "welt"}
    def check_word(self, word):
        return word.lower() in self.words

await kernel.hot_add(GermanDict)
@prop(attr_name, service_property, default)

Three arguments: the Python attribute name on the instance (self._language), the service-registry property name other components match against ("language"), and the default value. Service properties are how the registry filters and ranks providers — see “Properties & Ranking” below.

  Index rebuilt: ['EN', 'FR', 'DE']

The spell checker’s @effect fires automatically. Zero changes to the spell checker code.

Hot-Remove

await kernel.hot_remove("dict-fr")
  Index rebuilt: ['EN', 'DE']
Zero code changes in the spell checker

It didn’t know German was coming. It doesn’t know French left. The @effect kept its index current automatically.

Computed Properties

A @computed property caches its result and recomputes only when dependencies change:

@computed
def available_languages(self):
    return [getattr(d, "_language", "?") for d in self.rt.dictionaries]

Read self.available_languages() — always current, cached until the dictionary list changes.

When two writes belong together: batch()

A reactive engine fires effects after every write. That’s what you want most of the time — but sometimes two writes belong together, and you don’t want anyone to observe the half-updated state in between.

The problem, made concrete

You’re rotating an API endpoint and its credential at the same time. Both live in config, written one after the other.

(IConfig is one of the kernel’s built-in contracts from signalpy.kernel.contractsConfigProvider ships in the kernel and implements it. You can @requires(config=IConfig) without writing a provider yourself.)

@component("client")
@requires(config=IConfig)
class APIClient:
    @effect
    async def reconnect(self):
        url   = self.rt.config.get("api.url")
        token = self.rt.config.get("api.token")
        await self._open(url, token)        # uses BOTH on every change

In a deploy script:

config.set("api.url",   "https://api-v2.example.com")
config.set("api.token", "tok_v2_xxxxx")

Without batching, the effect runs twice:

Step Trigger Body sees Outcome
1 set("api.url", v2_url) fires v2_url, v1_token tries to open v2 endpoint with v1 token → auth error
2 set("api.token", v2_token) fires v2_url, v2_token opens correctly

The first run wasn’t just wasted work — it was wrong. A real auth attempt went out with mismatched creds.

The fix

Wrap the correlated writes:

from signalpy.kernel import batch

with batch():
    config.set("api.url",   "https://api-v2.example.com")
    config.set("api.token", "tok_v2_xxxxx")
# effect fires ONCE, with both new values

Now the body sees (v2_url, v2_token) on its single run. No half-state ever exists from the effect’s point of view.

When you don’t need it

The kernel batches automatically in two common situations, so you don’t need to reach for batch():

  • Hot-add / hot-remove cascades — a single kernel.hot_add(GermanDict) causes one cascade; one effect run.
  • Diamond dependencies — if A → B and A → C and an effect reads both B and C, a single update to A runs the effect once, not twice.

You only need batch() for two top-level set calls that are semantically one update. The kernel cannot guess they’re related; you have to tell it.

Long-running async effects: when work outlives a notification

Async effects open a new question: what happens if a dependency changes while the body is mid-await? The body has already read old values; the new write arrives before the body’s old work has finished. We say the new run supersedes the old one — a fresh notification cancels the relevance of an in-flight body.

The default is always safe: the engine remembers the new notification and re-runs the effect once the in-flight body (the one currently mid-await, not yet returned) completes. You write the obvious code and it works:

@component("client")
@requires(config=IConfig)
class APIClient:
    @effect
    async def reconnect(self):
        url = self.rt.config.get("api.url")
        self._client = await connect_slowly(url)   # 2-second handshake

If config.set("api.url", new_url) fires while connect_slowly is mid-handshake, the engine queues a re-run. When the old handshake returns, the body re-fires with the new URL. Nothing gets dropped. No code changes from you.

This works for the vast majority of cases (config reloads, fetches on dependency change, debounced rebuilds). Two specific situations need explicit handling.

When the in-flight work is expensive and about to be redone — is_stale()

Default behavior runs the in-flight body all the way to the end, even if its result is going to be discarded. That’s fine for a quick handshake. It’s not fine for a 30-second batch indexing job that’s about to be re-run with newer data.

is_stale() lets the body bail out voluntarily between awaits:

from signalpy.kernel import is_stale

@component("indexer")
@requires(docs=IDocStore, search=ISearchIndex)
class Indexer:
    @effect
    async def reindex(self):
        documents = self.rt.docs.list()           # tracked
        for batch_of_docs in chunked(documents, 100):
            await self.rt.search.write(batch_of_docs)
            if is_stale():
                return                            # newer doc set arrived

is_stale() flips to True the moment a notification arrives. Check it at any await-point boundary that’s safe to abandon work at. The engine will fire the next run regardless — is_stale() is purely your hint that the rest of the current body is wasted.

When the in-flight work holds a resourcecancel_on_supersede

Sometimes the in-flight body isn’t just wasteful but blocking. A long-poll holding a connection, a file lock, a GPU lease — the next run can’t even begin until the previous one releases.

@component("event-stream")
@requires(config=IConfig)
class EventStream:
    @effect(cancel_on_supersede=True)
    async def stream(self):
        url = self.rt.config.get("stream.url")
        async with httpx.AsyncClient() as client:
            async for event in client.stream("GET", url):
                self.rt.publish("stream.event", event)   # bus event

When config.set("stream.url", new_url) fires:

  1. The engine calls task.cancel() on the in-flight task.
  2. The async generator inside client.stream raises CancelledError. The async with block closes the HTTP connection cleanly.
  3. The engine re-runs the effect with the new URL.

The next stream connects without waiting for an event from the old endpoint to release the loop.

Picking which mode

You almost never need to think about this. The default works for “fetch / reload / rebuild on dependency change” type effects. Reach for is_stale() only when the body is genuinely expensive and decomposable into checkpoint-able chunks. Reach for cancel_on_supersede=True only when the body owns a long-held resource the next run needs.

If you find yourself reaching for both in the same effect, that’s a sign the effect is doing too much — split it. See Threading Model → Async effect supersede for the full mechanism, including the table of which engine step each escape hatch controls.

For a recipe-shelf summary of all four escape hatches (the three above plus cross-thread writes via loop.call_soon_threadsafe), with worked-example scenarios you can lift into your own code, see Patterns → Reactive Intent vs Default.

Properties & Ranking

Properties are metadata attached to a service when it registers:

@component("dict-en")
@provides(IDictionary)
@prop("_language", "language", "EN")          # property: language = "EN"
@prop("_ranking",  "service.ranking", 0)      # priority: 0 = highest
class EnglishDict: ...

The special property service.ranking controls priority. When multiple services match a single @requires, the one with the lowest ranking wins.