Skip to content

Commit 3ba5674

Browse files
committed
feat: verify warm-start delivery and cold-start silently lost dispatches
Firestarter's didWarmStart: true means the response was written to a socket, not that the runner received it. A silently dead poller (no FIN, e.g. a VM torn down mid-poll) leaves the dispatched run stuck in PENDING_EXECUTING until the run engine's heartbeat redrive minutes later, burning a queue redelivery toward TASK_RUN_DEQUEUED_MAX_RETRIES each time. After a warm-start hit the supervisor now retains the DequeuedMessage, waits TRIGGER_WARM_START_VERIFY_DELAY_MS (default 10s), then asks the platform for the run's latest snapshot. If it is still the exact snapshot that was dequeued, no runner ever started the attempt - the run falls through to the regular cold-create path. Double-starts are prevented by the engine: startRunAttempt runs under a per-run lock and rejects stale snapshot ids, so a reviving runner and the fallback workload can't both execute. On probe errors nothing happens - during platform brownouts healthy runners legitimately act late, and falling back on uncertainty would stampede duplicates; the heartbeat redrive stays as the backstop. Off by default; enable with TRIGGER_WARM_START_VERIFY_ENABLED. When disabled the code path is a no-op. Emits warmstart.verify wide events (outcome: delivered / fallback / probe_error). Resolves TRI-10659.
1 parent 4783419 commit 3ba5674

4 files changed

Lines changed: 404 additions & 47 deletions

File tree

apps/supervisor/src/env.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,16 @@ const Env = z
7979
TRIGGER_CHECKPOINT_URL: z.string().optional(),
8080
TRIGGER_METADATA_URL: z.string().optional(),
8181

82+
// Warm-start delivery verification: after a warm-start hit, probe the
83+
// platform and cold-start the run if no runner acted on the dispatch
84+
TRIGGER_WARM_START_VERIFY_ENABLED: BoolEnv.default(false),
85+
TRIGGER_WARM_START_VERIFY_DELAY_MS: z.coerce
86+
.number()
87+
.int()
88+
.min(1_000)
89+
.max(60_000)
90+
.default(10_000),
91+
8292
// Used by the resource monitor
8393
RESOURCE_MONITOR_ENABLED: BoolEnv.default(false),
8494
RESOURCE_MONITOR_OVERRIDE_CPU_TOTAL: z.coerce.number().optional(),

apps/supervisor/src/index.ts

Lines changed: 89 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,10 @@ import { PodCleaner } from "./services/podCleaner.js";
2727
import { FailedPodHandler } from "./services/failedPodHandler.js";
2828
import { getWorkerToken } from "./workerToken.js";
2929
import { OtlpTraceService } from "./services/otlpTraceService.js";
30+
import {
31+
WarmStartVerificationService,
32+
type WarmStartTimings,
33+
} from "./services/warmStartVerificationService.js";
3034
import { extractTraceparent, getRestoreRunnerId } from "./util.js";
3135
import { Redis } from "ioredis";
3236
import { BackpressureMonitor } from "./backpressure/backpressureMonitor.js";
@@ -54,6 +58,7 @@ class ManagedSupervisor {
5458
private readonly logger = new SimpleStructuredLogger("managed-supervisor");
5559
private readonly resourceMonitor: ResourceMonitor;
5660
private readonly checkpointClient?: CheckpointClient;
61+
private readonly warmStartVerifier?: WarmStartVerificationService;
5762

5863
private readonly podCleaner?: PodCleaner;
5964
private readonly failedPodHandler?: FailedPodHandler;
@@ -299,6 +304,19 @@ class ManagedSupervisor {
299304
});
300305
}
301306

