Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions services/session-ingest/drizzle/0002_watery_venus.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
CREATE INDEX `ingest_items_ingested_at_id_idx` ON `ingest_items` (`ingested_at`,`id`);
127 changes: 127 additions & 0 deletions services/session-ingest/drizzle/meta/0002_snapshot.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
{
"version": "6",
"dialect": "sqlite",
"id": "7f8bb7f8-414e-4875-bab2-3e6ce4de7a40",
"prevId": "878ab46c-3f3b-4b31-949b-88ef03ae6244",
"tables": {
"ingest_items": {
"name": "ingest_items",
"columns": {
"id": {
"name": "id",
"type": "integer",
"primaryKey": true,
"notNull": true,
"autoincrement": true
},
"item_id": {
"name": "item_id",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"item_type": {
"name": "item_type",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"item_data": {
"name": "item_data",
"type": "text",
"primaryKey": false,
"notNull": true,
"autoincrement": false
},
"item_data_r2_key": {
"name": "item_data_r2_key",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
},
"ingested_at": {
"name": "ingested_at",
"type": "integer",
"primaryKey": false,
"notNull": false,
"autoincrement": false
}
},
"indexes": {
"ingest_items_item_id_unique": {
"name": "ingest_items_item_id_unique",
"columns": [
"item_id"
],
"isUnique": true
},
"ingest_items_ingested_at_id_idx": {
"name": "ingest_items_ingested_at_id_idx",
"columns": [
"ingested_at",
"id"
],
"isUnique": false
}
},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"ingest_meta": {
"name": "ingest_meta",
"columns": {
"key": {
"name": "key",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
},
"value": {
"name": "value",
"type": "text",
"primaryKey": false,
"notNull": false,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
},
"sessions": {
"name": "sessions",
"columns": {
"session_id": {
"name": "session_id",
"type": "text",
"primaryKey": true,
"notNull": true,
"autoincrement": false
}
},
"indexes": {},
"foreignKeys": {},
"compositePrimaryKeys": {},
"uniqueConstraints": {},
"checkConstraints": {}
}
},
"views": {},
"enums": {},
"_meta": {
"schemas": {},
"tables": {},
"columns": {}
},
"internal": {
"indexes": {}
}
}
7 changes: 7 additions & 0 deletions services/session-ingest/drizzle/meta/_journal.json
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,13 @@
"when": 1772799661151,
"tag": "0001_common_blackheart",
"breakpoints": true
},
{
"idx": 2,
"version": "6",
"when": 1780499751344,
"tag": "0002_watery_venus",
"breakpoints": true
}
]
}
2 changes: 2 additions & 0 deletions services/session-ingest/drizzle/migrations.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import journal from './meta/_journal.json';
import m0000 from './0000_redundant_slyde.sql';
import m0001 from './0001_common_blackheart.sql';
import m0002 from './0002_watery_venus.sql';

export default {
journal,
migrations: {
m0000,
m0001,
m0002,
},
};
22 changes: 13 additions & 9 deletions services/session-ingest/src/db/sqlite-schema.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,17 @@
import { sqliteTable, text, integer } from 'drizzle-orm/sqlite-core';
import { index, sqliteTable, text, integer } from 'drizzle-orm/sqlite-core';

export const ingestItems = sqliteTable('ingest_items', {
id: integer('id').primaryKey({ autoIncrement: true }),
item_id: text('item_id').notNull().unique(),
item_type: text('item_type').notNull(),
item_data: text('item_data').notNull(),
item_data_r2_key: text('item_data_r2_key'),
ingested_at: integer('ingested_at'),
});
export const ingestItems = sqliteTable(
'ingest_items',
{
id: integer('id').primaryKey({ autoIncrement: true }),
item_id: text('item_id').notNull().unique(),
item_type: text('item_type').notNull(),
item_data: text('item_data').notNull(),
item_data_r2_key: text('item_data_r2_key'),
ingested_at: integer('ingested_at'),
},
table => [index('ingest_items_ingested_at_id_idx').on(table.ingested_at, table.id)]
);

export const ingestMeta = sqliteTable('ingest_meta', {
key: text('key').primaryKey(),
Expand Down
16 changes: 16 additions & 0 deletions services/session-ingest/src/dos/SessionIngestDO.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { describe, expect, it, vi } from 'vitest';

vi.mock('cloudflare:workers', () => ({
DurableObject: class DurableObject {
constructor(_state: unknown, _env: unknown) {}
},
}));

import { ingestOrderCursor } from './SessionIngestDO';

describe('SessionIngestDO ingest ordering', () => {
it('uses ingested_at with id only as a tie-breaker for cursor progression', () => {
expect(ingestOrderCursor({ ingested_at: 100, id: 7 })).toEqual({ ingestedAt: 100, id: 7 });
expect(ingestOrderCursor({ ingested_at: null, id: 3 })).toEqual({ ingestedAt: null, id: 3 });
});
});
50 changes: 40 additions & 10 deletions services/session-ingest/src/dos/SessionIngestDO.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { DurableObject } from 'cloudflare:workers';
import { eq, ne, gt, gte, lt, and, inArray, isNotNull } from 'drizzle-orm';
import { eq, ne, gt, gte, lt, and, or, inArray, isNull, isNotNull } from 'drizzle-orm';
import { drizzle, type DrizzleSqliteDODatabase } from 'drizzle-orm/durable-sqlite';
import { migrate } from 'drizzle-orm/durable-sqlite/migrator';

Expand Down Expand Up @@ -86,6 +86,29 @@ const INGEST_META_EXTRACTORS: Array<{

type Changes = Array<{ name: ExtractableMetaKey; value: string | null }>;

export type IngestOrderCursor = { ingestedAt: number | null; id: number };

export function afterIngestOrderCursor(cursor: IngestOrderCursor) {
if (cursor.ingestedAt === null) {
return or(
and(isNull(ingestItems.ingested_at), gt(ingestItems.id, cursor.id)),
isNotNull(ingestItems.ingested_at)
);
}

return or(
gt(ingestItems.ingested_at, cursor.ingestedAt),
and(eq(ingestItems.ingested_at, cursor.ingestedAt), gt(ingestItems.id, cursor.id))
);
}

export function ingestOrderCursor(row: {
ingested_at: number | null;
id: number;
}): IngestOrderCursor {
return { ingestedAt: row.ingested_at, id: row.id };
}

export class SessionIngestDO extends DurableObject<Env> {
private db: DrizzleSqliteDODatabase;

Expand Down Expand Up @@ -295,25 +318,31 @@ export class SessionIngestDO extends DurableObject<Env> {
// --- messages ---
const CURSOR_BATCH = 10;
controller.enqueue(encoder.encode(',"messages":['));
let msgCursor = 0;
let msgCursor: IngestOrderCursor | undefined;
let firstMsg = true;

while (true) {
const msgBatch = db
.select({
id: ingestItems.id,
ingested_at: ingestItems.ingested_at,
item_id: ingestItems.item_id,
item_data: ingestItems.item_data,
item_data_r2_key: ingestItems.item_data_r2_key,
})
.from(ingestItems)
.where(and(eq(ingestItems.item_type, 'message'), gt(ingestItems.id, msgCursor)))
.orderBy(ingestItems.id)
.where(
and(
eq(ingestItems.item_type, 'message'),
msgCursor ? afterIngestOrderCursor(msgCursor) : undefined
)
)
.orderBy(ingestItems.ingested_at, ingestItems.id)
Comment thread
johnnyeric marked this conversation as resolved.
.limit(CURSOR_BATCH)
.all();

if (msgBatch.length === 0) break;
msgCursor = msgBatch[msgBatch.length - 1].id;
msgCursor = ingestOrderCursor(msgBatch[msgBatch.length - 1]);

for (const msgRow of msgBatch) {
if (!firstMsg) controller.enqueue(encoder.encode(','));
Expand All @@ -327,13 +356,14 @@ export class SessionIngestDO extends DurableObject<Env> {
const msgId = msgRow.item_id.slice('message/'.length);
const partRange = getPartItemIdentityRange(msgId);
controller.enqueue(encoder.encode(',"parts":['));
let partCursor = 0;
let partCursor: IngestOrderCursor | undefined;
let firstPart = true;

while (true) {
const partBatch = db
.select({
id: ingestItems.id,
ingested_at: ingestItems.ingested_at,
item_data: ingestItems.item_data,
item_data_r2_key: ingestItems.item_data_r2_key,
})
Expand All @@ -343,15 +373,15 @@ export class SessionIngestDO extends DurableObject<Env> {
eq(ingestItems.item_type, 'part'),
gte(ingestItems.item_id, partRange.start),
lt(ingestItems.item_id, partRange.end),
gt(ingestItems.id, partCursor)
partCursor ? afterIngestOrderCursor(partCursor) : undefined
)
)
.orderBy(ingestItems.id)
.orderBy(ingestItems.ingested_at, ingestItems.id)
.limit(CURSOR_BATCH)
.all();

if (partBatch.length === 0) break;
partCursor = partBatch[partBatch.length - 1].id;
partCursor = ingestOrderCursor(partBatch[partBatch.length - 1]);

for (const partRow of partBatch) {
if (!firstPart) controller.enqueue(encoder.encode(','));
Expand Down Expand Up @@ -404,7 +434,7 @@ export class SessionIngestDO extends DurableObject<Env> {
})
.from(ingestItems)
.where(ne(ingestItems.item_type, 'session_diff'))
.orderBy(ingestItems.id)
.orderBy(ingestItems.ingested_at, ingestItems.id)
.all();

if (rows.length === 0) {
Expand Down
36 changes: 36 additions & 0 deletions services/session-ingest/src/queue-consumer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,42 @@ describe('createItemExtractor', () => {
});

describe('queue', () => {
it('delays failed queue message retries to avoid immediately hammering hot DOs', async () => {
const limit = vi.fn(async () => [{ session_id: 'ses_retry' }]);
const where = vi.fn(() => ({ limit }));
const from = vi.fn(() => ({ where }));
vi.mocked(getWorkerDb).mockReturnValue({ select: vi.fn(() => ({ from })) } as never);
const env = {
HYPERDRIVE: { connectionString: 'postgres://unused' },
SESSION_INGEST_R2: { get: vi.fn(async () => null) },
} as never;
const ack = vi.fn();
const retry = vi.fn();

await queue(
{
messages: [
{
body: {
r2Key: 'ingest/retry-missing',
kiloUserId: 'usr_retry',
sessionId: 'ses_retry',
ingestVersion: 1,
ingestedAt: 1,
},
ack,
retry,
},
],
} as never,
env,
{ waitUntil: vi.fn() } as unknown as ExecutionContext
);

expect(ack).not.toHaveBeenCalled();
expect(retry).toHaveBeenCalledWith({ delaySeconds: 60 });
});

it('passes full parsed oversized message data and its R2 reference into ingest', async () => {
const ingest = vi.fn(async () => ({ changes: [] }));
vi.mocked(getSessionIngestDO).mockReturnValue({ ingest } as never);
Expand Down
2 changes: 1 addition & 1 deletion services/session-ingest/src/queue-consumer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ export async function queue(
sessionId: msg.body.sessionId,
error: err instanceof Error ? err.message : String(err),
});
msg.retry();
msg.retry({ delaySeconds: 60 });
}
}
}