Audit Trail
Cross-cutting compliance logging with @subscribe wildcards and set_policy – no changes to business components.
The Problem
Compliance regulations (SOX, PCI-DSS, SOC 2) require that every financial transaction is logged with an immutable record of who did what and when. The logging mechanism must satisfy two constraints:
- Non-invasive. Business components should not contain audit logic. A transfer service transfers money; it should not know or care that an audit system is watching.
- Complete. Every bus invocation and every domain event must be capturable – not just the ones a developer remembered to instrument.
Traditional approaches either scatter logging calls throughout business code (invasive, fragile, incomplete) or require an AOP framework with bytecode weaving (heavy, opaque). In SignalPy, two kernel primitives solve both constraints: @subscribe with wildcard patterns captures domain events, and kernel.set_policy(audit=True) enables invoke-level logging orthogonally.
Architecture
+-----------------+
| AccountService |
| @runnable |
| rt.publish() |---> "account.transfer"
+-----------------+ |
^ |
kernel.bus.invoke() | | fnmatch("account.*")
| v
+-----------------+ +-----------------+
| Notification | | AuditTrail |
| Service | | @subscribe |
| @subscribe | | ("account.*") |
| ("account. | | entries.append()|
| transfer") | +-----------------+
+-----------------+
|
rt.invoke("notifications.send")
Four moving parts, zero kernel changes:
| Component | Role | Key mechanism |
|---|---|---|
AccountService |
Transfers money, publishes domain events | rt.publish("account.transfer", ...) |
NotificationService |
Sends user-facing notifications | @subscribe("account.transfer") + rt.invoke() |
AuditTrail |
Captures all account events into a queryable log | @subscribe("account.*") wildcard |
| Policy layer | Logs every rt.invoke() and rt.publish() call |
kernel.set_policy("accounts", {"audit": True}) |
The AuditTrail and NotificationService are just components. Add them, remove them, replace them – the AccountService never changes.
How It Works
Step 1: Domain events via rt.publish()
The AccountService publishes a structured event after each transfer. This is the only audit-relevant line in the business component:
@component("accounts", version="1.0", depends=["config"])
@requires(config="IConfig")
class AccountService:
@lifecycle.activate
def activate(self):
self._balances = {"alice": 1000.0, "bob": 500.0, "charlie": 250.0}
@runnable("transfer", params=TransferParams,
description="Transfer money")
async def transfer(self, params):
if self._balances[params.from_account] < params.amount:
return {"error": "Insufficient funds"}
self._balances[params.from_account] -= params.amount
self._balances.setdefault(params.to_account, 0.0)
self._balances[params.to_account] += params.amount
1 await self.rt.publish("account.transfer", {
"from": params.from_account,
"to": params.to_account,
"amount": params.amount,
})
return {"ok": True, "balances": dict(self._balances)}- 1
-
rt.publish()emits a domain event to the bus. The AccountService does not know who is listening. It publishes facts about what happened; consumers decide what to do with them.
Step 2: Wildcard capture with @subscribe
The AuditTrail subscribes to "account.*", which matches any event whose type starts with account. – including account.transfer, account.close, or any future event type:
@component("audit-trail", version="1.0", depends=["config"])
@requires(config="IConfig")
class AuditTrail:
@lifecycle.activate
def activate(self):
1 self.entries = []
2 @subscribe("account.*",
description="Audit all account events")
def on_account_event(self, event_type, data):
entry = {"event": event_type, "data": data}
3 self.entries.append(entry)- 1
- In-memory log. Production would write to an append-only store.
- 2
-
The wildcard pattern uses
fnmatchsemantics (Python’sfnmatch.fnmatch)."account.*"matches any event type where the prefix isaccount.followed by any single segment. Use"*"to capture everything. - 3
-
The handler is synchronous –
def, notasync def. The kernel detects this at decoration time and dispatches correctly. Audit handlers that only append to a list do not need async.
The bus resolves wildcards at publish time. When AccountService publishes "account.transfer", the bus collects both exact-match subscribers (NotificationService on "account.transfer") and wildcard-match subscribers (AuditTrail on "account.*"), then invokes all of them.
Step 3: Cross-component notification chain
The NotificationService subscribes to the same event and uses rt.invoke() to call its own runnable, demonstrating that domain events can trigger cross-component bus calls:
@subscribe("account.transfer", description="Notify on transfer")
async def on_transfer(self, event_type, data):
await self.rt.invoke("notifications.send", {
"user": data["to"],
"message": f"You received ${data['amount']:.2f} from {data['from']}",
})A single rt.publish() in AccountService fans out to both the audit trail and the notification system, each acting independently.
Step 4: Policy-driven invoke audit
Wildcard subscriptions capture domain events. But what about bus invocations – the accounts.transfer and notifications.send calls themselves? That is where set_policy comes in:
kernel.set_policy("accounts", {"audit": True})
kernel.set_policy("notifications", {"audit": True})This sets _audit = True on the Runtime for each named component. When _audit is true, every rt.invoke() and rt.publish() call emits a structured JSON log line via Python’s logging module before executing:
# From runtime.py — the actual implementation
if self._audit:
log.info('{"action":"invoke","caller":"%s","target":"%s"}',
self.component_name, target)This is not a subscription – it is a Runtime-level interceptor. It captures the caller identity, the target, and the action type (invoke or publish). The policy is set on the kernel, not on the component. The component code is unchanged.
@subscribe("account.*") captures domain events (what happened). set_policy(audit=True) captures bus operations (who called what). Together they provide complete audit coverage – event-level and invocation-level.
Running It
PYTHONPATH=src python -m signalpy.examples.audit_trailExpected output:
=== Transfers with audit trail ===
[audit] {"event": "account.transfer", "data": {"from": "alice", "to": "bob", "amount": 150.0}}
[audit] {"event": "account.transfer", "data": {"from": "bob", "to": "charlie", "amount": 75.0}}
=== Audit trail ===
{'event': 'account.transfer', 'data': {'from': 'alice', 'to': 'bob', 'amount': 150.0}}
{'event': 'account.transfer', 'data': {'from': 'bob', 'to': 'charlie', 'amount': 75.0}}
=== Final balances ===
alice: $850.00
bob: $575.00
charlie: $325.00
=== Notifications sent ===
bob: You received $150.00 from alice
charlie: You received $75.00 from bob
Audit trail demo complete.
The [audit] lines come from the AuditTrail’s @subscribe handler. The policy-level audit logs (invoke/publish JSON lines) go to Python’s logging module at INFO level – enable logging.basicConfig(level=logging.INFO) to see them on stderr.
The test suite validates the same scenario programmatically:
PYTHONPATH=src python -m pytest src/signalpy/tests/test_examples.py::TestAuditTrail -vProduction Considerations
Append-only event storage. The example uses an in-memory list. Production audit trails need durable, append-only storage – a write-ahead log, an event store (EventStoreDB), or a database table with no UPDATE/DELETE permissions. The AuditTrail component is the natural place to swap in a real backend; business components remain untouched.
Event schema versioning. As domain events evolve (new fields, renamed fields), the audit log must handle both old and new schemas. Stamp each event with a schema version at publish time ({"schema": "account.transfer/v2", ...}) and version your audit readers accordingly. The @subscribe wildcard pattern does not change when the payload schema changes.
GDPR and PII filtering. Audit logs often contain personal data (account names, email addresses, IP addresses). Apply PII filtering in the AuditTrail’s handler before persisting – redact, hash, or tokenize sensitive fields. The handler runs before storage, so filtering is centralized in one component rather than scattered across publishers.
Audit query patterns. The example exposes a query runnable on the AuditTrail component. In production, index entries by event type, timestamp, and actor. Common queries: “all transfers by alice in the last 24 hours”, “all events for account X”, “all failed operations”. The bus makes this queryable via the same kernel.bus.invoke("audit-trail.query", ...) interface.
Policy granularity. set_policy accepts invoke_allow, invoke_deny, and publish_allow lists in addition to audit. You can selectively audit high-risk components while leaving low-risk ones unaudited to reduce log volume. Policies are set at boot time or changed at runtime – no component restarts needed.
Key Takeaway
@subscribe with wildcard patterns captures domain events across an entire namespace. kernel.set_policy(audit=True) captures bus invocations at the Runtime level. Together, they provide complete audit coverage – event-level and invocation-level – without a single line of audit code in any business component. The audit trail is just another component: add it, remove it, replace it.