diff --git a/services/session-ingest/drizzle/0002_watery_venus.sql b/services/session-ingest/drizzle/0002_watery_venus.sql new file mode 100644 index 0000000000..237cf2c390 --- /dev/null +++ b/services/session-ingest/drizzle/0002_watery_venus.sql @@ -0,0 +1 @@ +CREATE INDEX `ingest_items_ingested_at_id_idx` ON `ingest_items` (`ingested_at`,`id`); \ No newline at end of file diff --git a/services/session-ingest/drizzle/meta/0002_snapshot.json b/services/session-ingest/drizzle/meta/0002_snapshot.json new file mode 100644 index 0000000000..55d799d863 --- /dev/null +++ b/services/session-ingest/drizzle/meta/0002_snapshot.json @@ -0,0 +1,127 @@ +{ + "version": "6", + "dialect": "sqlite", + "id": "7f8bb7f8-414e-4875-bab2-3e6ce4de7a40", + "prevId": "878ab46c-3f3b-4b31-949b-88ef03ae6244", + "tables": { + "ingest_items": { + "name": "ingest_items", + "columns": { + "id": { + "name": "id", + "type": "integer", + "primaryKey": true, + "notNull": true, + "autoincrement": true + }, + "item_id": { + "name": "item_id", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "item_type": { + "name": "item_type", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "item_data": { + "name": "item_data", + "type": "text", + "primaryKey": false, + "notNull": true, + "autoincrement": false + }, + "item_data_r2_key": { + "name": "item_data_r2_key", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + }, + "ingested_at": { + "name": "ingested_at", + "type": "integer", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": { + "ingest_items_item_id_unique": { + "name": "ingest_items_item_id_unique", + "columns": [ + "item_id" + ], + "isUnique": true + }, + "ingest_items_ingested_at_id_idx": { + "name": "ingest_items_ingested_at_id_idx", + "columns": [ + "ingested_at", + "id" + ], + "isUnique": false + } + }, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "ingest_meta": { + "name": "ingest_meta", + "columns": { + "key": { + "name": "key", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + }, + "value": { + "name": "value", + "type": "text", + "primaryKey": false, + "notNull": false, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + }, + "sessions": { + "name": "sessions", + "columns": { + "session_id": { + "name": "session_id", + "type": "text", + "primaryKey": true, + "notNull": true, + "autoincrement": false + } + }, + "indexes": {}, + "foreignKeys": {}, + "compositePrimaryKeys": {}, + "uniqueConstraints": {}, + "checkConstraints": {} + } + }, + "views": {}, + "enums": {}, + "_meta": { + "schemas": {}, + "tables": {}, + "columns": {} + }, + "internal": { + "indexes": {} + } +} \ No newline at end of file diff --git a/services/session-ingest/drizzle/meta/_journal.json b/services/session-ingest/drizzle/meta/_journal.json index c92b7d621c..15e9d70a69 100644 --- a/services/session-ingest/drizzle/meta/_journal.json +++ b/services/session-ingest/drizzle/meta/_journal.json @@ -15,6 +15,13 @@ "when": 1772799661151, "tag": "0001_common_blackheart", "breakpoints": true + }, + { + "idx": 2, + "version": "6", + "when": 1780499751344, + "tag": "0002_watery_venus", + "breakpoints": true } ] } \ No newline at end of file diff --git a/services/session-ingest/drizzle/migrations.js b/services/session-ingest/drizzle/migrations.js index 50ac0c2e5c..36e171cb3f 100644 --- a/services/session-ingest/drizzle/migrations.js +++ b/services/session-ingest/drizzle/migrations.js @@ -1,11 +1,13 @@ import journal from './meta/_journal.json'; import m0000 from './0000_redundant_slyde.sql'; import m0001 from './0001_common_blackheart.sql'; +import m0002 from './0002_watery_venus.sql'; export default { journal, migrations: { m0000, m0001, + m0002, }, }; diff --git a/services/session-ingest/src/db/sqlite-schema.ts b/services/session-ingest/src/db/sqlite-schema.ts index 540101a293..2b4e145805 100644 --- a/services/session-ingest/src/db/sqlite-schema.ts +++ b/services/session-ingest/src/db/sqlite-schema.ts @@ -1,13 +1,17 @@ -import { sqliteTable, text, integer } from 'drizzle-orm/sqlite-core'; +import { index, sqliteTable, text, integer } from 'drizzle-orm/sqlite-core'; -export const ingestItems = sqliteTable('ingest_items', { - id: integer('id').primaryKey({ autoIncrement: true }), - item_id: text('item_id').notNull().unique(), - item_type: text('item_type').notNull(), - item_data: text('item_data').notNull(), - item_data_r2_key: text('item_data_r2_key'), - ingested_at: integer('ingested_at'), -}); +export const ingestItems = sqliteTable( + 'ingest_items', + { + id: integer('id').primaryKey({ autoIncrement: true }), + item_id: text('item_id').notNull().unique(), + item_type: text('item_type').notNull(), + item_data: text('item_data').notNull(), + item_data_r2_key: text('item_data_r2_key'), + ingested_at: integer('ingested_at'), + }, + table => [index('ingest_items_ingested_at_id_idx').on(table.ingested_at, table.id)] +); export const ingestMeta = sqliteTable('ingest_meta', { key: text('key').primaryKey(), diff --git a/services/session-ingest/src/dos/SessionIngestDO.test.ts b/services/session-ingest/src/dos/SessionIngestDO.test.ts new file mode 100644 index 0000000000..6f7e327bd3 --- /dev/null +++ b/services/session-ingest/src/dos/SessionIngestDO.test.ts @@ -0,0 +1,16 @@ +import { describe, expect, it, vi } from 'vitest'; + +vi.mock('cloudflare:workers', () => ({ + DurableObject: class DurableObject { + constructor(_state: unknown, _env: unknown) {} + }, +})); + +import { ingestOrderCursor } from './SessionIngestDO'; + +describe('SessionIngestDO ingest ordering', () => { + it('uses ingested_at with id only as a tie-breaker for cursor progression', () => { + expect(ingestOrderCursor({ ingested_at: 100, id: 7 })).toEqual({ ingestedAt: 100, id: 7 }); + expect(ingestOrderCursor({ ingested_at: null, id: 3 })).toEqual({ ingestedAt: null, id: 3 }); + }); +}); diff --git a/services/session-ingest/src/dos/SessionIngestDO.ts b/services/session-ingest/src/dos/SessionIngestDO.ts index 6128905999..fa65f98493 100644 --- a/services/session-ingest/src/dos/SessionIngestDO.ts +++ b/services/session-ingest/src/dos/SessionIngestDO.ts @@ -1,5 +1,5 @@ import { DurableObject } from 'cloudflare:workers'; -import { eq, ne, gt, gte, lt, and, inArray, isNotNull } from 'drizzle-orm'; +import { eq, ne, gt, gte, lt, and, or, inArray, isNull, isNotNull } from 'drizzle-orm'; import { drizzle, type DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite'; import { migrate } from 'drizzle-orm/durable-sqlite/migrator'; @@ -86,6 +86,29 @@ const INGEST_META_EXTRACTORS: Array<{ type Changes = Array<{ name: ExtractableMetaKey; value: string | null }>; +export type IngestOrderCursor = { ingestedAt: number | null; id: number }; + +export function afterIngestOrderCursor(cursor: IngestOrderCursor) { + if (cursor.ingestedAt === null) { + return or( + and(isNull(ingestItems.ingested_at), gt(ingestItems.id, cursor.id)), + isNotNull(ingestItems.ingested_at) + ); + } + + return or( + gt(ingestItems.ingested_at, cursor.ingestedAt), + and(eq(ingestItems.ingested_at, cursor.ingestedAt), gt(ingestItems.id, cursor.id)) + ); +} + +export function ingestOrderCursor(row: { + ingested_at: number | null; + id: number; +}): IngestOrderCursor { + return { ingestedAt: row.ingested_at, id: row.id }; +} + export class SessionIngestDO extends DurableObject { private db: DrizzleSqliteDODatabase; @@ -295,25 +318,31 @@ export class SessionIngestDO extends DurableObject { // --- messages --- const CURSOR_BATCH = 10; controller.enqueue(encoder.encode(',"messages":[')); - let msgCursor = 0; + let msgCursor: IngestOrderCursor | undefined; let firstMsg = true; while (true) { const msgBatch = db .select({ id: ingestItems.id, + ingested_at: ingestItems.ingested_at, item_id: ingestItems.item_id, item_data: ingestItems.item_data, item_data_r2_key: ingestItems.item_data_r2_key, }) .from(ingestItems) - .where(and(eq(ingestItems.item_type, 'message'), gt(ingestItems.id, msgCursor))) - .orderBy(ingestItems.id) + .where( + and( + eq(ingestItems.item_type, 'message'), + msgCursor ? afterIngestOrderCursor(msgCursor) : undefined + ) + ) + .orderBy(ingestItems.ingested_at, ingestItems.id) .limit(CURSOR_BATCH) .all(); if (msgBatch.length === 0) break; - msgCursor = msgBatch[msgBatch.length - 1].id; + msgCursor = ingestOrderCursor(msgBatch[msgBatch.length - 1]); for (const msgRow of msgBatch) { if (!firstMsg) controller.enqueue(encoder.encode(',')); @@ -327,13 +356,14 @@ export class SessionIngestDO extends DurableObject { const msgId = msgRow.item_id.slice('message/'.length); const partRange = getPartItemIdentityRange(msgId); controller.enqueue(encoder.encode(',"parts":[')); - let partCursor = 0; + let partCursor: IngestOrderCursor | undefined; let firstPart = true; while (true) { const partBatch = db .select({ id: ingestItems.id, + ingested_at: ingestItems.ingested_at, item_data: ingestItems.item_data, item_data_r2_key: ingestItems.item_data_r2_key, }) @@ -343,15 +373,15 @@ export class SessionIngestDO extends DurableObject { eq(ingestItems.item_type, 'part'), gte(ingestItems.item_id, partRange.start), lt(ingestItems.item_id, partRange.end), - gt(ingestItems.id, partCursor) + partCursor ? afterIngestOrderCursor(partCursor) : undefined ) ) - .orderBy(ingestItems.id) + .orderBy(ingestItems.ingested_at, ingestItems.id) .limit(CURSOR_BATCH) .all(); if (partBatch.length === 0) break; - partCursor = partBatch[partBatch.length - 1].id; + partCursor = ingestOrderCursor(partBatch[partBatch.length - 1]); for (const partRow of partBatch) { if (!firstPart) controller.enqueue(encoder.encode(',')); @@ -404,7 +434,7 @@ export class SessionIngestDO extends DurableObject { }) .from(ingestItems) .where(ne(ingestItems.item_type, 'session_diff')) - .orderBy(ingestItems.id) + .orderBy(ingestItems.ingested_at, ingestItems.id) .all(); if (rows.length === 0) { diff --git a/services/session-ingest/src/queue-consumer.test.ts b/services/session-ingest/src/queue-consumer.test.ts index 531ba289b8..3dbc6419f0 100644 --- a/services/session-ingest/src/queue-consumer.test.ts +++ b/services/session-ingest/src/queue-consumer.test.ts @@ -143,6 +143,42 @@ describe('createItemExtractor', () => { }); describe('queue', () => { + it('delays failed queue message retries to avoid immediately hammering hot DOs', async () => { + const limit = vi.fn(async () => [{ session_id: 'ses_retry' }]); + const where = vi.fn(() => ({ limit })); + const from = vi.fn(() => ({ where })); + vi.mocked(getWorkerDb).mockReturnValue({ select: vi.fn(() => ({ from })) } as never); + const env = { + HYPERDRIVE: { connectionString: 'postgres://unused' }, + SESSION_INGEST_R2: { get: vi.fn(async () => null) }, + } as never; + const ack = vi.fn(); + const retry = vi.fn(); + + await queue( + { + messages: [ + { + body: { + r2Key: 'ingest/retry-missing', + kiloUserId: 'usr_retry', + sessionId: 'ses_retry', + ingestVersion: 1, + ingestedAt: 1, + }, + ack, + retry, + }, + ], + } as never, + env, + { waitUntil: vi.fn() } as unknown as ExecutionContext + ); + + expect(ack).not.toHaveBeenCalled(); + expect(retry).toHaveBeenCalledWith({ delaySeconds: 60 }); + }); + it('passes full parsed oversized message data and its R2 reference into ingest', async () => { const ingest = vi.fn(async () => ({ changes: [] })); vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never); diff --git a/services/session-ingest/src/queue-consumer.ts b/services/session-ingest/src/queue-consumer.ts index becbebb223..bed1fa9b5c 100644 --- a/services/session-ingest/src/queue-consumer.ts +++ b/services/session-ingest/src/queue-consumer.ts @@ -540,7 +540,7 @@ export async function queue( sessionId: msg.body.sessionId, error: err instanceof Error ? err.message : String(err), }); - msg.retry(); + msg.retry({ delaySeconds: 60 }); } } }