Decorators

All 11 decorators — signature, parameters, example.

What do these decorators actually do to the function? Almost nothing at decoration time — they attach a small marker dataclass (e.g. fn.__effect__ = EffectDef(...)) and return the function unchanged. The kernel scans for those markers when it registers the class as a factory (free function _finalize_meta(cls), called by LifecycleManager.register_factory); the real wrapping (in ReactiveEffect, bus handler registration, etc.) happens at activation time. See Architecture → Decoration → Activation for the three-stage walkthrough with the actual code, plus the rationale for the marker pattern over closures, descriptors, or global registries.

Core

@component

@component(name: str, *, namespace: str = "", version: str = "0.0.0", **transports)

Mark a class as a component factory. Transport configuration (formerly @api) is passed as keyword arguments. Boot order is derived entirely from @requires contracts – there is no separate ordering parameter.

@component("greeter", version="1.0")
class GreeterApp: ...

# With transport config:
@component("search", version="1.0",
           rest={"prefix": "/search", "version": "v1"},
           mcp={"name": "search-tools"})
class SearchApp: ...

@provides

@provides(*contracts: str | type)

Declare services this component provides. Accepts types or strings.

@provides(IDictionary)          # type
@provides("IDictionary")        # string
@provides(IDictionary, ICache)  # multiple
Providers don’t register against consumers

@provides(IFoo) is the only thing a provider needs to be discoverable. The kernel’s registry listener wires the new provider into every consumer of that contract automatically — at boot, on hot_add(), on hot-remove. There is no manager.attach(self), no register() call, and you should not add a @requires(consumer=IConsumer) “for symmetry.” Consumers point at providers; providers do not point back. If your component never reads self.rt.consumer, that requirement is dead weight that confuses boot order and the dependency graph.

@requires

@requires(*, optional: bool = False, key: str = "", **deps: str | type)

Declare services this component reads. One decorator, four shapes — distinguished by the type annotation and keyword arguments. The shape determines boot semantics, not just injection style:

Shape Field becomes Boot semantics
@requires(x=IFoo) single highest-ranked IFoo Boot-blocking. Toposort puts an edge from every IFoo provider to this component. Activation waits.
@requires(x=list[IFoo]) live list[IFoo] Non-blocking, reactive. No toposort edge. Component boots with [] and the registry listener appends/removes providers as they come online or are hot-removed.
@requires(x=IFoo, key="prop") live dict[str, IFoo] keyed by service property Non-blocking, reactive. No toposort edge. Like aggregate, but indexed by a service property.
@requires(x=IFoo, optional=True) single IFoo or None Non-blocking. No toposort edge. Component is valid with no provider.
@requires(config=IConfig)                    # single — boot-blocking
@requires(dicts=list[IDictionary])           # aggregate — reactive, non-blocking
@requires(dicts=IDictionary, key="language") # map — reactive, non-blocking
@requires(cache=ICache, optional=True)       # optional — non-blocking
The three non-blocking shapes look like dependencies but behave like subscriptions

list[X], key=, and optional=True don’t create boot-order constraints — they’re reactive subscriptions on the registry. Your component starts up regardless of who’s there; the runtime keeps your view in sync as providers come and go. Use @effect to react to changes, or just read self.rt.x whenever you need the current value.

Only the bare scalar shape @requires(x=IFoo) says “I cannot start until this exists.” Use it for the things you genuinely cannot run without (config, logger, the one true backend).

See Contracts for the built-in contract types and Tutorial 3 — Dynamic Services for the aggregate/reactive pattern in depth.

Reactive

@computed

@computed
def method(self) -> T: ...

Reactive cached property. Reads from self.rt.* are tracked. Recomputes only when dependencies change.

@computed
def base_url(self):
    return self.rt.config.get("url", "http://localhost")

Access: self.base_url() — always current, cached.

@effect

@effect
def method(self) -> None: ...

@effect(cancel_on_supersede: bool = False)
def method(self) -> None: ...

Reactive side effect. Reads from self.rt.* are tracked. Re-runs when dependencies change. Both def and async def work.

@effect
async def on_config_change(self):
    url = self.rt.config.get("url")
    await self._reconnect(url)

