Implementation Plan: Event Bus & Observability
Implementation Plan: Event Bus & Observability
Section 9 of Forge System Design Date: 2026-02-07 Status: Implementation Ready
Overview
This document provides a complete implementation blueprint for the Event Bus & Observability system — the nervous system of Forge. The event bus is the P0 foundation that enables all feedback loops, audit trails, cost tracking, and learning mechanisms. It must be built in Week 1 alongside the core types.
Design Philosophy:
- Start simple: In-memory pub/sub with SQLite persistence
- Real-time first: Subscribers receive events immediately
- Audit-ready: Every event persisted for replay and compliance
- Type-safe: Full TypeScript discriminated unions for event types
- Performance-conscious: Batched writes, capped buffer, async-first
1. EventBus Class — Complete Implementation
1.1 Core Interface
typescript// src/core/bus.ts import { ulid } from 'ulid'; import type { DrizzleDB } from './db'; import { events } from './schema'; type EventHandler<T = ForgeEvent> = (event: T) => void | Promise<void>; interface SubscriptionOptions { once?: boolean; // Auto-unsubscribe after first invocation filter?: (event: ForgeEvent) => boolean; // Optional predicate priority?: number; // Higher = called first (default 0) } export class EventBus { private handlers = new Map<string, Set<{ handler: EventHandler; options: SubscriptionOptions; }>>(); private db: DrizzleDB; private eventBuffer: ForgeEvent[] = []; private bufferSize = 100; // Flush to DB when buffer reaches this private flushInterval: NodeJS.Timeout; constructor(db: DrizzleDB, options?: { bufferSize?: number; flushIntervalMs?: number }) { this.db = db; this.bufferSize = options?.bufferSize ?? 100; // Periodic flush to ensure events aren't lost const intervalMs = options?.flushIntervalMs ?? 5000; // 5 seconds this.flushInterval = setInterval(() => this.flush(), intervalMs); } /** * Emit an event to all subscribers + persist to SQLite */ async emit(event: Omit<ForgeEvent, 'id' | 'timestamp'>): Promise<ForgeEvent> { const fullEvent: ForgeEvent = { ...event, id: ulid(), timestamp: new Date(), }; // Buffer for batch persistence this.eventBuffer.push(fullEvent); // Flush if buffer full if (this.eventBuffer.length >= this.bufferSize) { await this.flush(); } // Notify subscribers immediately (don't wait for DB write) await this.notify(fullEvent); return fullEvent; } /** * Subscribe to events by type or wildcard */ on( type: string | '*', handler: EventHandler, options: SubscriptionOptions = {} ): () => void { if (!this.handlers.has(type)) { this.handlers.set(type, new Set()); } const subscription = { handler, options }; this.handlers.get(type)!.add(subscription); // Return unsubscribe function return () => { this.handlers.get(type)?.delete(subscription); }; } /** * Subscribe once (auto-unsubscribe after first event) */ once(type: string | '*', handler: EventHandler): () => void { return this.on(type, handler, { once: true }); } /** * Unsubscribe all handlers for a type */ off(type: string): void { this.handlers.delete(type); } /** * Replay events by traceId (for debugging, reflection, audit) */ async replay(traceId: string): Promise<ForgeEvent[]> { return this.db .select() .from(events) .where(eq(events.traceId, traceId)) .orderBy(events.timestamp); } /** * Replay events by time window */ async replaySince(since: Date, filter?: Partial<ForgeEvent>): Promise<ForgeEvent[]> { let query = this.db .select() .from(events) .where(gte(events.timestamp, since.getTime())); if (filter?.source) { query = query.where(eq(events.source, filter.source)); } if (filter?.type) { query = query.where(eq(events.type, filter.type)); } if (filter?.phase) { query = query.where(eq(events.phase, filter.phase)); } return query.orderBy(events.timestamp); } /** * Flush buffered events to SQLite */ private async flush(): Promise<void> { if (this.eventBuffer.length === 0) return; const toFlush = [...this.eventBuffer]; this.eventBuffer = []; try { await this.db.insert(events).values( toFlush.map(e => ({ id: e.id, traceId: e.traceId, timestamp: e.timestamp.getTime(), source: e.source, type: e.type, phase: e.phase ?? null, payload: JSON.stringify(e.payload), tokensUsed: e.cost?.tokens ?? null, costUsd: e.cost?.usd ?? null, durationMs: e.durationMs ?? null, })) ); } catch (error) { // On error, put events back in buffer to avoid loss this.eventBuffer.unshift(...toFlush); throw error; } } /** * Notify subscribers (in priority order) */ private async notify(event: ForgeEvent): Promise<void> { // Collect handlers for this event type + wildcard const typeHandlers = this.handlers.get(event.type) ?? new Set(); const wildcardHandlers = this.handlers.get('*') ?? new Set(); const allHandlers = [...typeHandlers, ...wildcardHandlers]; // Sort by priority (highest first) allHandlers.sort((a, b) => (b.options.priority ?? 0) - (a.options.priority ?? 0) ); for (const { handler, options } of allHandlers) { // Check filter predicate if (options.filter && !options.filter(event)) { continue; } // Invoke handler (catch errors to avoid breaking other subscribers) try { await handler(event); } catch (error) { console.error(`Event handler error for ${event.type}:`, error); } // Auto-unsubscribe if "once" if (options.once) { this.handlers.get(event.type)?.delete({ handler, options }); } } } /** * Cleanup on shutdown */ async dispose(): Promise<void> { clearInterval(this.flushInterval); await this.flush(); } }
1.2 Wildcard and Namespace Subscriptions
typescript// Support namespace matching: "review.*" matches "review.started", "review.finding" export class EventBus { // ... existing code ... /** * Check if a subscription pattern matches an event type */ private matches(pattern: string, eventType: string): boolean { if (pattern === '*') return true; if (pattern === eventType) return true; // Namespace wildcard: "review.*" if (pattern.endsWith('.*')) { const namespace = pattern.slice(0, -2); return eventType.startsWith(namespace + '.'); } return false; } /** * Enhanced notify that checks pattern matching */ private async notify(event: ForgeEvent): Promise<void> { const matchingHandlers: Array<{ handler: EventHandler; options: SubscriptionOptions; }> = []; // Find all matching subscriptions for (const [pattern, handlers] of this.handlers.entries()) { if (this.matches(pattern, event.type)) { matchingHandlers.push(...handlers); } } // Sort by priority matchingHandlers.sort((a, b) => (b.options.priority ?? 0) - (a.options.priority ?? 0) ); // Invoke handlers for (const { handler, options } of matchingHandlers) { if (options.filter && !options.filter(event)) continue; try { await handler(event); } catch (error) { console.error(`Event handler error for ${event.type}:`, error); } if (options.once) { // Find the pattern this came from and remove it for (const [pattern, handlers] of this.handlers.entries()) { if (handlers.has({ handler, options })) { handlers.delete({ handler, options }); } } } } } }
2. Event Persistence — SQLite Integration
2.1 Schema (from SYSTEM-DESIGN.md Section 5)
typescript// src/core/schema.ts import { sqliteTable, text, integer, real } from 'drizzle-orm/sqlite-core'; export const events = sqliteTable('events', { id: text('id').primaryKey(), // ulid traceId: text('trace_id').notNull(), // groups one pipeline run timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), source: text('source').notNull(), // agent or component id type: text('type').notNull(), // "plan.started", "review.finding", etc. phase: text('phase'), // current pipeline phase payload: text('payload', { mode: 'json' }), // event-specific data tokensUsed:integer('tokens_used'), costUsd: real('cost_usd'), durationMs:integer('duration_ms'), }); // Indexes for common queries export const eventsIndexes = { traceIdIdx: index('idx_events_trace_id').on(events.traceId), typeIdx: index('idx_events_type').on(events.type), timestampIdx: index('idx_events_timestamp').on(events.timestamp), sourceIdx: index('idx_events_source').on(events.source), };
2.2 Async vs Sync Write Strategy
Decision: Async write with batching
- Emit is non-blocking: Events go to in-memory buffer immediately
- Subscribers notified immediately: No wait for DB write
- Batch writes: Every N events or M seconds, whichever comes first
- Durability: On shutdown, flush all buffered events synchronously
Rationale:
- Event throughput can spike during agent loops (100+ events/sec)
- SQLite write lock would bottleneck if every emit waited for disk
- Buffering allows batch inserts (much faster)
- Risk: If process crashes, lose up to
bufferSizeevents — acceptable for observability (not transactional data)
2.3 Batching Configuration
typescriptinterface EventBusConfig { bufferSize: number; // Flush when buffer reaches this (default 100) flushIntervalMs: number; // Flush every N ms regardless (default 5000) maxRetries: number; // Retry failed writes (default 3) } const DEFAULT_CONFIG: EventBusConfig = { bufferSize: 100, flushIntervalMs: 5000, maxRetries: 3, };
2.4 Error Handling for Persistence
typescriptprivate async flush(): Promise<void> { if (this.eventBuffer.length === 0) return; const toFlush = [...this.eventBuffer]; this.eventBuffer = []; let retries = 0; while (retries < this.config.maxRetries) { try { await this.db.insert(events).values( toFlush.map(e => ({ id: e.id, traceId: e.traceId, timestamp: e.timestamp.getTime(), source: e.source, type: e.type, phase: e.phase ?? null, payload: JSON.stringify(e.payload), tokensUsed: e.cost?.tokens ?? null, costUsd: e.cost?.usd ?? null, durationMs: e.durationMs ?? null, })) ); return; // Success } catch (error) { retries++; if (retries >= this.config.maxRetries) { // Last resort: log to file system await this.dumpToFile(toFlush); throw new Error(`Failed to persist events after ${retries} retries`); } // Exponential backoff await sleep(Math.pow(2, retries) * 100); } } } private async dumpToFile(events: ForgeEvent[]): Promise<void> { const filename = `./events-dump-${Date.now()}.jsonl`; const lines = events.map(e => JSON.stringify(e)).join('\n'); await fs.writeFile(filename, lines); console.error(`Event persistence failed. Dumped to ${filename}`); }
3. Event Replay — Reconstructing History
3.1 Replay by TraceId
typescript/** * Replay all events for a specific pipeline run * Used for: debugging, reflection, human review, audit trail */ async replay(traceId: string): Promise<ForgeEvent[]> { const rows = await this.db .select() .from(events) .where(eq(events.traceId, traceId)) .orderBy(events.timestamp); return rows.map(row => this.deserializeEvent(row)); } private deserializeEvent(row: typeof events.$inferSelect): ForgeEvent { return { id: row.id, traceId: row.traceId, timestamp: new Date(row.timestamp), source: row.source, type: row.type, phase: row.phase ?? undefined, payload: row.payload ? JSON.parse(row.payload) : undefined, cost: row.tokensUsed ? { tokens: row.tokensUsed, usd: row.costUsd ?? 0, } : undefined, durationMs: row.durationMs ?? undefined, }; }
3.2 Replay by Time Window
typescript/** * Replay events since a timestamp * Used for: daily cost reports, trend analysis, error spike detection */ async replaySince( since: Date, options?: { type?: string; source?: string; phase?: string; limit?: number; } ): Promise<ForgeEvent[]> { let query = this.db .select() .from(events) .where(gte(events.timestamp, since.getTime())); if (options?.type) { query = query.where(eq(events.type, options.type)); } if (options?.source) { query = query.where(eq(events.source, options.source)); } if (options?.phase) { query = query.where(eq(events.phase, options.phase)); } if (options?.limit) { query = query.limit(options.limit); } const rows = await query.orderBy(events.timestamp); return rows.map(row => this.deserializeEvent(row)); }
3.3 Replay with Filters
typescriptinterface EventFilter { traceId?: string; type?: string | string[]; // Exact match or array of types typePattern?: string; // Wildcard: "review.*" source?: string | string[]; phase?: string | string[]; since?: Date; until?: Date; costMin?: number; // Filter by cost > threshold durationMin?: number; // Filter by duration > threshold limit?: number; offset?: number; } async query(filter: EventFilter): Promise<ForgeEvent[]> { let query = this.db.select().from(events); // Build WHERE clauses const conditions = []; if (filter.traceId) { conditions.push(eq(events.traceId, filter.traceId)); } if (filter.type) { if (Array.isArray(filter.type)) { conditions.push(inArray(events.type, filter.type)); } else { conditions.push(eq(events.type, filter.type)); } } if (filter.typePattern) { // Convert wildcard to SQL LIKE const pattern = filter.typePattern.replace('*', '%'); conditions.push(like(events.type, pattern)); } if (filter.since) { conditions.push(gte(events.timestamp, filter.since.getTime())); } if (filter.until) { conditions.push(lte(events.timestamp, filter.until.getTime())); } if (filter.costMin) { conditions.push(gte(events.costUsd, filter.costMin)); } if (conditions.length > 0) { query = query.where(and(...conditions)); } if (filter.limit) { query = query.limit(filter.limit); } if (filter.offset) { query = query.offset(filter.offset); } const rows = await query.orderBy(events.timestamp); return rows.map(row => this.deserializeEvent(row)); }
4. Event Type Registry — Full Type Safety
4.1 Complete Event Type Definitions
typescript// src/core/event-types.ts /** * Base event structure */ export interface ForgeEvent { id: string; traceId: string; timestamp: Date; source: string; type: string; phase?: PhaseName; payload: unknown; cost?: { tokens: number; usd: number }; durationMs?: number; } /** * Phase names */ export type PhaseName = | 'planning' | 'implementation' | 'review' | 'testing' | 'deployment' | 'monitoring'; /** * Event catalog — discriminated union for type safety */ export type ForgeEventType = // ─── Pipeline lifecycle events ─── | RunStartedEvent | RunCompletedEvent | RunFailedEvent | RunCancelledEvent // ─── Phase transition events ─── | PhaseStartedEvent | PhaseCompletedEvent | PhaseFailedEvent // ─── Agent events ─── | AgentIterationEvent | AgentReflectionEvent | AgentDecisionEvent // ─── Tool events ─── | ToolExecutedEvent | ToolFailedEvent // ─── Planning events ─── | PlanningStartedEvent | PlanningCompletedEvent | ArchitectureProposedEvent | TasksGeneratedEvent // ─── Implementation events ─── | ImplementationStartedEvent | ImplementationCompletedEvent | CodeGeneratedEvent | ValidationFailedEvent // ─── Review events ─── | ReviewStartedEvent | ReviewCompletedEvent | FindingDetectedEvent | RiskAssessedEvent // ─── Testing events ─── | TestingStartedEvent | TestingCompletedEvent | TestPassedEvent | TestFailedEvent | CoverageReportedEvent // ─── Deployment events ─── | DeploymentStartedEvent | DeploymentCompletedEvent | DeploymentFailedEvent | RollbackInitiatedEvent // ─── Safety events ─── | GateRequestedEvent | GateApprovedEvent | GateRejectedEvent | BreakerTrippedEvent // ─── Memory events ─── | MemoryStoredEvent | MemoryRecalledEvent | PatternLearnedEvent // ─── Human interaction events ─── | HumanInputRequestedEvent | HumanInputReceivedEvent | FindingDismissedEvent; /** * Individual event type definitions with payloads */ // ─── Pipeline Events ─── export interface RunStartedEvent extends ForgeEvent { type: 'run.started'; payload: { task: string; config: Record<string, unknown>; }; } export interface RunCompletedEvent extends ForgeEvent { type: 'run.completed'; payload: { status: 'success'; totalCostUsd: number; totalTokens: number; durationMs: number; }; } export interface RunFailedEvent extends ForgeEvent { type: 'run.failed'; payload: { error: string; phase: PhaseName; recoverable: boolean; }; } export interface RunCancelledEvent extends ForgeEvent { type: 'run.cancelled'; payload: { reason: string; cancelledBy: 'human' | 'breaker' | 'timeout'; }; } // ─── Phase Events ─── export interface PhaseStartedEvent extends ForgeEvent { type: 'phase.started'; payload: { phase: PhaseName; }; } export interface PhaseCompletedEvent extends ForgeEvent { type: 'phase.completed'; payload: { phase: PhaseName; iterations: number; durationMs: number; }; } export interface PhaseFailedEvent extends ForgeEvent { type: 'phase.failed'; payload: { phase: PhaseName; error: string; }; } // ─── Agent Events ─── export interface AgentIterationEvent extends ForgeEvent { type: 'agent.iteration'; payload: { iteration: number; agentType: AgentType; reasoning: string; action: string; outcome: 'success' | 'failure' | 'retry'; }; } export interface AgentReflectionEvent extends ForgeEvent { type: 'agent.reflection'; payload: { agentType: AgentType; learnings: Array<{ content: string; confidence: number; }>; }; } export interface AgentDecisionEvent extends ForgeEvent { type: 'agent.decision'; payload: { agentType: AgentType; decision: string; confidence: number; alternatives: string[]; }; } // ─── Tool Events ─── export interface ToolExecutedEvent extends ForgeEvent { type: 'tool.executed'; payload: { toolName: string; inputHash: string; outputHash: string; durationMs: number; cached: boolean; }; } export interface ToolFailedEvent extends ForgeEvent { type: 'tool.failed'; payload: { toolName: string; error: string; retryCount: number; willRetry: boolean; }; } // ─── Review Events ─── export interface ReviewStartedEvent extends ForgeEvent { type: 'review.started'; payload: { filesChanged: number; linesChanged: number; }; } export interface ReviewCompletedEvent extends ForgeEvent { type: 'review.completed'; payload: { approved: boolean; findingsCount: number; riskScore: number; requiresHuman: boolean; }; } export interface FindingDetectedEvent extends ForgeEvent { type: 'review.finding'; payload: { findingId: string; severity: 'info' | 'warning' | 'error' | 'critical'; category: string; file: string; line: number; message: string; confidence: number; }; } export interface RiskAssessedEvent extends ForgeEvent { type: 'review.risk_assessed'; payload: { score: number; level: 'low' | 'medium' | 'high' | 'critical'; factors: Record<string, number>; }; } // ─── Testing Events ─── export interface TestingStartedEvent extends ForgeEvent { type: 'testing.started'; payload: { testCount: number; strategy: string; }; } export interface TestingCompletedEvent extends ForgeEvent { type: 'testing.completed'; payload: { total: number; passed: number; failed: number; skipped: number; coverage: { line: number; branch: number; function: number; }; }; } export interface TestFailedEvent extends ForgeEvent { type: 'test.failed'; payload: { testName: string; suite: string; error: string; rootCause?: string; suggestedFix?: string; }; } // ─── Deployment Events ─── export interface DeploymentStartedEvent extends ForgeEvent { type: 'deployment.started'; payload: { deploymentId: string; strategy: 'canary' | 'blue_green' | 'rolling' | 'immediate'; environment: string; }; } export interface DeploymentCompletedEvent extends ForgeEvent { type: 'deployment.completed'; payload: { deploymentId: string; url: string; durationMs: number; }; } export interface RollbackInitiatedEvent extends ForgeEvent { type: 'deployment.rollback'; payload: { deploymentId: string; reason: string; trigger: 'automatic' | 'manual'; }; } // ─── Safety Events ─── export interface GateRequestedEvent extends ForgeEvent { type: 'gate.requested'; payload: { gateId: string; reason: string; urgency: 'low' | 'medium' | 'high'; timeout: number; }; } export interface GateApprovedEvent extends ForgeEvent { type: 'gate.approved'; payload: { gateId: string; approvedBy: string; comment?: string; }; } export interface BreakerTrippedEvent extends ForgeEvent { type: 'breaker.tripped'; payload: { breakerType: 'iteration' | 'cost' | 'time' | 'error_rate'; threshold: number; currentValue: number; }; } // ─── Memory Events ─── export interface MemoryStoredEvent extends ForgeEvent { type: 'memory.stored'; payload: { memoryId: string; type: 'episodic' | 'semantic' | 'procedural'; confidence: number; }; } export interface PatternLearnedEvent extends ForgeEvent { type: 'memory.pattern_learned'; payload: { pattern: string; trigger: string; successRate: number; }; } // ─── Human Interaction Events ─── export interface HumanInputRequestedEvent extends ForgeEvent { type: 'human.input_requested'; payload: { requestId: string; prompt: string; timeout: number; }; } export interface FindingDismissedEvent extends ForgeEvent { type: 'human.finding_dismissed'; payload: { findingId: string; reason?: string; }; }
4.2 Type-Safe Event Emission
typescript// Type-safe emit helpers export class EventBus { // ... existing code ... /** * Type-safe event emission */ async emitTyped<T extends ForgeEventType>( event: Omit<T, 'id' | 'timestamp'> ): Promise<T> { return this.emit(event) as Promise<T>; } /** * Type-safe subscription */ onTyped<T extends ForgeEventType>( type: T['type'], handler: (event: T) => void | Promise<void>, options?: SubscriptionOptions ): () => void { return this.on(type, handler as EventHandler, options); } } // Usage example: bus.emitTyped<ReviewCompletedEvent>({ traceId: 'xyz', source: 'reviewer-agent', type: 'review.completed', phase: 'review', payload: { approved: true, findingsCount: 3, riskScore: 0.4, requiresHuman: false, }, }); bus.onTyped<FindingDetectedEvent>('review.finding', async (event) => { // event.payload is typed as FindingDetectedEvent['payload'] console.log(`Finding: ${event.payload.message} at ${event.payload.file}:${event.payload.line}`); });
5. Trace Context — W3C Compatible
5.1 TraceContext Class
typescript// src/core/trace-context.ts import { ulid } from 'ulid'; /** * W3C Trace Context implementation * See: https://www.w3.org/TR/trace-context/ */ export class TraceContext { readonly traceId: string; // Root trace ID (ULID) readonly spanId: string; // Current span ID (ULID) readonly parentSpanId?: string; readonly traceFlags: number; // 01 = sampled private constructor( traceId: string, spanId: string, parentSpanId?: string, traceFlags: number = 0x01 ) { this.traceId = traceId; this.spanId = spanId; this.parentSpanId = parentSpanId; this.traceFlags = traceFlags; } /** * Create a new root trace context */ static create(): TraceContext { const traceId = ulid(); const spanId = ulid(); return new TraceContext(traceId, spanId); } /** * Create a child span from this context */ createChildSpan(): TraceContext { const childSpanId = ulid(); return new TraceContext( this.traceId, childSpanId, this.spanId, this.traceFlags ); } /** * Serialize to W3C traceparent header * Format: 00-{trace_id}-{span_id}-{flags} */ toTraceparent(): string { return `00-${this.traceId}-${this.spanId}-${this.traceFlags.toString(16).padStart(2, '0')}`; } /** * Parse from W3C traceparent header */ static fromTraceparent(traceparent: string): TraceContext { const [version, traceId, spanId, flags] = traceparent.split('-'); if (version !== '00') { throw new Error(`Unsupported traceparent version: ${version}`); } return new TraceContext(traceId, spanId, undefined, parseInt(flags, 16)); } /** * Check if this trace is sampled */ isSampled(): boolean { return (this.traceFlags & 0x01) === 0x01; } }
5.2 Span Helpers
typescript/** * Span represents a unit of work within a trace */ export interface Span { traceId: string; spanId: string; parentSpanId?: string; name: string; startTime: Date; endTime?: Date; attributes: Record<string, string | number | boolean>; events: Array<{ timestamp: Date; name: string; attributes?: Record<string, unknown> }>; } /** * Span builder for fluent API */ export class SpanBuilder { private span: Partial<Span> = { events: [], attributes: {}, }; constructor(private ctx: TraceContext, name: string) { this.span.traceId = ctx.traceId; this.span.spanId = ctx.spanId; this.span.parentSpanId = ctx.parentSpanId; this.span.name = name; this.span.startTime = new Date(); } setAttribute(key: string, value: string | number | boolean): this { this.span.attributes![key] = value; return this; } addEvent(name: string, attributes?: Record<string, unknown>): this { this.span.events!.push({ timestamp: new Date(), name, attributes }); return this; } end(): Span { this.span.endTime = new Date(); return this.span as Span; } } // Helper to run code within a span export async function withSpan<T>( ctx: TraceContext, name: string, fn: (span: SpanBuilder) => Promise<T> ): Promise<T> { const span = new SpanBuilder(ctx, name); try { const result = await fn(span); span.end(); return result; } catch (error) { span.setAttribute('error', true); span.addEvent('exception', { message: error instanceof Error ? error.message : String(error), }); span.end(); throw error; } } // Usage example: const ctx = TraceContext.create(); await withSpan(ctx, 'review_phase', async (span) => { span.setAttribute('files_changed', 5); const findings = await runReview(); span.addEvent('findings_detected', { count: findings.length }); return findings; });
6. Structured Logging — Integration with Events
6.1 Log Levels and Mapping
typescript// src/core/logger.ts export enum LogLevel { DEBUG = 0, INFO = 1, WARN = 2, ERROR = 3, CRITICAL = 4, } export interface LogEntry { level: LogLevel; timestamp: Date; traceId?: string; spanId?: string; source: string; message: string; context?: Record<string, unknown>; } export class Logger { constructor( private bus: EventBus, private minLevel: LogLevel = LogLevel.INFO ) {} debug(source: string, message: string, context?: Record<string, unknown>) { this.log(LogLevel.DEBUG, source, message, context); } info(source: string, message: string, context?: Record<string, unknown>) { this.log(LogLevel.INFO, source, message, context); } warn(source: string, message: string, context?: Record<string, unknown>) { this.log(LogLevel.WARN, source, message, context); } error(source: string, message: string, context?: Record<string, unknown>) { this.log(LogLevel.ERROR, source, message, context); } critical(source: string, message: string, context?: Record<string, unknown>) { this.log(LogLevel.CRITICAL, source, message, context); } private log( level: LogLevel, source: string, message: string, context?: Record<string, unknown> ) { if (level < this.minLevel) return; const entry: LogEntry = { level, timestamp: new Date(), traceId: context?.traceId as string | undefined, spanId: context?.spanId as string | undefined, source, message, context, }; // Emit as event (allows subscribers to capture logs) this.bus.emit({ traceId: entry.traceId ?? 'system', source, type: 'log.' + LogLevel[level].toLowerCase(), payload: entry, }); // Also write to console this.writeToConsole(entry); } private writeToConsole(entry: LogEntry) { const levelName = LogLevel[entry.level]; const timestamp = entry.timestamp.toISOString(); const traceInfo = entry.traceId ? ` [${entry.traceId.slice(0, 8)}]` : ''; const line = `${timestamp} ${levelName}${traceInfo} ${entry.source}: ${entry.message}`; switch (entry.level) { case LogLevel.DEBUG: case LogLevel.INFO: console.log(line); break; case LogLevel.WARN: console.warn(line); break; case LogLevel.ERROR: case LogLevel.CRITICAL: console.error(line); break; } // Log context if present (DEBUG only) if (entry.level === LogLevel.DEBUG && entry.context) { console.log(' Context:', entry.context); } } }
6.2 Log Retention Policy
typescriptinterface LogRetentionPolicy { DEBUG: string; // '7d' INFO: string; // '30d' WARN: string; // '90d' ERROR: string; // '1y' CRITICAL: string; // 'permanent' } const RETENTION_POLICY: LogRetentionPolicy = { DEBUG: '7d', INFO: '30d', WARN: '90d', ERROR: '1y', CRITICAL: 'permanent', }; // Cleanup job (run daily) async function cleanupOldLogs(db: DrizzleDB) { const now = Date.now(); // Delete DEBUG logs older than 7 days await db.delete(events) .where( and( eq(events.type, 'log.debug'), lt(events.timestamp, now - 7 * 24 * 60 * 60 * 1000) ) ); // Delete INFO logs older than 30 days await db.delete(events) .where( and( eq(events.type, 'log.info'), lt(events.timestamp, now - 30 * 24 * 60 * 60 * 1000) ) ); // ... similar for WARN, ERROR // CRITICAL logs are never deleted }
7. Metrics Collection — Cost, Tokens, Duration
7.1 MetricsCollector Class
typescript// src/core/metrics.ts export interface Metrics { // Run-level metrics runs: { total: number; succeeded: number; failed: number; cancelled: number; }; // Cost metrics cost: { total: number; byPhase: Record<PhaseName, number>; byAgent: Record<AgentType, number>; perDay: number; perMonth: number; }; // Token usage tokens: { total: number; byPhase: Record<PhaseName, number>; byAgent: Record<AgentType, number>; }; // Duration metrics duration: { avgPerRun: number; byPhase: Record<PhaseName, number>; }; // Iteration counts iterations: { avgPerPhase: Record<PhaseName, number>; }; // Tool usage tools: { totalCalls: number; byTool: Record<string, number>; avgDuration: Record<string, number>; }; // Error rates errors: { total: number; byPhase: Record<PhaseName, number>; }; } export class MetricsCollector { private bus: EventBus; private db: DrizzleDB; private cache: Map<string, number> = new Map(); constructor(bus: EventBus, db: DrizzleDB) { this.bus = bus; this.db = db; // Subscribe to cost/duration events to maintain in-memory aggregates this.bus.on('*', async (event) => { if (event.cost) { this.incrementCounter('cost.total', event.cost.usd); this.incrementCounter(`cost.phase.${event.phase}`, event.cost.usd); } if (event.durationMs) { this.incrementCounter('duration.total', event.durationMs); this.incrementCounter(`duration.phase.${event.phase}`, event.durationMs); } }); } private incrementCounter(key: string, value: number) { const current = this.cache.get(key) ?? 0; this.cache.set(key, current + value); } /** * Get current metrics (computed from events + cache) */ async getMetrics(since?: Date): Promise<Metrics> { const filter = since ? { since } : {}; const events = await this.bus.query(filter); return { runs: this.computeRunMetrics(events), cost: this.computeCostMetrics(events), tokens: this.computeTokenMetrics(events), duration: this.computeDurationMetrics(events), iterations: this.computeIterationMetrics(events), tools: this.computeToolMetrics(events), errors: this.computeErrorMetrics(events), }; } private computeCostMetrics(events: ForgeEvent[]): Metrics['cost'] { const total = events .filter(e => e.cost) .reduce((sum, e) => sum + e.cost!.usd, 0); const byPhase: Record<PhaseName, number> = {} as any; for (const phase of ['planning', 'implementation', 'review', 'testing', 'deployment'] as PhaseName[]) { byPhase[phase] = events .filter(e => e.phase === phase && e.cost) .reduce((sum, e) => sum + e.cost!.usd, 0); } // For byAgent, need to know which agent emitted the event const byAgent: Record<AgentType, number> = {} as any; for (const event of events) { if (event.cost) { const agentType = this.extractAgentType(event.source); if (agentType) { byAgent[agentType] = (byAgent[agentType] ?? 0) + event.cost.usd; } } } // Per-day: events from last 24h const oneDayAgo = new Date(Date.now() - 24 * 60 * 60 * 1000); const perDay = events .filter(e => e.timestamp >= oneDayAgo && e.cost) .reduce((sum, e) => sum + e.cost!.usd, 0); // Per-month: events from last 30 days const thirtyDaysAgo = new Date(Date.now() - 30 * 24 * 60 * 60 * 1000); const perMonth = events .filter(e => e.timestamp >= thirtyDaysAgo && e.cost) .reduce((sum, e) => sum + e.cost!.usd, 0); return { total, byPhase, byAgent, perDay, perMonth }; } private computeTokenMetrics(events: ForgeEvent[]): Metrics['tokens'] { const total = events .filter(e => e.cost?.tokens) .reduce((sum, e) => sum + e.cost!.tokens, 0); const byPhase: Record<PhaseName, number> = {} as any; for (const phase of ['planning', 'implementation', 'review', 'testing', 'deployment'] as PhaseName[]) { byPhase[phase] = events .filter(e => e.phase === phase && e.cost?.tokens) .reduce((sum, e) => sum + e.cost!.tokens, 0); } const byAgent: Record<AgentType, number> = {} as any; // ... similar to cost return { total, byPhase, byAgent }; } private computeDurationMetrics(events: ForgeEvent[]): Metrics['duration'] { // Average per run: sum of 'run.completed' durations / count const completedRuns = events.filter(e => e.type === 'run.completed'); const avgPerRun = completedRuns.length > 0 ? completedRuns.reduce((sum, e) => sum + (e.durationMs ?? 0), 0) / completedRuns.length : 0; // By phase: average duration of 'phase.completed' events const byPhase: Record<PhaseName, number> = {} as any; for (const phase of ['planning', 'implementation', 'review', 'testing', 'deployment'] as PhaseName[]) { const phaseCompletions = events.filter( e => e.type === 'phase.completed' && (e.payload as any)?.phase === phase ); byPhase[phase] = phaseCompletions.length > 0 ? phaseCompletions.reduce((sum, e) => sum + (e.durationMs ?? 0), 0) / phaseCompletions.length : 0; } return { avgPerRun, byPhase }; } private computeIterationMetrics(events: ForgeEvent[]): Metrics['iterations'] { const byPhase: Record<PhaseName, number> = {} as any; for (const phase of ['planning', 'implementation', 'review', 'testing', 'deployment'] as PhaseName[]) { const iterations = events.filter(e => e.type === 'agent.iteration' && e.phase === phase); byPhase[phase] = iterations.length; } return { avgPerPhase: byPhase }; } private computeToolMetrics(events: ForgeEvent[]): Metrics['tools'] { const toolEvents = events.filter(e => e.type === 'tool.executed'); const byTool: Record<string, number> = {}; const durations: Record<string, number[]> = {}; for (const event of toolEvents) { const toolName = (event.payload as any)?.toolName; if (toolName) { byTool[toolName] = (byTool[toolName] ?? 0) + 1; if (event.durationMs) { if (!durations[toolName]) durations[toolName] = []; durations[toolName].push(event.durationMs); } } } const avgDuration: Record<string, number> = {}; for (const [tool, durs] of Object.entries(durations)) { avgDuration[tool] = durs.reduce((a, b) => a + b, 0) / durs.length; } return { totalCalls: toolEvents.length, byTool, avgDuration, }; } private computeErrorMetrics(events: ForgeEvent[]): Metrics['errors'] { const errorEvents = events.filter(e => e.type.includes('.failed') || e.type === 'log.error' || e.type === 'log.critical' ); const byPhase: Record<PhaseName, number> = {} as any; for (const phase of ['planning', 'implementation', 'review', 'testing', 'deployment'] as PhaseName[]) { byPhase[phase] = errorEvents.filter(e => e.phase === phase).length; } return { total: errorEvents.length, byPhase, }; } private extractAgentType(source: string): AgentType | null { // Assumes source is like "planner-agent" or "implementer-agent" const match = source.match(/^(\w+)-agent$/); return match ? (match[1] as AgentType) : null; } }
8. Cost Tracking — LLM and Tool Costs
8.1 Cost Calculator
typescript// src/core/cost-calculator.ts interface ModelPricing { model: string; inputCostPer1M: number; // USD per 1M input tokens outputCostPer1M: number; // USD per 1M output tokens } const MODEL_PRICING: ModelPricing[] = [ { model: 'claude-sonnet-4-5-20250929', inputCostPer1M: 3.00, outputCostPer1M: 15.00, }, { model: 'claude-haiku-4-5-20251001', inputCostPer1M: 0.25, outputCostPer1M: 1.25, }, { model: 'gpt-4-turbo', inputCostPer1M: 10.00, outputCostPer1M: 30.00, }, ]; export class CostCalculator { private pricing: Map<string, ModelPricing>; constructor() { this.pricing = new Map( MODEL_PRICING.map(p => [p.model, p]) ); } /** * Calculate cost for an LLM call */ calculateLLMCost( model: string, inputTokens: number, outputTokens: number ): number { const pricing = this.pricing.get(model); if (!pricing) { throw new Error(`Unknown model: ${model}`); } const inputCost = (inputTokens / 1_000_000) * pricing.inputCostPer1M; const outputCost = (outputTokens / 1_000_000) * pricing.outputCostPer1M; return inputCost + outputCost; } /** * Calculate cost for tool execution (if applicable) */ calculateToolCost(toolName: string, durationMs: number): number { // Most tools are free (local execution) // Some tools have costs (external APIs) switch (toolName) { case 'github_api': // GitHub API is free for reasonable usage return 0; case 'cloud_deployment': // Hypothetical: deployment costs X per minute return (durationMs / 60_000) * 0.10; default: return 0; } } } // Integration with event bus export async function trackLLMCost( bus: EventBus, ctx: TraceContext, source: string, model: string, inputTokens: number, outputTokens: number ) { const calculator = new CostCalculator(); const cost = calculator.calculateLLMCost(model, inputTokens, outputTokens); await bus.emit({ traceId: ctx.traceId, source, type: 'llm.called', payload: { model, inputTokens, outputTokens, }, cost: { tokens: inputTokens + outputTokens, usd: cost, }, }); }
8.2 Cost Queries
typescript// Get cost for a specific run async function getRunCost(bus: EventBus, traceId: string): Promise<number> { const events = await bus.replay(traceId); return events .filter(e => e.cost) .reduce((sum, e) => sum + e.cost!.usd, 0); } // Get cost for a specific day async function getDailyCost(bus: EventBus, date: Date): Promise<number> { const start = new Date(date); start.setHours(0, 0, 0, 0); const end = new Date(date); end.setHours(23, 59, 59, 999); const events = await bus.query({ since: start, until: end, }); return events .filter(e => e.cost) .reduce((sum, e) => sum + e.cost!.usd, 0); } // Get cost for a specific month async function getMonthlyCost(bus: EventBus, year: number, month: number): Promise<number> { const start = new Date(year, month - 1, 1); const end = new Date(year, month, 0, 23, 59, 59, 999); const events = await bus.query({ since: start, until: end, }); return events .filter(e => e.cost) .reduce((sum, e) => sum + e.cost!.usd, 0); }
9. Observable Interface — Real-Time Updates
9.1 Observable Pattern
typescript// src/core/observable.ts export interface Observer<T = ForgeEvent> { next(value: T): void; error?(err: Error): void; complete?(): void; } export class Observable<T = ForgeEvent> { constructor( private subscribe: (observer: Observer<T>) => (() => void) ) {} /** * Subscribe to the observable */ on(observer: Observer<T>): () => void { return this.subscribe(observer); } /** * Transform events */ map<U>(fn: (value: T) => U): Observable<U> { return new Observable<U>((observer) => { return this.subscribe({ next: (value) => observer.next(fn(value)), error: observer.error, complete: observer.complete, }); }); } /** * Filter events */ filter(predicate: (value: T) => boolean): Observable<T> { return new Observable<T>((observer) => { return this.subscribe({ next: (value) => { if (predicate(value)) observer.next(value); }, error: observer.error, complete: observer.complete, }); }); } } // Create observable from event bus export function createEventObservable( bus: EventBus, type?: string ): Observable<ForgeEvent> { return new Observable<ForgeEvent>((observer) => { const unsubscribe = bus.on(type ?? '*', (event) => { observer.next(event); }); return unsubscribe; }); } // Usage example: const observable = createEventObservable(bus, 'review.*'); const unsubscribe = observable .filter(e => (e.payload as any)?.severity === 'critical') .map(e => ({ message: `Critical finding: ${(e.payload as any)?.message}`, timestamp: e.timestamp, })) .on({ next: (value) => console.log(value), error: (err) => console.error(err), });
9.2 CLI UI Integration
typescript// CLI progress UI can subscribe to events import { EventBus } from './bus'; export class CLIProgressUI { constructor(private bus: EventBus) { this.setupSubscriptions(); } private setupSubscriptions() { // Listen for phase transitions this.bus.on('phase.started', (event) => { const phase = (event.payload as any).phase; console.log(`\n🚀 Starting phase: ${phase}`); }); this.bus.on('phase.completed', (event) => { const { phase, durationMs } = event.payload as any; console.log(`✅ Completed ${phase} in ${durationMs}ms`); }); // Listen for findings this.bus.on('review.finding', (event) => { const { severity, message, file, line } = event.payload as any; const icon = severity === 'critical' ? '🔴' : severity === 'error' ? '🟠' : '🟡'; console.log(` ${icon} ${file}:${line} - ${message}`); }); // Listen for test results this.bus.on('test.failed', (event) => { const { testName, error } = event.payload as any; console.log(` ❌ ${testName}: ${error}`); }); // Listen for cost updates this.bus.on('*', (event) => { if (event.cost) { // Update running cost counter this.updateCostDisplay(event.cost.usd); } }); } private updateCostDisplay(incrementUsd: number) { // Update in-place cost counter (e.g., using terminal escape codes) // Implementation depends on CLI framework (ink, blessed, etc.) } }
10. Snapshot & Restore — Checkpoint Integration
10.1 Snapshotting Event Bus State
typescriptexport interface EventBusSnapshot { subscriptions: Array<{ type: string; handlerCount: number; }>; bufferedEvents: ForgeEvent[]; timestamp: Date; } export class EventBus { // ... existing code ... /** * Capture current bus state for checkpoint */ snapshot(): EventBusSnapshot { const subscriptions = Array.from(this.handlers.entries()).map( ([type, handlers]) => ({ type, handlerCount: handlers.size, }) ); return { subscriptions, bufferedEvents: [...this.eventBuffer], timestamp: new Date(), }; } /** * Restore from snapshot (replay buffered events) */ async restore(snapshot: EventBusSnapshot): Promise<void> { // Re-emit buffered events that weren't persisted for (const event of snapshot.bufferedEvents) { await this.notify(event); // Notify subscribers only (don't re-persist) } } }
10.2 Integration with Checkpoint System
typescript// From orchestrator/checkpoint.ts import { EventBus, EventBusSnapshot } from '../core/bus'; export interface Checkpoint { id: string; traceId: string; phase: PhaseName; state: Record<string, unknown>; eventBusSnapshot: EventBusSnapshot; timestamp: Date; } export class CheckpointManager { constructor( private bus: EventBus, private db: DrizzleDB ) {} async createCheckpoint( traceId: string, phase: PhaseName, state: Record<string, unknown> ): Promise<Checkpoint> { const checkpoint: Checkpoint = { id: ulid(), traceId, phase, state, eventBusSnapshot: this.bus.snapshot(), timestamp: new Date(), }; await this.db.insert(checkpoints).values({ id: checkpoint.id, traceId: checkpoint.traceId, phase: checkpoint.phase, state: JSON.stringify({ ...checkpoint.state, eventBusSnapshot: checkpoint.eventBusSnapshot, }), timestamp: checkpoint.timestamp.getTime(), }); return checkpoint; } async restoreCheckpoint(checkpointId: string): Promise<Checkpoint> { const row = await this.db .select() .from(checkpoints) .where(eq(checkpoints.id, checkpointId)) .limit(1); if (row.length === 0) { throw new Error(`Checkpoint not found: ${checkpointId}`); } const state = JSON.parse(row[0].state); const eventBusSnapshot = state.eventBusSnapshot; delete state.eventBusSnapshot; // Restore event bus state await this.bus.restore(eventBusSnapshot); return { id: row[0].id, traceId: row[0].traceId, phase: row[0].phase as PhaseName, state, eventBusSnapshot, timestamp: new Date(row[0].timestamp), }; } }
11. Performance Targets and Constraints
11.1 Throughput Targets
- Event emission rate: 1000+ events/second
- Subscription invocation: < 1ms per handler
- Batch write to SQLite: < 100ms for 100 events
- Replay query: < 500ms for 10,000 events
- Memory footprint: < 50MB for in-memory buffer
11.2 Memory Limits
typescriptinterface EventBusConfig { bufferSize: number; // Max events in buffer (default 100) maxBufferMemoryMB: number; // Max memory for buffer (default 10) maxSubscribers: number; // Max subscribers per event type (default 100) flushIntervalMs: number; // Periodic flush (default 5000) } export class EventBus { private config: EventBusConfig; constructor(db: DrizzleDB, config?: Partial<EventBusConfig>) { this.config = { bufferSize: 100, maxBufferMemoryMB: 10, maxSubscribers: 100, flushIntervalMs: 5000, ...config, }; // ... rest of constructor } /** * Check if buffer exceeds memory limit */ private isBufferOverMemoryLimit(): boolean { const bufferSizeBytes = JSON.stringify(this.eventBuffer).length; const bufferSizeMB = bufferSizeBytes / (1024 * 1024); return bufferSizeMB > this.config.maxBufferMemoryMB; } /** * Enhanced flush check */ async emit(event: Omit<ForgeEvent, 'id' | 'timestamp'>): Promise<ForgeEvent> { const fullEvent: ForgeEvent = { ...event, id: ulid(), timestamp: new Date(), }; this.eventBuffer.push(fullEvent); // Flush if buffer size OR memory limit exceeded if ( this.eventBuffer.length >= this.config.bufferSize || this.isBufferOverMemoryLimit() ) { await this.flush(); } await this.notify(fullEvent); return fullEvent; } }
11.3 Old Event Pruning
typescript/** * Prune old events from SQLite to prevent unbounded growth * Run this as a periodic background job */ async function pruneOldEvents( db: DrizzleDB, retentionDays: number = 90 ): Promise<number> { const cutoff = Date.now() - retentionDays * 24 * 60 * 60 * 1000; // Keep all events with these types (permanent audit trail) const keepTypes = [ 'run.started', 'run.completed', 'run.failed', 'deployment.completed', 'deployment.rollback', 'gate.approved', 'gate.rejected', ]; const result = await db .delete(events) .where( and( lt(events.timestamp, cutoff), notInArray(events.type, keepTypes) ) ); return result.rowsAffected ?? 0; } // Recommended: Run daily via cron or scheduled job // Example: keep last 90 days, prune older (except critical events)
12. Testing — TestEventBus
12.1 TestEventBus Implementation
typescript// src/core/bus.test-helper.ts /** * In-memory event bus for testing (no SQLite dependency) */ export class TestEventBus extends EventBus { private capturedEvents: ForgeEvent[] = []; constructor() { // Pass a mock DB (no actual persistence) super(null as any, { bufferSize: 1000, flushIntervalMs: 999999 }); } /** * Override flush to capture events instead of writing to DB */ protected async flush(): Promise<void> { this.capturedEvents.push(...this.eventBuffer); this.eventBuffer = []; } /** * Get all captured events */ getCapturedEvents(): ForgeEvent[] { return [...this.capturedEvents]; } /** * Get events of a specific type */ getEventsByType(type: string): ForgeEvent[] { return this.capturedEvents.filter(e => e.type === type); } /** * Assert event was emitted */ assertEventEmitted(type: string, count: number = 1): void { const events = this.getEventsByType(type); if (events.length !== count) { throw new Error( `Expected ${count} events of type "${type}", got ${events.length}` ); } } /** * Assert event order */ assertEventOrder(types: string[]): void { const actualTypes = this.capturedEvents.map(e => e.type); const matchingIndices = types.map(t => actualTypes.indexOf(t)); // Check if indices are in ascending order for (let i = 1; i < matchingIndices.length; i++) { if (matchingIndices[i] <= matchingIndices[i - 1]) { throw new Error( `Event order violation: expected ${types[i]} after ${types[i - 1]}` ); } } } /** * Assert payload correctness */ assertEventPayload( type: string, predicate: (payload: unknown) => boolean ): void { const events = this.getEventsByType(type); if (events.length === 0) { throw new Error(`No events of type "${type}" found`); } const event = events[0]; if (!predicate(event.payload)) { throw new Error( `Event payload does not match predicate: ${JSON.stringify(event.payload)}` ); } } /** * Clear captured events */ clear(): void { this.capturedEvents = []; } }
12.2 Example Test Cases
typescript// tests/event-bus.test.ts import { describe, it, expect } from 'bun:test'; import { TestEventBus } from '../src/core/bus.test-helper'; describe('EventBus', () => { it('should emit events to subscribers', async () => { const bus = new TestEventBus(); const received: ForgeEvent[] = []; bus.on('test.event', (event) => { received.push(event); }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'test.event', payload: { foo: 'bar' }, }); expect(received).toHaveLength(1); expect(received[0].type).toBe('test.event'); expect(received[0].payload).toEqual({ foo: 'bar' }); }); it('should support wildcard subscriptions', async () => { const bus = new TestEventBus(); const received: ForgeEvent[] = []; bus.on('*', (event) => { received.push(event); }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'event.one', payload: {}, }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'event.two', payload: {}, }); expect(received).toHaveLength(2); }); it('should support namespace subscriptions', async () => { const bus = new TestEventBus(); const received: ForgeEvent[] = []; bus.on('review.*', (event) => { received.push(event); }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'review.started', payload: {}, }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'review.finding', payload: {}, }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'test.started', // Should NOT match payload: {}, }); expect(received).toHaveLength(2); expect(received[0].type).toBe('review.started'); expect(received[1].type).toBe('review.finding'); }); it('should invoke handlers in priority order', async () => { const bus = new TestEventBus(); const order: number[] = []; bus.on('test', () => order.push(1), { priority: 1 }); bus.on('test', () => order.push(3), { priority: 3 }); bus.on('test', () => order.push(2), { priority: 2 }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'test', payload: {}, }); expect(order).toEqual([3, 2, 1]); // Highest priority first }); it('should support once() for auto-unsubscribe', async () => { const bus = new TestEventBus(); let callCount = 0; bus.once('test', () => callCount++); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'test', payload: {}, }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'test', payload: {}, }); expect(callCount).toBe(1); // Only called once }); it('should assert event emission', async () => { const bus = new TestEventBus(); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'review.finding', payload: {}, }); bus.assertEventEmitted('review.finding', 1); }); it('should assert event order', async () => { const bus = new TestEventBus(); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'phase.started', payload: {}, }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'agent.iteration', payload: {}, }); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'phase.completed', payload: {}, }); bus.assertEventOrder(['phase.started', 'agent.iteration', 'phase.completed']); }); it('should assert payload correctness', async () => { const bus = new TestEventBus(); await bus.emit({ traceId: 'test-trace', source: 'test', type: 'review.finding', payload: { severity: 'critical', message: 'Security issue', }, }); bus.assertEventPayload( 'review.finding', (payload: any) => payload.severity === 'critical' ); }); });
Summary
This implementation plan provides a complete, production-ready design for the Event Bus & Observability system. All code is TypeScript, fully type-safe, and ready to build.
Key Implementation Files:
/src/core/bus.ts— EventBus class with pub/sub and persistence/src/core/event-types.ts— Complete event type registry (20+ events)/src/core/trace-context.ts— W3C trace context and span helpers/src/core/logger.ts— Structured logging integrated with event bus/src/core/metrics.ts— MetricsCollector for cost, tokens, duration/src/core/cost-calculator.ts— LLM and tool cost tracking/src/core/observable.ts— Observable interface for real-time UI/src/core/bus.test-helper.ts— TestEventBus for unit tests/src/core/schema.ts— SQLite events table schema
Build Order:
- Week 1: Core types, EventBus, TraceContext, Logger, Schema
- Week 2: MetricsCollector, CostCalculator, Observable, TestEventBus
- Week 3+: Integration with agents (emit events from agent loop)
This is the foundation that makes feedback loops, audit trails, cost tracking, and learning possible.