Kernel API
Kernel, ServiceRegistry, Bus, Runtime, Signal/Computed/Effect.
Kernel
The orchestrator. from signalpy.kernel import Kernel
Methods
| Method | Description |
|---|---|
discover(classes) |
Register @component classes as factories |
instantiate(factory, name?, props?) |
Create an instance from a factory |
await boot() |
Auto-instantiate, resolve deps, activate, set up reactive effects |
await shutdown() |
Dispose effects, deactivate in reverse order |
await hot_add(cls, name?, props?) |
Add component to running kernel. Consumer @effect methods auto-re-run |
await hot_remove(name) |
Remove component. Consumer @effect methods auto-re-run |
await retry_erroneous(name) |
Re-attempt activation of ERRORED component |
await drain(timeout_s=30) |
Stop accepting invocations, wait for in-flight |
set_policy(name, policy) |
Set invoke/publish policy for a component |
status() -> dict |
Components, services, bus handlers, reactive metadata |
Properties
| Property | Description |
|---|---|
.state |
KernelState: CREATED, BOOTING, HEALTHY, DRAINING, STOPPED |
.healthy |
True when HEALTHY |
.traits |
TraitRegistry |
.registry |
ServiceRegistry |
.bus |
Bus |
.lifecycle |
LifecycleManager |
ServiceRegistry
from signalpy.kernel import ServiceRegistry
| Method | Description |
|---|---|
provide(contract, impl, name, props?, factory?) |
Register a service |
unprovide(entry) |
Remove a service |
require(contract, **filter) |
Single highest-ranked. Raises KeyError |
require_optional(contract) |
Returns None instead of raising |
require_all(contract) |
All, sorted by ranking |
require_map(contract, key_prop) |
Dict keyed by property |
require_for(contract, consumer) |
Factory-aware, cached per consumer |
acquire(contract, consumer) |
Increment ref count |
release(contract, consumer) |
Decrement ref count |
ref_count(contract) |
Total refs across all consumers |
query(contract?) |
Query entries |
on_change(listener) |
Listen for provide/unprovide events |
Bus
from signalpy.kernel import Bus
| Method | Description |
|---|---|
register_handler(target, handler) |
Register invocation handler |
await invoke(target, params?) |
Call a handler. Exact → target routing → transports |
subscribe(event_type, handler) |
Register event handler |
await publish(event_type, data?) |
Fan-out to subscribers + transports |
add_transport(transport) |
Add cross-process transport |
Runtime (self.rt)
Per-component reactive context.
| Access | Description |
|---|---|
self.rt.config |
Injected service — reactive read |
self.rt.peek("config") |
Read without reactive tracking |
await self.rt.invoke(target, params?) |
Bus call (policy-checked) |
await self.rt.publish(event_type, data?) |
Bus event (policy-checked) |
self.rt.on(event_type, handler) |
Subscribe to events |
await self.rt.spawn(factory, name?, props?) |
Create child component |
self.rt.bus |
Raw bus (escape hatch for adapters) |
Signal / Computed / Effect
from signalpy.kernel.reactive import Signal, Computed, Effect
Signal[T]
Reactive value container. Reading tracks consumers. Writing notifies.
s = Signal(0)
s.get() # read (tracks if inside Effect/Computed)
s.set(5) # write (notifies subscribers)
s.peek() # read without tracking
s.update(lambda x: x + 1)Computed[T]
Derived value. Lazy — recomputes only when read AND dependencies dirty.
doubled = Computed(lambda: counter.get() * 2)
doubled.get() # recomputes if counter changed
doubled.dispose() # stop trackingEffect
Side effect. Re-runs when tracked dependencies change.
e = Effect(lambda: print(f"Counter: {counter.get()}"))
counter.set(1) # effect re-runs
e.dispose() # stop trackingbatch()
Group changes. Effects fire once at the end.
from signalpy.kernel import batch
with batch():
a.set(1)
b.set(2)
# effects run ONCE hereis_stale()
Returns True if a newer run has been scheduled while the currently running effect’s body is in flight. Returns False outside any effect, and False for sync effects (sync bodies can’t be superseded mid-execution under the global lock). Useful inside async effect bodies for cooperative supersede checks:
from signalpy.kernel import is_stale
@effect
async def reload(self):
items = self.rt.items.get()
for item in items:
await self._process(item)
if is_stale():
return # a newer run has been scheduled — bailSee Threading Model → Async effect supersede for the full lifecycle and the alternative cancel_on_supersede=True escape hatch.