Skip to content

Commit 6c67e95

Browse files
dahliaclaude-sonnet-4-6
andcommitted
Add per-row monotonic tie-breaker to enqueueMany()
MySQL evaluates NOW(6) once per statement, so all rows in a bulk INSERT share the same deliver_after timestamp. When messages share an orderingKey, dequeueing (which ORDER BY deliver_after) then selects among them non-deterministically. Fix by using delayMs * 1000 + index microseconds for each row's interval, so row i gets a deliver_after exactly i µs later than row 0. This ensures the insertion order is preserved under any orderingKey without requiring a new column or schema change. Add a regression test that enqueues five messages under the same ordering key via enqueueMany() and asserts they are delivered in order. #599 (comment) Co-Authored-By: claude-sonnet-4-6 <claude-sonnet-4-6@anthropic.com>
1 parent d005d1e commit 6c67e95

2 files changed

Lines changed: 49 additions & 2 deletions

File tree

packages/mysql/src/mq.test.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,6 +575,48 @@ test(
575575
},
576576
);
577577

578+
test(
579+
"MysqlMessageQueue.enqueueMany() preserves insertion order for same ordering key",
580+
{ skip: dbUrl == null },
581+
async () => {
582+
if (dbUrl == null) return; // Bun does not support skip option
583+
const pool = mysql.createPool(dbUrl!);
584+
const tableName = randomTableName("bord");
585+
const mq = new MysqlMessageQueue(pool, {
586+
tableName,
587+
pollInterval: { milliseconds: 50 },
588+
});
589+
const controller = new AbortController();
590+
const received: number[] = [];
591+
try {
592+
const listening = mq.listen(
593+
(msg: number) => {
594+
received.push(msg);
595+
},
596+
{ signal: controller.signal },
597+
);
598+
599+
// All 5 messages share the same ordering key. Without per-row
600+
// tie-breaker timestamps, all deliver_after values would be identical
601+
// (NOW(6) is constant within a single SQL statement) and dequeue order
602+
// would be nondeterministic.
603+
await mq.enqueueMany([1, 2, 3, 4, 5], { orderingKey: "order-test" });
604+
605+
await waitFor(() => received.length >= 5, 10_000);
606+
assert.deepStrictEqual(
607+
received,
608+
[1, 2, 3, 4, 5],
609+
"enqueueMany() must deliver messages in insertion order for the same ordering key",
610+
);
611+
controller.abort();
612+
await listening;
613+
} finally {
614+
await mq.drop();
615+
await pool.end();
616+
}
617+
},
618+
);
619+
578620
// ---------------------------------------------------------------------------
579621
// Handler error survival
580622
// ---------------------------------------------------------------------------

packages/mysql/src/mq.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -245,9 +245,14 @@ export class MysqlMessageQueue implements MessageQueue {
245245
const placeholders = messages.map(() =>
246246
"(UUID(), CAST(? AS JSON), DATE_ADD(NOW(6), INTERVAL ? MICROSECOND), ?)"
247247
).join(", ");
248-
const values = messages.flatMap((message) => [
248+
// Each row gets a monotonically increasing interval so that messages
249+
// sharing the same orderingKey are assigned distinct deliver_after values
250+
// within a single INSERT statement. Without this tie-breaker, all rows
251+
// would receive the same NOW(6) timestamp (MySQL evaluates NOW(6) once per
252+
// statement), making dequeue order nondeterministic for ordered keys.
253+
const values = messages.flatMap((message, index) => [
249254
JSON.stringify(message),
250-
delayMs * 1000,
255+
delayMs * 1000 + index,
251256
orderingKey,
252257
]);
253258
let conn: PoolConnection | undefined;

0 commit comments

Comments
 (0)