Skip to content

Commit 41facd2

Browse files
committed
fix(session-ingest): delay queue retries safely
1 parent 4820d06 commit 41facd2

4 files changed

Lines changed: 90 additions & 11 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { describe, expect, it, vi } from 'vitest';
2+
3+
vi.mock('cloudflare:workers', () => ({
4+
DurableObject: class DurableObject {
5+
constructor(_state: unknown, _env: unknown) {}
6+
},
7+
}));
8+
9+
import { ingestOrderCursor } from './SessionIngestDO';
10+
11+
describe('SessionIngestDO ingest ordering', () => {
12+
it('uses ingested_at with id only as a tie-breaker for cursor progression', () => {
13+
expect(ingestOrderCursor({ ingested_at: 100, id: 7 })).toEqual({ ingestedAt: 100, id: 7 });
14+
expect(ingestOrderCursor({ ingested_at: null, id: 3 })).toEqual({ ingestedAt: null, id: 3 });
15+
});
16+
});

services/session-ingest/src/dos/SessionIngestDO.ts

Lines changed: 37 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DurableObject } from 'cloudflare:workers';
2-
import { eq, ne, gt, gte, lt, and, inArray, isNotNull } from 'drizzle-orm';
2+
import { eq, ne, gt, gte, lt, and, or, inArray, isNull, isNotNull } from 'drizzle-orm';
33
import { drizzle, type DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite';
44
import { migrate } from 'drizzle-orm/durable-sqlite/migrator';
55

@@ -86,6 +86,26 @@ const INGEST_META_EXTRACTORS: Array<{
8686

8787
type Changes = Array<{ name: ExtractableMetaKey; value: string | null }>;
8888

89+
export type IngestOrderCursor = { ingestedAt: number | null; id: number };
90+
91+
export function afterIngestOrderCursor(cursor: IngestOrderCursor) {
92+
if (cursor.ingestedAt === null) {
93+
return or(
94+
and(isNull(ingestItems.ingested_at), gt(ingestItems.id, cursor.id)),
95+
isNotNull(ingestItems.ingested_at)
96+
);
97+
}
98+
99+
return or(
100+
gt(ingestItems.ingested_at, cursor.ingestedAt),
101+
and(eq(ingestItems.ingested_at, cursor.ingestedAt), gt(ingestItems.id, cursor.id))
102+
);
103+
}
104+
105+
export function ingestOrderCursor(row: { ingested_at: number | null; id: number }): IngestOrderCursor {
106+
return { ingestedAt: row.ingested_at, id: row.id };
107+
}
108+
89109
export class SessionIngestDO extends DurableObject<Env> {
90110
private db: DrizzleSqliteDODatabase;
91111

@@ -295,25 +315,31 @@ export class SessionIngestDO extends DurableObject<Env> {
295315
// --- messages ---
296316
const CURSOR_BATCH = 10;
297317
controller.enqueue(encoder.encode(',"messages":['));
298-
let msgCursor = 0;
318+
let msgCursor: IngestOrderCursor | undefined;
299319
let firstMsg = true;
300320

301321
while (true) {
302322
const msgBatch = db
303323
.select({
304324
id: ingestItems.id,
325+
ingested_at: ingestItems.ingested_at,
305326
item_id: ingestItems.item_id,
306327
item_data: ingestItems.item_data,
307328
item_data_r2_key: ingestItems.item_data_r2_key,
308329
})
309330
.from(ingestItems)
310-
.where(and(eq(ingestItems.item_type, 'message'), gt(ingestItems.id, msgCursor)))
311-
.orderBy(ingestItems.id)
331+
.where(
332+
and(
333+
eq(ingestItems.item_type, 'message'),
334+
msgCursor ? afterIngestOrderCursor(msgCursor) : undefined
335+
)
336+
)
337+
.orderBy(ingestItems.ingested_at, ingestItems.id)
312338
.limit(CURSOR_BATCH)
313339
.all();
314340

315341
if (msgBatch.length === 0) break;
316-
msgCursor = msgBatch[msgBatch.length - 1].id;
342+
msgCursor = ingestOrderCursor(msgBatch[msgBatch.length - 1]);
317343

318344
for (const msgRow of msgBatch) {
319345
if (!firstMsg) controller.enqueue(encoder.encode(','));
@@ -327,13 +353,14 @@ export class SessionIngestDO extends DurableObject<Env> {
327353
const msgId = msgRow.item_id.slice('message/'.length);
328354
const partRange = getPartItemIdentityRange(msgId);
329355
controller.enqueue(encoder.encode(',"parts":['));
330-
let partCursor = 0;
356+
let partCursor: IngestOrderCursor | undefined;
331357
let firstPart = true;
332358

333359
while (true) {
334360
const partBatch = db
335361
.select({
336362
id: ingestItems.id,
363+
ingested_at: ingestItems.ingested_at,
337364
item_data: ingestItems.item_data,
338365
item_data_r2_key: ingestItems.item_data_r2_key,
339366
})
@@ -343,15 +370,15 @@ export class SessionIngestDO extends DurableObject<Env> {
343370
eq(ingestItems.item_type, 'part'),
344371
gte(ingestItems.item_id, partRange.start),
345372
lt(ingestItems.item_id, partRange.end),
346-
gt(ingestItems.id, partCursor)
373+
partCursor ? afterIngestOrderCursor(partCursor) : undefined
347374
)
348375
)
349-
.orderBy(ingestItems.id)
376+
.orderBy(ingestItems.ingested_at, ingestItems.id)
350377
.limit(CURSOR_BATCH)
351378
.all();
352379

353380
if (partBatch.length === 0) break;
354-
partCursor = partBatch[partBatch.length - 1].id;
381+
partCursor = ingestOrderCursor(partBatch[partBatch.length - 1]);
355382

356383
for (const partRow of partBatch) {
357384
if (!firstPart) controller.enqueue(encoder.encode(','));
@@ -404,7 +431,7 @@ export class SessionIngestDO extends DurableObject<Env> {
404431
})
405432
.from(ingestItems)
406433
.where(ne(ingestItems.item_type, 'session_diff'))
407-
.orderBy(ingestItems.id)
434+
.orderBy(ingestItems.ingested_at, ingestItems.id)
408435
.all();
409436

410437
if (rows.length === 0) {

services/session-ingest/src/queue-consumer.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,42 @@ describe('createItemExtractor', () => {
143143
});
144144

145145
describe('queue', () => {
146+
it('delays failed queue message retries to avoid immediately hammering hot DOs', async () => {
147+
const limit = vi.fn(async () => [{ session_id: 'ses_retry' }]);
148+
const where = vi.fn(() => ({ limit }));
149+
const from = vi.fn(() => ({ where }));
150+
vi.mocked(getWorkerDb).mockReturnValue({ select: vi.fn(() => ({ from })) } as never);
151+
const env = {
152+
HYPERDRIVE: { connectionString: 'postgres://unused' },
153+
SESSION_INGEST_R2: { get: vi.fn(async () => null) },
154+
} as never;
155+
const ack = vi.fn();
156+
const retry = vi.fn();
157+
158+
await queue(
159+
{
160+
messages: [
161+
{
162+
body: {
163+
r2Key: 'ingest/retry-missing',
164+
kiloUserId: 'usr_retry',
165+
sessionId: 'ses_retry',
166+
ingestVersion: 1,
167+
ingestedAt: 1,
168+
},
169+
ack,
170+
retry,
171+
},
172+
],
173+
} as never,
174+
env,
175+
{ waitUntil: vi.fn() } as unknown as ExecutionContext
176+
);
177+
178+
expect(ack).not.toHaveBeenCalled();
179+
expect(retry).toHaveBeenCalledWith({ delaySeconds: 60 });
180+
});
181+
146182
it('passes full parsed oversized message data and its R2 reference into ingest', async () => {
147183
const ingest = vi.fn(async () => ({ changes: [] }));
148184
vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never);

services/session-ingest/src/queue-consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ export async function queue(
540540
sessionId: msg.body.sessionId,
541541
error: err instanceof Error ? err.message : String(err),
542542
});
543-
msg.retry();
543+
msg.retry({ delaySeconds: 60 });
544544
}
545545
}
546546
}

0 commit comments

Comments
 (0)