Decorators

All 12 decorators — signature, parameters, example.

What do these decorators actually do to the function? Almost nothing at decoration time — they attach a small marker dataclass (e.g. fn.__effect__ = EffectDef(...)) and return the function unchanged. The kernel scans for those markers when it registers the class as a factory (free function _finalize_meta(cls), called by LifecycleManager.register_factory); the real wrapping (in ReactiveEffect, bus handler registration, etc.) happens at activation time. See Architecture → Decoration → Activation for the three-stage walkthrough with the actual code, plus the rationale for the marker pattern over closures, descriptors, or global registries.

Core

@component

@component(name: str, *, namespace: str = "", version: str = "0.0.0", depends: list[str] | None = None)

Mark a class as a component factory.

@component("greeter", version="1.0")
class GreeterApp: ...

@provides

@provides(*contracts: str | type)

Declare services this component provides. Accepts types or strings.

@provides(IDictionary)          # type
@provides("IDictionary")        # string
@provides(IDictionary, ICache)  # multiple

@requires

@requires(*, optional: bool = False, key: str = "", **deps: str | type)

Declare services this component requires. One decorator, three injection modes:

@requires(config=IConfig)                   # single (highest-ranked)
@requires(dicts=list[IDictionary])          # aggregate: all as list
@requires(dicts=IDictionary, key="language") # map: dict keyed by property
@requires(cache=ICache, optional=True)      # optional: None if missing

Reactive

@computed

@computed
def method(self) -> T: ...

Reactive cached property. Reads from self.rt.* are tracked. Recomputes only when dependencies change.

@computed
def base_url(self):
    return self.rt.config.get("url", "http://localhost")

Access: self.base_url() — always current, cached.

@effect

@effect
def method(self) -> None: ...

@effect(cancel_on_supersede: bool = False)
def method(self) -> None: ...

Reactive side effect. Reads from self.rt.* are tracked. Re-runs when dependencies change. Both def and async def work.

@effect
async def on_config_change(self):
    url = self.rt.config.get("url")
    await self._reconnect(url)

Async supersede behavior. If a dependency changes while an async body is mid-await, the engine sets a pending-rerun flag and fires the effect again once the in-flight body finishes — notifications are never silently dropped. See Threading Model → Async effect supersede for the full mechanism with examples.

Two opt-in escape hatches for when the default isn’t right:

  • is_stale() inside an async body — returns True once a newer run has been scheduled. Use it to short-circuit expensive work that’s about to be redone:

    from signalpy.kernel import is_stale
    
    @effect
    async def index(self):
        docs = self.rt.docs.get()
        for batch in chunked(docs, 100):
            await self._index_one(batch)
            if is_stale():
                return    # newer doc set arrived; bail
  • @effect(cancel_on_supersede=True) — when a dependency changes mid-await, the engine calls task.cancel() on the in-flight task. The body sees CancelledError at the next await; the engine then re-runs with the latest values. Pick this when the in-flight work owns a resource (socket, connection, lease) the next run needs to acquire:

    @effect(cancel_on_supersede=True)
    async def long_poll(self):
        url = self.rt.endpoint.get()
        async with httpx.AsyncClient() as client:
            async for event in client.stream("GET", url):
                ...

Lifecycle

@lifecycle.activate

@lifecycle.activate
def method(self): ...

Called when the component is activated. Both def and async def.

@lifecycle.deactivate

@lifecycle.deactivate
def method(self): ...

Called when the component is deactivated. Both def and async def.

@lifecycle.health

@lifecycle.health
def method(self) -> dict: ...

Health check callback.

Surface

@runnable

@runnable(name: str, *, params: type, returns: type | None = None,
          description: str = "", timeout_s: float | None = None,
          destructive: bool = False, internal: bool = False,
          requires_action: str = "", requires_role: str = "")

Typed callable on the bus. Transport adapters auto-generate REST/MCP/CLI bindings.

@runnable("search", params=SearchParams, description="Search")
async def search(self, params):
    return {"results": [...]}

@runnable("delete", params=DeleteParams, requires_role="admin")
async def delete(self, params): ...

@api

@api(transport: str, *, prefix: str = "", name: str = "", auth: bool = False,
     version: str = "", rate_limit: str = "", include: list[str] | None = None,
     exclude: list[str] | None = None)

Declare an API surface for a transport. Stack multiple for multi-transport.

@api("rest", prefix="/search", version="v1")
@api("mcp", name="search-tools")
class SearchApp: ...

Metadata

@prop

@prop(attr_name: str, prop_name: str, default: Any = None)

Mutable component property. Propagated to service registry. service.ranking controls priority.

@prop("_language", "language", "EN")
@prop("_ranking", "service.ranking", 0)
class EnglishDict: ...

@kind

@kind(name: str, *, model: type, description: str = "")

Register a data schema.

@kind("alert", model=AlertModel, description="Security alert")
class SecurityApp: ...

@skill

@skill(name: str, *, content: str, triggers: list[str] | None = None, description: str = "")

AI knowledge bundle.

@skill("spl-writer", content="# SPL Guide...", triggers=["splunk", "spl"])
class SplunkApp: ...

Events

@subscribe

@subscribe(event_type: str, *, description: str = "")

Declare a bus event handler. Both def and async def.

@subscribe("order.created", description="Handle new orders")
async def on_order(self, event_type, data):
    ...