Audit Trail

Cross-cutting compliance logging with @subscribe wildcards and set_policy – no changes to business components.

The Problem

Compliance regulations (SOX, PCI-DSS, SOC 2) require that every financial transaction is logged with an immutable record of who did what and when. The logging mechanism must satisfy two constraints:

  • Non-invasive. Business components should not contain audit logic. A transfer service transfers money; it should not know or care that an audit system is watching.
  • Complete. Every bus invocation and every domain event must be capturable – not just the ones a developer remembered to instrument.

Traditional approaches either scatter logging calls throughout business code (invasive, fragile, incomplete) or require an AOP framework with bytecode weaving (heavy, opaque). In SignalPy, two kernel primitives solve both constraints: @subscribe with wildcard patterns captures domain events, and kernel.set_policy(audit=True) enables invoke-level logging orthogonally.

Architecture

                     +-----------------+
                     |  AccountService |
                     |  @runnable      |
                     |  rt.publish()   |---> "account.transfer"
                     +-----------------+           |
                            ^                      |
    kernel.bus.invoke()     |                      |  fnmatch("account.*")
                            |                      v
                     +-----------------+    +-----------------+
                     | Notification    |    |   AuditTrail    |
                     | Service         |    | @subscribe      |
                     | @subscribe      |    |   ("account.*") |
                     | ("account.      |    | entries.append()|
                     |   transfer")    |    +-----------------+
                     +-----------------+
                            |
                   rt.invoke("notifications.send")

Four moving parts, zero kernel changes:

Component Role Key mechanism
AccountService Transfers money, publishes domain events rt.publish("account.transfer", ...)
NotificationService Sends user-facing notifications @subscribe("account.transfer") + rt.invoke()
AuditTrail Captures all account events into a queryable log @subscribe("account.*") wildcard
Policy layer Logs every rt.invoke() and rt.publish() call kernel.set_policy("accounts", {"audit": True})

The AuditTrail and NotificationService are just components. Add them, remove them, replace them – the AccountService never changes.

How It Works

Step 1: Domain events via rt.publish()

The AccountService publishes a structured event after each transfer. This is the only audit-relevant line in the business component:

@component("accounts", version="1.0", depends=["config"])
@requires(config="IConfig")
class AccountService:

    @lifecycle.activate
    def activate(self):
        self._balances = {"alice": 1000.0, "bob": 500.0, "charlie": 250.0}

    @runnable("transfer", params=TransferParams,
              description="Transfer money")
    async def transfer(self, params):
        if self._balances[params.from_account] < params.amount:
            return {"error": "Insufficient funds"}
        self._balances[params.from_account] -= params.amount
        self._balances.setdefault(params.to_account, 0.0)
        self._balances[params.to_account] += params.amount
1        await self.rt.publish("account.transfer", {
            "from": params.from_account,
            "to": params.to_account,
            "amount": params.amount,
        })
        return {"ok": True, "balances": dict(self._balances)}
1
rt.publish() emits a domain event to the bus. The AccountService does not know who is listening. It publishes facts about what happened; consumers decide what to do with them.

Step 2: Wildcard capture with @subscribe

The AuditTrail subscribes to "account.*", which matches any event whose type starts with account. – including account.transfer, account.close, or any future event type:

@component("audit-trail", version="1.0", depends=["config"])
@requires(config="IConfig")
class AuditTrail:

    @lifecycle.activate
    def activate(self):
1        self.entries = []

2    @subscribe("account.*",
               description="Audit all account events")
    def on_account_event(self, event_type, data):
        entry = {"event": event_type, "data": data}
3        self.entries.append(entry)
1
In-memory log. Production would write to an append-only store.
2
The wildcard pattern uses fnmatch semantics (Python’s fnmatch.fnmatch). "account.*" matches any event type where the prefix is account. followed by any single segment. Use "*" to capture everything.
3
The handler is synchronous – def, not async def. The kernel detects this at decoration time and dispatches correctly. Audit handlers that only append to a list do not need async.

The bus resolves wildcards at publish time. When AccountService publishes "account.transfer", the bus collects both exact-match subscribers (NotificationService on "account.transfer") and wildcard-match subscribers (AuditTrail on "account.*"), then invokes all of them.

Step 3: Cross-component notification chain

The NotificationService subscribes to the same event and uses rt.invoke() to call its own runnable, demonstrating that domain events can trigger cross-component bus calls:

@subscribe("account.transfer", description="Notify on transfer")
async def on_transfer(self, event_type, data):
    await self.rt.invoke("notifications.send", {
        "user": data["to"],
        "message": f"You received ${data['amount']:.2f} from {data['from']}",
    })

A single rt.publish() in AccountService fans out to both the audit trail and the notification system, each acting independently.

Step 4: Policy-driven invoke audit

Wildcard subscriptions capture domain events. But what about bus invocations – the accounts.transfer and notifications.send calls themselves? That is where set_policy comes in:

kernel.set_policy("accounts", {"audit": True})
kernel.set_policy("notifications", {"audit": True})

This sets _audit = True on the Runtime for each named component. When _audit is true, every rt.invoke() and rt.publish() call emits a structured JSON log line via Python’s logging module before executing:

# From runtime.py — the actual implementation
if self._audit:
    log.info('{"action":"invoke","caller":"%s","target":"%s"}',
             self.component_name, target)

This is not a subscription – it is a Runtime-level interceptor. It captures the caller identity, the target, and the action type (invoke or publish). The policy is set on the kernel, not on the component. The component code is unchanged.

Two audit layers

@subscribe("account.*") captures domain events (what happened). set_policy(audit=True) captures bus operations (who called what). Together they provide complete audit coverage – event-level and invocation-level.

Running It

PYTHONPATH=src python -m signalpy.examples.audit_trail

Expected output:

  === Transfers with audit trail ===
    [audit] {"event": "account.transfer", "data": {"from": "alice", "to": "bob", "amount": 150.0}}
    [audit] {"event": "account.transfer", "data": {"from": "bob", "to": "charlie", "amount": 75.0}}

  === Audit trail ===
    {'event': 'account.transfer', 'data': {'from': 'alice', 'to': 'bob', 'amount': 150.0}}
    {'event': 'account.transfer', 'data': {'from': 'bob', 'to': 'charlie', 'amount': 75.0}}

  === Final balances ===
    alice: $850.00
    bob: $575.00
    charlie: $325.00

  === Notifications sent ===
    bob: You received $150.00 from alice
    charlie: You received $75.00 from bob

  Audit trail demo complete.

The [audit] lines come from the AuditTrail’s @subscribe handler. The policy-level audit logs (invoke/publish JSON lines) go to Python’s logging module at INFO level – enable logging.basicConfig(level=logging.INFO) to see them on stderr.

The test suite validates the same scenario programmatically:

PYTHONPATH=src python -m pytest src/signalpy/tests/test_examples.py::TestAuditTrail -v

Production Considerations

Append-only event storage. The example uses an in-memory list. Production audit trails need durable, append-only storage – a write-ahead log, an event store (EventStoreDB), or a database table with no UPDATE/DELETE permissions. The AuditTrail component is the natural place to swap in a real backend; business components remain untouched.

Event schema versioning. As domain events evolve (new fields, renamed fields), the audit log must handle both old and new schemas. Stamp each event with a schema version at publish time ({"schema": "account.transfer/v2", ...}) and version your audit readers accordingly. The @subscribe wildcard pattern does not change when the payload schema changes.

GDPR and PII filtering. Audit logs often contain personal data (account names, email addresses, IP addresses). Apply PII filtering in the AuditTrail’s handler before persisting – redact, hash, or tokenize sensitive fields. The handler runs before storage, so filtering is centralized in one component rather than scattered across publishers.

Audit query patterns. The example exposes a query runnable on the AuditTrail component. In production, index entries by event type, timestamp, and actor. Common queries: “all transfers by alice in the last 24 hours”, “all events for account X”, “all failed operations”. The bus makes this queryable via the same kernel.bus.invoke("audit-trail.query", ...) interface.

Policy granularity. set_policy accepts invoke_allow, invoke_deny, and publish_allow lists in addition to audit. You can selectively audit high-risk components while leaving low-risk ones unaudited to reduce log volume. Policies are set at boot time or changed at runtime – no component restarts needed.

Key Takeaway

@subscribe with wildcard patterns captures domain events across an entire namespace. kernel.set_policy(audit=True) captures bus invocations at the Runtime level. Together, they provide complete audit coverage – event-level and invocation-level – without a single line of audit code in any business component. The audit trail is just another component: add it, remove it, replace it.