|
| 1 | +/** |
| 2 | + * D1ExperimentStore — Cloudflare D1-backed `ExperimentStore`. |
| 3 | + * |
| 4 | + * Workers-safe (uses only the `D1Database` binding the runtime injects). Two |
| 5 | + * tables, no joins, no migrations beyond `ensureSchema()`. Schema designed so |
| 6 | + * a Worker route can both write the row at run start and update it at run end |
| 7 | + * without losing the original config — the row's lifecycle mirrors the |
| 8 | + * `Run.status` field one-to-one. |
| 9 | + * |
| 10 | + * Why this lives next to `InMemoryExperimentStore`: |
| 11 | + * - bad-app, legal-agent, gtm-agent, film-agent all run as Workers |
| 12 | + * - Workers cannot use `node:fs`, so `FileSystemExperimentStore` doesn't apply |
| 13 | + * - Hand-rolling D1 SQL in every consumer is exactly the duplication this |
| 14 | + * module exists to prevent |
| 15 | + * |
| 16 | + * Schema versioning: the `meta` table records `schema_version` so a future |
| 17 | + * column addition can be detected and migrated additively. Today's schema is |
| 18 | + * v1; bump only on breaking shape changes. |
| 19 | + */ |
| 20 | + |
| 21 | +import type { Experiment, ExperimentStore, Run } from './experiment-tracker' |
| 22 | + |
| 23 | +/** |
| 24 | + * Minimal `D1Database` shape we depend on. Avoids pulling in |
| 25 | + * `@cloudflare/workers-types` as a hard dep — consumers that already have |
| 26 | + * those types installed can pass the binding directly. |
| 27 | + */ |
| 28 | +export interface D1Like { |
| 29 | + prepare(query: string): D1PreparedStatementLike |
| 30 | + batch?(statements: D1PreparedStatementLike[]): Promise<unknown[]> |
| 31 | + exec(query: string): Promise<unknown> |
| 32 | +} |
| 33 | + |
| 34 | +export interface D1PreparedStatementLike { |
| 35 | + bind(...values: unknown[]): D1PreparedStatementLike |
| 36 | + first<T = Record<string, unknown>>(): Promise<T | null> |
| 37 | + all<T = Record<string, unknown>>(): Promise<{ results: T[] }> |
| 38 | + run(): Promise<unknown> |
| 39 | +} |
| 40 | + |
| 41 | +export interface D1ExperimentStoreOptions { |
| 42 | + /** D1 binding from `env`. */ |
| 43 | + db: D1Like |
| 44 | + /** |
| 45 | + * Optional table-name prefix so multiple ExperimentStores can share a DB |
| 46 | + * without colliding (e.g. `tax_eval_experiments` vs `legal_eval_experiments`). |
| 47 | + * Default: `agent_eval_`. |
| 48 | + */ |
| 49 | + tablePrefix?: string |
| 50 | +} |
| 51 | + |
| 52 | +const SCHEMA_VERSION = 1 |
| 53 | + |
| 54 | +export class D1ExperimentStore implements ExperimentStore { |
| 55 | + private readonly db: D1Like |
| 56 | + private readonly experimentsTable: string |
| 57 | + private readonly runsTable: string |
| 58 | + private readonly metaTable: string |
| 59 | + private schemaReady = false |
| 60 | + |
| 61 | + constructor(options: D1ExperimentStoreOptions) { |
| 62 | + this.db = options.db |
| 63 | + const prefix = options.tablePrefix ?? 'agent_eval_' |
| 64 | + this.experimentsTable = `${prefix}experiments` |
| 65 | + this.runsTable = `${prefix}runs` |
| 66 | + this.metaTable = `${prefix}meta` |
| 67 | + } |
| 68 | + |
| 69 | + /** |
| 70 | + * Idempotent schema setup. Safe to call before every operation; the second |
| 71 | + * call short-circuits via `schemaReady`. Most consumers will call it once |
| 72 | + * during Worker bootstrap. |
| 73 | + */ |
| 74 | + async ensureSchema(): Promise<void> { |
| 75 | + if (this.schemaReady) return |
| 76 | + // Single `exec` so D1 batches the DDL. |
| 77 | + const ddl = ` |
| 78 | + CREATE TABLE IF NOT EXISTS ${this.experimentsTable} ( |
| 79 | + id TEXT PRIMARY KEY, |
| 80 | + name TEXT NOT NULL, |
| 81 | + created_at TEXT NOT NULL, |
| 82 | + metadata_json TEXT |
| 83 | + ); |
| 84 | + CREATE TABLE IF NOT EXISTS ${this.runsTable} ( |
| 85 | + id TEXT PRIMARY KEY, |
| 86 | + experiment_id TEXT NOT NULL, |
| 87 | + name TEXT, |
| 88 | + status TEXT NOT NULL, |
| 89 | + started_at TEXT NOT NULL, |
| 90 | + completed_at TEXT, |
| 91 | + config_json TEXT NOT NULL, |
| 92 | + report_json TEXT, |
| 93 | + error TEXT |
| 94 | + ); |
| 95 | + CREATE INDEX IF NOT EXISTS idx_${this.runsTable}_experiment ON ${this.runsTable}(experiment_id); |
| 96 | + CREATE INDEX IF NOT EXISTS idx_${this.runsTable}_started ON ${this.runsTable}(started_at); |
| 97 | + CREATE TABLE IF NOT EXISTS ${this.metaTable} ( |
| 98 | + key TEXT PRIMARY KEY, |
| 99 | + value TEXT NOT NULL |
| 100 | + ); |
| 101 | + INSERT OR REPLACE INTO ${this.metaTable}(key, value) VALUES ('schema_version', '${SCHEMA_VERSION}'); |
| 102 | + ` |
| 103 | + await this.db.exec(ddl.trim().replace(/\s+/g, ' ')) |
| 104 | + this.schemaReady = true |
| 105 | + } |
| 106 | + |
| 107 | + async saveExperiment(exp: Experiment): Promise<void> { |
| 108 | + await this.ensureSchema() |
| 109 | + await this.db |
| 110 | + .prepare( |
| 111 | + `INSERT INTO ${this.experimentsTable}(id, name, created_at, metadata_json) |
| 112 | + VALUES (?1, ?2, ?3, ?4) |
| 113 | + ON CONFLICT(id) DO UPDATE SET |
| 114 | + name = excluded.name, |
| 115 | + created_at = excluded.created_at, |
| 116 | + metadata_json = excluded.metadata_json`, |
| 117 | + ) |
| 118 | + .bind(exp.id, exp.name, exp.createdAt, exp.metadata ? JSON.stringify(exp.metadata) : null) |
| 119 | + .run() |
| 120 | + } |
| 121 | + |
| 122 | + async getExperiment(id: string): Promise<Experiment | null> { |
| 123 | + await this.ensureSchema() |
| 124 | + const row = await this.db |
| 125 | + .prepare( |
| 126 | + `SELECT id, name, created_at, metadata_json |
| 127 | + FROM ${this.experimentsTable} |
| 128 | + WHERE id = ?1`, |
| 129 | + ) |
| 130 | + .bind(id) |
| 131 | + .first<ExperimentRow>() |
| 132 | + return row ? rowToExperiment(row) : null |
| 133 | + } |
| 134 | + |
| 135 | + async listExperiments(): Promise<Experiment[]> { |
| 136 | + await this.ensureSchema() |
| 137 | + const { results } = await this.db |
| 138 | + .prepare( |
| 139 | + `SELECT id, name, created_at, metadata_json |
| 140 | + FROM ${this.experimentsTable} |
| 141 | + ORDER BY created_at DESC`, |
| 142 | + ) |
| 143 | + .all<ExperimentRow>() |
| 144 | + return results.map(rowToExperiment) |
| 145 | + } |
| 146 | + |
| 147 | + async saveRun(run: Run): Promise<void> { |
| 148 | + await this.ensureSchema() |
| 149 | + await this.db |
| 150 | + .prepare( |
| 151 | + `INSERT INTO ${this.runsTable}(id, experiment_id, name, status, started_at, completed_at, config_json, report_json, error) |
| 152 | + VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) |
| 153 | + ON CONFLICT(id) DO UPDATE SET |
| 154 | + experiment_id = excluded.experiment_id, |
| 155 | + name = excluded.name, |
| 156 | + status = excluded.status, |
| 157 | + started_at = excluded.started_at, |
| 158 | + completed_at = excluded.completed_at, |
| 159 | + config_json = excluded.config_json, |
| 160 | + report_json = excluded.report_json, |
| 161 | + error = excluded.error`, |
| 162 | + ) |
| 163 | + .bind( |
| 164 | + run.id, |
| 165 | + run.experimentId, |
| 166 | + run.name ?? null, |
| 167 | + run.status, |
| 168 | + run.startedAt, |
| 169 | + run.completedAt ?? null, |
| 170 | + JSON.stringify(run.config), |
| 171 | + run.report ? JSON.stringify(run.report) : null, |
| 172 | + run.error ?? null, |
| 173 | + ) |
| 174 | + .run() |
| 175 | + } |
| 176 | + |
| 177 | + async getRun(id: string): Promise<Run | null> { |
| 178 | + await this.ensureSchema() |
| 179 | + const row = await this.db |
| 180 | + .prepare( |
| 181 | + `SELECT id, experiment_id, name, status, started_at, completed_at, config_json, report_json, error |
| 182 | + FROM ${this.runsTable} |
| 183 | + WHERE id = ?1`, |
| 184 | + ) |
| 185 | + .bind(id) |
| 186 | + .first<RunRow>() |
| 187 | + return row ? rowToRun(row) : null |
| 188 | + } |
| 189 | + |
| 190 | + async listRuns(experimentId: string): Promise<Run[]> { |
| 191 | + await this.ensureSchema() |
| 192 | + const { results } = await this.db |
| 193 | + .prepare( |
| 194 | + `SELECT id, experiment_id, name, status, started_at, completed_at, config_json, report_json, error |
| 195 | + FROM ${this.runsTable} |
| 196 | + WHERE experiment_id = ?1 |
| 197 | + ORDER BY started_at DESC`, |
| 198 | + ) |
| 199 | + .bind(experimentId) |
| 200 | + .all<RunRow>() |
| 201 | + return results.map(rowToRun) |
| 202 | + } |
| 203 | +} |
| 204 | + |
| 205 | +interface ExperimentRow { |
| 206 | + id: string |
| 207 | + name: string |
| 208 | + created_at: string |
| 209 | + metadata_json: string | null |
| 210 | +} |
| 211 | + |
| 212 | +interface RunRow { |
| 213 | + id: string |
| 214 | + experiment_id: string |
| 215 | + name: string | null |
| 216 | + status: string |
| 217 | + started_at: string |
| 218 | + completed_at: string | null |
| 219 | + config_json: string |
| 220 | + report_json: string | null |
| 221 | + error: string | null |
| 222 | +} |
| 223 | + |
| 224 | +function rowToExperiment(row: ExperimentRow): Experiment { |
| 225 | + return { |
| 226 | + id: row.id, |
| 227 | + name: row.name, |
| 228 | + createdAt: row.created_at, |
| 229 | + ...(row.metadata_json ? { metadata: JSON.parse(row.metadata_json) as Record<string, unknown> } : {}), |
| 230 | + } |
| 231 | +} |
| 232 | + |
| 233 | +function rowToRun(row: RunRow): Run { |
| 234 | + return { |
| 235 | + id: row.id, |
| 236 | + experimentId: row.experiment_id, |
| 237 | + ...(row.name ? { name: row.name } : {}), |
| 238 | + status: row.status as Run['status'], |
| 239 | + startedAt: row.started_at, |
| 240 | + ...(row.completed_at ? { completedAt: row.completed_at } : {}), |
| 241 | + config: JSON.parse(row.config_json), |
| 242 | + ...(row.report_json ? { report: JSON.parse(row.report_json) } : {}), |
| 243 | + ...(row.error ? { error: row.error } : {}), |
| 244 | + } |
| 245 | +} |
0 commit comments