Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 130 additions & 0 deletions apps/supervisor/src/services/computeSnapshotService.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
import { describe, expect, it, vi } from "vitest";
import { setTimeout as sleep } from "node:timers/promises";
import { ComputeSnapshotService } from "./computeSnapshotService.js";
import type { ComputeWorkloadManager } from "../workloadManager/compute.js";
import type { SupervisorHttpClient } from "@trigger.dev/core/v3/workers";

// The TimerWheel ticks every 100ms, so a 200ms delay dispatches within ~300ms.
const DELAY_MS = 200;
// Long enough that a pending snapshot would certainly have dispatched.
const SETTLE_MS = 600;

function createService() {
const snapshot = vi.fn(async (_opts: { runnerId: string; metadata: Record<string, string> }) => true);

const computeManager = {
snapshotDelayMs: DELAY_MS,
snapshotDispatchLimit: 1,
snapshot,
} as unknown as ComputeWorkloadManager;

const service = new ComputeSnapshotService({
computeManager,
workerClient: {} as SupervisorHttpClient,
wideEventOpts: { service: "supervisor-test", env: {}, enabled: false },
});

return { service, snapshot };
}

function delayedSnapshot(runnerId = "runner-1") {
return {
runnerId,
runFriendlyId: "run_1",
snapshotFriendlyId: "snapshot_1",
};
}

describe("ComputeSnapshotService", () => {
it("dispatches a scheduled snapshot after the delay", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
expect(snapshot).toHaveBeenCalledWith({
runnerId: "runner-1",
metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_1" },
});
} finally {
service.stop();
}
});

it("cancel before the delay expires prevents the dispatch", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());

expect(service.cancel("run_1")).toBe(true);

await sleep(SETTLE_MS);
expect(snapshot).not.toHaveBeenCalled();
} finally {
service.stop();
}
});

it("cancel returns false when nothing is pending", () => {
const { service } = createService();
try {
expect(service.cancel("run_1")).toBe(false);
} finally {
service.stop();
}
});

it("cancel with a matching runnerId cancels the pending snapshot", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot("runner-a"));

expect(service.cancel("run_1", "runner-a")).toBe(true);

await sleep(SETTLE_MS);
expect(snapshot).not.toHaveBeenCalled();
} finally {
service.stop();
}
});

it("cancel with a different runnerId leaves the pending snapshot alone", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot("runner-a"));

// A stale runner for a reassigned run must not cancel the new runner's snapshot.
expect(service.cancel("run_1", "runner-b")).toBe(false);

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
expect(snapshot).toHaveBeenCalledWith(
expect.objectContaining({ runnerId: "runner-a" })
);
} finally {
service.stop();
}
});

it("re-scheduling the same run replaces the pending snapshot", async () => {
const { service, snapshot } = createService();
try {
service.schedule("run_1", delayedSnapshot());
service.schedule("run_1", {
runnerId: "runner-1",
runFriendlyId: "run_1",
snapshotFriendlyId: "snapshot_2",
});

await vi.waitFor(() => expect(snapshot).toHaveBeenCalledTimes(1), { timeout: 2_000 });
await sleep(SETTLE_MS);

expect(snapshot).toHaveBeenCalledTimes(1);
expect(snapshot).toHaveBeenCalledWith({
runnerId: "runner-1",
metadata: { runId: "run_1", snapshotFriendlyId: "snapshot_2" },
});
} finally {
service.stop();
}
});
});
15 changes: 13 additions & 2 deletions apps/supervisor/src/services/computeSnapshotService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,19 @@ export class ComputeSnapshotService {
});
}

