Skip to content

Commit 5a7b8ce

Browse files
committed
feat(supervisor): add dequeue and warm start timing to wide event
Thread timing context from queue consumer through to the compute workload manager's wide event: - dequeueResponseMs: platform dequeue HTTP round-trip - pollingIntervalMs: which polling interval was active (idle vs active) - warmStartCheckMs: warm start check duration All fields are optional to avoid breaking existing consumers.
1 parent ac3dadf commit 5a7b8ce

File tree

7 files changed

+29
-7
lines changed

7 files changed

+29
-7
lines changed

apps/supervisor/src/index.ts

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -195,7 +195,7 @@ class ManagedSupervisor {
195195
this.workloadServer.notifyRun({ run });
196196
});
197197

198-
this.workerSession.on("runQueueMessage", async ({ time, message }) => {
198+
this.workerSession.on("runQueueMessage", async ({ time, message, dequeueResponseMs, pollingIntervalMs }) => {
199199
this.logger.log(`Received message with timestamp ${time.toLocaleString()}`, message);
200200

201201
if (message.completedWaitpoints.length > 0) {
@@ -244,7 +244,9 @@ class ManagedSupervisor {
244244

245245
this.logger.log("Scheduling run", { runId: message.run.id });
246246

247+
const warmStartStart = performance.now();
247248
const didWarmStart = await this.tryWarmStart(message);
249+
const warmStartCheckMs = Math.round(performance.now() - warmStartStart);
248250

249251
if (didWarmStart) {
250252
this.logger.log("Warm start successful", { runId: message.run.id });
@@ -260,6 +262,9 @@ class ManagedSupervisor {
260262

261263
await this.workloadManager.create({
262264
dequeuedAt: message.dequeuedAt,
265+
dequeueResponseMs,
266+
pollingIntervalMs,
267+
warmStartCheckMs,
263268
envId: message.environment.id,
264269
envType: message.environment.type,
265270
image: message.image,

apps/supervisor/src/workloadManager/compute.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ export class ComputeWorkloadManager implements WorkloadManager {
9393
machine: opts.machine.name,
9494
// Environment
9595
instanceName: env.TRIGGER_WORKER_INSTANCE_NAME,
96+
// Supervisor timing
97+
dequeueResponseMs: opts.dequeueResponseMs,
98+
pollingIntervalMs: opts.pollingIntervalMs,
99+
warmStartCheckMs: opts.warmStartCheckMs,
96100
// Request
97101
image: imageRef,
98102
url,

apps/supervisor/src/workloadManager/types.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ export interface WorkloadManagerCreateOptions {
2424
nextAttemptNumber?: number;
2525
dequeuedAt: Date;
2626
placementTags?: PlacementTag[];
27+
// Timing context (populated by supervisor handler, included in wide event)
28+
dequeueResponseMs?: number;
29+
pollingIntervalMs?: number;
30+
warmStartCheckMs?: number;
2731
// identifiers
2832
envId: string;
2933
envType: EnvironmentType;

packages/core/src/v3/runEngineWorker/supervisor/consumerPool.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -351,12 +351,12 @@ export class RunQueueConsumerPool {
351351

352352
const consumer = this.consumerFactory({
353353
...this.consumerOptions,
354-
onDequeue: async (messages) => {
354+
onDequeue: async (messages, timing) => {
355355
// Always update queue length, default to 0 for empty dequeues or missing value
356356
this.updateQueueLength(messages[0]?.workerQueueLength ?? 0);
357357

358358
// Forward to the original handler
359-
await this.consumerOptions.onDequeue(messages);
359+
await this.consumerOptions.onDequeue(messages, timing);
360360
},
361361
});
362362

packages/core/src/v3/runEngineWorker/supervisor/events.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ export type WorkerEvents = {
66
{
77
time: Date;
88
message: DequeuedMessage;
9+
dequeueResponseMs?: number;
10+
pollingIntervalMs?: number;
911
},
1012
];
1113
requestRunAttemptStart: [

packages/core/src/v3/runEngineWorker/supervisor/queueConsumer.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,22 @@ export type RunQueueConsumerOptions = {
1515
preDequeue?: PreDequeueFn;
1616
preSkip?: PreSkipFn;
1717
maxRunCount?: number;
18-
onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
18+
onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
1919
};
2020

2121
export class RunQueueConsumer implements QueueConsumer {
2222
private readonly client: SupervisorHttpClient;
2323
private readonly preDequeue?: PreDequeueFn;
2424
private readonly preSkip?: PreSkipFn;
2525
private readonly maxRunCount?: number;
26-
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody) => Promise<void>;
26+
private readonly onDequeue: (messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }) => Promise<void>;
2727

2828
private readonly logger = new SimpleStructuredLogger("queue-consumer");
2929

3030
private intervalMs: number;
3131
private idleIntervalMs: number;
3232
private isEnabled: boolean;
33+
private lastScheduledIntervalMs: number;
3334

3435
constructor(opts: RunQueueConsumerOptions) {
3536
this.isEnabled = false;
@@ -38,6 +39,7 @@ export class RunQueueConsumer implements QueueConsumer {
3839
this.preDequeue = opts.preDequeue;
3940
this.preSkip = opts.preSkip;
4041
this.maxRunCount = opts.maxRunCount;
42+
this.lastScheduledIntervalMs = opts.idleIntervalMs;
4143
this.onDequeue = opts.onDequeue;
4244
this.client = opts.client;
4345
}
@@ -111,16 +113,18 @@ export class RunQueueConsumer implements QueueConsumer {
111113
let nextIntervalMs = this.idleIntervalMs;
112114

113115
try {
116+
const dequeueStart = performance.now();
114117
const response = await this.client.dequeue({
115118
maxResources: preDequeueResult?.maxResources,
116119
maxRunCount: this.maxRunCount,
117120
});
121+
const dequeueResponseMs = Math.round(performance.now() - dequeueStart);
118122

119123
if (!response.success) {
120124
this.logger.error("Failed to dequeue", { error: response.error });
121125
} else {
122126
try {
123-
await this.onDequeue(response.data);
127+
await this.onDequeue(response.data, { dequeueResponseMs, pollingIntervalMs: this.lastScheduledIntervalMs });
124128

125129
if (response.data.length > 0) {
126130
nextIntervalMs = this.intervalMs;
@@ -141,6 +145,7 @@ export class RunQueueConsumer implements QueueConsumer {
141145
this.logger.verbose("scheduled dequeue with idle interval", { delayMs });
142146
}
143147

148+
this.lastScheduledIntervalMs = delayMs;
144149
setTimeout(this.dequeue.bind(this), delayMs);
145150
}
146151
}

packages/core/src/v3/runEngineWorker/supervisor/session.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,13 +80,15 @@ export class SupervisorSession extends EventEmitter<WorkerEvents> {
8080
});
8181
}
8282

83-
private async onDequeue(messages: WorkerApiDequeueResponseBody): Promise<void> {
83+
private async onDequeue(messages: WorkerApiDequeueResponseBody, timing?: { dequeueResponseMs: number; pollingIntervalMs: number }): Promise<void> {
8484
this.logger.verbose("Dequeued messages with contents", { count: messages.length, messages });
8585

8686
for (const message of messages) {
8787
this.emit("runQueueMessage", {
8888
time: new Date(),
8989
message,
90+
dequeueResponseMs: timing?.dequeueResponseMs,
91+
pollingIntervalMs: timing?.pollingIntervalMs,
9092
});
9193
}
9294
}

0 commit comments

Comments
 (0)