diff --git a/packages/api/src/routes/sync.test.ts b/packages/api/src/routes/sync.test.ts index 257289e..3adacaf 100644 --- a/packages/api/src/routes/sync.test.ts +++ b/packages/api/src/routes/sync.test.ts @@ -7,6 +7,7 @@ import { productTypeHandler, stockItemHandler, tenantHandler, + type SeedFile, } from "@heim/domain"; import { LocalKeyManagementService } from "../crypto/kms.ts"; import { encryptPayload } from "../crypto/payload-encryption.ts"; @@ -467,3 +468,186 @@ describe("POST /api/sync/commands", () => { ); }); }); + +describe("POST /api/sync/commands/batch", () => { + it("returns 401 when no session", async () => { + const client = makeClient(); + const pool = makePool(client); + const app = makeApp(null, pool); + + const body: SeedFile = { + version: 1, + commands: [ + { + streamId: "pt-1", + streamType: "ProductType", + type: "CreateProductType", + payload: { name: "Oil" }, + }, + ], + }; + + const res = await supertest(app).post("/api/sync/commands/batch").send(body); + + expect(res.status).toBe(401); + expect(res.body).toMatchObject({ error: "not_authenticated" }); + }); + + it("returns 400 for empty commands array", async () => { + const client = makeClient(); + const pool = makePool(client); + const app = makeApp(SESSION, pool); + + const res = await supertest(app) + .post("/api/sync/commands/batch") + .send({ version: 1, commands: [] }); + + expect(res.status).toBe(400); + expect(res.body).toMatchObject({ error: "empty_commands" }); + }); + + it("returns 400 for unsupported version", async () => { + const client = makeClient(); + const pool = makePool(client); + const app = makeApp(SESSION, pool); + + const res = await supertest(app) + .post("/api/sync/commands/batch") + .send({ + version: 99, + commands: [ + { + streamId: "pt-1", + streamType: "ProductType", + type: "CreateProductType", + payload: { name: "Oil" }, + }, + ], + }); + + expect(res.status).toBe(400); + expect(res.body).toMatchObject({ error: "unsupported_version" }); + }); + + it("returns 400 for unknown stream type", async () => { + const client = makeClient(); + const pool = makePool(client); + const app = makeApp(SESSION, pool); + + const res = await supertest(app) + .post("/api/sync/commands/batch") + .send({ + version: 1, + commands: [{ streamId: "x-1", streamType: "UnknownType", type: "Create", payload: {} }], + }); + + expect(res.status).toBe(400); + expect(res.body).toMatchObject({ + error: "unknown_stream_type", + index: 0, + streamType: "UnknownType", + }); + }); + + it("processes batch successfully with multiple commands", async () => { + const client = makeClient(); + const pool = makePool(client); + const query = vi.mocked(client.query); + + const app = makeApp(SESSION, pool); + + const res = await supertest(app) + .post("/api/sync/commands/batch") + .send({ + version: 1, + commands: [ + { + streamId: "pt-1", + streamType: "ProductType", + type: "CreateProductType", + payload: { name: "Olive Oil", category: "pantry" }, + }, + { + streamId: "pt-2", + streamType: "ProductType", + type: "CreateProductType", + payload: { name: "Butter", category: "dairy" }, + }, + ], + }); + + expect(res.status).toBe(200); + expect(res.body).toMatchObject({ ok: true, imported: 2 }); + + // Verify transaction boundaries + const calls = query.mock.calls.map(([sql]) => { + const s = (sql as string).trim(); + if (s === "BEGIN" || s === "COMMIT" || s === "ROLLBACK") return s; + if (s.startsWith("SELECT")) return "SELECT"; + if (s.startsWith("INSERT")) return "INSERT"; + return s; + }); + expect(calls[0]).toBe("BEGIN"); + expect(calls[calls.length - 1]).toBe("COMMIT"); + }); + + it("rolls back entire batch when a command is rejected", async () => { + const client = makeClient(); + const pool = makePool(client); + const query = vi.mocked(client.query); + + // BEGIN + query.mockResolvedValueOnce({ rows: [] } as never); + // loadStreamEvents for first command — already has a ProductType + query.mockResolvedValueOnce({ + rows: [ + { + id: "evt-existing", + tenant_id: "tenant-1", + stream_id: "pt-1", + stream_type: "ProductType", + stream_position: 1, + event_type: "ProductTypeCreated", + correlation_id: "corr-0", + causation_id: "command:corr-0", + acting_principal_id: "principal-1", + effective_principal_id: null, + payload: { name: "Existing", category: null }, + metadata: {}, + actual_time: new Date(), + }, + ], + } as never); + // ROLLBACK + query.mockResolvedValueOnce({ rows: [] } as never); + + const app = makeApp(SESSION, pool); + + // CreateProductType on existing stream → rejected + const res = await supertest(app) + .post("/api/sync/commands/batch") + .send({ + version: 1, + commands: [ + { + streamId: "pt-1", + streamType: "ProductType", + type: "CreateProductType", + payload: { name: "Duplicate" }, + }, + ], + }); + + expect(res.status).toBe(422); + expect(res.body).toMatchObject({ + error: "command_rejected", + index: 0, + reason: "Product type already exists", + }); + + // Verify ROLLBACK was called, not COMMIT + const calls = query.mock.calls.map(([sql]) => (sql as string).trim()); + expect(calls).toContain("ROLLBACK"); + expect(calls).not.toContain("COMMIT"); + }); +}); diff --git a/packages/api/src/routes/sync.ts b/packages/api/src/routes/sync.ts index 820ef00..123146d 100644 --- a/packages/api/src/routes/sync.ts +++ b/packages/api/src/routes/sync.ts @@ -1,3 +1,4 @@ +import { randomUUID } from "node:crypto"; import { Router } from "express"; import type { Pool } from "pg"; import { @@ -5,6 +6,7 @@ import { buildAggregate, type Command, type CommandHandlerRegistry, + type SeedFile, } from "@heim/domain"; import type { KeyManagementService } from "../crypto/kms.ts"; import { appendEvents } from "../event-store/append-events.ts"; @@ -151,5 +153,92 @@ export function createSyncRouter( } }); + router.post("/commands/batch", async (req, res) => { + if (!req.session) { + res.status(401).json({ error: "not_authenticated" }); + return; + } + + const body = req.body as SeedFile; + + if (body.version !== 1) { + res.status(400).json({ error: "unsupported_version" }); + return; + } + + if (!Array.isArray(body.commands) || body.commands.length === 0) { + res.status(400).json({ error: "empty_commands" }); + return; + } + + const { tenantId, principalId } = req.session; + const correlationId = randomUUID(); + + const client = await pool.connect(); + try { + await client.query("BEGIN"); + + let imported = 0; + + for (let i = 0; i < body.commands.length; i++) { + const seedCmd = body.commands[i]!; + + const config = AGGREGATE_REGISTRY[seedCmd.streamType]; + if (!config) { + await client.query("ROLLBACK"); + res.status(400).json({ + error: "unknown_stream_type", + index: i, + streamType: seedCmd.streamType, + }); + return; + } + + const streamEvents = await loadStreamEvents(client, tenantId, seedCmd.streamId); + const aggregate = buildAggregate(config.initial, streamEvents, config.apply); + + const commandId = randomUUID(); + const command: Command = { + commandId, + correlationId, + causationId: `batch:${correlationId}`, + streamId: seedCmd.streamId, + streamType: seedCmd.streamType, + type: seedCmd.type, + payload: seedCmd.payload, + expectedVersion: aggregate.version, + actualTime: seedCmd.actualTime ? new Date(seedCmd.actualTime) : new Date(), + tenantId, + actingPrincipalId: principalId, + effectivePrincipalId: null, + }; + + const result = commandRegistry.handle(aggregate.state, command, config); + + if (!result.ok) { + await client.query("ROLLBACK"); + res.status(422).json({ + error: "command_rejected", + index: i, + reason: result.reason, + }); + return; + } + + await appendEvents(client, [...result.events]); + await projectorRegistry.apply(client, result.events); + imported++; + } + + await client.query("COMMIT"); + res.json({ ok: true, imported }); + } catch { + await client.query("ROLLBACK").catch(() => {}); + res.status(500).json({ error: "internal_error" }); + } finally { + client.release(); + } + }); + return router; } diff --git a/packages/domain/src/index.ts b/packages/domain/src/index.ts index 6acefef..3103217 100644 --- a/packages/domain/src/index.ts +++ b/packages/domain/src/index.ts @@ -72,4 +72,11 @@ export { type TenantMember, type TenantState, } from "./tenant/index.ts"; +export { + registerSeedConverter, + snapshotsToSeedFile, + type SeedCommand, + type SeedFile, + type SnapshotToSeedOptions, +} from "./seed.ts"; export { applyUserEvent, INITIAL_USER_STATE, type UserState } from "./user/index.ts"; diff --git a/packages/domain/src/seed.test.ts b/packages/domain/src/seed.test.ts new file mode 100644 index 0000000..f00680d --- /dev/null +++ b/packages/domain/src/seed.test.ts @@ -0,0 +1,149 @@ +import { describe, expect, it } from "vitest"; +import { snapshotsToSeedFile } from "./seed.ts"; + +function makeSnapshot(streamType: string, streamId: string, state: Record) { + return { streamId, streamType, state }; +} + +describe("snapshotsToSeedFile", () => { + it("converts ProductType snapshot to CreateProductType command", () => { + const snapshots = [makeSnapshot("ProductType", "pt-1", { name: "Widget", category: "pantry" })]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.version).toBe(1); + expect(result.commands).toHaveLength(1); + expect(result.commands[0]).toEqual({ + streamId: "pt-1", + streamType: "ProductType", + type: "CreateProductType", + payload: { name: "Widget", category: "pantry" }, + }); + }); + + it("converts StockItem snapshot to AddStockItem command", () => { + const snapshots = [ + makeSnapshot("StockItem", "si-1", { + productTypeId: "pt-1", + level: "full", + exactCount: 10, + expiryDate: "2026-06-01", + purchaseDate: "2026-01-15", + }), + ]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.commands).toHaveLength(1); + expect(result.commands[0]).toEqual({ + streamId: "si-1", + streamType: "StockItem", + type: "AddStockItem", + payload: { + productTypeId: "pt-1", + level: "full", + exactCount: 10, + expiryDate: "2026-06-01", + purchaseDate: "2026-01-15", + }, + }); + }); + + it("omits null optional fields from StockItem payload", () => { + const snapshots = [ + makeSnapshot("StockItem", "si-1", { + productTypeId: "pt-1", + level: "full", + exactCount: null, + expiryDate: null, + purchaseDate: null, + }), + ]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.commands[0]!.payload).toEqual({ + productTypeId: "pt-1", + level: "full", + }); + }); + + it("orders ProductType before StockItem by priority", () => { + const snapshots = [ + makeSnapshot("StockItem", "si-1", { productTypeId: "pt-1", level: "full" }), + makeSnapshot("ProductType", "pt-1", { name: "Widget" }), + ]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.commands).toHaveLength(2); + expect(result.commands[0]!.streamType).toBe("ProductType"); + expect(result.commands[1]!.streamType).toBe("StockItem"); + }); + + it("filters by streamTypes option", () => { + const snapshots = [ + makeSnapshot("ProductType", "pt-1", { name: "Widget" }), + makeSnapshot("StockItem", "si-1", { productTypeId: "pt-1", level: "full" }), + ]; + + const result = snapshotsToSeedFile(snapshots, { streamTypes: ["ProductType"] }); + + expect(result.commands).toHaveLength(1); + expect(result.commands[0]!.streamType).toBe("ProductType"); + }); + + it("excludes Tenant snapshots", () => { + const snapshots = [ + makeSnapshot("Tenant", "t-1", { name: "Acme" }), + makeSnapshot("ProductType", "pt-1", { name: "Widget" }), + ]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.commands).toHaveLength(1); + expect(result.commands[0]!.streamType).toBe("ProductType"); + }); + + it("excludes User snapshots", () => { + const snapshots = [ + makeSnapshot("User", "u-1", { email: "alice@example.com" }), + makeSnapshot("ProductType", "pt-1", { name: "Widget" }), + ]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.commands).toHaveLength(1); + expect(result.commands[0]!.streamType).toBe("ProductType"); + }); + + it("skips unknown stream types with no converter", () => { + const snapshots = [ + makeSnapshot("UnknownAggregate", "x-1", { foo: "bar" }), + makeSnapshot("ProductType", "pt-1", { name: "Widget" }), + ]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.commands).toHaveLength(1); + expect(result.commands[0]!.streamType).toBe("ProductType"); + }); + + it("returns empty commands for empty snapshots", () => { + const result = snapshotsToSeedFile([]); + + expect(result.version).toBe(1); + expect(result.commands).toEqual([]); + }); + + it("returns empty commands when all snapshots are excluded types", () => { + const snapshots = [ + makeSnapshot("Tenant", "t-1", { name: "Acme" }), + makeSnapshot("User", "u-1", { email: "alice@example.com" }), + ]; + + const result = snapshotsToSeedFile(snapshots); + + expect(result.commands).toEqual([]); + }); +}); diff --git a/packages/domain/src/seed.ts b/packages/domain/src/seed.ts new file mode 100644 index 0000000..dda3ae4 --- /dev/null +++ b/packages/domain/src/seed.ts @@ -0,0 +1,99 @@ +export interface SeedCommand { + readonly streamId: string; + readonly streamType: string; + readonly type: string; + readonly payload: Record; + readonly actualTime?: string; +} + +export interface SeedFile { + readonly version: 1; + readonly commands: readonly SeedCommand[]; +} + +export interface SnapshotToSeedOptions { + readonly streamTypes?: readonly string[]; +} + +interface AggregateSnapshot { + readonly streamId: string; + readonly streamType: string; + readonly state: Record; +} + +type SnapshotConverter = (snapshot: AggregateSnapshot) => SeedCommand; + +const EXCLUDED_STREAM_TYPES = new Set(["Tenant", "User"]); + +const CONVERTER_REGISTRY = new Map(); + +const STREAM_TYPE_PRIORITY = new Map(); + +export function registerSeedConverter( + streamType: string, + converter: SnapshotConverter, + priority: number, +): void { + CONVERTER_REGISTRY.set(streamType, converter); + STREAM_TYPE_PRIORITY.set(streamType, priority); +} + +export function snapshotsToSeedFile( + snapshots: readonly AggregateSnapshot[], + options?: SnapshotToSeedOptions, +): SeedFile { + const allowedTypes = options?.streamTypes ? new Set(options.streamTypes) : null; + + const commands: SeedCommand[] = []; + + for (const snapshot of snapshots) { + if (EXCLUDED_STREAM_TYPES.has(snapshot.streamType)) continue; + if (allowedTypes && !allowedTypes.has(snapshot.streamType)) continue; + + const converter = CONVERTER_REGISTRY.get(snapshot.streamType); + if (!converter) continue; + + commands.push(converter(snapshot)); + } + + commands.sort((a, b) => { + const pa = STREAM_TYPE_PRIORITY.get(a.streamType) ?? Number.MAX_SAFE_INTEGER; + const pb = STREAM_TYPE_PRIORITY.get(b.streamType) ?? Number.MAX_SAFE_INTEGER; + return pa - pb; + }); + + return { version: 1, commands }; +} + +// Built-in converters for inventory aggregates + +registerSeedConverter( + "ProductType", + (snap) => ({ + streamId: snap.streamId, + streamType: "ProductType", + type: "CreateProductType", + payload: { + name: snap.state.name as string, + ...(snap.state.category != null ? { category: snap.state.category } : {}), + }, + }), + 10, +); + +registerSeedConverter( + "StockItem", + (snap) => ({ + streamId: snap.streamId, + streamType: "StockItem", + type: "AddStockItem", + payload: { + productTypeId: snap.state.productTypeId as string, + level: snap.state.level as string, + ...(snap.state.exactCount != null ? { exactCount: snap.state.exactCount } : {}), + ...(snap.state.expiryDate != null ? { expiryDate: snap.state.expiryDate } : {}), + ...(snap.state.purchaseDate != null ? { purchaseDate: snap.state.purchaseDate } : {}), + }, + }), + 20, +);