Decorators
All 11 decorators — signature, parameters, example.
What do these decorators actually do to the function? Almost nothing at decoration time — they attach a small marker dataclass (e.g.
fn.__effect__ = EffectDef(...)) and return the function unchanged. The kernel scans for those markers when it registers the class as a factory (free function_finalize_meta(cls), called byLifecycleManager.register_factory); the real wrapping (inReactiveEffect, bus handler registration, etc.) happens at activation time. See Architecture → Decoration → Activation for the three-stage walkthrough with the actual code, plus the rationale for the marker pattern over closures, descriptors, or global registries.
Core
@component
@component(name: str, *, namespace: str = "", version: str = "0.0.0", **transports)Mark a class as a component factory. Transport configuration (formerly @api) is passed as keyword arguments. Boot order is derived entirely from @requires contracts – there is no separate ordering parameter.
@component("greeter", version="1.0")
class GreeterApp: ...
# With transport config:
@component("search", version="1.0",
rest={"prefix": "/search", "version": "v1"},
mcp={"name": "search-tools"})
class SearchApp: ...@provides
@provides(*contracts: str | type)Declare services this component provides. Accepts types or strings.
@provides(IDictionary) # type
@provides("IDictionary") # string
@provides(IDictionary, ICache) # multiple@provides(IFoo) is the only thing a provider needs to be discoverable. The kernel’s registry listener wires the new provider into every consumer of that contract automatically — at boot, on hot_add(), on hot-remove. There is no manager.attach(self), no register() call, and you should not add a @requires(consumer=IConsumer) “for symmetry.” Consumers point at providers; providers do not point back. If your component never reads self.rt.consumer, that requirement is dead weight that confuses boot order and the dependency graph.
@requires
@requires(*, optional: bool = False, key: str = "", **deps: str | type)Declare services this component reads. One decorator, four shapes — distinguished by the type annotation and keyword arguments. The shape determines boot semantics, not just injection style:
| Shape | Field becomes | Boot semantics |
|---|---|---|
@requires(x=IFoo) |
single highest-ranked IFoo |
Boot-blocking. Toposort puts an edge from every IFoo provider to this component. Activation waits. |
@requires(x=list[IFoo]) |
live list[IFoo] |
Non-blocking, reactive. No toposort edge. Component boots with [] and the registry listener appends/removes providers as they come online or are hot-removed. |
@requires(x=IFoo, key="prop") |
live dict[str, IFoo] keyed by service property |
Non-blocking, reactive. No toposort edge. Like aggregate, but indexed by a service property. |
@requires(x=IFoo, optional=True) |
single IFoo or None |
Non-blocking. No toposort edge. Component is valid with no provider. |
@requires(config=IConfig) # single — boot-blocking
@requires(dicts=list[IDictionary]) # aggregate — reactive, non-blocking
@requires(dicts=IDictionary, key="language") # map — reactive, non-blocking
@requires(cache=ICache, optional=True) # optional — non-blockinglist[X], key=, and optional=True don’t create boot-order constraints — they’re reactive subscriptions on the registry. Your component starts up regardless of who’s there; the runtime keeps your view in sync as providers come and go. Use @effect to react to changes, or just read self.rt.x whenever you need the current value.
Only the bare scalar shape @requires(x=IFoo) says “I cannot start until this exists.” Use it for the things you genuinely cannot run without (config, logger, the one true backend).
See Contracts for the built-in contract types and Tutorial 3 — Dynamic Services for the aggregate/reactive pattern in depth.
Reactive
@computed
@computed
def method(self) -> T: ...Reactive cached property. Reads from self.rt.* are tracked. Recomputes only when dependencies change.
@computed
def base_url(self):
return self.rt.config.get("url", "http://localhost")Access: self.base_url() — always current, cached.
@effect
@effect
def method(self) -> None: ...
@effect(cancel_on_supersede: bool = False)
def method(self) -> None: ...Reactive side effect. Reads from self.rt.* are tracked. Re-runs when dependencies change. Both def and async def work.
@effect
async def on_config_change(self):
url = self.rt.config.get("url")
await self._reconnect(url)Async supersede behavior. If a dependency changes while an async body is mid-await, the engine sets a pending-rerun flag and fires the effect again once the in-flight body finishes — notifications are never silently dropped. See Threading Model → Async effect supersede for the full mechanism with examples.
Two opt-in escape hatches for when the default isn’t right:
is_stale()inside an async body — returnsTrueonce a newer run has been scheduled. Use it to short-circuit expensive work that’s about to be redone:from signalpy.kernel import is_stale @effect async def index(self): docs = self.rt.docs.get() for batch in chunked(docs, 100): await self._index_one(batch) if is_stale(): return # newer doc set arrived; bail@effect(cancel_on_supersede=True)— when a dependency changes mid-await, the engine callstask.cancel()on the in-flight task. The body seesCancelledErrorat the next await; the engine then re-runs with the latest values. Pick this when the in-flight work owns a resource (socket, connection, lease) the next run needs to acquire:@effect(cancel_on_supersede=True) async def long_poll(self): url = self.rt.endpoint.get() async with httpx.AsyncClient() as client: async for event in client.stream("GET", url): ...
Lifecycle
@lifecycle.activate
@lifecycle.activate
def method(self): ...Called when the component is activated. Both def and async def.
@lifecycle.deactivate
@lifecycle.deactivate
def method(self): ...Called when the component is deactivated. Both def and async def.
@lifecycle.health
@lifecycle.health
def method(self) -> dict: ...Health check callback.
@lifecycle.supervision
@lifecycle.supervision(
*,
strategy: str = "one_for_one",
max_restarts: int = 3,
within_seconds: float = 60.0,
backoff: str = "exponential",
base_delay: float = 1.0,
)
def method(self, child_name, error, attempt, context) -> bool: ...Declare a supervision callback for child component failures. The decorated method is called when a child spawned via self.rt.spawn() fails activation. Return True to restart, False to give up.
Strategies: one_for_one (restart failed child), one_for_all (restart all siblings), rest_for_one (restart failed + everything after it).
@lifecycle.supervision(
strategy="one_for_one",
max_restarts=3,
within_seconds=60,
backoff="exponential",
base_delay=2.0,
)
async def on_child_failure(self, child_name, error, attempt, context):
if isinstance(error, ConnectionError):
context.update_properties({"endpoint": "fallback.url"})
return TrueSee Supervision and Reliability for the full explanation.
Surface
@runnable
@runnable(name: str, *, params: type, returns: type | None = None,
description: str = "", timeout_s: float | None = None,
destructive: bool = False, transports: list[str] | None = None,
requires_action: str = "", requires_role: str = "")Schema-only declaration. Declares name, params, and description. No bus handler registration – transport adapters discover schemas via kernel.runnables() and call schema.handler directly.
Use transports=["native"] to restrict a runnable to kernel-internal calls only. By default all transports can discover the runnable.
@runnable("search", params=SearchParams, description="Search")
async def search(self, params):
return {"results": [...]}
@runnable("delete", params=DeleteParams, requires_role="admin")
async def delete(self, params): ...
@runnable("_reindex", params=BaseModel, transports=["native"],
description="Internal reindex")
async def reindex(self, params): ...Metadata
@prop
@prop(attr_name: str, prop_name: str, default: Any = None)Mutable component property. Propagated to service registry. service.ranking controls priority.
@prop("_language", "language", "EN")
@prop("_ranking", "service.ranking", 0)
class EnglishDict: ...@kind
@kind(name: str, *, model: type, description: str = "")Register a data schema.
@kind("alert", model=AlertModel, description="Security alert")
class SecurityApp: ...@skill
@skill(name: str, *, content: str, triggers: list[str] | None = None, description: str = "")AI knowledge bundle.
@skill("spl-writer", content="# SPL Guide...", triggers=["splunk", "spl"])
class SplunkApp: ...Events
@subscribe
@subscribe(event_type: str, *, description: str = "")Declare a bus event handler. Both def and async def.
@subscribe("order.created", description="Handle new orders")
async def on_order(self, event_type, data):
...