Skip to content

Commit 41be1d8

Browse files
committed
fix(session-ingest): delay queue retries safely
1 parent 4820d06 commit 41be1d8

4 files changed

Lines changed: 93 additions & 11 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { describe, expect, it, vi } from 'vitest';
2+
3+
vi.mock('cloudflare:workers', () => ({
4+
DurableObject: class DurableObject {
5+
constructor(_state: unknown, _env: unknown) {}
6+
},
7+
}));
8+
9+
import { ingestOrderCursor } from './SessionIngestDO';
10+
11+
describe('SessionIngestDO ingest ordering', () => {
12+
it('uses ingested_at with id only as a tie-breaker for cursor progression', () => {
13+
expect(ingestOrderCursor({ ingested_at: 100, id: 7 })).toEqual({ ingestedAt: 100, id: 7 });
14+
expect(ingestOrderCursor({ ingested_at: null, id: 3 })).toEqual({ ingestedAt: null, id: 3 });
15+
});
16+
});

services/session-ingest/src/dos/SessionIngestDO.ts

Lines changed: 40 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DurableObject } from 'cloudflare:workers';
2-
import { eq, ne, gt, gte, lt, and, inArray, isNotNull } from 'drizzle-orm';
2+
import { eq, ne, gt, gte, lt, and, or, inArray, isNull, isNotNull } from 'drizzle-orm';
33
import { drizzle, type DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite';
44
import { migrate } from 'drizzle-orm/durable-sqlite/migrator';
55

@@ -86,6 +86,29 @@ const INGEST_META_EXTRACTORS: Array<{
8686

8787
type Changes = Array<{ name: ExtractableMetaKey; value: string | null }>;
8888

89+
export type IngestOrderCursor = { ingestedAt: number | null; id: number };
90+
91+
export function afterIngestOrderCursor(cursor: IngestOrderCursor) {
92+
if (cursor.ingestedAt === null) {
93+
return or(
94+
and(isNull(ingestItems.ingested_at), gt(ingestItems.id, cursor.id)),
95+
isNotNull(ingestItems.ingested_at)
96+
);
97+
}
98+
99+
return or(
100+
gt(ingestItems.ingested_at, cursor.ingestedAt),
101+
and(eq(ingestItems.ingested_at, cursor.ingestedAt), gt(ingestItems.id, cursor.id))
102+
);
103+
}
104+
105+
export function ingestOrderCursor(row: {
106+
ingested_at: number | null;
107+
id: number;
108+
}): IngestOrderCursor {
109+
return { ingestedAt: row.ingested_at, id: row.id };
110+
}
111+
89112
export class SessionIngestDO extends DurableObject<Env> {
90113
private db: DrizzleSqliteDODatabase;
91114

@@ -295,25 +318,31 @@ export class SessionIngestDO extends DurableObject<Env> {
295318
// --- messages ---
296319
const CURSOR_BATCH = 10;
297320
controller.enqueue(encoder.encode(',"messages":['));
298-
let msgCursor = 0;
321+
let msgCursor: IngestOrderCursor | undefined;
299322
let firstMsg = true;
300323

301324
while (true) {
302325
const msgBatch = db
303326
.select({
304327
id: ingestItems.id,
328+
ingested_at: ingestItems.ingested_at,
305329
item_id: ingestItems.item_id,
306330
item_data: ingestItems.item_data,
307331
item_data_r2_key: ingestItems.item_data_r2_key,
308332
})
309333
.from(ingestItems)
310-
.where(and(eq(ingestItems.item_type, 'message'), gt(ingestItems.id, msgCursor)))
311-
.orderBy(ingestItems.id)
334+
.where(
335+
and(
336+
eq(ingestItems.item_type, 'message'),
337+
msgCursor ? afterIngestOrderCursor(msgCursor) : undefined
338+
)
339+
)
340+
.orderBy(ingestItems.ingested_at, ingestItems.id)
312341
.limit(CURSOR_BATCH)
313342
.all();
314343

315344
if (msgBatch.length === 0) break;
316-
msgCursor = msgBatch[msgBatch.length - 1].id;
345+
msgCursor = ingestOrderCursor(msgBatch[msgBatch.length - 1]);
317346

318347
for (const msgRow of msgBatch) {
319348
if (!firstMsg) controller.enqueue(encoder.encode(','));
@@ -327,13 +356,14 @@ export class SessionIngestDO extends DurableObject<Env> {
327356
const msgId = msgRow.item_id.slice('message/'.length);
328357
const partRange = getPartItemIdentityRange(msgId);
329358
controller.enqueue(encoder.encode(',"parts":['));
330-
let partCursor = 0;
359+
let partCursor: IngestOrderCursor | undefined;
331360
let firstPart = true;
332361

333362
while (true) {
334363
const partBatch = db
335364
.select({
336365
id: ingestItems.id,
366+
ingested_at: ingestItems.ingested_at,
337367
item_data: ingestItems.item_data,
338368
item_data_r2_key: ingestItems.item_data_r2_key,
339369
})
@@ -343,15 +373,15 @@ export class SessionIngestDO extends DurableObject<Env> {
343373
eq(ingestItems.item_type, 'part'),
344374
gte(ingestItems.item_id, partRange.start),
345375
lt(ingestItems.item_id, partRange.end),
346-
gt(ingestItems.id, partCursor)
376+
partCursor ? afterIngestOrderCursor(partCursor) : undefined
347377
)
348378
)
349-
.orderBy(ingestItems.id)
379+
.orderBy(ingestItems.ingested_at, ingestItems.id)
350380
.limit(CURSOR_BATCH)
351381
.all();
352382

353383
if (partBatch.length === 0) break;
354-
partCursor = partBatch[partBatch.length - 1].id;
384+
partCursor = ingestOrderCursor(partBatch[partBatch.length - 1]);
355385

356386
for (const partRow of partBatch) {
357387
if (!firstPart) controller.enqueue(encoder.encode(','));
@@ -404,7 +434,7 @@ export class SessionIngestDO extends DurableObject<Env> {
404434
})
405435
.from(ingestItems)
406436
.where(ne(ingestItems.item_type, 'session_diff'))
407-
.orderBy(ingestItems.id)
437+
.orderBy(ingestItems.ingested_at, ingestItems.id)
408438
.all();
409439

410440
if (rows.length === 0) {

services/session-ingest/src/queue-consumer.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,42 @@ describe('createItemExtractor', () => {
143143
});
144144

145145
describe('queue', () => {
146+
it('delays failed queue message retries to avoid immediately hammering hot DOs', async () => {
147+
const limit = vi.fn(async () => [{ session_id: 'ses_retry' }]);
148+
const where = vi.fn(() => ({ limit }));
149+
const from = vi.fn(() => ({ where }));
150+
vi.mocked(getWorkerDb).mockReturnValue({ select: vi.fn(() => ({ from })) } as never);
151+
const env = {
152+
HYPERDRIVE: { connectionString: 'postgres://unused' },
153+
SESSION_INGEST_R2: { get: vi.fn(async () => null) },
154+
} as never;
155+
const ack = vi.fn();
156+
const retry = vi.fn();
157+
158+
await queue(
159+
{
160+
messages: [
161+
{
162+
body: {
163+
r2Key: 'ingest/retry-missing',
164+
kiloUserId: 'usr_retry',
165+
sessionId: 'ses_retry',
166+
ingestVersion: 1,
167+
ingestedAt: 1,
168+
},
169+
ack,
170+
retry,
171+
},
172+
],
173+
} as never,
174+
env,
175+
{ waitUntil: vi.fn() } as unknown as ExecutionContext
176+
);
177+
178+
expect(ack).not.toHaveBeenCalled();
179+
expect(retry).toHaveBeenCalledWith({ delaySeconds: 60 });
180+
});
181+
146182
it('passes full parsed oversized message data and its R2 reference into ingest', async () => {
147183
const ingest = vi.fn(async () => ({ changes: [] }));
148184
vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never);

services/session-ingest/src/queue-consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ export async function queue(
540540
sessionId: msg.body.sessionId,
541541
error: err instanceof Error ? err.message : String(err),
542542
});
543-
msg.retry();
543+
msg.retry({ delaySeconds: 60 });
544544
}
545545
}
546546
}

0 commit comments

Comments
 (0)