Async supersede behavior. If a dependency changes while an async body is mid-await, the engine sets a pending-rerun flag and fires the effect again once the in-flight body finishes — notifications are never silently dropped. See Threading Model → Async effect supersede for the full mechanism with examples.

Two opt-in escape hatches for when the default isn’t right:

  • is_stale() inside an async body — returns True once a newer run has been scheduled. Use it to short-circuit expensive work that’s about to be redone:

    from signalpy.kernel import is_stale
    
    @effect
    async def index(self):
        docs = self.rt.docs.get()
        for batch in chunked(docs, 100):
            await self._index_one(batch)
            if is_stale():
                return    # newer doc set arrived; bail
  • @effect(cancel_on_supersede=True) — when a dependency changes mid-await, the engine calls task.cancel() on the in-flight task. The body sees CancelledError at the next await; the engine then re-runs with the latest values. Pick this when the in-flight work owns a resource (socket, connection, lease) the next run needs to acquire:

    @effect(cancel_on_supersede=True)
    async def long_poll(self):
        url = self.rt.endpoint.get()
        async with httpx.AsyncClient() as client:
            async for event in client.stream("GET", url):
                ...

Lifecycle

@lifecycle.activate

@lifecycle.activate
def method(self): ...

Called when the component is activated. Both def and async def.

@lifecycle.deactivate

@lifecycle.deactivate
def method(self): ...

Called when the component is deactivated. Both def and async def.

@lifecycle.health

@lifecycle.health
def method(self) -> dict: ...

Health check callback.

@lifecycle.supervision

@lifecycle.supervision(
    *,
    strategy: str = "one_for_one",
    max_restarts: int = 3,
    within_seconds: float = 60.0,
    backoff: str = "exponential",
    base_delay: float = 1.0,
)
def method(self, child_name, error, attempt, context) -> bool: ...

Declare a supervision callback for child component failures. The decorated method is called when a child spawned via self.rt.spawn() fails activation. Return True to restart, False to give up.

Strategies: one_for_one (restart failed child), one_for_all (restart all siblings), rest_for_one (restart failed + everything after it).

@lifecycle.supervision(
    strategy="one_for_one",
    max_restarts=3,
    within_seconds=60,
    backoff="exponential",
    base_delay=2.0,
)
async def on_child_failure(self, child_name, error, attempt, context):
    if isinstance(error, ConnectionError):
        context.update_properties({"endpoint": "fallback.url"})
    return True

See Supervision and Reliability for the full explanation.

Surface

@runnable

@runnable(name: str, *, params: type, returns: type | None = None,
          description: str = "", timeout_s: float | None = None,
          destructive: bool = False, transports: list[str] | None = None,
          requires_action: str = "", requires_role: str = "")

Schema-only declaration. Declares name, params, and description. No bus handler registration – transport adapters discover schemas via kernel.runnables() and call schema.handler directly.

Use transports=["native"] to restrict a runnable to kernel-internal calls only. By default all transports can discover the runnable.

@runnable("search", params=SearchParams, description="Search")
async def search(self, params):
    return {"results": [...]}

@runnable("delete", params=DeleteParams, requires_role="admin")
async def delete(self, params): ...

@runnable("_reindex", params=BaseModel, transports=["native"],
          description="Internal reindex")
async def reindex(self, params): ...

Metadata

@prop

@prop(attr_name: str, prop_name: str, default: Any = None)

Mutable component property. Propagated to service registry. service.ranking controls priority.

@prop("_language", "language", "EN")
@prop("_ranking", "service.ranking", 0)
class EnglishDict: ...

@kind

@kind(name: str, *, model: type, description: str = "")

Register a data schema.

@kind("alert", model=AlertModel, description="Security alert")
class SecurityApp: ...

@skill

@skill(name: str, *, content: str, triggers: list[str] | None = None, description: str = "")

AI knowledge bundle.

@skill("spl-writer", content="# SPL Guide...", triggers=["splunk", "spl"])
class SplunkApp: ...

Events

@subscribe

@subscribe(event_type: str, *, description: str = "")

Declare a bus event handler. Both def and async def.

@subscribe("order.created", description="Handle new orders")
async def on_order(self, event_type, data):
    ...