Skip to content

Commit d162dcf

Browse files
committed
fix(webapp): update the queue metrics simulator for cumulative counters
Counter events now emit per queue and op odometer readings with a seeded zero baseline, matching the production emitter, so throughput and started counts reconstruct from simulated data instead of reading zero. Scenario switches prune the previous scenario's queues, a --project flag seeds each scenario into its own project for side-by-side design review, and a new many-queues scenario covers pagination and relevance ranking with one runaway queue, a busy head, a bursty middle, and a sparse tail. Adds --help.
1 parent efdd64f commit d162dcf

1 file changed

Lines changed: 176 additions & 19 deletions

File tree

apps/webapp/seed-queue-metrics.mts

Lines changed: 176 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,39 @@ const scenarios: Record<string, (totalBuckets: number, bucketSec: number) => Sce
185185
],
186186
}),
187187

188+
// Pagination + relevance-ranking design surface: one runaway queue, a busy-but-healthy
189+
// head, a bursty middle, and a long sparse tail across 61 queues (the list pages at 25).
190+
"many-queues": () => ({
191+
description:
192+
"61 queues: one runaway, busy head, bursty middle, long sparse tail (pagination + ranking)",
193+
envLimit: () => 150,
194+
queues: [
195+
{ name: "imports", limit: () => 8, arrivals: (_b, r) => poisson(14, r), waitBaseMs: 80 },
196+
...["checkout", "notifications", "emails"].map((name, i) => ({
197+
name,
198+
limit: () => 15,
199+
arrivals: (_b: number, r: Rng) => poisson(7 + i, r),
200+
waitBaseMs: 60,
201+
})),
202+
...Array.from({ length: 12 }, (_v, i) =>
203+
bursty(`service-${String(i + 1).padStart(2, "0")}`, 10, 2)
204+
),
205+
...Array.from({ length: 20 }, (_v, i) => ({
206+
name: `job-${String(i + 1).padStart(2, "0")}`,
207+
limit: () => 5,
208+
arrivals: (_b: number, r: Rng) => poisson(1, r),
209+
waitBaseMs: 40,
210+
})),
211+
...Array.from({ length: 25 }, (_v, i) => ({
212+
name: `tenant-${String(i + 1).padStart(2, "0")}`,
213+
limit: () => 3,
214+
arrivals: (_b: number, r: Rng) => (r() < 0.05 ? poisson(2, r) : 0),
215+
waitBaseMs: 30,
216+
sparse: true,
217+
})),
218+
],
219+
}),
220+
188221
// Default: one env with a variety of queue behaviours + occasional env saturation.
189222
mixed: (totalBuckets) => ({
190223
description: "variety of queue profiles in one env, with occasional env saturation",
@@ -219,15 +252,61 @@ const WAIT_SIGMA = 0.6;
219252
const NACK_RATE = 0.02;
220253
const DLQ_RATE = 0.004;
221254

255+
type CounterOp = "enqueue" | "started" | "ack" | "nack" | "dlq";
256+
// Per-(queue, op) odometers, mirroring the production emitter: cumulative readings with a
257+
// cum=0 baseline on the first one, so deltaSumTimestamp captures the 0->1 delta.
258+
type CounterState = Record<CounterOp, number>[];
259+
260+
function counterRows(
261+
counters: CounterState,
262+
q: number,
263+
ids: Ids,
264+
queueName: string,
265+
eventTime: string,
266+
orderKey: () => number,
267+
op: CounterOp,
268+
wait_ms?: number
269+
): QueueMetricsRawV1Input[] {
270+
const rows: QueueMetricsRawV1Input[] = [];
271+
if (counters[q][op] === 0) {
272+
rows.push({
273+
...ids,
274+
queue_name: queueName,
275+
event_time: eventTime,
276+
op,
277+
cumulative: 0,
278+
order_key: orderKey(),
279+
});
280+
}
281+
counters[q][op] += 1;
282+
rows.push({
283+
...ids,
284+
queue_name: queueName,
285+
event_time: eventTime,
286+
op,
287+
cumulative: counters[q][op],
288+
order_key: orderKey(),
289+
...(wait_ms !== undefined ? { wait_ms } : {}),
290+
});
291+
return rows;
292+
}
293+
294+
function newCounterState(n: number): CounterState {
295+
return Array.from({ length: n }, () => ({ enqueue: 0, started: 0, ack: 0, nack: 0, dlq: 0 }));
296+
}
297+
222298
// Advance one bucket of the simulation for every queue, returning the raw rows to insert.
223-
// `backlog` is mutated in place so state carries across buckets (and into live mode).
299+
// `backlog` and `counters` are mutated in place so state carries across buckets (and into
300+
// live mode).
224301
function simulateBucket(
225302
scenario: Scenario,
226303
bucket: number,
227304
bucketSec: number,
228305
eventTime: string,
306+
bucketEpochSec: number,
229307
ids: Ids,
230308
backlog: number[],
309+
counters: CounterState,
231310
rng: Rng
232311
): QueueMetricsRawV1Input[] {
233312
const envLimit = scenario.envLimit(bucket);
@@ -259,6 +338,11 @@ function simulateBucket(
259338
envQueued += queued[q];
260339
}
261340

341+
// Order keys are time-based (like the production stream ids) so appended runs and live
342+
// mode stay monotonic; the per-bucket sequence keeps them unique within a bucket.
343+
let bucketSeq = 0;
344+
const orderKey = () => bucketEpochSec * 1_000_000 + bucketSeq++;
345+
262346
const rows: QueueMetricsRawV1Input[] = [];
263347
for (let q = 0; q < n; q++) {
264348
const profile = scenario.queues[q];
@@ -287,21 +371,26 @@ function simulateBucket(
287371
rows.push(gauge);
288372

289373
for (let a = 0; a < arrivals; a++) {
290-
rows.push({ ...ids, queue_name: profile.name, event_time: eventTime, op: "enqueue" });
374+
rows.push(...counterRows(counters, q, ids, profile.name, eventTime, orderKey, "enqueue"));
291375
}
292376

293377
const medianWait = profile.waitBaseMs + (prior / Math.max(limit[q], 1)) * bucketSec * 1000;
294378
for (let s = 0; s < started; s++) {
295-
rows.push({
296-
...ids,
297-
queue_name: profile.name,
298-
event_time: eventTime,
299-
op: "started",
300-
wait_ms: Math.round(lognormal(medianWait, WAIT_SIGMA, rng)),
301-
});
379+
rows.push(
380+
...counterRows(
381+
counters,
382+
q,
383+
ids,
384+
profile.name,
385+
eventTime,
386+
orderKey,
387+
"started",
388+
Math.round(lognormal(medianWait, WAIT_SIGMA, rng))
389+
)
390+
);
302391
const roll = rng();
303-
const op = roll < DLQ_RATE ? "dlq" : roll < DLQ_RATE + NACK_RATE ? "nack" : "ack";
304-
rows.push({ ...ids, queue_name: profile.name, event_time: eventTime, op });
392+
const op: CounterOp = roll < DLQ_RATE ? "dlq" : roll < DLQ_RATE + NACK_RATE ? "nack" : "ack";
393+
rows.push(...counterRows(counters, q, ids, profile.name, eventTime, orderKey, op));
305394
}
306395
}
307396
return rows;
@@ -412,11 +501,55 @@ async function ensureTaskQueues(
412501
update: { concurrencyLimit },
413502
});
414503
}
415-
console.log(`Ensured ${scenario.queues.length} task queues in Postgres.`);
504+
505+
// Drop queues left over from a previously seeded scenario so switching scenarios
506+
// does not leave metric-less rows in the list.
507+
const { count: pruned } = await prisma.taskQueue.deleteMany({
508+
where: {
509+
runtimeEnvironmentId,
510+
name: { notIn: scenario.queues.map((q) => q.name) },
511+
},
512+
});
513+
console.log(
514+
`Ensured ${scenario.queues.length} task queues in Postgres${pruned > 0 ? `, pruned ${pruned} stale` : ""}.`
515+
);
516+
}
517+
518+
function printHelp() {
519+
const lines = Object.entries(scenarios).map(
520+
([name, build]) => ` ${name.padEnd(28)}${build(720, 10).description}`
521+
);
522+
console.log(`Queue metrics simulator: seeds a synthetic tenant with realistic queue metrics.
523+
524+
Usage: pnpm --filter webapp run db:seed:queue-metrics -- [flags]
525+
526+
Flags:
527+
--scenario <name> which scenario to seed (default: mixed)
528+
--project <name> project to seed into (default: ${PROJECT_NAME}); use one
529+
project per scenario to browse them side by side
530+
--window <dur> how much history to backfill, e.g. 30m, 6h, 1d (default: 2h)
531+
--bucket <sec> seconds per simulated bucket (default: 10)
532+
--seed <n> RNG seed for reproducible data (default: 1)
533+
--live after backfilling, keep appending one bucket per interval
534+
--reset clear this environment's metrics before seeding
535+
--reset-only clear and exit without seeding
536+
--help this text
537+
538+
Scenarios:
539+
${lines.join("\n")}
540+
541+
Example designer setup (one project per scenario):
542+
pnpm --filter webapp run db:seed:queue-metrics -- --scenario mixed --reset
543+
pnpm --filter webapp run db:seed:queue-metrics -- --scenario many-queues --project qm-many-queues --reset
544+
pnpm --filter webapp run db:seed:queue-metrics -- --scenario throttled-backlog --project qm-throttled --reset`);
416545
}
417546

418547
async function main() {
419548
const flags = parseArgs(process.argv.slice(2));
549+
if (flags.help === "true") {
550+
printHelp();
551+
process.exit(0);
552+
}
420553
const scenarioName = flags.scenario ?? "mixed";
421554
const build = scenarios[scenarioName];
422555
if (!build) {
@@ -453,13 +586,14 @@ async function main() {
453586
if (!org)
454587
org = await createOrganization({ title: ORG_TITLE, userId: user.id, companySize: "1-10" });
455588

589+
const projectName = flags.project ?? PROJECT_NAME;
456590
let project = await prisma.project.findFirst({
457-
where: { name: PROJECT_NAME, organizationId: org.id },
591+
where: { name: projectName, organizationId: org.id },
458592
});
459593
if (!project) {
460594
project = await createProject({
461595
organizationSlug: org.slug,
462-
name: PROJECT_NAME,
596+
name: projectName,
463597
userId: user.id,
464598
version: "v3",
465599
});
@@ -502,10 +636,24 @@ async function main() {
502636
// Backfill: buckets from (now - window) up to now, aligned to the bucket grid.
503637
const nowBucket = Math.floor(Date.now() / 1000 / bucketSec) * bucketSec;
504638
const startBucket = nowBucket - totalBuckets * bucketSec;
639+
const counters = newCounterState(scenario.queues.length);
505640
const rows: QueueMetricsRawV1Input[] = [];
506641
for (let b = 0; b < totalBuckets; b++) {
507-
const eventTime = formatChDateTime(new Date((startBucket + b * bucketSec) * 1000));
508-
rows.push(...simulateBucket(scenario, b, bucketSec, eventTime, ids, backlog, rng));
642+
const bucketEpochSec = startBucket + b * bucketSec;
643+
const eventTime = formatChDateTime(new Date(bucketEpochSec * 1000));
644+
rows.push(
645+
...simulateBucket(
646+
scenario,
647+
b,
648+
bucketSec,
649+
eventTime,
650+
bucketEpochSec,
651+
ids,
652+
backlog,
653+
counters,
654+
rng
655+
)
656+
);
509657
}
510658
await insertBatched(ch, rows, nonce);
511659
console.log(`Inserted ${rows.length} raw rows.`);
@@ -530,10 +678,19 @@ async function main() {
530678
// eslint-disable-next-line no-constant-condition
531679
while (true) {
532680
await new Promise((r) => setTimeout(r, bucketSec * 1000));
533-
const eventTime = formatChDateTime(
534-
new Date(Math.floor(Date.now() / 1000 / bucketSec) * bucketSec * 1000)
681+
const bucketEpochSec = Math.floor(Date.now() / 1000 / bucketSec) * bucketSec;
682+
const eventTime = formatChDateTime(new Date(bucketEpochSec * 1000));
683+
const liveRows = simulateBucket(
684+
scenario,
685+
b,
686+
bucketSec,
687+
eventTime,
688+
bucketEpochSec,
689+
ids,
690+
backlog,
691+
counters,
692+
rng
535693
);
536-
const liveRows = simulateBucket(scenario, b, bucketSec, eventTime, ids, backlog, rng);
537694
await insertBatched(ch, liveRows, `${nonce}:live:${b}`);
538695
console.log(`bucket ${b}: ${liveRows.length} rows @ ${eventTime}`);
539696
b++;

0 commit comments

Comments
 (0)