Kernel API

Kernel, ServiceRegistry, Bus, Runtime, Signal/Computed/Effect.

Kernel

The orchestrator. from signalpy.kernel import Kernel

Methods

Method Description
discover(classes) Register @component classes as factories
instantiate(factory, name?, props?) Create an instance from a factory
await boot() Auto-instantiate, resolve deps, activate, set up reactive effects
await shutdown() Dispose effects, deactivate in reverse order
await hot_add(cls, name?, props?) Add component to running kernel. Consumer @effect methods auto-re-run
await hot_remove(name) Remove component. Consumer @effect methods auto-re-run
await retry_erroneous(name) Re-attempt activation of ERRORED component
await drain(timeout_s=30) Stop accepting invocations, wait for in-flight
set_policy(name, policy) Set invoke/publish policy for a component
status() -> dict Components, services, bus handlers, reactive metadata

Properties

Property Description
.state KernelState: CREATED, BOOTING, HEALTHY, DRAINING, STOPPED
.healthy True when HEALTHY
.traits TraitRegistry
.registry ServiceRegistry
.bus Bus
.lifecycle LifecycleManager

ServiceRegistry

from signalpy.kernel import ServiceRegistry

Method Description
provide(contract, impl, name, props?, factory?) Register a service
unprovide(entry) Remove a service
require(contract, **filter) Single highest-ranked. Raises KeyError
require_optional(contract) Returns None instead of raising
require_all(contract) All, sorted by ranking
require_map(contract, key_prop) Dict keyed by property
require_for(contract, consumer) Factory-aware, cached per consumer
acquire(contract, consumer) Increment ref count
release(contract, consumer) Decrement ref count
ref_count(contract) Total refs across all consumers
query(contract?) Query entries
on_change(listener) Listen for provide/unprovide events

Bus

from signalpy.kernel import Bus

Method Description
register_handler(target, handler) Register invocation handler
await invoke(target, params?) Call a handler. Exact → target routing → transports
subscribe(event_type, handler) Register event handler
await publish(event_type, data?) Fan-out to subscribers + transports
add_transport(transport) Add cross-process transport

Runtime (self.rt)

Per-component reactive context.

Access Description
self.rt.config Injected service — reactive read
self.rt.peek("config") Read without reactive tracking
await self.rt.invoke(target, params?) Bus call (policy-checked)
await self.rt.publish(event_type, data?) Bus event (policy-checked)
self.rt.on(event_type, handler) Subscribe to events
await self.rt.spawn(factory, name?, props?) Create child component
self.rt.bus Raw bus (escape hatch for adapters)

Signal / Computed / Effect

from signalpy.kernel.reactive import Signal, Computed, Effect

Signal[T]

Reactive value container. Reading tracks consumers. Writing notifies.

s = Signal(0)
s.get()          # read (tracks if inside Effect/Computed)
s.set(5)         # write (notifies subscribers)
s.peek()         # read without tracking
s.update(lambda x: x + 1)

Computed[T]

Derived value. Lazy — recomputes only when read AND dependencies dirty.

doubled = Computed(lambda: counter.get() * 2)
doubled.get()    # recomputes if counter changed
doubled.dispose() # stop tracking

Effect

Side effect. Re-runs when tracked dependencies change.

e = Effect(lambda: print(f"Counter: {counter.get()}"))
counter.set(1)   # effect re-runs
e.dispose()      # stop tracking

batch()

Group changes. Effects fire once at the end.

from signalpy.kernel import batch

with batch():
    a.set(1)
    b.set(2)
# effects run ONCE here

is_stale()

Returns True if a newer run has been scheduled while the currently running effect’s body is in flight. Returns False outside any effect, and False for sync effects (sync bodies can’t be superseded mid-execution under the global lock). Useful inside async effect bodies for cooperative supersede checks:

from signalpy.kernel import is_stale

@effect
async def reload(self):
    items = self.rt.items.get()
    for item in items:
        await self._process(item)
        if is_stale():
            return    # a newer run has been scheduled — bail

See Threading Model → Async effect supersede for the full lifecycle and the alternative cancel_on_supersede=True escape hatch.