Skip to content

Commit b272842

Browse files
committed
fix(session-ingest): delay queue retries safely
1 parent 4820d06 commit b272842

4 files changed

Lines changed: 79 additions & 11 deletions

File tree

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
import { describe, expect, it, vi } from 'vitest';
2+
3+
vi.mock('cloudflare:workers', () => ({
4+
DurableObject: class DurableObject {
5+
constructor(_state: unknown, _env: unknown) {}
6+
},
7+
}));
8+
9+
import { ingestOrderCursor } from './SessionIngestDO';
10+
11+
describe('SessionIngestDO ingest ordering', () => {
12+
it('uses ingested_at with id only as a tie-breaker for cursor progression', () => {
13+
expect(ingestOrderCursor({ ingested_at: 100, id: 7 })).toEqual({ ingestedAt: 100, id: 7 });
14+
expect(ingestOrderCursor({ ingested_at: null, id: 3 })).toEqual({ ingestedAt: 0, id: 3 });
15+
});
16+
});

services/session-ingest/src/dos/SessionIngestDO.ts

Lines changed: 26 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { DurableObject } from 'cloudflare:workers';
2-
import { eq, ne, gt, gte, lt, and, inArray, isNotNull } from 'drizzle-orm';
2+
import { eq, ne, gte, lt, and, inArray, isNotNull, sql } from 'drizzle-orm';
33
import { drizzle, type DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite';
44
import { migrate } from 'drizzle-orm/durable-sqlite/migrator';
55

@@ -86,6 +86,20 @@ const INGEST_META_EXTRACTORS: Array<{
8686

8787
type Changes = Array<{ name: ExtractableMetaKey; value: string | null }>;
8888

89+
export type IngestOrderCursor = { ingestedAt: number; id: number };
90+
91+
export function coalescedIngestedAt() {
92+
return sql<number>`coalesce(${ingestItems.ingested_at}, 0)`;
93+
}
94+
95+
export function afterIngestOrderCursor(cursor: IngestOrderCursor) {
96+
return sql`(${coalescedIngestedAt()} > ${cursor.ingestedAt} OR (${coalescedIngestedAt()} = ${cursor.ingestedAt} AND ${ingestItems.id} > ${cursor.id}))`;
97+
}
98+
99+
export function ingestOrderCursor(row: { ingested_at: number | null; id: number }): IngestOrderCursor {
100+
return { ingestedAt: row.ingested_at ?? 0, id: row.id };
101+
}
102+
89103
export class SessionIngestDO extends DurableObject<Env> {
90104
private db: DrizzleSqliteDODatabase;
91105

@@ -295,25 +309,26 @@ export class SessionIngestDO extends DurableObject<Env> {
295309
// --- messages ---
296310
const CURSOR_BATCH = 10;
297311
controller.enqueue(encoder.encode(',"messages":['));
298-
let msgCursor = 0;
312+
let msgCursor: IngestOrderCursor = { ingestedAt: -1, id: 0 };
299313
let firstMsg = true;
300314

301315
while (true) {
302316
const msgBatch = db
303317
.select({
304318
id: ingestItems.id,
319+
ingested_at: ingestItems.ingested_at,
305320
item_id: ingestItems.item_id,
306321
item_data: ingestItems.item_data,
307322
item_data_r2_key: ingestItems.item_data_r2_key,
308323
})
309324
.from(ingestItems)
310-
.where(and(eq(ingestItems.item_type, 'message'), gt(ingestItems.id, msgCursor)))
311-
.orderBy(ingestItems.id)
325+
.where(and(eq(ingestItems.item_type, 'message'), afterIngestOrderCursor(msgCursor)))
326+
.orderBy(coalescedIngestedAt(), ingestItems.id)
312327
.limit(CURSOR_BATCH)
313328
.all();
314329

315330
if (msgBatch.length === 0) break;
316-
msgCursor = msgBatch[msgBatch.length - 1].id;
331+
msgCursor = ingestOrderCursor(msgBatch[msgBatch.length - 1]);
317332

318333
for (const msgRow of msgBatch) {
319334
if (!firstMsg) controller.enqueue(encoder.encode(','));
@@ -327,13 +342,14 @@ export class SessionIngestDO extends DurableObject<Env> {
327342
const msgId = msgRow.item_id.slice('message/'.length);
328343
const partRange = getPartItemIdentityRange(msgId);
329344
controller.enqueue(encoder.encode(',"parts":['));
330-
let partCursor = 0;
345+
let partCursor: IngestOrderCursor = { ingestedAt: -1, id: 0 };
331346
let firstPart = true;
332347

333348
while (true) {
334349
const partBatch = db
335350
.select({
336351
id: ingestItems.id,
352+
ingested_at: ingestItems.ingested_at,
337353
item_data: ingestItems.item_data,
338354
item_data_r2_key: ingestItems.item_data_r2_key,
339355
})
@@ -343,15 +359,15 @@ export class SessionIngestDO extends DurableObject<Env> {
343359
eq(ingestItems.item_type, 'part'),
344360
gte(ingestItems.item_id, partRange.start),
345361
lt(ingestItems.item_id, partRange.end),
346-
gt(ingestItems.id, partCursor)
362+
afterIngestOrderCursor(partCursor)
347363
)
348364
)
349-
.orderBy(ingestItems.id)
365+
.orderBy(coalescedIngestedAt(), ingestItems.id)
350366
.limit(CURSOR_BATCH)
351367
.all();
352368

353369
if (partBatch.length === 0) break;
354-
partCursor = partBatch[partBatch.length - 1].id;
370+
partCursor = ingestOrderCursor(partBatch[partBatch.length - 1]);
355371

356372
for (const partRow of partBatch) {
357373
if (!firstPart) controller.enqueue(encoder.encode(','));
@@ -404,7 +420,7 @@ export class SessionIngestDO extends DurableObject<Env> {
404420
})
405421
.from(ingestItems)
406422
.where(ne(ingestItems.item_type, 'session_diff'))
407-
.orderBy(ingestItems.id)
423+
.orderBy(coalescedIngestedAt(), ingestItems.id)
408424
.all();
409425

410426
if (rows.length === 0) {

services/session-ingest/src/queue-consumer.test.ts

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,6 +143,42 @@ describe('createItemExtractor', () => {
143143
});
144144

145145
describe('queue', () => {
146+
it('delays failed queue message retries to avoid immediately hammering hot DOs', async () => {
147+
const limit = vi.fn(async () => [{ session_id: 'ses_retry' }]);
148+
const where = vi.fn(() => ({ limit }));
149+
const from = vi.fn(() => ({ where }));
150+
vi.mocked(getWorkerDb).mockReturnValue({ select: vi.fn(() => ({ from })) } as never);
151+
const env = {
152+
HYPERDRIVE: { connectionString: 'postgres://unused' },
153+
SESSION_INGEST_R2: { get: vi.fn(async () => null) },
154+
} as never;
155+
const ack = vi.fn();
156+
const retry = vi.fn();
157+
158+
await queue(
159+
{
160+
messages: [
161+
{
162+
body: {
163+
r2Key: 'ingest/retry-missing',
164+
kiloUserId: 'usr_retry',
165+
sessionId: 'ses_retry',
166+
ingestVersion: 1,
167+
ingestedAt: 1,
168+
},
169+
ack,
170+
retry,
171+
},
172+
],
173+
} as never,
174+
env,
175+
{ waitUntil: vi.fn() } as unknown as ExecutionContext
176+
);
177+
178+
expect(ack).not.toHaveBeenCalled();
179+
expect(retry).toHaveBeenCalledWith({ delaySeconds: 60 });
180+
});
181+
146182
it('passes full parsed oversized message data and its R2 reference into ingest', async () => {
147183
const ingest = vi.fn(async () => ({ changes: [] }));
148184
vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never);

services/session-ingest/src/queue-consumer.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -540,7 +540,7 @@ export async function queue(
540540
sessionId: msg.body.sessionId,
541541
error: err instanceof Error ? err.message : String(err),
542542
});
543-
msg.retry();
543+
msg.retry({ delaySeconds: 60 });
544544
}
545545
}
546546
}

0 commit comments

Comments
 (0)