Data Model Implementation Plan (Section 5)
Data Model Implementation Plan (Section 5)
Plan: 05-data-model.md Date: 2026-02-07 Status: Implementation Ready Dependencies: Core types, event bus foundation
Executive Summary
This plan details the complete SQLite database implementation for Forge using Drizzle ORM. The data model is the system's ground truth, storing all events, memories, patterns, execution state, and learning artifacts. Every decision, action, and outcome flows through this schema.
Key Design Principles:
- Event-sourced where possible — append-only events enable full replay
- Optimized for common queries — indexed for typical access patterns
- Embeddings in SQLite — avoid external vector DB in MVP
- Supports learning loops — memory, patterns, and consolidation built-in
- Migration-friendly — schema can evolve without breaking existing data
1. Drizzle Setup
1.1 Dependencies
json{ "dependencies": { "drizzle-orm": "^0.29.0", "better-sqlite3": "^9.2.0" }, "devDependencies": { "drizzle-kit": "^0.20.0", "@types/better-sqlite3": "^7.6.8" } }
Why better-sqlite3 over bun:sqlite:
- More mature, battle-tested
- Better TypeScript support
- Works with Drizzle's migration tooling
- Can switch to bun:sqlite later if needed (API compatible)
Alternative: For production at scale, use libSQL (turso) which is SQLite-compatible but adds replication.
1.2 Configuration
drizzle.config.ts:
typescriptimport type { Config } from 'drizzle-kit'; export default { schema: './src/memory/schema.ts', out: './drizzle/migrations', driver: 'better-sqlite', dbCredentials: { url: process.env.DATABASE_URL || '.forge/memory.db', }, verbose: true, strict: true, } satisfies Config;
Connection configuration (src/memory/db.ts):
typescriptimport Database from 'better-sqlite3'; import { drizzle } from 'drizzle-orm/better-sqlite3'; import * as schema from './schema'; export function createDatabase(path: string = '.forge/memory.db') { // Create SQLite connection const sqlite = new Database(path); // Enable WAL mode for better concurrency sqlite.pragma('journal_mode = WAL'); // Busy timeout for concurrent access (30 seconds) sqlite.pragma('busy_timeout = 30000'); // Foreign keys enabled sqlite.pragma('foreign_keys = ON'); // Synchronous mode: NORMAL (fast, safe enough for local dev) sqlite.pragma('synchronous = NORMAL'); // Memory-mapped I/O for performance (256MB) sqlite.pragma('mmap_size = 268435456'); // Cache size (64MB) sqlite.pragma('cache_size = -64000'); // Create Drizzle instance return drizzle(sqlite, { schema }); } // Export singleton instance for application use export const db = createDatabase();
1.3 Migration Strategy
Workflow:
bash# Generate migration from schema changes bun drizzle-kit generate:sqlite # Review migration in drizzle/migrations/ cat drizzle/migrations/0001_*.sql # Apply migration bun drizzle-kit push:sqlite # Or for programmatic migration: bun run src/memory/migrate.ts
Programmatic migration (src/memory/migrate.ts):
typescriptimport { migrate } from 'drizzle-orm/better-sqlite3/migrator'; import { db } from './db'; export async function runMigrations() { console.log('Running migrations...'); migrate(db, { migrationsFolder: './drizzle/migrations', migrationsTable: '__drizzle_migrations__' }); console.log('Migrations complete'); } // Run if executed directly if (import.meta.main) { runMigrations(); }
Backwards compatibility approach:
- Never drop columns (mark deprecated instead)
- Use nullable columns for new fields
- Write backfill scripts for data migrations
- Version the schema in a
schema_versiontable
2. Complete Schema
2.1 Events Table (Append-Only Audit Trail)
typescriptimport { sqliteTable, text, integer, real, blob, index } from 'drizzle-orm/sqlite-core'; import { sql } from 'drizzle-orm'; export const events = sqliteTable('events', { // Identity id: text('id').primaryKey(), // ULID for time-sortable IDs traceId: text('trace_id').notNull(), // Groups one pipeline run timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), // Source attribution source: text('source').notNull(), // agent or component id type: text('type').notNull(), // "plan.started", "review.finding", etc. phase: text('phase'), // current pipeline phase // Payload payload: text('payload', { mode: 'json' }), // event-specific data // Cost tracking tokensUsed: integer('tokens_used'), costUsd: real('cost_usd'), // Performance durationMs: integer('duration_ms'), // Metadata level: text('level', { enum: ['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'] }).default('INFO'), }, (table) => ({ // Indexes for common queries traceIdIdx: index('events_trace_id_idx').on(table.traceId), typeIdx: index('events_type_idx').on(table.type), timestampIdx: index('events_timestamp_idx').on(table.timestamp), sourceIdx: index('events_source_idx').on(table.source), phaseIdx: index('events_phase_idx').on(table.phase), // Composite index for cost queries costIdx: index('events_cost_idx').on(table.traceId, table.costUsd), // Composite for error queries errorIdx: index('events_error_idx').on(table.level, table.timestamp) .where(sql`level IN ('ERROR', 'CRITICAL')`), })); // Type inference export type Event = typeof events.$inferSelect; export type NewEvent = typeof events.$inferInsert;
Why this structure:
- Append-only: never update/delete events (enables replay)
- JSON payload: flexible event-specific data
- Indexed for: trace replay, type filtering, time-series queries, cost analysis
- Partial index on errors: faster error queries without full table scan
2.2 Memories Table
typescriptexport const memories = sqliteTable('memories', { // Identity id: text('id').primaryKey(), type: text('type', { enum: ['episodic', 'semantic', 'procedural'] }).notNull(), // Content content: text('content').notNull(), // Human-readable description context: text('context').notNull(), // When is this relevant? // Embedding for similarity search embedding: blob('embedding', { mode: 'buffer' }), // Float32Array serialized // Confidence and decay confidence: real('confidence').notNull().default(0.5), // 0.0 - 1.0 // Provenance source: text('source'), // What event created this? tags: text('tags', { mode: 'json' }).$type<string[]>(), // ["typescript", "testing"] // Temporal tracking createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), lastAccessed: integer('last_accessed', { mode: 'timestamp_ms' }).notNull(), accessCount: integer('access_count').notNull().default(0), // Lifecycle archived: integer('archived', { mode: 'boolean' }).default(false), }, (table) => ({ typeIdx: index('memories_type_idx').on(table.type), confidenceIdx: index('memories_confidence_idx').on(table.confidence), lastAccessedIdx: index('memories_last_accessed_idx').on(table.lastAccessed), // Partial index for active memories activeIdx: index('memories_active_idx').on(table.confidence, table.lastAccessed) .where(sql`archived = 0 AND confidence > 0.2`), // Index for tag searches (using json_each virtual table) tagsIdx: index('memories_tags_idx').on(table.tags), })); export type Memory = typeof memories.$inferSelect; export type NewMemory = typeof memories.$inferInsert;
Embedding storage details:
typescript// Helper functions for embedding serialization export function serializeEmbedding(embedding: Float32Array): Buffer { return Buffer.from(embedding.buffer); } export function deserializeEmbedding(buffer: Buffer): Float32Array { return new Float32Array( buffer.buffer, buffer.byteOffset, buffer.byteLength / Float32Array.BYTES_PER_ELEMENT ); } // Usage: // const embedding = await embedText(memory.content); // await db.insert(memories).values({ // ...memory, // embedding: serializeEmbedding(embedding) // });
2.3 Patterns Table
typescriptexport const patterns = sqliteTable('patterns', { // Identity id: text('id').primaryKey(), type: text('type', { enum: ['success', 'failure', 'approach'] }).notNull(), // Pattern definition trigger: text('trigger').notNull(), // What situation activates this? pattern: text('pattern').notNull(), // The pattern itself resolution: text('resolution'), // What to do when triggered // Statistics frequency: integer('frequency').notNull().default(1), // Times observed successRate: real('success_rate'), // How often this works (0.0 - 1.0) confidence: real('confidence').notNull().default(0.5), // Temporal lastSeen: integer('last_seen', { mode: 'timestamp_ms' }).notNull(), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), // GEP Protocol fields priority: integer('priority').default(1), // Gene priority mutations: integer('mutations').default(0), // Evolution tracking }, (table) => ({ typeIdx: index('patterns_type_idx').on(table.type), triggerIdx: index('patterns_trigger_idx').on(table.trigger), frequencyIdx: index('patterns_frequency_idx').on(table.frequency), // Composite for pattern selection scoreIdx: index('patterns_score_idx').on(table.type, table.confidence, table.frequency), })); export type Pattern = typeof patterns.$inferSelect; export type NewPattern = typeof patterns.$inferInsert;
2.4 Checkpoints Table
typescriptexport const checkpoints = sqliteTable('checkpoints', { // Identity id: text('id').primaryKey(), traceId: text('trace_id').notNull(), phase: text('phase').notNull(), // State snapshot (JSON-serialized phase output) state: text('state', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), // Temporal timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), // Metadata sizeBytes: integer('size_bytes'), // Compressed state size isValid: integer('is_valid', { mode: 'boolean' }).default(true), }, (table) => ({ traceIdIdx: index('checkpoints_trace_id_idx').on(table.traceId), phaseIdx: index('checkpoints_phase_idx').on(table.phase), timestampIdx: index('checkpoints_timestamp_idx').on(table.timestamp), })); export type Checkpoint = typeof checkpoints.$inferSelect; export type NewCheckpoint = typeof checkpoints.$inferInsert;
2.5 Runs Table (Pipeline Execution History)
typescriptexport const runs = sqliteTable('runs', { // Identity (same as traceId) id: text('id').primaryKey(), // Task description task: text('task').notNull(), // Status status: text('status', { enum: ['pending', 'running', 'completed', 'failed', 'cancelled'] }).notNull(), currentPhase: text('current_phase'), // Configuration snapshot config: text('config', { mode: 'json' }).$type<Record<string, unknown>>(), // Temporal startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), completedAt: integer('completed_at', { mode: 'timestamp_ms' }), // Cost tracking totalCostUsd: real('total_cost_usd').default(0), totalTokens: integer('total_tokens').default(0), // Outcome error: text('error'), // If failed, why errorType: text('error_type'), // Classification // Metadata humanInterventions: integer('human_interventions').default(0), bounces: integer('bounces').default(0), // Review/test bounce count }, (table) => ({ statusIdx: index('runs_status_idx').on(table.status), startedAtIdx: index('runs_started_at_idx').on(table.startedAt), costIdx: index('runs_cost_idx').on(table.totalCostUsd), // Composite for active runs activeIdx: index('runs_active_idx').on(table.status, table.startedAt) .where(sql`status IN ('pending', 'running')`), })); export type Run = typeof runs.$inferSelect; export type NewRun = typeof runs.$inferInsert;
2.6 Findings Table (Review/Test Issues)
typescriptexport const findings = sqliteTable('findings', { // Identity id: text('id').primaryKey(), runId: text('run_id').notNull().references(() => runs.id, { onDelete: 'cascade' }), // Source phase: text('phase', { enum: ['review', 'testing'] }).notNull(), analyzer: text('analyzer'), // Which tool/agent found this // Severity severity: text('severity', { enum: ['info', 'warning', 'error', 'critical'] }).notNull(), category: text('category', { enum: ['style', 'security', 'correctness', 'performance', 'maintainability', 'test'] }).notNull(), // Location file: text('file'), line: integer('line'), column: integer('column'), // Content message: text('message').notNull(), detail: text('detail'), // Extended explanation // Fix suggestion fixable: integer('fixable', { mode: 'boolean' }).default(false), fix: text('fix'), // Suggested code change confidence: real('confidence'), // 0.0 - 1.0 // Human feedback dismissed: integer('dismissed', { mode: 'boolean' }).default(false), dismissedBy: text('dismissed_by'), // Reason for dismissal dismissedAt: integer('dismissed_at', { mode: 'timestamp_ms' }), // Applied applied: integer('applied', { mode: 'boolean' }).default(false), appliedAt: integer('applied_at', { mode: 'timestamp_ms' }), // Temporal createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('findings_run_id_idx').on(table.runId), severityIdx: index('findings_severity_idx').on(table.severity), categoryIdx: index('findings_category_idx').on(table.category), // Partial index for active findings (not dismissed) activeIdx: index('findings_active_idx').on(table.severity, table.runId) .where(sql`dismissed = 0`), // Composite for file-based queries fileIdx: index('findings_file_idx').on(table.file, table.line), })); export type Finding = typeof findings.$inferSelect; export type NewFinding = typeof findings.$inferInsert;
3. Additional Tables
3.1 Sessions Table
typescriptexport const sessions = sqliteTable('sessions', { // Identity id: text('id').primaryKey(), // Association runId: text('run_id').references(() => runs.id, { onDelete: 'set null' }), // Context workingMemory: text('working_memory', { mode: 'json' }).$type<unknown>(), taskStack: text('task_stack', { mode: 'json' }).$type<string[]>(), // Temporal startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), lastActivityAt: integer('last_activity_at', { mode: 'timestamp_ms' }).notNull(), endedAt: integer('ended_at', { mode: 'timestamp_ms' }), // Status active: integer('active', { mode: 'boolean' }).default(true), }, (table) => ({ activeIdx: index('sessions_active_idx').on(table.active, table.lastActivityAt) .where(sql`active = 1`), })); export type Session = typeof sessions.$inferSelect; export type NewSession = typeof sessions.$inferInsert;
3.2 Tool Executions Table
typescriptexport const toolExecutions = sqliteTable('tool_executions', { // Identity id: text('id').primaryKey(), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), traceId: text('trace_id').notNull(), // Tool info toolName: text('tool_name').notNull(), toolVersion: text('tool_version'), // Input/Output (hashed for deduplication) inputHash: text('input_hash').notNull(), outputHash: text('output_hash'), input: text('input', { mode: 'json' }).$type<unknown>(), output: text('output', { mode: 'json' }).$type<unknown>(), // Execution success: integer('success', { mode: 'boolean' }).notNull(), error: text('error'), durationMs: integer('duration_ms').notNull(), retryCount: integer('retry_count').default(0), // Cache cacheHit: integer('cache_hit', { mode: 'boolean' }).default(false), // Temporal executedAt: integer('executed_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ toolNameIdx: index('tool_executions_tool_name_idx').on(table.toolName), traceIdIdx: index('tool_executions_trace_id_idx').on(table.traceId), inputHashIdx: index('tool_executions_input_hash_idx').on(table.inputHash), cacheIdx: index('tool_executions_cache_idx').on(table.toolName, table.inputHash), })); export type ToolExecution = typeof toolExecutions.$inferSelect; export type NewToolExecution = typeof toolExecutions.$inferInsert;
3.3 Cost Tracking Table
typescriptexport const costTracking = sqliteTable('cost_tracking', { // Identity id: text('id').primaryKey(), // Association traceId: text('trace_id'), runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), // Agent/Model agentId: text('agent_id').notNull(), agentType: text('agent_type').notNull(), modelName: text('model_name').notNull(), // Usage tokensPrompt: integer('tokens_prompt').notNull(), tokensCompletion: integer('tokens_completion').notNull(), tokensTotal: integer('tokens_total').notNull(), // Cost costUsd: real('cost_usd').notNull(), // Temporal timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('cost_tracking_run_id_idx').on(table.runId), timestampIdx: index('cost_tracking_timestamp_idx').on(table.timestamp), agentTypeIdx: index('cost_tracking_agent_type_idx').on(table.agentType), // Composite for cost aggregation aggregationIdx: index('cost_tracking_aggregation_idx') .on(table.timestamp, table.agentType, table.modelName), })); export type CostTracking = typeof costTracking.$inferSelect; export type NewCostTracking = typeof costTracking.$inferInsert;
3.4 Human Decisions Table
typescriptexport const humanDecisions = sqliteTable('human_decisions', { // Identity id: text('id').primaryKey(), // Association runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), // Gate gateType: text('gate_type', { enum: ['architecture_approval', 'production_deploy', 'security_findings', 'cost_overrun'] }).notNull(), phase: text('phase').notNull(), // Context contextSummary: text('context_summary').notNull(), riskScore: real('risk_score'), // Decision decision: text('decision', { enum: ['approved', 'rejected', 'changes_requested'] }).notNull(), feedback: text('feedback'), edits: text('edits'), // What the human changed // Temporal requestedAt: integer('requested_at', { mode: 'timestamp_ms' }).notNull(), decidedAt: integer('decided_at', { mode: 'timestamp_ms' }).notNull(), durationMs: integer('duration_ms').notNull(), }, (table) => ({ runIdIdx: index('human_decisions_run_id_idx').on(table.runId), gateTypeIdx: index('human_decisions_gate_type_idx').on(table.gateType), decisionIdx: index('human_decisions_decision_idx').on(table.decision), })); export type HumanDecision = typeof humanDecisions.$inferSelect; export type NewHumanDecision = typeof humanDecisions.$inferInsert;
3.5 Agent Configs Table
typescriptexport const agentConfigs = sqliteTable('agent_configs', { // Identity id: text('id').primaryKey(), agentType: text('agent_type').notNull().unique(), // Configuration config: text('config', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), // Model selection modelName: text('model_name').notNull(), temperature: real('temperature').default(0.7), maxTokens: integer('max_tokens'), // Safety maxIterations: integer('max_iterations').default(10), maxCostUsd: real('max_cost_usd').default(5), // Metadata version: integer('version').notNull().default(1), active: integer('active', { mode: 'boolean' }).default(true), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), updatedAt: integer('updated_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ agentTypeIdx: index('agent_configs_agent_type_idx').on(table.agentType), })); export type AgentConfig = typeof agentConfigs.$inferSelect; export type NewAgentConfig = typeof agentConfigs.$inferInsert;
4. Index Strategy
4.1 Single-Column Indexes
Purpose: Fast filtering on commonly queried columns.
| Table | Column | Rationale |
|---|---|---|
| events | trace_id | Replay all events for a run |
| events | type | Filter by event type |
| events | timestamp | Time-series queries |
| events | source | Filter by agent/component |
| events | phase | Filter by pipeline phase |
| memories | type | Filter by memory type |
| memories | confidence | Filter low-confidence memories |
| memories | last_accessed | Find stale memories |
| patterns | type | Filter by pattern type |
| patterns | trigger | Find patterns by trigger |
| runs | status | Find active/failed runs |
| runs | started_at | Time-series run history |
| findings | run_id | Get all findings for a run |
| findings | severity | Filter critical findings |
4.2 Composite Indexes
Purpose: Optimize multi-column filters and sorts.
typescript// events: cost queries by run index('events_cost_idx').on(table.traceId, table.costUsd) // memories: active memory queries index('memories_active_idx') .on(table.confidence, table.lastAccessed) .where(sql`archived = 0 AND confidence > 0.2`) // patterns: pattern selection scoring index('patterns_score_idx') .on(table.type, table.confidence, table.frequency) // runs: active run queries index('runs_active_idx') .on(table.status, table.startedAt) .where(sql`status IN ('pending', 'running')`) // findings: active findings by severity index('findings_active_idx') .on(table.severity, table.runId) .where(sql`dismissed = 0`) // cost_tracking: aggregation queries index('cost_tracking_aggregation_idx') .on(table.timestamp, table.agentType, table.modelName)
4.3 Partial Indexes
Purpose: Index only relevant rows to save space and improve performance.
sql-- Only index errors, not all events CREATE INDEX events_error_idx ON events(level, timestamp) WHERE level IN ('ERROR', 'CRITICAL'); -- Only index active memories CREATE INDEX memories_active_idx ON memories(confidence, last_accessed) WHERE archived = 0 AND confidence > 0.2; -- Only index non-dismissed findings CREATE INDEX findings_active_idx ON findings(severity, run_id) WHERE dismissed = 0; -- Only index active runs CREATE INDEX runs_active_idx ON runs(status, started_at) WHERE status IN ('pending', 'running');
Benefit: Smaller indexes = faster queries, less disk space.
4.4 Index Maintenance
SQLite automatically maintains indexes on insert/update/delete. No manual REINDEX needed unless:
- Database corruption (rare)
- After bulk imports
- After schema changes
Monitor index usage:
sql-- Check index usage EXPLAIN QUERY PLAN SELECT * FROM events WHERE type = 'plan.started'; -- Analyze table statistics ANALYZE; -- Get table sizes SELECT name, SUM(pgsize) as size FROM dbstat GROUP BY name;
5. Query Patterns
5.1 Insert Event
typescriptimport { ulid } from 'ulid'; import { events } from './schema'; import { db } from './db'; async function insertEvent(event: Omit<NewEvent, 'id' | 'timestamp'>) { await db.insert(events).values({ id: ulid(), timestamp: new Date(), ...event }); } // Usage await insertEvent({ traceId: 'run-123', source: 'planner-agent', type: 'plan.completed', phase: 'planning', payload: { storiesCount: 5 }, tokensUsed: 1500, costUsd: 0.02, durationMs: 3500, level: 'INFO' });
5.2 Recall Memories by Similarity
typescriptimport { memories } from './schema'; import { db } from './db'; import { sql, desc } from 'drizzle-orm'; async function recallSimilar( queryEmbedding: Float32Array, type?: 'episodic' | 'semantic' | 'procedural', limit = 10 ): Promise<Memory[]> { // Build WHERE clause const conditions = [sql`archived = 0`, sql`confidence > 0.2`]; if (type) { conditions.push(sql`type = ${type}`); } // Fetch candidate memories const candidates = await db .select() .from(memories) .where(sql.join(conditions, sql` AND `)) .orderBy(desc(memories.confidence)) .limit(limit * 10); // Fetch 10x for filtering // Calculate cosine similarity const scored = candidates.map(memory => { if (!memory.embedding) return { memory, score: 0 }; const embedding = deserializeEmbedding(memory.embedding); const score = cosineSimilarity(queryEmbedding, embedding); return { memory, score }; }); // Sort by similarity and take top K scored.sort((a, b) => b.score - a.score); return scored.slice(0, limit).map(s => s.memory); } // Cosine similarity helper function cosineSimilarity(a: Float32Array, b: Float32Array): number { if (a.length !== b.length) throw new Error('Dimension mismatch'); let dotProduct = 0; let normA = 0; let normB = 0; for (let i = 0; i < a.length; i++) { dotProduct += a[i] * b[i]; normA += a[i] * a[i]; normB += b[i] * b[i]; } return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)); }
Performance characteristics:
- Brute-force similarity on ~100 memories: <10ms
- ~1000 memories: ~50ms
- ~10,000 memories: ~500ms
When to upgrade to vector DB:
- Memory count > 100,000
- Query latency > 200ms
- Need approximate nearest neighbor (ANN) algorithms
5.3 Get Run History
typescriptimport { runs } from './schema'; import { db } from './db'; import { desc, eq } from 'drizzle-orm'; async function getRunHistory(limit = 50) { return db .select({ id: runs.id, task: runs.task, status: runs.status, startedAt: runs.startedAt, completedAt: runs.completedAt, totalCostUsd: runs.totalCostUsd, error: runs.error, }) .from(runs) .orderBy(desc(runs.startedAt)) .limit(limit); } async function getRunById(runId: string) { const [run] = await db .select() .from(runs) .where(eq(runs.id, runId)) .limit(1); return run; } async function getActiveRuns() { return db .select() .from(runs) .where(sql`status IN ('pending', 'running')`) .orderBy(desc(runs.startedAt)); }
5.4 Find Patterns by Trigger
typescriptimport { patterns } from './schema'; import { db } from './db'; import { like, desc, eq } from 'drizzle-orm'; async function findPatternsByTrigger(triggerQuery: string) { return db .select() .from(patterns) .where(like(patterns.trigger, `%${triggerQuery}%`)) .orderBy(desc(patterns.frequency), desc(patterns.confidence)) .limit(20); } async function getBestPatternsForType(type: 'success' | 'failure' | 'approach') { return db .select() .from(patterns) .where(eq(patterns.type, type)) .orderBy( desc(patterns.confidence), desc(patterns.frequency), desc(patterns.successRate) ) .limit(10); }
5.5 Checkpoint CRUD
typescriptimport { checkpoints } from './schema'; import { db } from './db'; import { eq, desc } from 'drizzle-orm'; import { ulid } from 'ulid'; async function createCheckpoint( traceId: string, phase: string, state: Record<string, unknown> ): Promise<string> { const id = ulid(); const stateStr = JSON.stringify(state); await db.insert(checkpoints).values({ id, traceId, phase, state, timestamp: new Date(), sizeBytes: Buffer.byteLength(stateStr, 'utf8'), isValid: true, }); return id; } async function getLatestCheckpoint(traceId: string) { const [checkpoint] = await db .select() .from(checkpoints) .where(eq(checkpoints.traceId, traceId)) .orderBy(desc(checkpoints.timestamp)) .limit(1); return checkpoint; } async function getCheckpointsByPhase(traceId: string, phase: string) { return db .select() .from(checkpoints) .where( sql`trace_id = ${traceId} AND phase = ${phase} AND is_valid = 1` ) .orderBy(desc(checkpoints.timestamp)); } async function invalidateCheckpoint(checkpointId: string) { await db .update(checkpoints) .set({ isValid: false }) .where(eq(checkpoints.id, checkpointId)); }
5.6 Findings by Severity
typescriptimport { findings, runs } from './schema'; import { db } from './db'; import { eq, sql, desc } from 'drizzle-orm'; async function getFindingsByRun(runId: string) { return db .select() .from(findings) .where(eq(findings.runId, runId)) .orderBy( desc(findings.severity), findings.file, findings.line ); } async function getCriticalFindings(runId?: string) { const conditions = [eq(findings.severity, 'critical'), eq(findings.dismissed, false)]; if (runId) { conditions.push(eq(findings.runId, runId)); } return db .select() .from(findings) .where(sql.join(conditions, sql` AND `)) .orderBy(desc(findings.createdAt)); } async function getFindingStats(runId: string) { const result = await db .select({ severity: findings.severity, count: sql<number>`COUNT(*)`, }) .from(findings) .where(sql`run_id = ${runId} AND dismissed = 0`) .groupBy(findings.severity); return Object.fromEntries(result.map(r => [r.severity, r.count])); } async function dismissFinding(findingId: string, reason: string) { await db .update(findings) .set({ dismissed: true, dismissedBy: reason, dismissedAt: new Date(), }) .where(eq(findings.id, findingId)); }
5.7 Cost Aggregation
typescriptimport { costTracking, runs } from './schema'; import { db } from './db'; import { sql, between, eq, desc } from 'drizzle-orm'; async function getTotalCostByRun(runId: string) { const result = await db .select({ totalCost: sql<number>`SUM(cost_usd)`, totalTokens: sql<number>`SUM(tokens_total)`, }) .from(costTracking) .where(eq(costTracking.runId, runId)); return result[0]; } async function getCostByAgentType( startDate: Date, endDate: Date ) { return db .select({ agentType: costTracking.agentType, totalCost: sql<number>`SUM(cost_usd)`, totalTokens: sql<number>`SUM(tokens_total)`, operationCount: sql<number>`COUNT(*)`, }) .from(costTracking) .where(between(costTracking.timestamp, startDate, endDate)) .groupBy(costTracking.agentType) .orderBy(desc(sql`SUM(cost_usd)`)); } async function getCostByModel( startDate: Date, endDate: Date ) { return db .select({ modelName: costTracking.modelName, totalCost: sql<number>`SUM(cost_usd)`, avgCostPerOperation: sql<number>`AVG(cost_usd)`, operationCount: sql<number>`COUNT(*)`, }) .from(costTracking) .where(between(costTracking.timestamp, startDate, endDate)) .groupBy(costTracking.modelName) .orderBy(desc(sql`SUM(cost_usd)`)); } async function getDailyCostSummary(days = 30) { const startDate = new Date(); startDate.setDate(startDate.getDate() - days); return db .select({ date: sql<string>`DATE(timestamp / 1000, 'unixepoch')`, totalCost: sql<number>`SUM(cost_usd)`, runCount: sql<number>`COUNT(DISTINCT run_id)`, }) .from(costTracking) .where(sql`timestamp >= ${startDate.getTime()}`) .groupBy(sql`DATE(timestamp / 1000, 'unixepoch')`) .orderBy(desc(sql`DATE(timestamp / 1000, 'unixepoch')`)); }
5.8 Event Replay
typescriptimport { events } from './schema'; import { db } from './db'; import { eq } from 'drizzle-orm'; async function replayTrace(traceId: string) { return db .select() .from(events) .where(eq(events.traceId, traceId)) .orderBy(events.timestamp); } async function getEventsByType( traceId: string, eventType: string ) { return db .select() .from(events) .where( sql`trace_id = ${traceId} AND type = ${eventType}` ) .orderBy(events.timestamp); } async function getErrorEvents(traceId: string) { return db .select() .from(events) .where( sql`trace_id = ${traceId} AND level IN ('ERROR', 'CRITICAL')` ) .orderBy(events.timestamp); }
6. Embedding Storage
6.1 Storage Format
Embeddings are stored as binary blobs (Float32Array serialized to Buffer):
typescript// Embedding dimension (depends on model) // OpenAI text-embedding-3-small: 1536 // voyage-2: 1024 // Custom models: varies const EMBEDDING_DIMENSION = 1536; // Serialize embedding for storage function serializeEmbedding(embedding: Float32Array): Buffer { if (embedding.length !== EMBEDDING_DIMENSION) { throw new Error(`Expected ${EMBEDDING_DIMENSION} dimensions, got ${embedding.length}`); } return Buffer.from(embedding.buffer); } // Deserialize from storage function deserializeEmbedding(buffer: Buffer): Float32Array { return new Float32Array( buffer.buffer, buffer.byteOffset, buffer.byteLength / Float32Array.BYTES_PER_ELEMENT ); }
6.2 Similarity Search
Brute-force cosine similarity:
typescriptinterface SimilaritySearchOptions { type?: 'episodic' | 'semantic' | 'procedural'; minConfidence?: number; excludeArchived?: boolean; limit?: number; } async function similaritySearch( queryEmbedding: Float32Array, options: SimilaritySearchOptions = {} ): Promise<Array<{ memory: Memory; score: number }>> { const { type, minConfidence = 0.2, excludeArchived = true, limit = 10 } = options; // Build filter conditions const conditions = [sql`confidence >= ${minConfidence}`]; if (excludeArchived) { conditions.push(sql`archived = 0`); } if (type) { conditions.push(sql`type = ${type}`); } // Fetch candidates const candidates = await db .select() .from(memories) .where(sql.join(conditions, sql` AND `)) .orderBy(desc(memories.confidence)); // Calculate similarities const results = candidates .filter(m => m.embedding !== null) .map(memory => ({ memory, score: cosineSimilarity( queryEmbedding, deserializeEmbedding(memory.embedding!) ) })) .sort((a, b) => b.score - a.score) .slice(0, limit); return results; } function cosineSimilarity(a: Float32Array, b: Float32Array): number { let dotProduct = 0; let normA = 0; let normB = 0; for (let i = 0; i < a.length; i++) { dotProduct += a[i] * b[i]; normA += a[i] * a[i]; normB += b[i] * b[i]; } const denominator = Math.sqrt(normA) * Math.sqrt(normB); return denominator === 0 ? 0 : dotProduct / denominator; }
6.3 Performance Characteristics
| Memory Count | Search Time | Mitigation |
|---|---|---|
| 100 | <10ms | None needed |
| 1,000 | ~50ms | Acceptable |
| 10,000 | ~500ms | Pre-filter by type/tags |
| 100,000 | ~5s | Upgrade to vector DB |
Optimizations:
- Pre-filter aggressively: Use SQL WHERE to reduce candidates
- Cache embeddings: Keep hot embeddings in memory
- Parallel search: Use workers for multiple queries
- Hybrid search: Combine keyword + vector search
6.4 When to Upgrade
Switch to a dedicated vector database when:
- Memory count > 100,000
- Query latency > 200ms
- Need approximate nearest neighbor (ANN)
- Need distributed search
Options:
- Qdrant — Rust-based, fast, good TypeScript SDK
- Weaviate — GraphQL API, good integrations
- Pinecone — Managed, easy to use
- pgvector — PostgreSQL extension (if switching DB)
7. Migration Strategy
7.1 Initial Schema Creation
bash# Generate initial migration bun drizzle-kit generate:sqlite # Review the migration cat drizzle/migrations/0000_initial.sql # Apply migration bun drizzle-kit push:sqlite
7.2 Schema Evolution
Adding a new column:
typescript// schema.ts export const runs = sqliteTable('runs', { // ... existing columns ... // NEW: Add automation level automationLevel: integer('automation_level').default(1), });
bash# Generate migration bun drizzle-kit generate:sqlite # Result: drizzle/migrations/0001_add_automation_level.sql # ALTER TABLE runs ADD COLUMN automation_level INTEGER DEFAULT 1; # Apply bun drizzle-kit push:sqlite
Renaming a column:
typescript// Don't rename directly — it breaks existing queries // Instead: add new column, backfill, deprecate old export const runs = sqliteTable('runs', { // OLD (deprecated) error: text('error'), // NEW errorMessage: text('error_message'), errorType: text('error_type'), });
Backfill script:
typescript// scripts/backfill-error-fields.ts import { db } from '../src/memory/db'; import { runs } from '../src/memory/schema'; import { isNotNull } from 'drizzle-orm'; async function backfillErrorFields() { const runsWithError = await db .select() .from(runs) .where(isNotNull(runs.error)); for (const run of runsWithError) { await db .update(runs) .set({ errorMessage: run.error, errorType: 'unknown', // default }) .where(eq(runs.id, run.id)); } console.log(`Backfilled ${runsWithError.length} runs`); } backfillErrorFields();
7.3 Backwards Compatibility
Approach:
- Add new columns as nullable — old code still works
- Backfill data — migrate existing data
- Update application code — use new columns
- Deprecate old columns — mark as deprecated in comments
- Remove old columns — after grace period (major version bump)
Example:
typescript// v1.0.0 export const runs = sqliteTable('runs', { error: text('error'), // deprecated: use errorMessage + errorType errorMessage: text('error_message'), errorType: text('error_type'), }); // v2.0.0 export const runs = sqliteTable('runs', { errorMessage: text('error_message'), errorType: text('error_type'), // error column removed });
8. Data Lifecycle
8.1 Retention Policies
typescriptinterface RetentionPolicies { events: { DEBUG: '7 days', INFO: '30 days', WARN: '90 days', ERROR: '1 year', CRITICAL: 'permanent' }; memories: { episodic: 'archive after 90 days if not accessed', semantic: 'keep if confidence > 0.3', procedural: 'keep if success_rate > 0.5' }; patterns: { success: 'keep if frequency > 1', failure: 'keep if frequency > 1', approach: 'keep if success_rate > 0.4' }; checkpoints: { valid: '30 days', invalid: '7 days' }; runs: { completed: '1 year', failed: '1 year', cancelled: '30 days' }; findings: { all: 'keep for run lifetime' }; }
8.2 Archival Job
typescript// src/memory/archive.ts import { db } from './db'; import { events, memories, patterns, checkpoints } from './schema'; import { sql } from 'drizzle-orm'; export async function archiveOldData() { const now = Date.now(); // Archive old DEBUG events (7 days) await db .delete(events) .where( sql`level = 'DEBUG' AND timestamp < ${now - 7 * 24 * 60 * 60 * 1000}` ); // Archive old INFO events (30 days) await db .delete(events) .where( sql`level = 'INFO' AND timestamp < ${now - 30 * 24 * 60 * 60 * 1000}` ); // Archive stale episodic memories (90 days not accessed) await db .update(memories) .set({ archived: true }) .where( sql`type = 'episodic' AND last_accessed < ${now - 90 * 24 * 60 * 60 * 1000}` ); // Archive low-confidence semantic memories await db .update(memories) .set({ archived: true }) .where(sql`type = 'semantic' AND confidence < 0.3`); // Delete invalid checkpoints (7 days old) await db .delete(checkpoints) .where( sql`is_valid = 0 AND timestamp < ${now - 7 * 24 * 60 * 60 * 1000}` ); // Delete old valid checkpoints (30 days) await db .delete(checkpoints) .where( sql`is_valid = 1 AND timestamp < ${now - 30 * 24 * 60 * 60 * 1000}` ); console.log('Archival complete'); }
8.3 Memory Pruning
typescript// src/memory/prune.ts import { db } from './db'; import { memories, patterns } from './schema'; import { sql } from 'drizzle-orm'; export async function pruneMemories() { // Delete archived memories with low confidence const deleted = await db .delete(memories) .where(sql`archived = 1 AND confidence < 0.2`); console.log(`Pruned ${deleted.changes} archived memories`); } export async function prunePatterns() { // Delete patterns with low frequency and low success rate const deleted = await db .delete(patterns) .where( sql`frequency = 1 AND (success_rate < 0.4 OR success_rate IS NULL)` ); console.log(`Pruned ${deleted.changes} low-value patterns`); }
8.4 Consolidation Job
typescript// src/memory/consolidate.ts import { db } from './db'; import { memories, patterns } from './schema'; import { eq, sql } from 'drizzle-orm'; export async function consolidateMemories() { // Find similar episodic memories and merge them const episodic = await db .select() .from(memories) .where(sql`type = 'episodic' AND archived = 0`); // Group by similarity (simplified — use embedding similarity in production) const groups = groupBySimilarity(episodic); for (const group of groups) { if (group.length > 1) { // Merge into one semantic memory const merged = await mergeMemories(group); // Store as semantic await db.insert(memories).values({ ...merged, type: 'semantic', confidence: Math.max(...group.map(m => m.confidence)), }); // Archive originals for (const memory of group) { await db .update(memories) .set({ archived: true }) .where(eq(memories.id, memory.id)); } } } } export async function consolidatePatterns() { // Promote frequent episodic patterns to procedural const frequentPatterns = await db .select() .from(patterns) .where(sql`frequency >= 3 AND success_rate >= 0.7`); for (const pattern of frequentPatterns) { // Convert to procedural memory await db.insert(memories).values({ id: ulid(), type: 'procedural', content: pattern.pattern, context: pattern.trigger, confidence: pattern.confidence, source: `pattern:${pattern.id}`, tags: ['promoted'], createdAt: new Date(), lastAccessed: new Date(), accessCount: pattern.frequency, }); } } function groupBySimilarity(memories: Memory[]): Memory[][] { // Simplified — use actual embedding similarity in production const groups: Memory[][] = []; const used = new Set<string>(); for (const memory of memories) { if (used.has(memory.id)) continue; const similar = memories.filter(m => !used.has(m.id) && isSimilar(memory.content, m.content) ); if (similar.length > 0) { groups.push(similar); similar.forEach(m => used.add(m.id)); } } return groups; } function mergeMemories(memories: Memory[]): Omit<NewMemory, 'id'> { // Combine content, keep highest confidence return { type: 'semantic', content: memories.map(m => m.content).join('\n'), context: memories[0].context, confidence: Math.max(...memories.map(m => m.confidence)), tags: Array.from(new Set(memories.flatMap(m => m.tags || []))), source: `consolidated:${memories.map(m => m.id).join(',')}`, createdAt: new Date(), lastAccessed: new Date(), accessCount: memories.reduce((sum, m) => sum + m.accessCount, 0), }; }
Run consolidation daily:
typescript// src/memory/scheduler.ts import { consolidateMemories, consolidatePatterns } from './consolidate'; import { archiveOldData } from './archive'; import { pruneMemories, prunePatterns } from './prune'; export function startMaintenanceJobs() { // Run daily at 2 AM const DAILY = 24 * 60 * 60 * 1000; setInterval(async () => { console.log('Running daily maintenance...'); await consolidateMemories(); await consolidatePatterns(); await archiveOldData(); await pruneMemories(); await prunePatterns(); console.log('Maintenance complete'); }, DAILY); }
9. Connection Management
9.1 Single Connection vs Pool
SQLite is single-writer — only one write transaction at a time.
For Forge (single Bun process):
- Use a single connection (exported singleton)
- Enable WAL mode for concurrent reads
- Set busy timeout for write contention
Why this works:
- Forge orchestrator is single-threaded
- Agents run sequentially (MVP)
- WAL allows multiple readers
When to use a pool:
- Multi-process deployment (workers)
- HTTP API with concurrent requests
- Parallel agent execution (post-MVP)
9.2 WAL Mode
Write-Ahead Logging benefits:
- Readers don't block writers
- Writers don't block readers
- Better concurrency
typescript// Enabled in db.ts sqlite.pragma('journal_mode = WAL');
Files created:
memory.db— main databasememory.db-wal— write-ahead logmemory.db-shm— shared memory
Checkpoint WAL periodically:
typescriptexport function checkpointWAL() { sqlite.pragma('wal_checkpoint(TRUNCATE)'); } // Run after large writes
9.3 Busy Timeout
Handle write contention:
typescript// Wait up to 30 seconds for lock sqlite.pragma('busy_timeout = 30000');
If timeout exceeded:
typescripttry { await db.insert(events).values(...); } catch (error) { if (error.code === 'SQLITE_BUSY') { // Database locked — retry with exponential backoff await retry(() => db.insert(events).values(...)); } throw error; }
9.4 Concurrent Reads/Writes
Read pattern:
typescript// Multiple concurrent reads are fine with WAL const [run1, run2, run3] = await Promise.all([ db.select().from(runs).where(eq(runs.id, 'run-1')), db.select().from(runs).where(eq(runs.id, 'run-2')), db.select().from(runs).where(eq(runs.id, 'run-3')), ]);
Write pattern:
typescript// Writes are serialized automatically await db.transaction(async (tx) => { await tx.insert(events).values(event1); await tx.insert(events).values(event2); await tx.update(runs).set({ status: 'completed' }); });
9.5 Graceful Shutdown
typescriptexport function closeDatabase() { // Checkpoint WAL sqlite.pragma('wal_checkpoint(TRUNCATE)'); // Close connection sqlite.close(); console.log('Database closed gracefully'); } // Register shutdown handler process.on('SIGINT', () => { closeDatabase(); process.exit(0); });
10. Seed Data
10.1 Default Patterns
typescript// src/memory/seed.ts import { db } from './db'; import { patterns, agentConfigs } from './schema'; import { ulid } from 'ulid'; export async function seedDefaultPatterns() { const defaultPatterns = [ { id: ulid(), type: 'failure' as const, trigger: 'null pointer exception', pattern: 'Missing null check before property access', resolution: 'Add null/undefined check', frequency: 0, confidence: 0.8, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, { id: ulid(), type: 'success' as const, trigger: 'test coverage low', pattern: 'Generated property-based tests', resolution: 'Use fast-check for property testing', frequency: 0, successRate: 0.85, confidence: 0.7, lastSeen: new Date(), createdAt: new Date(), priority: 2, }, { id: ulid(), type: 'approach' as const, trigger: 'circular dependency', pattern: 'Extract interface to break cycle', resolution: 'Create interface in separate file', frequency: 0, successRate: 0.9, confidence: 0.9, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, ]; await db.insert(patterns).values(defaultPatterns); console.log(`Seeded ${defaultPatterns.length} default patterns`); }
10.2 Error Taxonomy
typescriptexport async function seedErrorTaxonomy() { const errorCategories = [ { id: ulid(), type: 'failure' as const, trigger: 'syntax error', pattern: 'Code does not parse', resolution: 'Fix syntax before continuing', frequency: 0, confidence: 1.0, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, { id: ulid(), type: 'failure' as const, trigger: 'type error', pattern: 'Type mismatch', resolution: 'Correct type annotations or cast', frequency: 0, confidence: 1.0, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, { id: ulid(), type: 'failure' as const, trigger: 'import error', pattern: 'Module not found', resolution: 'Install dependency or fix import path', frequency: 0, confidence: 1.0, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, ]; await db.insert(patterns).values(errorCategories); console.log(`Seeded ${errorCategories.length} error categories`); }
10.3 Built-in Tool Definitions
typescriptexport async function seedToolDefinitions() { const tools = [ { id: ulid(), agentType: 'implementer', config: { tools: [ { name: 'read_file', description: 'Read file contents' }, { name: 'write_file', description: 'Write file contents' }, { name: 'run_command', description: 'Execute shell command' }, { name: 'search_code', description: 'Search codebase' }, ], }, modelName: 'claude-sonnet-4-5-20250929', temperature: 0.7, maxIterations: 50, maxCostUsd: 10, version: 1, active: true, createdAt: new Date(), updatedAt: new Date(), }, { id: ulid(), agentType: 'reviewer', config: { tools: [ { name: 'run_linter', description: 'Run ESLint/Biome' }, { name: 'run_security_scan', description: 'Scan for vulnerabilities' }, { name: 'llm_review', description: 'AI code review' }, ], }, modelName: 'claude-haiku-4-5-20251001', temperature: 0.3, maxIterations: 5, maxCostUsd: 2, version: 1, active: true, createdAt: new Date(), updatedAt: new Date(), }, ]; await db.insert(agentConfigs).values(tools); console.log(`Seeded ${tools.length} agent configs`); }
10.4 Seed Script
typescript// scripts/seed.ts import { seedDefaultPatterns, seedErrorTaxonomy, seedToolDefinitions } from '../src/memory/seed'; async function seed() { console.log('Seeding database...'); await seedDefaultPatterns(); await seedErrorTaxonomy(); await seedToolDefinitions(); console.log('Seed complete'); } seed();
Run on first setup:
bashbun run scripts/seed.ts
11. Testing
11.1 In-Memory SQLite for Tests
typescript// src/memory/db.test.ts import { beforeEach, describe, expect, test } from 'bun:test'; import Database from 'better-sqlite3'; import { drizzle } from 'drizzle-orm/better-sqlite3'; import * as schema from './schema'; describe('Database operations', () => { let db: ReturnType<typeof drizzle>; let sqlite: Database.Database; beforeEach(() => { // Create in-memory database sqlite = new Database(':memory:'); db = drizzle(sqlite, { schema }); // Run migrations (simplified for tests) sqlite.exec(` CREATE TABLE events ( id TEXT PRIMARY KEY, trace_id TEXT NOT NULL, timestamp INTEGER NOT NULL, source TEXT NOT NULL, type TEXT NOT NULL, phase TEXT, payload TEXT, tokens_used INTEGER, cost_usd REAL, duration_ms INTEGER, level TEXT DEFAULT 'INFO' ); CREATE INDEX events_trace_id_idx ON events(trace_id); CREATE INDEX events_type_idx ON events(type); CREATE INDEX events_timestamp_idx ON events(timestamp); `); }); test('insert and retrieve event', async () => { await db.insert(schema.events).values({ id: 'event-1', traceId: 'trace-1', timestamp: new Date(), source: 'test', type: 'test.event', phase: 'testing', payload: { foo: 'bar' }, level: 'INFO', }); const [event] = await db .select() .from(schema.events) .where(eq(schema.events.id, 'event-1')); expect(event).toBeDefined(); expect(event.type).toBe('test.event'); expect(event.payload).toEqual({ foo: 'bar' }); }); test('replay events by trace', async () => { // Insert multiple events await db.insert(schema.events).values([ { id: 'event-1', traceId: 'trace-1', timestamp: new Date(1000), source: 'test', type: 'event.1', level: 'INFO', }, { id: 'event-2', traceId: 'trace-1', timestamp: new Date(2000), source: 'test', type: 'event.2', level: 'INFO', }, ]); // Replay const events = await db .select() .from(schema.events) .where(eq(schema.events.traceId, 'trace-1')) .orderBy(schema.events.timestamp); expect(events).toHaveLength(2); expect(events[0].type).toBe('event.1'); expect(events[1].type).toBe('event.2'); }); });
11.2 Fixture Factories
typescript// src/memory/fixtures.ts import { ulid } from 'ulid'; import type { NewEvent, NewMemory, NewRun, NewFinding } from './schema'; export function createEventFixture(overrides?: Partial<NewEvent>): NewEvent { return { id: ulid(), traceId: `trace-${ulid()}`, timestamp: new Date(), source: 'test-agent', type: 'test.event', phase: 'testing', payload: { test: true }, level: 'INFO', ...overrides, }; } export function createMemoryFixture(overrides?: Partial<NewMemory>): NewMemory { return { id: ulid(), type: 'episodic', content: 'Test memory', context: 'test context', confidence: 0.5, createdAt: new Date(), lastAccessed: new Date(), accessCount: 0, ...overrides, }; } export function createRunFixture(overrides?: Partial<NewRun>): NewRun { return { id: ulid(), task: 'Test task', status: 'pending', startedAt: new Date(), totalCostUsd: 0, totalTokens: 0, ...overrides, }; } export function createFindingFixture(overrides?: Partial<NewFinding>): NewFinding { return { id: ulid(), runId: `run-${ulid()}`, phase: 'review', severity: 'warning', category: 'style', message: 'Test finding', createdAt: new Date(), ...overrides, }; }
11.3 Test Helpers
typescript// src/memory/test-helpers.ts import type { DrizzleD1Database } from 'drizzle-orm/d1'; import { events, memories, runs } from './schema'; export async function insertTestEvents( db: DrizzleD1Database, traceId: string, count: number ) { const testEvents = Array.from({ length: count }, (_, i) => createEventFixture({ traceId, type: `test.event.${i}`, timestamp: new Date(Date.now() + i * 1000), }) ); await db.insert(events).values(testEvents); return testEvents; } export async function createTestRun( db: DrizzleD1Database, overrides?: Partial<NewRun> ) { const run = createRunFixture(overrides); await db.insert(runs).values(run); return run; } export async function clearTestData(db: DrizzleD1Database) { await db.delete(events); await db.delete(memories); await db.delete(runs); }
11.4 Integration Tests
typescript// src/memory/queries.test.ts import { beforeEach, describe, expect, test } from 'bun:test'; import { createTestDatabase } from './test-db'; import { insertTestEvents, createTestRun } from './test-helpers'; import { replayTrace, getTotalCostByRun } from './queries'; describe('Query integration tests', () => { let db: ReturnType<typeof createTestDatabase>; beforeEach(() => { db = createTestDatabase(); }); test('replay trace returns events in order', async () => { const traceId = 'trace-123'; await insertTestEvents(db, traceId, 5); const events = await replayTrace(traceId); expect(events).toHaveLength(5); expect(events[0].type).toBe('test.event.0'); expect(events[4].type).toBe('test.event.4'); }); test('cost aggregation sums correctly', async () => { const run = await createTestRun(db); await db.insert(costTracking).values([ { id: ulid(), runId: run.id, agentId: 'agent-1', agentType: 'planner', modelName: 'claude-sonnet', tokensPrompt: 1000, tokensCompletion: 500, tokensTotal: 1500, costUsd: 0.02, timestamp: new Date(), }, { id: ulid(), runId: run.id, agentId: 'agent-2', agentType: 'implementer', modelName: 'claude-sonnet', tokensPrompt: 2000, tokensCompletion: 1000, tokensTotal: 3000, costUsd: 0.04, timestamp: new Date(), }, ]); const cost = await getTotalCostByRun(run.id); expect(cost.totalCost).toBe(0.06); expect(cost.totalTokens).toBe(4500); }); });
Complete Schema Export
src/memory/schema.ts (full file):
typescriptimport { sqliteTable, text, integer, real, blob, index } from 'drizzle-orm/sqlite-core'; import { sql } from 'drizzle-orm'; // ─── Events ─────────────────────────────────────────────── export const events = sqliteTable('events', { id: text('id').primaryKey(), traceId: text('trace_id').notNull(), timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), source: text('source').notNull(), type: text('type').notNull(), phase: text('phase'), payload: text('payload', { mode: 'json' }), tokensUsed: integer('tokens_used'), costUsd: real('cost_usd'), durationMs: integer('duration_ms'), level: text('level', { enum: ['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'] }).default('INFO'), }, (table) => ({ traceIdIdx: index('events_trace_id_idx').on(table.traceId), typeIdx: index('events_type_idx').on(table.type), timestampIdx: index('events_timestamp_idx').on(table.timestamp), sourceIdx: index('events_source_idx').on(table.source), phaseIdx: index('events_phase_idx').on(table.phase), costIdx: index('events_cost_idx').on(table.traceId, table.costUsd), errorIdx: index('events_error_idx').on(table.level, table.timestamp) .where(sql`level IN ('ERROR', 'CRITICAL')`), })); // ─── Memories ───────────────────────────────────────────── export const memories = sqliteTable('memories', { id: text('id').primaryKey(), type: text('type', { enum: ['episodic', 'semantic', 'procedural'] }).notNull(), content: text('content').notNull(), context: text('context').notNull(), embedding: blob('embedding', { mode: 'buffer' }), confidence: real('confidence').notNull().default(0.5), source: text('source'), tags: text('tags', { mode: 'json' }).$type<string[]>(), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), lastAccessed: integer('last_accessed', { mode: 'timestamp_ms' }).notNull(), accessCount: integer('access_count').notNull().default(0), archived: integer('archived', { mode: 'boolean' }).default(false), }, (table) => ({ typeIdx: index('memories_type_idx').on(table.type), confidenceIdx: index('memories_confidence_idx').on(table.confidence), lastAccessedIdx: index('memories_last_accessed_idx').on(table.lastAccessed), activeIdx: index('memories_active_idx').on(table.confidence, table.lastAccessed) .where(sql`archived = 0 AND confidence > 0.2`), })); // ─── Patterns ───────────────────────────────────────────── export const patterns = sqliteTable('patterns', { id: text('id').primaryKey(), type: text('type', { enum: ['success', 'failure', 'approach'] }).notNull(), trigger: text('trigger').notNull(), pattern: text('pattern').notNull(), resolution: text('resolution'), frequency: integer('frequency').notNull().default(1), successRate: real('success_rate'), confidence: real('confidence').notNull().default(0.5), lastSeen: integer('last_seen', { mode: 'timestamp_ms' }).notNull(), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), priority: integer('priority').default(1), mutations: integer('mutations').default(0), }, (table) => ({ typeIdx: index('patterns_type_idx').on(table.type), triggerIdx: index('patterns_trigger_idx').on(table.trigger), frequencyIdx: index('patterns_frequency_idx').on(table.frequency), scoreIdx: index('patterns_score_idx').on(table.type, table.confidence, table.frequency), })); // ─── Checkpoints ────────────────────────────────────────── export const checkpoints = sqliteTable('checkpoints', { id: text('id').primaryKey(), traceId: text('trace_id').notNull(), phase: text('phase').notNull(), state: text('state', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), sizeBytes: integer('size_bytes'), isValid: integer('is_valid', { mode: 'boolean' }).default(true), }, (table) => ({ traceIdIdx: index('checkpoints_trace_id_idx').on(table.traceId), phaseIdx: index('checkpoints_phase_idx').on(table.phase), timestampIdx: index('checkpoints_timestamp_idx').on(table.timestamp), })); // ─── Runs ───────────────────────────────────────────────── export const runs = sqliteTable('runs', { id: text('id').primaryKey(), task: text('task').notNull(), status: text('status', { enum: ['pending', 'running', 'completed', 'failed', 'cancelled'] }).notNull(), currentPhase: text('current_phase'), config: text('config', { mode: 'json' }).$type<Record<string, unknown>>(), startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), completedAt: integer('completed_at', { mode: 'timestamp_ms' }), totalCostUsd: real('total_cost_usd').default(0), totalTokens: integer('total_tokens').default(0), error: text('error'), errorType: text('error_type'), humanInterventions: integer('human_interventions').default(0), bounces: integer('bounces').default(0), }, (table) => ({ statusIdx: index('runs_status_idx').on(table.status), startedAtIdx: index('runs_started_at_idx').on(table.startedAt), costIdx: index('runs_cost_idx').on(table.totalCostUsd), activeIdx: index('runs_active_idx').on(table.status, table.startedAt) .where(sql`status IN ('pending', 'running')`), })); // ─── Findings ───────────────────────────────────────────── export const findings = sqliteTable('findings', { id: text('id').primaryKey(), runId: text('run_id').notNull().references(() => runs.id, { onDelete: 'cascade' }), phase: text('phase', { enum: ['review', 'testing'] }).notNull(), analyzer: text('analyzer'), severity: text('severity', { enum: ['info', 'warning', 'error', 'critical'] }).notNull(), category: text('category', { enum: ['style', 'security', 'correctness', 'performance', 'maintainability', 'test'] }).notNull(), file: text('file'), line: integer('line'), column: integer('column'), message: text('message').notNull(), detail: text('detail'), fixable: integer('fixable', { mode: 'boolean' }).default(false), fix: text('fix'), confidence: real('confidence'), dismissed: integer('dismissed', { mode: 'boolean' }).default(false), dismissedBy: text('dismissed_by'), dismissedAt: integer('dismissed_at', { mode: 'timestamp_ms' }), applied: integer('applied', { mode: 'boolean' }).default(false), appliedAt: integer('applied_at', { mode: 'timestamp_ms' }), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('findings_run_id_idx').on(table.runId), severityIdx: index('findings_severity_idx').on(table.severity), categoryIdx: index('findings_category_idx').on(table.category), activeIdx: index('findings_active_idx').on(table.severity, table.runId) .where(sql`dismissed = 0`), fileIdx: index('findings_file_idx').on(table.file, table.line), })); // ─── Sessions ───────────────────────────────────────────── export const sessions = sqliteTable('sessions', { id: text('id').primaryKey(), runId: text('run_id').references(() => runs.id, { onDelete: 'set null' }), workingMemory: text('working_memory', { mode: 'json' }).$type<unknown>(), taskStack: text('task_stack', { mode: 'json' }).$type<string[]>(), startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), lastActivityAt: integer('last_activity_at', { mode: 'timestamp_ms' }).notNull(), endedAt: integer('ended_at', { mode: 'timestamp_ms' }), active: integer('active', { mode: 'boolean' }).default(true), }, (table) => ({ activeIdx: index('sessions_active_idx').on(table.active, table.lastActivityAt) .where(sql`active = 1`), })); // ─── Tool Executions ────────────────────────────────────── export const toolExecutions = sqliteTable('tool_executions', { id: text('id').primaryKey(), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), traceId: text('trace_id').notNull(), toolName: text('tool_name').notNull(), toolVersion: text('tool_version'), inputHash: text('input_hash').notNull(), outputHash: text('output_hash'), input: text('input', { mode: 'json' }).$type<unknown>(), output: text('output', { mode: 'json' }).$type<unknown>(), success: integer('success', { mode: 'boolean' }).notNull(), error: text('error'), durationMs: integer('duration_ms').notNull(), retryCount: integer('retry_count').default(0), cacheHit: integer('cache_hit', { mode: 'boolean' }).default(false), executedAt: integer('executed_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ toolNameIdx: index('tool_executions_tool_name_idx').on(table.toolName), traceIdIdx: index('tool_executions_trace_id_idx').on(table.traceId), inputHashIdx: index('tool_executions_input_hash_idx').on(table.inputHash), cacheIdx: index('tool_executions_cache_idx').on(table.toolName, table.inputHash), })); // ─── Cost Tracking ──────────────────────────────────────── export const costTracking = sqliteTable('cost_tracking', { id: text('id').primaryKey(), traceId: text('trace_id'), runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), agentId: text('agent_id').notNull(), agentType: text('agent_type').notNull(), modelName: text('model_name').notNull(), tokensPrompt: integer('tokens_prompt').notNull(), tokensCompletion: integer('tokens_completion').notNull(), tokensTotal: integer('tokens_total').notNull(), costUsd: real('cost_usd').notNull(), timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('cost_tracking_run_id_idx').on(table.runId), timestampIdx: index('cost_tracking_timestamp_idx').on(table.timestamp), agentTypeIdx: index('cost_tracking_agent_type_idx').on(table.agentType), aggregationIdx: index('cost_tracking_aggregation_idx') .on(table.timestamp, table.agentType, table.modelName), })); // ─── Human Decisions ────────────────────────────────────── export const humanDecisions = sqliteTable('human_decisions', { id: text('id').primaryKey(), runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), gateType: text('gate_type', { enum: ['architecture_approval', 'production_deploy', 'security_findings', 'cost_overrun'] }).notNull(), phase: text('phase').notNull(), contextSummary: text('context_summary').notNull(), riskScore: real('risk_score'), decision: text('decision', { enum: ['approved', 'rejected', 'changes_requested'] }).notNull(), feedback: text('feedback'), edits: text('edits'), requestedAt: integer('requested_at', { mode: 'timestamp_ms' }).notNull(), decidedAt: integer('decided_at', { mode: 'timestamp_ms' }).notNull(), durationMs: integer('duration_ms').notNull(), }, (table) => ({ runIdIdx: index('human_decisions_run_id_idx').on(table.runId), gateTypeIdx: index('human_decisions_gate_type_idx').on(table.gateType), decisionIdx: index('human_decisions_decision_idx').on(table.decision), })); // ─── Agent Configs ──────────────────────────────────────── export const agentConfigs = sqliteTable('agent_configs', { id: text('id').primaryKey(), agentType: text('agent_type').notNull().unique(), config: text('config', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), modelName: text('model_name').notNull(), temperature: real('temperature').default(0.7), maxTokens: integer('max_tokens'), maxIterations: integer('max_iterations').default(10), maxCostUsd: real('max_cost_usd').default(5), version: integer('version').notNull().default(1), active: integer('active', { mode: 'boolean' }).default(true), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), updatedAt: integer('updated_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ agentTypeIdx: index('agent_configs_agent_type_idx').on(table.agentType), })); // ─── Type Exports ───────────────────────────────────────── export type Event = typeof events.$inferSelect; export type NewEvent = typeof events.$inferInsert; export type Memory = typeof memories.$inferSelect; export type NewMemory = typeof memories.$inferInsert; export type Pattern = typeof patterns.$inferSelect; export type NewPattern = typeof patterns.$inferInsert; export type Checkpoint = typeof checkpoints.$inferSelect; export type NewCheckpoint = typeof checkpoints.$inferInsert; export type Run = typeof runs.$inferSelect; export type NewRun = typeof runs.$inferInsert; export type Finding = typeof findings.$inferSelect; export type NewFinding = typeof findings.$inferInsert; export type Session = typeof sessions.$inferSelect; export type NewSession = typeof sessions.$inferInsert; export type ToolExecution = typeof toolExecutions.$inferSelect; export type NewToolExecution = typeof toolExecutions.$inferInsert; export type CostTracking = typeof costTracking.$inferSelect; export type NewCostTracking = typeof costTracking.$inferInsert; export type HumanDecision = typeof humanDecisions.$inferSelect; export type NewHumanDecision = typeof humanDecisions.$inferInsert; export type AgentConfig = typeof agentConfigs.$inferSelect; export type NewAgentConfig = typeof agentConfigs.$inferInsert;
Implementation Checklist
Week 1: Foundation
- Install dependencies (drizzle-orm, better-sqlite3, drizzle-kit)
- Create drizzle.config.ts
- Implement schema.ts (all tables)
- Create db.ts (connection management)
- Write migration script (migrate.ts)
- Generate and apply initial migration
- Test in-memory database setup
Week 2: Core Queries
- Implement event insertion
- Implement event replay
- Implement memory recall (with similarity search)
- Implement run CRUD
- Implement checkpoint CRUD
- Implement findings CRUD
- Test all query patterns
Week 3: Additional Features
- Implement cost aggregation queries
- Implement tool execution tracking
- Implement human decision tracking
- Create seed data script
- Test seed data
Week 4: Maintenance Jobs
- Implement archival job
- Implement pruning job
- Implement consolidation job
- Schedule daily maintenance
- Test lifecycle management
Week 5: Testing
- Create fixture factories
- Write integration tests for all queries
- Write tests for maintenance jobs
- Test migration workflow
- Performance testing (large datasets)
Next Steps
After implementing the data model:
- Build the event bus (Section 9) — connects to events table
- Build the memory system (Section 11) — uses memories, patterns tables
- Build agent base class — logs to events, checkpoints state
- Build orchestrator — manages runs, creates checkpoints
- Build feedback loops — learns from events, stores patterns
The data model is the foundation. Everything else reads from and writes to these tables.