30 min
architecture
February 8, 2026

Data Model Implementation Plan (Section 5)

Data Model Implementation Plan (Section 5)

Plan: 05-data-model.md Date: 2026-02-07 Status: Implementation Ready Dependencies: Core types, event bus foundation


Executive Summary

This plan details the complete SQLite database implementation for Forge using Drizzle ORM. The data model is the system's ground truth, storing all events, memories, patterns, execution state, and learning artifacts. Every decision, action, and outcome flows through this schema.

Key Design Principles:

  1. Event-sourced where possible — append-only events enable full replay
  2. Optimized for common queries — indexed for typical access patterns
  3. Embeddings in SQLite — avoid external vector DB in MVP
  4. Supports learning loops — memory, patterns, and consolidation built-in
  5. Migration-friendly — schema can evolve without breaking existing data

1. Drizzle Setup

1.1 Dependencies

json
{ "dependencies": { "drizzle-orm": "^0.29.0", "better-sqlite3": "^9.2.0" }, "devDependencies": { "drizzle-kit": "^0.20.0", "@types/better-sqlite3": "^7.6.8" } }

Why better-sqlite3 over bun:sqlite:

  • More mature, battle-tested
  • Better TypeScript support
  • Works with Drizzle's migration tooling
  • Can switch to bun:sqlite later if needed (API compatible)

Alternative: For production at scale, use libSQL (turso) which is SQLite-compatible but adds replication.

1.2 Configuration

drizzle.config.ts:

typescript
import type { Config } from 'drizzle-kit'; export default { schema: './src/memory/schema.ts', out: './drizzle/migrations', driver: 'better-sqlite', dbCredentials: { url: process.env.DATABASE_URL || '.forge/memory.db', }, verbose: true, strict: true, } satisfies Config;

Connection configuration (src/memory/db.ts):