307+
if (env.TRIGGER_WARM_START_VERIFY_ENABLED && this.warmStartUrl) {
308+
this.logger.log("Warm-start delivery verification enabled", {
309+
delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS,
310+
});
311+
312+
this.warmStartVerifier = new WarmStartVerificationService({
313+
workerClient: this.workerSession.httpClient,
314+
delayMs: env.TRIGGER_WARM_START_VERIFY_DELAY_MS,
315+
createWorkload: (message, timings) => this.createWorkload(message, timings),
316+
wideEventOpts: this.wideEventOpts,
317+
});
318+
}
319+
302320
this.workerSession.on("runNotification", async ({ time, run }) => {
303321
this.logger.verbose("runNotification", { time, run });
304322

@@ -455,58 +473,24 @@ class ManagedSupervisor {
455473
if (didWarmStart) {
456474
setExtra(fromContext(), "path_taken", "warm_start");
457475
this.logger.debug("Warm start successful", { runId: message.run.id });
458-
return;
459-
}
460-
461-
setExtra(fromContext(), "path_taken", "cold_create");
462-
463-
const createStart = performance.now();
464-
try {
465-
if (!message.deployment.friendlyId) {
466-
// mostly a type guard, deployments always exists for deployed environments
467-
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
468-
throw new Error("Deployment is missing");
469-
}
470-
471-
await this.workloadManager.create({
472-
dequeuedAt: message.dequeuedAt,
476+
// A hit only means the response was written to the long-poll
477+
// socket, not that the runner received it. Schedule a delivery
478+
// verification that cold-starts the run if nobody acts on it.
479+
this.warmStartVerifier?.schedule(message, {
473480
dequeueResponseMs,
474481
pollingIntervalMs,
475482
warmStartCheckMs,
476-
envId: message.environment.id,
477-
envType: message.environment.type,
478-
image: message.image,
479-
machine: message.run.machine,
480-
orgId: message.organization.id,
481-
projectId: message.project.id,
482-
deploymentFriendlyId: message.deployment.friendlyId,
483-
deploymentVersion: message.backgroundWorker.version,
484-
runId: message.run.id,
485-
runFriendlyId: message.run.friendlyId,
486-
version: message.version,
487-
nextAttemptNumber: message.run.attemptNumber,
488-
snapshotId: message.snapshot.id,
489-
snapshotFriendlyId: message.snapshot.friendlyId,
490-
placementTags: message.placementTags,
491-
traceContext: message.run.traceContext,
492-
annotations: message.run.annotations,
493-
hasPrivateLink: message.organization.hasPrivateLink,
494483
});
495-
recordPhaseSince("workload_create", createStart, undefined);
496-
497-
// Disabled for now
498-
// this.resourceMonitor.blockResources({
499-
// cpu: message.run.machine.cpu,
500-
// memory: message.run.machine.memory,
501-
// });
502-
} catch (error) {
503-
recordPhaseSince(
504-
"workload_create",
505-
createStart,
506-
error instanceof Error ? error : new Error(String(error))
507-
);
508-
this.logger.error("Failed to create workload", { error });
484+
return;
509485
}
486+
487+
setExtra(fromContext(), "path_taken", "cold_create");
488+
489+
await this.createWorkload(message, {
490+
dequeueResponseMs,
491+
pollingIntervalMs,
492+
warmStartCheckMs,
493+
});
510494
}
511495
);
512496
}
@@ -541,6 +525,8 @@ class ManagedSupervisor {
541525

542526
async onRunConnected({ run }: { run: { friendlyId: string } }) {
543527
this.logger.debug("Run connected", { run });
528+
// The dispatched run reached a runner on this node - no fallback needed.
529+
this.warmStartVerifier?.cancel(run.friendlyId);
544530
this.workerSession.subscribeToRunNotifications([run.friendlyId]);
545531
}
546532

@@ -549,6 +535,61 @@ class ManagedSupervisor {
549535
this.workerSession.unsubscribeFromRunNotifications([run.friendlyId]);
550536
}
551537

538+
private async createWorkload(message: DequeuedMessage, timings: WarmStartTimings) {
539+
const createStart = performance.now();
540+
try {
541+
if (!message.deployment.friendlyId) {
542+
// mostly a type guard, deployments always exists for deployed environments
543+
// a proper fix would be to use a discriminated union schema to differentiate between dequeued runs in dev and in deployed environments.
544+
throw new Error("Deployment is missing");
545+
}
546+
547+
if (!message.image) {
548+
// same type-guard situation as deployment above
549+
throw new Error("Image is missing");
550+
}
551+
552+
await this.workloadManager.create({
553+
dequeuedAt: message.dequeuedAt,
554+
dequeueResponseMs: timings.dequeueResponseMs,
555+
pollingIntervalMs: timings.pollingIntervalMs,
556+
warmStartCheckMs: timings.warmStartCheckMs,
557+
envId: message.environment.id,
558+
envType: message.environment.type,
559+
image: message.image,
560+
machine: message.run.machine,
561+
orgId: message.organization.id,
562+
projectId: message.project.id,
563+
deploymentFriendlyId: message.deployment.friendlyId,
564+
deploymentVersion: message.backgroundWorker.version,
565+
runId: message.run.id,
566+
runFriendlyId: message.run.friendlyId,
567+
version: message.version,
568+
nextAttemptNumber: message.run.attemptNumber,
569+
snapshotId: message.snapshot.id,
570+
snapshotFriendlyId: message.snapshot.friendlyId,
571+
placementTags: message.placementTags,
572+
traceContext: message.run.traceContext,
573+
annotations: message.run.annotations,
574+
hasPrivateLink: message.organization.hasPrivateLink,
575+
});
576+
recordPhaseSince("workload_create", createStart, undefined);
577+
578+
// Disabled for now
579+
// this.resourceMonitor.blockResources({
580+
// cpu: message.run.machine.cpu,
581+
// memory: message.run.machine.memory,
582+
// });
583+
} catch (error) {
584+
recordPhaseSince(
585+
"workload_create",
586+
createStart,
587+
error instanceof Error ? error : new Error(String(error))
588+
);
589+
this.logger.error("Failed to create workload", { error });
590+
}
591+
}
592+
552593
private async tryWarmStart(
553594
dequeuedMessage: DequeuedMessage,
554595
traceparent: string | undefined
@@ -632,6 +673,7 @@ class ManagedSupervisor {
632673
this.logger.log("Shutting down");
633674
await this.workloadServer.stop();
634675
await this.workerSession.stop();
676+
this.warmStartVerifier?.stop();
635677

636678
// Optional services
637679
this.backpressureMonitor?.stop();
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
import { setTimeout as sleep } from "node:timers/promises";
3+
import { WarmStartVerificationService } from "./warmStartVerificationService.js";
4+
import type { DequeuedMessage } from "@trigger.dev/core/v3";
5+
import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers";
6+
7+
// The TimerWheel ticks every 100ms, so a 1000ms delay (the env minimum)
8+
// fires within ~1.1s.
9+
const DELAY_MS = 1_000;
10+
// Long enough that a pending verification would certainly have fired.
11+
const SETTLE_MS = 1_600;
12+
13+
const DEQUEUED_SNAPSHOT_ID = "snapshot_dequeued";
14+
15+
function makeMessage(runFriendlyId = "run_1"): DequeuedMessage {
16+
return {
17+
run: { friendlyId: runFriendlyId },
18+
snapshot: { friendlyId: DEQUEUED_SNAPSHOT_ID },
19+
} as unknown as DequeuedMessage;
20+
}
21+
22+
function createService(opts: {
23+
latestSnapshotId?: string;
24+
probeError?: boolean;
25+
}) {
26+
const getLatestSnapshot = vi.fn(async (_runId: string) =>
27+
opts.probeError
28+
? { success: false as const, error: "connection refused" }
29+
: {
30+
success: true as const,
31+
data: { execution: { snapshot: { friendlyId: opts.latestSnapshotId } } },
32+
}
33+
);
34+
35+
const createWorkload = vi.fn(async () => {});
36+
37+
const service = new WarmStartVerificationService({
38+
workerClient: { getLatestSnapshot } as unknown as SupervisorHttpClient,
39+
delayMs: DELAY_MS,
40+
createWorkload,
41+
wideEventOpts: { service: "supervisor-test", env: {}, enabled: false },
42+
});
43+
44+
return { service, getLatestSnapshot, createWorkload };
45+
}
46+
47+
describe("WarmStartVerificationService", () => {
48+
it("falls back to a cold create when the snapshot is unchanged", async () => {
49+
const { service, createWorkload } = createService({
50+
latestSnapshotId: DEQUEUED_SNAPSHOT_ID,
51+
});
52+
try {
53+
const message = makeMessage();
54+
const timings = { warmStartCheckMs: 12 };
55+
service.schedule(message, timings);
56+
57+
await vi.waitFor(() => expect(createWorkload).toHaveBeenCalledTimes(1), {
58+
timeout: 3_000,
59+
});
60+
expect(createWorkload).toHaveBeenCalledWith(message, timings);
61+
} finally {
62+
service.stop();
63+
}
64+
});
65+
66+
it("does nothing when the snapshot has moved on (delivered)", async () => {
67+
const { service, getLatestSnapshot, createWorkload } = createService({
68+
latestSnapshotId: "snapshot_executing",
69+
});
70+
try {
71+
service.schedule(makeMessage(), { warmStartCheckMs: 12 });
72+
73+
await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), {
74+
timeout: 3_000,
75+
});
76+
await sleep(100);
77+
expect(createWorkload).not.toHaveBeenCalled();
78+
} finally {
79+
service.stop();
80+
}
81+
});
82+
83+
it("never falls back when the probe errors", async () => {
84+
const { service, getLatestSnapshot, createWorkload } = createService({ probeError: true });
85+
try {
86+
service.schedule(makeMessage(), { warmStartCheckMs: 12 });
87+
88+
await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), {
89+
timeout: 3_000,
90+
});
91+
await sleep(100);
92+
expect(createWorkload).not.toHaveBeenCalled();
93+
} finally {
94+
service.stop();
95+
}
96+
});
97+
98+
it("cancel before the delay prevents the probe entirely", async () => {
99+
const { service, getLatestSnapshot, createWorkload } = createService({
100+
latestSnapshotId: DEQUEUED_SNAPSHOT_ID,
101+
});
102+
try {
103+
service.schedule(makeMessage(), { warmStartCheckMs: 12 });
104+
105+
expect(service.cancel("run_1")).toBe(true);
106+
107+
await sleep(SETTLE_MS);
108+
expect(getLatestSnapshot).not.toHaveBeenCalled();
109+
expect(createWorkload).not.toHaveBeenCalled();
110+
} finally {
111+
service.stop();
112+
}
113+
});
114+
115+
it("re-scheduling the same run replaces the pending verification", async () => {
116+
const { service, getLatestSnapshot } = createService({
117+
latestSnapshotId: "snapshot_executing",
118+
});
119+
try {
120+
service.schedule(makeMessage(), { warmStartCheckMs: 1 });
121+
service.schedule(makeMessage(), { warmStartCheckMs: 2 });
122+
123+
await vi.waitFor(() => expect(getLatestSnapshot).toHaveBeenCalledTimes(1), {
124+
timeout: 3_000,
125+
});
126+
await sleep(SETTLE_MS);
127+
expect(getLatestSnapshot).toHaveBeenCalledTimes(1);
128+
} finally {
129+
service.stop();
130+
}
131+
});
132+
});

0 commit comments

Comments
 (0)