/** Cancel a pending delayed snapshot. Returns true if one was cancelled. */
cancel(runFriendlyId: string): boolean {
/**
* Cancel a pending delayed snapshot. Returns true if one was cancelled.
* When `runnerId` is given, only a snapshot scheduled for that same runner
* is cancelled - a stale runner for a run that has since been reassigned
* must not cancel the new runner's pending snapshot.
*/
cancel(runFriendlyId: string, runnerId?: string): boolean {
if (runnerId) {
const pending = this.timerWheel.peek(runFriendlyId);
if (pending && pending.data.runnerId !== runnerId) {
return false;
}
}
const cancelled = this.timerWheel.cancel(runFriendlyId);
if (cancelled) {
emitOneShot({
Expand Down
17 changes: 17 additions & 0 deletions apps/supervisor/src/services/timerWheel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,23 @@ describe("TimerWheel", () => {
wheel.stop();
});

it("peek returns the pending item without removing it", () => {
const wheel = new TimerWheel<string>({ delayMs: 3000, onExpire: () => {} });

wheel.start();
wheel.submit("run-1", "data");

expect(wheel.peek("run-1")).toEqual({ key: "run-1", data: "data" });
expect(wheel.size).toBe(1);
expect(wheel.peek("run-2")).toBeUndefined();

// Dispatched items are no longer peekable
vi.advanceTimersByTime(3100);
expect(wheel.peek("run-1")).toBeUndefined();

wheel.stop();
});

it("cancel returns false for unknown key", () => {
const wheel = new TimerWheel<string>({
delayMs: 3000,
Expand Down
6 changes: 6 additions & 0 deletions apps/supervisor/src/services/timerWheel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,12 @@ export class TimerWheel<T> {
return true;
}

/** Look up a pending item without removing it. */
peek(key: string): TimerWheelItem<T> | undefined {
const entry = this.entries.get(key);
return entry ? { key, data: entry.data } : undefined;
}

/** Number of pending items in the wheel. */
get size(): number {
return this.entries.size;
Expand Down
27 changes: 26 additions & 1 deletion apps/supervisor/src/workloadServer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -287,13 +287,25 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
"POST",
async () => {
const { req, reply, params, body } = ctx;
const runnerId = this.runnerIdFromRequest(req);
const completeResponse = await this.workerClient.completeRunAttempt(
params.runFriendlyId,
params.snapshotFriendlyId,
body,
this.runnerIdFromRequest(req)
runnerId
);

// A completion attempt invalidates any pending delayed snapshot
// regardless of outcome: the runner has finished executing, so the
// suspended state the snapshot was scheduled to capture no longer
// exists. Without this, the snapshot fires up to snapshotDelayMs
// later and pauses a VM that has long moved on - and on a transient
// completion failure the runner retries, so waiting for success
// would leave the stale snapshot armed in the meantime. The
// runnerId guard keeps a stale duplicate runner's failed completion
// from cancelling a fresh runner's snapshot.
this.snapshotService?.cancel(params.runFriendlyId, runnerId);
Comment thread
devin-ai-integration[bot] marked this conversation as resolved.
Outdated

if (!completeResponse.success) {
this.logger.error("Failed to complete run", {
params,
Expand Down Expand Up @@ -728,6 +740,19 @@ export class WorkloadServer extends EventEmitter<WorkloadServerEvents> {
const runDisconnected = (friendlyId: string, reason: string) => {
socketLogger.debug("runDisconnected", { ...getSocketMetadata() });

// The run is gone from this runner (crash, exit, or replaced by a new
// run), so a pending delayed snapshot for it is stale. Genuine
// waitpoint suspensions keep the socket connected, so this doesn't
// cancel a snapshot that's still wanted; the runnerId match guards
// against a stale duplicate runner cancelling a fresh runner's
// snapshot after the run was reassigned. Caveat: socket.data.runnerId
// is frozen at the websocket handshake, so after a same-supervisor
// restore (new runner id, socket not recreated) this guard refuses
// the cancel - a missed cancel, never a wrong one. The
// attempt.complete cancel uses the runner's current HTTP header id
// and is unaffected.
this.snapshotService?.cancel(friendlyId, socket.data.runnerId);

this.runSockets.delete(friendlyId);
this.emit("runDisconnected", { run: { friendlyId } });
socket.data.runFriendlyId = undefined;
Expand Down