typescript
import Database from 'better-sqlite3'; import { drizzle } from 'drizzle-orm/better-sqlite3'; import * as schema from './schema'; export function createDatabase(path: string = '.forge/memory.db') { // Create SQLite connection const sqlite = new Database(path); // Enable WAL mode for better concurrency sqlite.pragma('journal_mode = WAL'); // Busy timeout for concurrent access (30 seconds) sqlite.pragma('busy_timeout = 30000'); // Foreign keys enabled sqlite.pragma('foreign_keys = ON'); // Synchronous mode: NORMAL (fast, safe enough for local dev) sqlite.pragma('synchronous = NORMAL'); // Memory-mapped I/O for performance (256MB) sqlite.pragma('mmap_size = 268435456'); // Cache size (64MB) sqlite.pragma('cache_size = -64000'); // Create Drizzle instance return drizzle(sqlite, { schema }); } // Export singleton instance for application use export const db = createDatabase();

1.3 Migration Strategy

Workflow:

bash
# Generate migration from schema changes bun drizzle-kit generate:sqlite # Review migration in drizzle/migrations/ cat drizzle/migrations/0001_*.sql # Apply migration bun drizzle-kit push:sqlite # Or for programmatic migration: bun run src/memory/migrate.ts

Programmatic migration (src/memory/migrate.ts):

typescript
import { migrate } from 'drizzle-orm/better-sqlite3/migrator'; import { db } from './db'; export async function runMigrations() { console.log('Running migrations...'); migrate(db, { migrationsFolder: './drizzle/migrations', migrationsTable: '__drizzle_migrations__' }); console.log('Migrations complete'); } // Run if executed directly if (import.meta.main) { runMigrations(); }

Backwards compatibility approach:

  • Never drop columns (mark deprecated instead)
  • Use nullable columns for new fields
  • Write backfill scripts for data migrations
  • Version the schema in a schema_version table

2. Complete Schema

2.1 Events Table (Append-Only Audit Trail)

typescript
import { sqliteTable, text, integer, real, blob, index } from 'drizzle-orm/sqlite-core'; import { sql } from 'drizzle-orm'; export const events = sqliteTable('events', { // Identity id: text('id').primaryKey(), // ULID for time-sortable IDs traceId: text('trace_id').notNull(), // Groups one pipeline run timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), // Source attribution source: text('source').notNull(), // agent or component id type: text('type').notNull(), // "plan.started", "review.finding", etc. phase: text('phase'), // current pipeline phase // Payload payload: text('payload', { mode: 'json' }), // event-specific data // Cost tracking tokensUsed: integer('tokens_used'), costUsd: real('cost_usd'), // Performance durationMs: integer('duration_ms'), // Metadata level: text('level', { enum: ['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'] }).default('INFO'), }, (table) => ({ // Indexes for common queries traceIdIdx: index('events_trace_id_idx').on(table.traceId), typeIdx: index('events_type_idx').on(table.type), timestampIdx: index('events_timestamp_idx').on(table.timestamp), sourceIdx: index('events_source_idx').on(table.source), phaseIdx: index('events_phase_idx').on(table.phase), // Composite index for cost queries costIdx: index('events_cost_idx').on(table.traceId, table.costUsd), // Composite for error queries errorIdx: index('events_error_idx').on(table.level, table.timestamp) .where(sql`level IN ('ERROR', 'CRITICAL')`), })); // Type inference export type Event = typeof events.$inferSelect; export type NewEvent = typeof events.$inferInsert;

Why this structure:

  • Append-only: never update/delete events (enables replay)
  • JSON payload: flexible event-specific data
  • Indexed for: trace replay, type filtering, time-series queries, cost analysis
  • Partial index on errors: faster error queries without full table scan

2.2 Memories Table

typescript
export const memories = sqliteTable('memories', { // Identity id: text('id').primaryKey(), type: text('type', { enum: ['episodic', 'semantic', 'procedural'] }).notNull(), // Content content: text('content').notNull(), // Human-readable description context: text('context').notNull(), // When is this relevant? // Embedding for similarity search embedding: blob('embedding', { mode: 'buffer' }), // Float32Array serialized // Confidence and decay confidence: real('confidence').notNull().default(0.5), // 0.0 - 1.0 // Provenance source: text('source'), // What event created this? tags: text('tags', { mode: 'json' }).$type<string[]>(), // ["typescript", "testing"] // Temporal tracking createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), lastAccessed: integer('last_accessed', { mode: 'timestamp_ms' }).notNull(), accessCount: integer('access_count').notNull().default(0), // Lifecycle archived: integer('archived', { mode: 'boolean' }).default(false), }, (table) => ({ typeIdx: index('memories_type_idx').on(table.type), confidenceIdx: index('memories_confidence_idx').on(table.confidence), lastAccessedIdx: index('memories_last_accessed_idx').on(table.lastAccessed), // Partial index for active memories activeIdx: index('memories_active_idx').on(table.confidence, table.lastAccessed) .where(sql`archived = 0 AND confidence > 0.2`), // Index for tag searches (using json_each virtual table) tagsIdx: index('memories_tags_idx').on(table.tags), })); export type Memory = typeof memories.$inferSelect; export type NewMemory = typeof memories.$inferInsert;

Embedding storage details:

typescript
// Helper functions for embedding serialization export function serializeEmbedding(embedding: Float32Array): Buffer { return Buffer.from(embedding.buffer); } export function deserializeEmbedding(buffer: Buffer): Float32Array { return new Float32Array( buffer.buffer, buffer.byteOffset, buffer.byteLength / Float32Array.BYTES_PER_ELEMENT ); } // Usage: // const embedding = await embedText(memory.content); // await db.insert(memories).values({ // ...memory, // embedding: serializeEmbedding(embedding) // });

2.3 Patterns Table

typescript
export const patterns = sqliteTable('patterns', { // Identity id: text('id').primaryKey(), type: text('type', { enum: ['success', 'failure', 'approach'] }).notNull(), // Pattern definition trigger: text('trigger').notNull(), // What situation activates this? pattern: text('pattern').notNull(), // The pattern itself resolution: text('resolution'), // What to do when triggered // Statistics frequency: integer('frequency').notNull().default(1), // Times observed successRate: real('success_rate'), // How often this works (0.0 - 1.0) confidence: real('confidence').notNull().default(0.5), // Temporal lastSeen: integer('last_seen', { mode: 'timestamp_ms' }).notNull(), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), // GEP Protocol fields priority: integer('priority').default(1), // Gene priority mutations: integer('mutations').default(0), // Evolution tracking }, (table) => ({ typeIdx: index('patterns_type_idx').on(table.type), triggerIdx: index('patterns_trigger_idx').on(table.trigger), frequencyIdx: index('patterns_frequency_idx').on(table.frequency), // Composite for pattern selection scoreIdx: index('patterns_score_idx').on(table.type, table.confidence, table.frequency), })); export type Pattern = typeof patterns.$inferSelect; export type NewPattern = typeof patterns.$inferInsert;

2.4 Checkpoints Table

typescript
export const checkpoints = sqliteTable('checkpoints', { // Identity id: text('id').primaryKey(), traceId: text('trace_id').notNull(), phase: text('phase').notNull(), // State snapshot (JSON-serialized phase output) state: text('state', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), // Temporal timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), // Metadata sizeBytes: integer('size_bytes'), // Compressed state size isValid: integer('is_valid', { mode: 'boolean' }).default(true), }, (table) => ({ traceIdIdx: index('checkpoints_trace_id_idx').on(table.traceId), phaseIdx: index('checkpoints_phase_idx').on(table.phase), timestampIdx: index('checkpoints_timestamp_idx').on(table.timestamp), })); export type Checkpoint = typeof checkpoints.$inferSelect; export type NewCheckpoint = typeof checkpoints.$inferInsert;

2.5 Runs Table (Pipeline Execution History)

typescript
export const runs = sqliteTable('runs', { // Identity (same as traceId) id: text('id').primaryKey(), // Task description task: text('task').notNull(), // Status status: text('status', { enum: ['pending', 'running', 'completed', 'failed', 'cancelled'] }).notNull(), currentPhase: text('current_phase'), // Configuration snapshot config: text('config', { mode: 'json' }).$type<Record<string, unknown>>(), // Temporal startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), completedAt: integer('completed_at', { mode: 'timestamp_ms' }), // Cost tracking totalCostUsd: real('total_cost_usd').default(0), totalTokens: integer('total_tokens').default(0), // Outcome error: text('error'), // If failed, why errorType: text('error_type'), // Classification // Metadata humanInterventions: integer('human_interventions').default(0), bounces: integer('bounces').default(0), // Review/test bounce count }, (table) => ({ statusIdx: index('runs_status_idx').on(table.status), startedAtIdx: index('runs_started_at_idx').on(table.startedAt), costIdx: index('runs_cost_idx').on(table.totalCostUsd), // Composite for active runs activeIdx: index('runs_active_idx').on(table.status, table.startedAt) .where(sql`status IN ('pending', 'running')`), })); export type Run = typeof runs.$inferSelect; export type NewRun = typeof runs.$inferInsert;

2.6 Findings Table (Review/Test Issues)

typescript
export const findings = sqliteTable('findings', { // Identity id: text('id').primaryKey(), runId: text('run_id').notNull().references(() => runs.id, { onDelete: 'cascade' }), // Source phase: text('phase', { enum: ['review', 'testing'] }).notNull(), analyzer: text('analyzer'), // Which tool/agent found this // Severity severity: text('severity', { enum: ['info', 'warning', 'error', 'critical'] }).notNull(), category: text('category', { enum: ['style', 'security', 'correctness', 'performance', 'maintainability', 'test'] }).notNull(), // Location file: text('file'), line: integer('line'), column: integer('column'), // Content message: text('message').notNull(), detail: text('detail'), // Extended explanation // Fix suggestion fixable: integer('fixable', { mode: 'boolean' }).default(false), fix: text('fix'), // Suggested code change confidence: real('confidence'), // 0.0 - 1.0 // Human feedback dismissed: integer('dismissed', { mode: 'boolean' }).default(false), dismissedBy: text('dismissed_by'), // Reason for dismissal dismissedAt: integer('dismissed_at', { mode: 'timestamp_ms' }), // Applied applied: integer('applied', { mode: 'boolean' }).default(false), appliedAt: integer('applied_at', { mode: 'timestamp_ms' }), // Temporal createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('findings_run_id_idx').on(table.runId), severityIdx: index('findings_severity_idx').on(table.severity), categoryIdx: index('findings_category_idx').on(table.category), // Partial index for active findings (not dismissed) activeIdx: index('findings_active_idx').on(table.severity, table.runId) .where(sql`dismissed = 0`), // Composite for file-based queries fileIdx: index('findings_file_idx').on(table.file, table.line), })); export type Finding = typeof findings.$inferSelect; export type NewFinding = typeof findings.$inferInsert;

3. Additional Tables

3.1 Sessions Table

typescript
export const sessions = sqliteTable('sessions', { // Identity id: text('id').primaryKey(), // Association runId: text('run_id').references(() => runs.id, { onDelete: 'set null' }), // Context workingMemory: text('working_memory', { mode: 'json' }).$type<unknown>(), taskStack: text('task_stack', { mode: 'json' }).$type<string[]>(), // Temporal startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), lastActivityAt: integer('last_activity_at', { mode: 'timestamp_ms' }).notNull(), endedAt: integer('ended_at', { mode: 'timestamp_ms' }), // Status active: integer('active', { mode: 'boolean' }).default(true), }, (table) => ({ activeIdx: index('sessions_active_idx').on(table.active, table.lastActivityAt) .where(sql`active = 1`), })); export type Session = typeof sessions.$inferSelect; export type NewSession = typeof sessions.$inferInsert;

3.2 Tool Executions Table

typescript
export const toolExecutions = sqliteTable('tool_executions', { // Identity id: text('id').primaryKey(), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), traceId: text('trace_id').notNull(), // Tool info toolName: text('tool_name').notNull(), toolVersion: text('tool_version'), // Input/Output (hashed for deduplication) inputHash: text('input_hash').notNull(), outputHash: text('output_hash'), input: text('input', { mode: 'json' }).$type<unknown>(), output: text('output', { mode: 'json' }).$type<unknown>(), // Execution success: integer('success', { mode: 'boolean' }).notNull(), error: text('error'), durationMs: integer('duration_ms').notNull(), retryCount: integer('retry_count').default(0), // Cache cacheHit: integer('cache_hit', { mode: 'boolean' }).default(false), // Temporal executedAt: integer('executed_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ toolNameIdx: index('tool_executions_tool_name_idx').on(table.toolName), traceIdIdx: index('tool_executions_trace_id_idx').on(table.traceId), inputHashIdx: index('tool_executions_input_hash_idx').on(table.inputHash), cacheIdx: index('tool_executions_cache_idx').on(table.toolName, table.inputHash), })); export type ToolExecution = typeof toolExecutions.$inferSelect; export type NewToolExecution = typeof toolExecutions.$inferInsert;

3.3 Cost Tracking Table

typescript
export const costTracking = sqliteTable('cost_tracking', { // Identity id: text('id').primaryKey(), // Association traceId: text('trace_id'), runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), // Agent/Model agentId: text('agent_id').notNull(), agentType: text('agent_type').notNull(), modelName: text('model_name').notNull(), // Usage tokensPrompt: integer('tokens_prompt').notNull(), tokensCompletion: integer('tokens_completion').notNull(), tokensTotal: integer('tokens_total').notNull(), // Cost costUsd: real('cost_usd').notNull(), // Temporal timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('cost_tracking_run_id_idx').on(table.runId), timestampIdx: index('cost_tracking_timestamp_idx').on(table.timestamp), agentTypeIdx: index('cost_tracking_agent_type_idx').on(table.agentType), // Composite for cost aggregation aggregationIdx: index('cost_tracking_aggregation_idx') .on(table.timestamp, table.agentType, table.modelName), })); export type CostTracking = typeof costTracking.$inferSelect; export type NewCostTracking = typeof costTracking.$inferInsert;

3.4 Human Decisions Table

typescript
export const humanDecisions = sqliteTable('human_decisions', { // Identity id: text('id').primaryKey(), // Association runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), // Gate gateType: text('gate_type', { enum: ['architecture_approval', 'production_deploy', 'security_findings', 'cost_overrun'] }).notNull(), phase: text('phase').notNull(), // Context contextSummary: text('context_summary').notNull(), riskScore: real('risk_score'), // Decision decision: text('decision', { enum: ['approved', 'rejected', 'changes_requested'] }).notNull(), feedback: text('feedback'), edits: text('edits'), // What the human changed // Temporal requestedAt: integer('requested_at', { mode: 'timestamp_ms' }).notNull(), decidedAt: integer('decided_at', { mode: 'timestamp_ms' }).notNull(), durationMs: integer('duration_ms').notNull(), }, (table) => ({ runIdIdx: index('human_decisions_run_id_idx').on(table.runId), gateTypeIdx: index('human_decisions_gate_type_idx').on(table.gateType), decisionIdx: index('human_decisions_decision_idx').on(table.decision), })); export type HumanDecision = typeof humanDecisions.$inferSelect; export type NewHumanDecision = typeof humanDecisions.$inferInsert;

3.5 Agent Configs Table

typescript
export const agentConfigs = sqliteTable('agent_configs', { // Identity id: text('id').primaryKey(), agentType: text('agent_type').notNull().unique(), // Configuration config: text('config', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), // Model selection modelName: text('model_name').notNull(), temperature: real('temperature').default(0.7), maxTokens: integer('max_tokens'), // Safety maxIterations: integer('max_iterations').default(10), maxCostUsd: real('max_cost_usd').default(5), // Metadata version: integer('version').notNull().default(1), active: integer('active', { mode: 'boolean' }).default(true), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), updatedAt: integer('updated_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ agentTypeIdx: index('agent_configs_agent_type_idx').on(table.agentType), })); export type AgentConfig = typeof agentConfigs.$inferSelect; export type NewAgentConfig = typeof agentConfigs.$inferInsert;

4. Index Strategy

4.1 Single-Column Indexes

Purpose: Fast filtering on commonly queried columns.

TableColumnRationale
eventstrace_idReplay all events for a run
eventstypeFilter by event type
eventstimestampTime-series queries
eventssourceFilter by agent/component
eventsphaseFilter by pipeline phase
memoriestypeFilter by memory type
memoriesconfidenceFilter low-confidence memories
memorieslast_accessedFind stale memories
patternstypeFilter by pattern type
patternstriggerFind patterns by trigger
runsstatusFind active/failed runs
runsstarted_atTime-series run history
findingsrun_idGet all findings for a run
findingsseverityFilter critical findings

4.2 Composite Indexes

Purpose: Optimize multi-column filters and sorts.

typescript
// events: cost queries by run index('events_cost_idx').on(table.traceId, table.costUsd) // memories: active memory queries index('memories_active_idx') .on(table.confidence, table.lastAccessed) .where(sql`archived = 0 AND confidence > 0.2`) // patterns: pattern selection scoring index('patterns_score_idx') .on(table.type, table.confidence, table.frequency) // runs: active run queries index('runs_active_idx') .on(table.status, table.startedAt) .where(sql`status IN ('pending', 'running')`) // findings: active findings by severity index('findings_active_idx') .on(table.severity, table.runId) .where(sql`dismissed = 0`) // cost_tracking: aggregation queries index('cost_tracking_aggregation_idx') .on(table.timestamp, table.agentType, table.modelName)

4.3 Partial Indexes

Purpose: Index only relevant rows to save space and improve performance.

sql
-- Only index errors, not all events CREATE INDEX events_error_idx ON events(level, timestamp) WHERE level IN ('ERROR', 'CRITICAL'); -- Only index active memories CREATE INDEX memories_active_idx ON memories(confidence, last_accessed) WHERE archived = 0 AND confidence > 0.2; -- Only index non-dismissed findings CREATE INDEX findings_active_idx ON findings(severity, run_id) WHERE dismissed = 0; -- Only index active runs CREATE INDEX runs_active_idx ON runs(status, started_at) WHERE status IN ('pending', 'running');

Benefit: Smaller indexes = faster queries, less disk space.

4.4 Index Maintenance

SQLite automatically maintains indexes on insert/update/delete. No manual REINDEX needed unless:

  • Database corruption (rare)
  • After bulk imports
  • After schema changes

Monitor index usage:

sql
-- Check index usage EXPLAIN QUERY PLAN SELECT * FROM events WHERE type = 'plan.started'; -- Analyze table statistics ANALYZE; -- Get table sizes SELECT name, SUM(pgsize) as size FROM dbstat GROUP BY name;

5. Query Patterns

5.1 Insert Event

typescript
import { ulid } from 'ulid'; import { events } from './schema'; import { db } from './db'; async function insertEvent(event: Omit<NewEvent, 'id' | 'timestamp'>) { await db.insert(events).values({ id: ulid(), timestamp: new Date(), ...event }); } // Usage await insertEvent({ traceId: 'run-123', source: 'planner-agent', type: 'plan.completed', phase: 'planning', payload: { storiesCount: 5 }, tokensUsed: 1500, costUsd: 0.02, durationMs: 3500, level: 'INFO' });

5.2 Recall Memories by Similarity

typescript
import { memories } from './schema'; import { db } from './db'; import { sql, desc } from 'drizzle-orm'; async function recallSimilar( queryEmbedding: Float32Array, type?: 'episodic' | 'semantic' | 'procedural', limit = 10 ): Promise<Memory[]> { // Build WHERE clause const conditions = [sql`archived = 0`, sql`confidence > 0.2`]; if (type) { conditions.push(sql`type = ${type}`); } // Fetch candidate memories const candidates = await db .select() .from(memories) .where(sql.join(conditions, sql` AND `)) .orderBy(desc(memories.confidence)) .limit(limit * 10); // Fetch 10x for filtering // Calculate cosine similarity const scored = candidates.map(memory => { if (!memory.embedding) return { memory, score: 0 }; const embedding = deserializeEmbedding(memory.embedding); const score = cosineSimilarity(queryEmbedding, embedding); return { memory, score }; }); // Sort by similarity and take top K scored.sort((a, b) => b.score - a.score); return scored.slice(0, limit).map(s => s.memory); } // Cosine similarity helper function cosineSimilarity(a: Float32Array, b: Float32Array): number { if (a.length !== b.length) throw new Error('Dimension mismatch'); let dotProduct = 0; let normA = 0; let normB = 0; for (let i = 0; i < a.length; i++) { dotProduct += a[i] * b[i]; normA += a[i] * a[i]; normB += b[i] * b[i]; } return dotProduct / (Math.sqrt(normA) * Math.sqrt(normB)); }

Performance characteristics:

  • Brute-force similarity on ~100 memories: <10ms
  • ~1000 memories: ~50ms
  • ~10,000 memories: ~500ms

When to upgrade to vector DB:

  • Memory count > 100,000
  • Query latency > 200ms
  • Need approximate nearest neighbor (ANN) algorithms

5.3 Get Run History

typescript
import { runs } from './schema'; import { db } from './db'; import { desc, eq } from 'drizzle-orm'; async function getRunHistory(limit = 50) { return db .select({ id: runs.id, task: runs.task, status: runs.status, startedAt: runs.startedAt, completedAt: runs.completedAt, totalCostUsd: runs.totalCostUsd, error: runs.error, }) .from(runs) .orderBy(desc(runs.startedAt)) .limit(limit); } async function getRunById(runId: string) { const [run] = await db .select() .from(runs) .where(eq(runs.id, runId)) .limit(1); return run; } async function getActiveRuns() { return db .select() .from(runs) .where(sql`status IN ('pending', 'running')`) .orderBy(desc(runs.startedAt)); }

5.4 Find Patterns by Trigger

typescript
import { patterns } from './schema'; import { db } from './db'; import { like, desc, eq } from 'drizzle-orm'; async function findPatternsByTrigger(triggerQuery: string) { return db .select() .from(patterns) .where(like(patterns.trigger, `%${triggerQuery}%`)) .orderBy(desc(patterns.frequency), desc(patterns.confidence)) .limit(20); } async function getBestPatternsForType(type: 'success' | 'failure' | 'approach') { return db .select() .from(patterns) .where(eq(patterns.type, type)) .orderBy( desc(patterns.confidence), desc(patterns.frequency), desc(patterns.successRate) ) .limit(10); }

5.5 Checkpoint CRUD

typescript
import { checkpoints } from './schema'; import { db } from './db'; import { eq, desc } from 'drizzle-orm'; import { ulid } from 'ulid'; async function createCheckpoint( traceId: string, phase: string, state: Record<string, unknown> ): Promise<string> { const id = ulid(); const stateStr = JSON.stringify(state); await db.insert(checkpoints).values({ id, traceId, phase, state, timestamp: new Date(), sizeBytes: Buffer.byteLength(stateStr, 'utf8'), isValid: true, }); return id; } async function getLatestCheckpoint(traceId: string) { const [checkpoint] = await db .select() .from(checkpoints) .where(eq(checkpoints.traceId, traceId)) .orderBy(desc(checkpoints.timestamp)) .limit(1); return checkpoint; } async function getCheckpointsByPhase(traceId: string, phase: string) { return db .select() .from(checkpoints) .where( sql`trace_id = ${traceId} AND phase = ${phase} AND is_valid = 1` ) .orderBy(desc(checkpoints.timestamp)); } async function invalidateCheckpoint(checkpointId: string) { await db .update(checkpoints) .set({ isValid: false }) .where(eq(checkpoints.id, checkpointId)); }

5.6 Findings by Severity

typescript
import { findings, runs } from './schema'; import { db } from './db'; import { eq, sql, desc } from 'drizzle-orm'; async function getFindingsByRun(runId: string) { return db .select() .from(findings) .where(eq(findings.runId, runId)) .orderBy( desc(findings.severity), findings.file, findings.line ); } async function getCriticalFindings(runId?: string) { const conditions = [eq(findings.severity, 'critical'), eq(findings.dismissed, false)]; if (runId) { conditions.push(eq(findings.runId, runId)); } return db .select() .from(findings) .where(sql.join(conditions, sql` AND `)) .orderBy(desc(findings.createdAt)); } async function getFindingStats(runId: string) { const result = await db .select({ severity: findings.severity, count: sql<number>`COUNT(*)`, }) .from(findings) .where(sql`run_id = ${runId} AND dismissed = 0`) .groupBy(findings.severity); return Object.fromEntries(result.map(r => [r.severity, r.count])); } async function dismissFinding(findingId: string, reason: string) { await db .update(findings) .set({ dismissed: true, dismissedBy: reason, dismissedAt: new Date(), }) .where(eq(findings.id, findingId)); }

5.7 Cost Aggregation

typescript
import { costTracking, runs } from './schema'; import { db } from './db'; import { sql, between, eq, desc } from 'drizzle-orm'; async function getTotalCostByRun(runId: string) { const result = await db .select({ totalCost: sql<number>`SUM(cost_usd)`, totalTokens: sql<number>`SUM(tokens_total)`, }) .from(costTracking) .where(eq(costTracking.runId, runId)); return result[0]; } async function getCostByAgentType( startDate: Date, endDate: Date ) { return db .select({ agentType: costTracking.agentType, totalCost: sql<number>`SUM(cost_usd)`, totalTokens: sql<number>`SUM(tokens_total)`, operationCount: sql<number>`COUNT(*)`, }) .from(costTracking) .where(between(costTracking.timestamp, startDate, endDate)) .groupBy(costTracking.agentType) .orderBy(desc(sql`SUM(cost_usd)`)); } async function getCostByModel( startDate: Date, endDate: Date ) { return db .select({ modelName: costTracking.modelName, totalCost: sql<number>`SUM(cost_usd)`, avgCostPerOperation: sql<number>`AVG(cost_usd)`, operationCount: sql<number>`COUNT(*)`, }) .from(costTracking) .where(between(costTracking.timestamp, startDate, endDate)) .groupBy(costTracking.modelName) .orderBy(desc(sql`SUM(cost_usd)`)); } async function getDailyCostSummary(days = 30) { const startDate = new Date(); startDate.setDate(startDate.getDate() - days); return db .select({ date: sql<string>`DATE(timestamp / 1000, 'unixepoch')`, totalCost: sql<number>`SUM(cost_usd)`, runCount: sql<number>`COUNT(DISTINCT run_id)`, }) .from(costTracking) .where(sql`timestamp >= ${startDate.getTime()}`) .groupBy(sql`DATE(timestamp / 1000, 'unixepoch')`) .orderBy(desc(sql`DATE(timestamp / 1000, 'unixepoch')`)); }

5.8 Event Replay

typescript
import { events } from './schema'; import { db } from './db'; import { eq } from 'drizzle-orm'; async function replayTrace(traceId: string) { return db .select() .from(events) .where(eq(events.traceId, traceId)) .orderBy(events.timestamp); } async function getEventsByType( traceId: string, eventType: string ) { return db .select() .from(events) .where( sql`trace_id = ${traceId} AND type = ${eventType}` ) .orderBy(events.timestamp); } async function getErrorEvents(traceId: string) { return db .select() .from(events) .where( sql`trace_id = ${traceId} AND level IN ('ERROR', 'CRITICAL')` ) .orderBy(events.timestamp); }

6. Embedding Storage

6.1 Storage Format

Embeddings are stored as binary blobs (Float32Array serialized to Buffer):

typescript
// Embedding dimension (depends on model) // OpenAI text-embedding-3-small: 1536 // voyage-2: 1024 // Custom models: varies const EMBEDDING_DIMENSION = 1536; // Serialize embedding for storage function serializeEmbedding(embedding: Float32Array): Buffer { if (embedding.length !== EMBEDDING_DIMENSION) { throw new Error(`Expected ${EMBEDDING_DIMENSION} dimensions, got ${embedding.length}`); } return Buffer.from(embedding.buffer); } // Deserialize from storage function deserializeEmbedding(buffer: Buffer): Float32Array { return new Float32Array( buffer.buffer, buffer.byteOffset, buffer.byteLength / Float32Array.BYTES_PER_ELEMENT ); }

6.2 Similarity Search

Brute-force cosine similarity:

typescript
interface SimilaritySearchOptions { type?: 'episodic' | 'semantic' | 'procedural'; minConfidence?: number; excludeArchived?: boolean; limit?: number; } async function similaritySearch( queryEmbedding: Float32Array, options: SimilaritySearchOptions = {} ): Promise<Array<{ memory: Memory; score: number }>> { const { type, minConfidence = 0.2, excludeArchived = true, limit = 10 } = options; // Build filter conditions const conditions = [sql`confidence >= ${minConfidence}`]; if (excludeArchived) { conditions.push(sql`archived = 0`); } if (type) { conditions.push(sql`type = ${type}`); } // Fetch candidates const candidates = await db .select() .from(memories) .where(sql.join(conditions, sql` AND `)) .orderBy(desc(memories.confidence)); // Calculate similarities const results = candidates .filter(m => m.embedding !== null) .map(memory => ({ memory, score: cosineSimilarity( queryEmbedding, deserializeEmbedding(memory.embedding!) ) })) .sort((a, b) => b.score - a.score) .slice(0, limit); return results; } function cosineSimilarity(a: Float32Array, b: Float32Array): number { let dotProduct = 0; let normA = 0; let normB = 0; for (let i = 0; i < a.length; i++) { dotProduct += a[i] * b[i]; normA += a[i] * a[i]; normB += b[i] * b[i]; } const denominator = Math.sqrt(normA) * Math.sqrt(normB); return denominator === 0 ? 0 : dotProduct / denominator; }

6.3 Performance Characteristics

Memory CountSearch TimeMitigation
100<10msNone needed
1,000~50msAcceptable
10,000~500msPre-filter by type/tags
100,000~5sUpgrade to vector DB

Optimizations:

  1. Pre-filter aggressively: Use SQL WHERE to reduce candidates
  2. Cache embeddings: Keep hot embeddings in memory
  3. Parallel search: Use workers for multiple queries
  4. Hybrid search: Combine keyword + vector search

6.4 When to Upgrade

Switch to a dedicated vector database when:

  • Memory count > 100,000
  • Query latency > 200ms
  • Need approximate nearest neighbor (ANN)
  • Need distributed search

Options:

  • Qdrant — Rust-based, fast, good TypeScript SDK
  • Weaviate — GraphQL API, good integrations
  • Pinecone — Managed, easy to use
  • pgvector — PostgreSQL extension (if switching DB)

7. Migration Strategy

7.1 Initial Schema Creation

bash
# Generate initial migration bun drizzle-kit generate:sqlite # Review the migration cat drizzle/migrations/0000_initial.sql # Apply migration bun drizzle-kit push:sqlite

7.2 Schema Evolution

Adding a new column:

typescript
// schema.ts export const runs = sqliteTable('runs', { // ... existing columns ... // NEW: Add automation level automationLevel: integer('automation_level').default(1), });
bash
# Generate migration bun drizzle-kit generate:sqlite # Result: drizzle/migrations/0001_add_automation_level.sql # ALTER TABLE runs ADD COLUMN automation_level INTEGER DEFAULT 1; # Apply bun drizzle-kit push:sqlite

Renaming a column:

typescript
// Don't rename directly — it breaks existing queries // Instead: add new column, backfill, deprecate old export const runs = sqliteTable('runs', { // OLD (deprecated) error: text('error'), // NEW errorMessage: text('error_message'), errorType: text('error_type'), });

Backfill script:

typescript
// scripts/backfill-error-fields.ts import { db } from '../src/memory/db'; import { runs } from '../src/memory/schema'; import { isNotNull } from 'drizzle-orm'; async function backfillErrorFields() { const runsWithError = await db .select() .from(runs) .where(isNotNull(runs.error)); for (const run of runsWithError) { await db .update(runs) .set({ errorMessage: run.error, errorType: 'unknown', // default }) .where(eq(runs.id, run.id)); } console.log(`Backfilled ${runsWithError.length} runs`); } backfillErrorFields();

7.3 Backwards Compatibility

Approach:

  1. Add new columns as nullable — old code still works
  2. Backfill data — migrate existing data
  3. Update application code — use new columns
  4. Deprecate old columns — mark as deprecated in comments
  5. Remove old columns — after grace period (major version bump)

Example:

typescript
// v1.0.0 export const runs = sqliteTable('runs', { error: text('error'), // deprecated: use errorMessage + errorType errorMessage: text('error_message'), errorType: text('error_type'), }); // v2.0.0 export const runs = sqliteTable('runs', { errorMessage: text('error_message'), errorType: text('error_type'), // error column removed });

8. Data Lifecycle

8.1 Retention Policies

typescript
interface RetentionPolicies { events: { DEBUG: '7 days', INFO: '30 days', WARN: '90 days', ERROR: '1 year', CRITICAL: 'permanent' }; memories: { episodic: 'archive after 90 days if not accessed', semantic: 'keep if confidence > 0.3', procedural: 'keep if success_rate > 0.5' }; patterns: { success: 'keep if frequency > 1', failure: 'keep if frequency > 1', approach: 'keep if success_rate > 0.4' }; checkpoints: { valid: '30 days', invalid: '7 days' }; runs: { completed: '1 year', failed: '1 year', cancelled: '30 days' }; findings: { all: 'keep for run lifetime' }; }

8.2 Archival Job

typescript
// src/memory/archive.ts import { db } from './db'; import { events, memories, patterns, checkpoints } from './schema'; import { sql } from 'drizzle-orm'; export async function archiveOldData() { const now = Date.now(); // Archive old DEBUG events (7 days) await db .delete(events) .where( sql`level = 'DEBUG' AND timestamp < ${now - 7 * 24 * 60 * 60 * 1000}` ); // Archive old INFO events (30 days) await db .delete(events) .where( sql`level = 'INFO' AND timestamp < ${now - 30 * 24 * 60 * 60 * 1000}` ); // Archive stale episodic memories (90 days not accessed) await db .update(memories) .set({ archived: true }) .where( sql`type = 'episodic' AND last_accessed < ${now - 90 * 24 * 60 * 60 * 1000}` ); // Archive low-confidence semantic memories await db .update(memories) .set({ archived: true }) .where(sql`type = 'semantic' AND confidence < 0.3`); // Delete invalid checkpoints (7 days old) await db .delete(checkpoints) .where( sql`is_valid = 0 AND timestamp < ${now - 7 * 24 * 60 * 60 * 1000}` ); // Delete old valid checkpoints (30 days) await db .delete(checkpoints) .where( sql`is_valid = 1 AND timestamp < ${now - 30 * 24 * 60 * 60 * 1000}` ); console.log('Archival complete'); }

8.3 Memory Pruning

typescript
// src/memory/prune.ts import { db } from './db'; import { memories, patterns } from './schema'; import { sql } from 'drizzle-orm'; export async function pruneMemories() { // Delete archived memories with low confidence const deleted = await db .delete(memories) .where(sql`archived = 1 AND confidence < 0.2`); console.log(`Pruned ${deleted.changes} archived memories`); } export async function prunePatterns() { // Delete patterns with low frequency and low success rate const deleted = await db .delete(patterns) .where( sql`frequency = 1 AND (success_rate < 0.4 OR success_rate IS NULL)` ); console.log(`Pruned ${deleted.changes} low-value patterns`); }

8.4 Consolidation Job

typescript
// src/memory/consolidate.ts import { db } from './db'; import { memories, patterns } from './schema'; import { eq, sql } from 'drizzle-orm'; export async function consolidateMemories() { // Find similar episodic memories and merge them const episodic = await db .select() .from(memories) .where(sql`type = 'episodic' AND archived = 0`); // Group by similarity (simplified — use embedding similarity in production) const groups = groupBySimilarity(episodic); for (const group of groups) { if (group.length > 1) { // Merge into one semantic memory const merged = await mergeMemories(group); // Store as semantic await db.insert(memories).values({ ...merged, type: 'semantic', confidence: Math.max(...group.map(m => m.confidence)), }); // Archive originals for (const memory of group) { await db .update(memories) .set({ archived: true }) .where(eq(memories.id, memory.id)); } } } } export async function consolidatePatterns() { // Promote frequent episodic patterns to procedural const frequentPatterns = await db .select() .from(patterns) .where(sql`frequency >= 3 AND success_rate >= 0.7`); for (const pattern of frequentPatterns) { // Convert to procedural memory await db.insert(memories).values({ id: ulid(), type: 'procedural', content: pattern.pattern, context: pattern.trigger, confidence: pattern.confidence, source: `pattern:${pattern.id}`, tags: ['promoted'], createdAt: new Date(), lastAccessed: new Date(), accessCount: pattern.frequency, }); } } function groupBySimilarity(memories: Memory[]): Memory[][] { // Simplified — use actual embedding similarity in production const groups: Memory[][] = []; const used = new Set<string>(); for (const memory of memories) { if (used.has(memory.id)) continue; const similar = memories.filter(m => !used.has(m.id) && isSimilar(memory.content, m.content) ); if (similar.length > 0) { groups.push(similar); similar.forEach(m => used.add(m.id)); } } return groups; } function mergeMemories(memories: Memory[]): Omit<NewMemory, 'id'> { // Combine content, keep highest confidence return { type: 'semantic', content: memories.map(m => m.content).join('\n'), context: memories[0].context, confidence: Math.max(...memories.map(m => m.confidence)), tags: Array.from(new Set(memories.flatMap(m => m.tags || []))), source: `consolidated:${memories.map(m => m.id).join(',')}`, createdAt: new Date(), lastAccessed: new Date(), accessCount: memories.reduce((sum, m) => sum + m.accessCount, 0), }; }

Run consolidation daily:

typescript
// src/memory/scheduler.ts import { consolidateMemories, consolidatePatterns } from './consolidate'; import { archiveOldData } from './archive'; import { pruneMemories, prunePatterns } from './prune'; export function startMaintenanceJobs() { // Run daily at 2 AM const DAILY = 24 * 60 * 60 * 1000; setInterval(async () => { console.log('Running daily maintenance...'); await consolidateMemories(); await consolidatePatterns(); await archiveOldData(); await pruneMemories(); await prunePatterns(); console.log('Maintenance complete'); }, DAILY); }

9. Connection Management

9.1 Single Connection vs Pool

SQLite is single-writer — only one write transaction at a time.

For Forge (single Bun process):

  • Use a single connection (exported singleton)
  • Enable WAL mode for concurrent reads
  • Set busy timeout for write contention

Why this works:

  • Forge orchestrator is single-threaded
  • Agents run sequentially (MVP)
  • WAL allows multiple readers

When to use a pool:

  • Multi-process deployment (workers)
  • HTTP API with concurrent requests
  • Parallel agent execution (post-MVP)

9.2 WAL Mode

Write-Ahead Logging benefits:

  • Readers don't block writers
  • Writers don't block readers
  • Better concurrency
typescript
// Enabled in db.ts sqlite.pragma('journal_mode = WAL');

Files created:

  • memory.db — main database
  • memory.db-wal — write-ahead log
  • memory.db-shm — shared memory

Checkpoint WAL periodically:

typescript
export function checkpointWAL() { sqlite.pragma('wal_checkpoint(TRUNCATE)'); } // Run after large writes

9.3 Busy Timeout

Handle write contention:

typescript
// Wait up to 30 seconds for lock sqlite.pragma('busy_timeout = 30000');

If timeout exceeded:

typescript
try { await db.insert(events).values(...); } catch (error) { if (error.code === 'SQLITE_BUSY') { // Database locked — retry with exponential backoff await retry(() => db.insert(events).values(...)); } throw error; }

9.4 Concurrent Reads/Writes

Read pattern:

typescript
// Multiple concurrent reads are fine with WAL const [run1, run2, run3] = await Promise.all([ db.select().from(runs).where(eq(runs.id, 'run-1')), db.select().from(runs).where(eq(runs.id, 'run-2')), db.select().from(runs).where(eq(runs.id, 'run-3')), ]);

Write pattern:

typescript
// Writes are serialized automatically await db.transaction(async (tx) => { await tx.insert(events).values(event1); await tx.insert(events).values(event2); await tx.update(runs).set({ status: 'completed' }); });

9.5 Graceful Shutdown

typescript
export function closeDatabase() { // Checkpoint WAL sqlite.pragma('wal_checkpoint(TRUNCATE)'); // Close connection sqlite.close(); console.log('Database closed gracefully'); } // Register shutdown handler process.on('SIGINT', () => { closeDatabase(); process.exit(0); });

10. Seed Data

10.1 Default Patterns

typescript
// src/memory/seed.ts import { db } from './db'; import { patterns, agentConfigs } from './schema'; import { ulid } from 'ulid'; export async function seedDefaultPatterns() { const defaultPatterns = [ { id: ulid(), type: 'failure' as const, trigger: 'null pointer exception', pattern: 'Missing null check before property access', resolution: 'Add null/undefined check', frequency: 0, confidence: 0.8, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, { id: ulid(), type: 'success' as const, trigger: 'test coverage low', pattern: 'Generated property-based tests', resolution: 'Use fast-check for property testing', frequency: 0, successRate: 0.85, confidence: 0.7, lastSeen: new Date(), createdAt: new Date(), priority: 2, }, { id: ulid(), type: 'approach' as const, trigger: 'circular dependency', pattern: 'Extract interface to break cycle', resolution: 'Create interface in separate file', frequency: 0, successRate: 0.9, confidence: 0.9, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, ]; await db.insert(patterns).values(defaultPatterns); console.log(`Seeded ${defaultPatterns.length} default patterns`); }

10.2 Error Taxonomy

typescript
export async function seedErrorTaxonomy() { const errorCategories = [ { id: ulid(), type: 'failure' as const, trigger: 'syntax error', pattern: 'Code does not parse', resolution: 'Fix syntax before continuing', frequency: 0, confidence: 1.0, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, { id: ulid(), type: 'failure' as const, trigger: 'type error', pattern: 'Type mismatch', resolution: 'Correct type annotations or cast', frequency: 0, confidence: 1.0, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, { id: ulid(), type: 'failure' as const, trigger: 'import error', pattern: 'Module not found', resolution: 'Install dependency or fix import path', frequency: 0, confidence: 1.0, lastSeen: new Date(), createdAt: new Date(), priority: 1, }, ]; await db.insert(patterns).values(errorCategories); console.log(`Seeded ${errorCategories.length} error categories`); }

10.3 Built-in Tool Definitions

typescript
export async function seedToolDefinitions() { const tools = [ { id: ulid(), agentType: 'implementer', config: { tools: [ { name: 'read_file', description: 'Read file contents' }, { name: 'write_file', description: 'Write file contents' }, { name: 'run_command', description: 'Execute shell command' }, { name: 'search_code', description: 'Search codebase' }, ], }, modelName: 'claude-sonnet-4-5-20250929', temperature: 0.7, maxIterations: 50, maxCostUsd: 10, version: 1, active: true, createdAt: new Date(), updatedAt: new Date(), }, { id: ulid(), agentType: 'reviewer', config: { tools: [ { name: 'run_linter', description: 'Run ESLint/Biome' }, { name: 'run_security_scan', description: 'Scan for vulnerabilities' }, { name: 'llm_review', description: 'AI code review' }, ], }, modelName: 'claude-haiku-4-5-20251001', temperature: 0.3, maxIterations: 5, maxCostUsd: 2, version: 1, active: true, createdAt: new Date(), updatedAt: new Date(), }, ]; await db.insert(agentConfigs).values(tools); console.log(`Seeded ${tools.length} agent configs`); }

10.4 Seed Script

typescript
// scripts/seed.ts import { seedDefaultPatterns, seedErrorTaxonomy, seedToolDefinitions } from '../src/memory/seed'; async function seed() { console.log('Seeding database...'); await seedDefaultPatterns(); await seedErrorTaxonomy(); await seedToolDefinitions(); console.log('Seed complete'); } seed();

Run on first setup:

bash
bun run scripts/seed.ts

11. Testing

11.1 In-Memory SQLite for Tests

typescript
// src/memory/db.test.ts import { beforeEach, describe, expect, test } from 'bun:test'; import Database from 'better-sqlite3'; import { drizzle } from 'drizzle-orm/better-sqlite3'; import * as schema from './schema'; describe('Database operations', () => { let db: ReturnType<typeof drizzle>; let sqlite: Database.Database; beforeEach(() => { // Create in-memory database sqlite = new Database(':memory:'); db = drizzle(sqlite, { schema }); // Run migrations (simplified for tests) sqlite.exec(` CREATE TABLE events ( id TEXT PRIMARY KEY, trace_id TEXT NOT NULL, timestamp INTEGER NOT NULL, source TEXT NOT NULL, type TEXT NOT NULL, phase TEXT, payload TEXT, tokens_used INTEGER, cost_usd REAL, duration_ms INTEGER, level TEXT DEFAULT 'INFO' ); CREATE INDEX events_trace_id_idx ON events(trace_id); CREATE INDEX events_type_idx ON events(type); CREATE INDEX events_timestamp_idx ON events(timestamp); `); }); test('insert and retrieve event', async () => { await db.insert(schema.events).values({ id: 'event-1', traceId: 'trace-1', timestamp: new Date(), source: 'test', type: 'test.event', phase: 'testing', payload: { foo: 'bar' }, level: 'INFO', }); const [event] = await db .select() .from(schema.events) .where(eq(schema.events.id, 'event-1')); expect(event).toBeDefined(); expect(event.type).toBe('test.event'); expect(event.payload).toEqual({ foo: 'bar' }); }); test('replay events by trace', async () => { // Insert multiple events await db.insert(schema.events).values([ { id: 'event-1', traceId: 'trace-1', timestamp: new Date(1000), source: 'test', type: 'event.1', level: 'INFO', }, { id: 'event-2', traceId: 'trace-1', timestamp: new Date(2000), source: 'test', type: 'event.2', level: 'INFO', }, ]); // Replay const events = await db .select() .from(schema.events) .where(eq(schema.events.traceId, 'trace-1')) .orderBy(schema.events.timestamp); expect(events).toHaveLength(2); expect(events[0].type).toBe('event.1'); expect(events[1].type).toBe('event.2'); }); });

11.2 Fixture Factories

typescript
// src/memory/fixtures.ts import { ulid } from 'ulid'; import type { NewEvent, NewMemory, NewRun, NewFinding } from './schema'; export function createEventFixture(overrides?: Partial<NewEvent>): NewEvent { return { id: ulid(), traceId: `trace-${ulid()}`, timestamp: new Date(), source: 'test-agent', type: 'test.event', phase: 'testing', payload: { test: true }, level: 'INFO', ...overrides, }; } export function createMemoryFixture(overrides?: Partial<NewMemory>): NewMemory { return { id: ulid(), type: 'episodic', content: 'Test memory', context: 'test context', confidence: 0.5, createdAt: new Date(), lastAccessed: new Date(), accessCount: 0, ...overrides, }; } export function createRunFixture(overrides?: Partial<NewRun>): NewRun { return { id: ulid(), task: 'Test task', status: 'pending', startedAt: new Date(), totalCostUsd: 0, totalTokens: 0, ...overrides, }; } export function createFindingFixture(overrides?: Partial<NewFinding>): NewFinding { return { id: ulid(), runId: `run-${ulid()}`, phase: 'review', severity: 'warning', category: 'style', message: 'Test finding', createdAt: new Date(), ...overrides, }; }

11.3 Test Helpers

typescript
// src/memory/test-helpers.ts import type { DrizzleD1Database } from 'drizzle-orm/d1'; import { events, memories, runs } from './schema'; export async function insertTestEvents( db: DrizzleD1Database, traceId: string, count: number ) { const testEvents = Array.from({ length: count }, (_, i) => createEventFixture({ traceId, type: `test.event.${i}`, timestamp: new Date(Date.now() + i * 1000), }) ); await db.insert(events).values(testEvents); return testEvents; } export async function createTestRun( db: DrizzleD1Database, overrides?: Partial<NewRun> ) { const run = createRunFixture(overrides); await db.insert(runs).values(run); return run; } export async function clearTestData(db: DrizzleD1Database) { await db.delete(events); await db.delete(memories); await db.delete(runs); }

11.4 Integration Tests

typescript
// src/memory/queries.test.ts import { beforeEach, describe, expect, test } from 'bun:test'; import { createTestDatabase } from './test-db'; import { insertTestEvents, createTestRun } from './test-helpers'; import { replayTrace, getTotalCostByRun } from './queries'; describe('Query integration tests', () => { let db: ReturnType<typeof createTestDatabase>; beforeEach(() => { db = createTestDatabase(); }); test('replay trace returns events in order', async () => { const traceId = 'trace-123'; await insertTestEvents(db, traceId, 5); const events = await replayTrace(traceId); expect(events).toHaveLength(5); expect(events[0].type).toBe('test.event.0'); expect(events[4].type).toBe('test.event.4'); }); test('cost aggregation sums correctly', async () => { const run = await createTestRun(db); await db.insert(costTracking).values([ { id: ulid(), runId: run.id, agentId: 'agent-1', agentType: 'planner', modelName: 'claude-sonnet', tokensPrompt: 1000, tokensCompletion: 500, tokensTotal: 1500, costUsd: 0.02, timestamp: new Date(), }, { id: ulid(), runId: run.id, agentId: 'agent-2', agentType: 'implementer', modelName: 'claude-sonnet', tokensPrompt: 2000, tokensCompletion: 1000, tokensTotal: 3000, costUsd: 0.04, timestamp: new Date(), }, ]); const cost = await getTotalCostByRun(run.id); expect(cost.totalCost).toBe(0.06); expect(cost.totalTokens).toBe(4500); }); });

Complete Schema Export

src/memory/schema.ts (full file):

typescript
import { sqliteTable, text, integer, real, blob, index } from 'drizzle-orm/sqlite-core'; import { sql } from 'drizzle-orm'; // ─── Events ─────────────────────────────────────────────── export const events = sqliteTable('events', { id: text('id').primaryKey(), traceId: text('trace_id').notNull(), timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), source: text('source').notNull(), type: text('type').notNull(), phase: text('phase'), payload: text('payload', { mode: 'json' }), tokensUsed: integer('tokens_used'), costUsd: real('cost_usd'), durationMs: integer('duration_ms'), level: text('level', { enum: ['DEBUG', 'INFO', 'WARN', 'ERROR', 'CRITICAL'] }).default('INFO'), }, (table) => ({ traceIdIdx: index('events_trace_id_idx').on(table.traceId), typeIdx: index('events_type_idx').on(table.type), timestampIdx: index('events_timestamp_idx').on(table.timestamp), sourceIdx: index('events_source_idx').on(table.source), phaseIdx: index('events_phase_idx').on(table.phase), costIdx: index('events_cost_idx').on(table.traceId, table.costUsd), errorIdx: index('events_error_idx').on(table.level, table.timestamp) .where(sql`level IN ('ERROR', 'CRITICAL')`), })); // ─── Memories ───────────────────────────────────────────── export const memories = sqliteTable('memories', { id: text('id').primaryKey(), type: text('type', { enum: ['episodic', 'semantic', 'procedural'] }).notNull(), content: text('content').notNull(), context: text('context').notNull(), embedding: blob('embedding', { mode: 'buffer' }), confidence: real('confidence').notNull().default(0.5), source: text('source'), tags: text('tags', { mode: 'json' }).$type<string[]>(), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), lastAccessed: integer('last_accessed', { mode: 'timestamp_ms' }).notNull(), accessCount: integer('access_count').notNull().default(0), archived: integer('archived', { mode: 'boolean' }).default(false), }, (table) => ({ typeIdx: index('memories_type_idx').on(table.type), confidenceIdx: index('memories_confidence_idx').on(table.confidence), lastAccessedIdx: index('memories_last_accessed_idx').on(table.lastAccessed), activeIdx: index('memories_active_idx').on(table.confidence, table.lastAccessed) .where(sql`archived = 0 AND confidence > 0.2`), })); // ─── Patterns ───────────────────────────────────────────── export const patterns = sqliteTable('patterns', { id: text('id').primaryKey(), type: text('type', { enum: ['success', 'failure', 'approach'] }).notNull(), trigger: text('trigger').notNull(), pattern: text('pattern').notNull(), resolution: text('resolution'), frequency: integer('frequency').notNull().default(1), successRate: real('success_rate'), confidence: real('confidence').notNull().default(0.5), lastSeen: integer('last_seen', { mode: 'timestamp_ms' }).notNull(), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), priority: integer('priority').default(1), mutations: integer('mutations').default(0), }, (table) => ({ typeIdx: index('patterns_type_idx').on(table.type), triggerIdx: index('patterns_trigger_idx').on(table.trigger), frequencyIdx: index('patterns_frequency_idx').on(table.frequency), scoreIdx: index('patterns_score_idx').on(table.type, table.confidence, table.frequency), })); // ─── Checkpoints ────────────────────────────────────────── export const checkpoints = sqliteTable('checkpoints', { id: text('id').primaryKey(), traceId: text('trace_id').notNull(), phase: text('phase').notNull(), state: text('state', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), sizeBytes: integer('size_bytes'), isValid: integer('is_valid', { mode: 'boolean' }).default(true), }, (table) => ({ traceIdIdx: index('checkpoints_trace_id_idx').on(table.traceId), phaseIdx: index('checkpoints_phase_idx').on(table.phase), timestampIdx: index('checkpoints_timestamp_idx').on(table.timestamp), })); // ─── Runs ───────────────────────────────────────────────── export const runs = sqliteTable('runs', { id: text('id').primaryKey(), task: text('task').notNull(), status: text('status', { enum: ['pending', 'running', 'completed', 'failed', 'cancelled'] }).notNull(), currentPhase: text('current_phase'), config: text('config', { mode: 'json' }).$type<Record<string, unknown>>(), startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), completedAt: integer('completed_at', { mode: 'timestamp_ms' }), totalCostUsd: real('total_cost_usd').default(0), totalTokens: integer('total_tokens').default(0), error: text('error'), errorType: text('error_type'), humanInterventions: integer('human_interventions').default(0), bounces: integer('bounces').default(0), }, (table) => ({ statusIdx: index('runs_status_idx').on(table.status), startedAtIdx: index('runs_started_at_idx').on(table.startedAt), costIdx: index('runs_cost_idx').on(table.totalCostUsd), activeIdx: index('runs_active_idx').on(table.status, table.startedAt) .where(sql`status IN ('pending', 'running')`), })); // ─── Findings ───────────────────────────────────────────── export const findings = sqliteTable('findings', { id: text('id').primaryKey(), runId: text('run_id').notNull().references(() => runs.id, { onDelete: 'cascade' }), phase: text('phase', { enum: ['review', 'testing'] }).notNull(), analyzer: text('analyzer'), severity: text('severity', { enum: ['info', 'warning', 'error', 'critical'] }).notNull(), category: text('category', { enum: ['style', 'security', 'correctness', 'performance', 'maintainability', 'test'] }).notNull(), file: text('file'), line: integer('line'), column: integer('column'), message: text('message').notNull(), detail: text('detail'), fixable: integer('fixable', { mode: 'boolean' }).default(false), fix: text('fix'), confidence: real('confidence'), dismissed: integer('dismissed', { mode: 'boolean' }).default(false), dismissedBy: text('dismissed_by'), dismissedAt: integer('dismissed_at', { mode: 'timestamp_ms' }), applied: integer('applied', { mode: 'boolean' }).default(false), appliedAt: integer('applied_at', { mode: 'timestamp_ms' }), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('findings_run_id_idx').on(table.runId), severityIdx: index('findings_severity_idx').on(table.severity), categoryIdx: index('findings_category_idx').on(table.category), activeIdx: index('findings_active_idx').on(table.severity, table.runId) .where(sql`dismissed = 0`), fileIdx: index('findings_file_idx').on(table.file, table.line), })); // ─── Sessions ───────────────────────────────────────────── export const sessions = sqliteTable('sessions', { id: text('id').primaryKey(), runId: text('run_id').references(() => runs.id, { onDelete: 'set null' }), workingMemory: text('working_memory', { mode: 'json' }).$type<unknown>(), taskStack: text('task_stack', { mode: 'json' }).$type<string[]>(), startedAt: integer('started_at', { mode: 'timestamp_ms' }).notNull(), lastActivityAt: integer('last_activity_at', { mode: 'timestamp_ms' }).notNull(), endedAt: integer('ended_at', { mode: 'timestamp_ms' }), active: integer('active', { mode: 'boolean' }).default(true), }, (table) => ({ activeIdx: index('sessions_active_idx').on(table.active, table.lastActivityAt) .where(sql`active = 1`), })); // ─── Tool Executions ────────────────────────────────────── export const toolExecutions = sqliteTable('tool_executions', { id: text('id').primaryKey(), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), traceId: text('trace_id').notNull(), toolName: text('tool_name').notNull(), toolVersion: text('tool_version'), inputHash: text('input_hash').notNull(), outputHash: text('output_hash'), input: text('input', { mode: 'json' }).$type<unknown>(), output: text('output', { mode: 'json' }).$type<unknown>(), success: integer('success', { mode: 'boolean' }).notNull(), error: text('error'), durationMs: integer('duration_ms').notNull(), retryCount: integer('retry_count').default(0), cacheHit: integer('cache_hit', { mode: 'boolean' }).default(false), executedAt: integer('executed_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ toolNameIdx: index('tool_executions_tool_name_idx').on(table.toolName), traceIdIdx: index('tool_executions_trace_id_idx').on(table.traceId), inputHashIdx: index('tool_executions_input_hash_idx').on(table.inputHash), cacheIdx: index('tool_executions_cache_idx').on(table.toolName, table.inputHash), })); // ─── Cost Tracking ──────────────────────────────────────── export const costTracking = sqliteTable('cost_tracking', { id: text('id').primaryKey(), traceId: text('trace_id'), runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), eventId: text('event_id').references(() => events.id, { onDelete: 'cascade' }), agentId: text('agent_id').notNull(), agentType: text('agent_type').notNull(), modelName: text('model_name').notNull(), tokensPrompt: integer('tokens_prompt').notNull(), tokensCompletion: integer('tokens_completion').notNull(), tokensTotal: integer('tokens_total').notNull(), costUsd: real('cost_usd').notNull(), timestamp: integer('timestamp', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ runIdIdx: index('cost_tracking_run_id_idx').on(table.runId), timestampIdx: index('cost_tracking_timestamp_idx').on(table.timestamp), agentTypeIdx: index('cost_tracking_agent_type_idx').on(table.agentType), aggregationIdx: index('cost_tracking_aggregation_idx') .on(table.timestamp, table.agentType, table.modelName), })); // ─── Human Decisions ────────────────────────────────────── export const humanDecisions = sqliteTable('human_decisions', { id: text('id').primaryKey(), runId: text('run_id').references(() => runs.id, { onDelete: 'cascade' }), gateType: text('gate_type', { enum: ['architecture_approval', 'production_deploy', 'security_findings', 'cost_overrun'] }).notNull(), phase: text('phase').notNull(), contextSummary: text('context_summary').notNull(), riskScore: real('risk_score'), decision: text('decision', { enum: ['approved', 'rejected', 'changes_requested'] }).notNull(), feedback: text('feedback'), edits: text('edits'), requestedAt: integer('requested_at', { mode: 'timestamp_ms' }).notNull(), decidedAt: integer('decided_at', { mode: 'timestamp_ms' }).notNull(), durationMs: integer('duration_ms').notNull(), }, (table) => ({ runIdIdx: index('human_decisions_run_id_idx').on(table.runId), gateTypeIdx: index('human_decisions_gate_type_idx').on(table.gateType), decisionIdx: index('human_decisions_decision_idx').on(table.decision), })); // ─── Agent Configs ──────────────────────────────────────── export const agentConfigs = sqliteTable('agent_configs', { id: text('id').primaryKey(), agentType: text('agent_type').notNull().unique(), config: text('config', { mode: 'json' }).notNull().$type<Record<string, unknown>>(), modelName: text('model_name').notNull(), temperature: real('temperature').default(0.7), maxTokens: integer('max_tokens'), maxIterations: integer('max_iterations').default(10), maxCostUsd: real('max_cost_usd').default(5), version: integer('version').notNull().default(1), active: integer('active', { mode: 'boolean' }).default(true), createdAt: integer('created_at', { mode: 'timestamp_ms' }).notNull(), updatedAt: integer('updated_at', { mode: 'timestamp_ms' }).notNull(), }, (table) => ({ agentTypeIdx: index('agent_configs_agent_type_idx').on(table.agentType), })); // ─── Type Exports ───────────────────────────────────────── export type Event = typeof events.$inferSelect; export type NewEvent = typeof events.$inferInsert; export type Memory = typeof memories.$inferSelect; export type NewMemory = typeof memories.$inferInsert; export type Pattern = typeof patterns.$inferSelect; export type NewPattern = typeof patterns.$inferInsert; export type Checkpoint = typeof checkpoints.$inferSelect; export type NewCheckpoint = typeof checkpoints.$inferInsert; export type Run = typeof runs.$inferSelect; export type NewRun = typeof runs.$inferInsert; export type Finding = typeof findings.$inferSelect; export type NewFinding = typeof findings.$inferInsert; export type Session = typeof sessions.$inferSelect; export type NewSession = typeof sessions.$inferInsert; export type ToolExecution = typeof toolExecutions.$inferSelect; export type NewToolExecution = typeof toolExecutions.$inferInsert; export type CostTracking = typeof costTracking.$inferSelect; export type NewCostTracking = typeof costTracking.$inferInsert; export type HumanDecision = typeof humanDecisions.$inferSelect; export type NewHumanDecision = typeof humanDecisions.$inferInsert; export type AgentConfig = typeof agentConfigs.$inferSelect; export type NewAgentConfig = typeof agentConfigs.$inferInsert;

Implementation Checklist

Week 1: Foundation

  • Install dependencies (drizzle-orm, better-sqlite3, drizzle-kit)
  • Create drizzle.config.ts
  • Implement schema.ts (all tables)
  • Create db.ts (connection management)
  • Write migration script (migrate.ts)
  • Generate and apply initial migration
  • Test in-memory database setup

Week 2: Core Queries

  • Implement event insertion
  • Implement event replay
  • Implement memory recall (with similarity search)
  • Implement run CRUD
  • Implement checkpoint CRUD
  • Implement findings CRUD
  • Test all query patterns

Week 3: Additional Features

  • Implement cost aggregation queries
  • Implement tool execution tracking
  • Implement human decision tracking
  • Create seed data script
  • Test seed data

Week 4: Maintenance Jobs

  • Implement archival job
  • Implement pruning job
  • Implement consolidation job
  • Schedule daily maintenance
  • Test lifecycle management

Week 5: Testing

  • Create fixture factories
  • Write integration tests for all queries
  • Write tests for maintenance jobs
  • Test migration workflow
  • Performance testing (large datasets)

Next Steps

After implementing the data model:

  1. Build the event bus (Section 9) — connects to events table
  2. Build the memory system (Section 11) — uses memories, patterns tables
  3. Build agent base class — logs to events, checkpoints state
  4. Build orchestrator — manages runs, creates checkpoints
  5. Build feedback loops — learns from events, stores patterns

The data model is the foundation. Everything else reads from and writes to these tables.