Skip to content

Commit 5329dcc

Browse files
committed
feat(provider): add suspendSession to preserve resumeCursor for idle cle
Introduces suspendSession alongside stopSession with distinct semantics: - stopSession: Intentional termination, removes persisted binding including resumeCursor - suspendSession: Idle cleanup, preserves resumeCursor so next startSession resumes ProviderSessionReaper now calls suspendSession instead of stopSession so inactive sessions can resume their conversation state when revisited, rather than losing it.
1 parent ba5290b commit 5329dcc

8 files changed

Lines changed: 148 additions & 21 deletions

File tree

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ function createProviderServiceHarness(
9595
respondToRequest: () => unsupported(),
9696
respondToUserInput: () => unsupported(),
9797
stopSession: () => unsupported(),
98+
suspendSession: () => unsupported(),
9899
listSessions,
99100
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
100101
rollbackConversation,

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,7 @@ describe("ProviderCommandReactor", () => {
202202
respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"],
203203
respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"],
204204
stopSession: stopSession as ProviderServiceShape["stopSession"],
205+
suspendSession: () => unsupported(),
205206
listSessions: () => Effect.succeed(runtimeSessions),
206207
getCapabilities: (_provider) =>
207208
Effect.succeed({

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ function createProviderServiceHarness() {
9797
respondToRequest: () => unsupported(),
9898
respondToUserInput: () => unsupported(),
9999
stopSession: () => unsupported(),
100+
suspendSession: () => unsupported(),
100101
listSessions: () => Effect.succeed([...runtimeSessions]),
101102
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
102103
rollbackConversation: () => unsupported(),

apps/server/src/provider/Layers/ProviderService.test.ts

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -685,6 +685,56 @@ routing.layer("ProviderServiceLive routing", (it) => {
685685
}),
686686
);
687687

688+
it.effect("suspendSession preserves the persisted resumeCursor so the next start resumes", () =>
689+
Effect.gen(function* () {
690+
const provider = yield* ProviderService;
691+
const directory = yield* ProviderSessionDirectory;
692+
const threadId = asThreadId("thread-suspend-preserves-resume");
693+
694+
const initial = yield* provider.startSession(threadId, {
695+
provider: "codex",
696+
threadId,
697+
cwd: "/tmp/project-suspend",
698+
runtimeMode: "full-access",
699+
});
700+
701+
routing.codex.stopSession.mockClear();
702+
routing.codex.startSession.mockClear();
703+
704+
yield* provider.suspendSession({ threadId });
705+
706+
assert.deepEqual(routing.codex.stopSession.mock.calls, [[threadId]]);
707+
708+
const bindingAfterSuspend = yield* directory.getBinding(threadId);
709+
assert.equal(Option.isSome(bindingAfterSuspend), true);
710+
if (Option.isSome(bindingAfterSuspend)) {
711+
assert.equal(bindingAfterSuspend.value.status, "stopped");
712+
assert.deepEqual(bindingAfterSuspend.value.resumeCursor, initial.resumeCursor);
713+
}
714+
715+
yield* provider.startSession(threadId, {
716+
provider: "codex",
717+
threadId,
718+
cwd: "/tmp/project-suspend",
719+
runtimeMode: "full-access",
720+
});
721+
722+
assert.equal(routing.codex.startSession.mock.calls.length, 1);
723+
const resumedStartInput = routing.codex.startSession.mock.calls[0]?.[0];
724+
assert.equal(typeof resumedStartInput === "object" && resumedStartInput !== null, true);
725+
if (resumedStartInput && typeof resumedStartInput === "object") {
726+
const startPayload = resumedStartInput as {
727+
provider?: string;
728+
resumeCursor?: unknown;
729+
threadId?: string;
730+
};
731+
assert.equal(startPayload.provider, "codex");
732+
assert.equal(startPayload.threadId, threadId);
733+
assert.deepEqual(startPayload.resumeCursor, initial.resumeCursor);
734+
}
735+
}),
736+
);
737+
688738
it.effect("recovers stale sessions for sendTurn using persisted cwd", () =>
689739
Effect.gen(function* () {
690740
const provider = yield* ProviderService;

apps/server/src/provider/Layers/ProviderService.ts

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -599,6 +599,51 @@ const makeProviderService = Effect.fn("makeProviderService")(function* (
599599
},
600600
);
601601

602+
const suspendSession: ProviderServiceShape["suspendSession"] = Effect.fn("suspendSession")(
603+
function* (rawInput) {
604+
const input = yield* decodeInputOrValidationError({
605+
operation: "ProviderService.suspendSession",
606+
schema: ProviderStopSessionInput,
607+
payload: rawInput,
608+
});
609+
let metricProvider = "unknown";
610+
return yield* Effect.gen(function* () {
611+
const routed = yield* resolveRoutableSession({
612+
threadId: input.threadId,
613+
operation: "ProviderService.suspendSession",
614+
allowRecovery: false,
615+
});
616+
metricProvider = routed.adapter.provider;
617+
yield* Effect.annotateCurrentSpan({
618+
"provider.operation": "suspend-session",
619+
"provider.kind": routed.adapter.provider,
620+
"provider.thread_id": input.threadId,
621+
});
622+
if (routed.isActive) {
623+
yield* routed.adapter.stopSession(routed.threadId);
624+
}
625+
yield* directory.upsert({
626+
threadId: input.threadId,
627+
provider: routed.adapter.provider,
628+
status: "stopped",
629+
runtimePayload: {
630+
activeTurnId: null,
631+
lastRuntimeEvent: "provider.suspendSession",
632+
lastRuntimeEventAt: new Date().toISOString(),
633+
},
634+
});
635+
}).pipe(
636+
withMetrics({
637+
counter: providerSessionsTotal,
638+
outcomeAttributes: () =>
639+
providerMetricAttributes(metricProvider, {
640+
operation: "suspend",
641+
}),
642+
}),
643+
);
644+
},
645+
);
646+
602647
const listSessions: ProviderServiceShape["listSessions"] = Effect.fn("listSessions")(
603648
function* () {
604649
const sessionsByProvider = yield* Effect.forEach(adapters, (adapter) =>
@@ -730,6 +775,7 @@ const makeProviderService = Effect.fn("makeProviderService")(function* (
730775
respondToRequest,
731776
respondToUserInput,
732777
stopSession,
778+
suspendSession,
733779
listSessions,
734780
getCapabilities,
735781
rollbackConversation,

apps/server/src/provider/Layers/ProviderSessionReaper.test.ts

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -120,18 +120,24 @@ describe("ProviderSessionReaper", () => {
120120

121121
async function createHarness(input: {
122122
readonly readModel: ReturnType<typeof makeReadModel>;
123-
readonly stopSessionImplementation?: (input: {
123+
readonly suspendSessionImplementation?: (input: {
124124
readonly threadId: ThreadId;
125-
}) => ReturnType<ProviderServiceShape["stopSession"]>;
125+
}) => ReturnType<ProviderServiceShape["suspendSession"]>;
126126
}) {
127-
const stoppedThreadIds = new Set<ThreadId>();
128-
const stopSession = vi.fn<ProviderServiceShape["stopSession"]>(
127+
const suspendedThreadIds = new Set<ThreadId>();
128+
const suspendSession = vi.fn<ProviderServiceShape["suspendSession"]>(
129129
(request) =>
130-
(input.stopSessionImplementation
131-
? input.stopSessionImplementation(request)
130+
(input.suspendSessionImplementation
131+
? input.suspendSessionImplementation(request)
132132
: Effect.sync(() => {
133-
stoppedThreadIds.add(request.threadId);
134-
})) as ReturnType<ProviderServiceShape["stopSession"]>,
133+
suspendedThreadIds.add(request.threadId);
134+
})) as ReturnType<ProviderServiceShape["suspendSession"]>,
135+
);
136+
const stopSession = vi.fn<ProviderServiceShape["stopSession"]>(
137+
() =>
138+
Effect.die(
139+
new Error("Reaper must call suspendSession, not stopSession (would wipe resumeCursor)."),
140+
) as ReturnType<ProviderServiceShape["stopSession"]>,
135141
);
136142

137143
const providerService: ProviderServiceShape = {
@@ -141,6 +147,7 @@ describe("ProviderSessionReaper", () => {
141147
respondToRequest: () => unsupported(),
142148
respondToUserInput: () => unsupported(),
143149
stopSession,
150+
suspendSession,
144151
listSessions: () => Effect.succeed([]),
145152
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
146153
rollbackConversation: () => unsupported(),
@@ -172,7 +179,7 @@ describe("ProviderSessionReaper", () => {
172179
);
173180

174181
runtime = ManagedRuntime.make(layer);
175-
return { stopSession, stoppedThreadIds };
182+
return { suspendSession, suspendedThreadIds, stopSession };
176183
}
177184

178185
it("reaps stale persisted sessions without active turns", async () => {
@@ -216,10 +223,11 @@ describe("ProviderSessionReaper", () => {
216223
scope = await Effect.runPromise(Scope.make("sequential"));
217224
await Effect.runPromise(reaper.start().pipe(Scope.provide(scope)));
218225

219-
await waitFor(() => harness.stopSession.mock.calls.length === 1);
226+
await waitFor(() => harness.suspendSession.mock.calls.length === 1);
220227

221-
expect(harness.stopSession.mock.calls[0]?.[0]).toEqual({ threadId });
222-
expect(harness.stoppedThreadIds.has(threadId)).toBe(true);
228+
expect(harness.suspendSession.mock.calls[0]?.[0]).toEqual({ threadId });
229+
expect(harness.suspendedThreadIds.has(threadId)).toBe(true);
230+
expect(harness.stopSession).not.toHaveBeenCalled();
223231
});
224232

225233
it("skips stale sessions when the thread still has an active turn", async () => {
@@ -265,6 +273,7 @@ describe("ProviderSessionReaper", () => {
265273
await Effect.runPromise(reaper.start().pipe(Scope.provide(scope)));
266274
await new Promise((resolve) => setTimeout(resolve, 50));
267275

276+
expect(harness.suspendSession).not.toHaveBeenCalled();
268277
expect(harness.stopSession).not.toHaveBeenCalled();
269278
const remaining = await runtime!.runPromise(repository.getByThreadId({ threadId }));
270279
expect(Option.isSome(remaining)).toBe(true);
@@ -312,6 +321,7 @@ describe("ProviderSessionReaper", () => {
312321
await Effect.runPromise(reaper.start().pipe(Scope.provide(scope)));
313322
await new Promise((resolve) => setTimeout(resolve, 50));
314323

324+
expect(harness.suspendSession).not.toHaveBeenCalled();
315325
expect(harness.stopSession).not.toHaveBeenCalled();
316326
const remaining = await runtime!.runPromise(repository.getByThreadId({ threadId }));
317327
expect(Option.isSome(remaining)).toBe(true);
@@ -359,6 +369,7 @@ describe("ProviderSessionReaper", () => {
359369
await Effect.runPromise(reaper.start().pipe(Scope.provide(scope)));
360370
await new Promise((resolve) => setTimeout(resolve, 50));
361371

372+
expect(harness.suspendSession).not.toHaveBeenCalled();
362373
expect(harness.stopSession).not.toHaveBeenCalled();
363374
const remaining = await runtime!.runPromise(repository.getByThreadId({ threadId }));
364375
expect(Option.isSome(remaining)).toBe(true);
@@ -397,12 +408,12 @@ describe("ProviderSessionReaper", () => {
397408
},
398409
},
399410
]),
400-
stopSessionImplementation: (request) =>
411+
suspendSessionImplementation: (request) =>
401412
request.threadId === failedThreadId
402413
? Effect.fail(
403414
new ProviderValidationError({
404415
operation: "ProviderSessionReaper.test",
405-
issue: "simulated stop failure",
416+
issue: "simulated suspend failure",
406417
}),
407418
)
408419
: Effect.void,
@@ -442,12 +453,13 @@ describe("ProviderSessionReaper", () => {
442453
scope = await Effect.runPromise(Scope.make("sequential"));
443454
await Effect.runPromise(reaper.start().pipe(Scope.provide(scope)));
444455

445-
await waitFor(() => harness.stopSession.mock.calls.length === 2);
456+
await waitFor(() => harness.suspendSession.mock.calls.length === 2);
446457

447-
expect(harness.stopSession.mock.calls.map(([request]) => request.threadId)).toEqual([
458+
expect(harness.suspendSession.mock.calls.map(([request]) => request.threadId)).toEqual([
448459
failedThreadId,
449460
reapedThreadId,
450461
]);
462+
expect(harness.stopSession).not.toHaveBeenCalled();
451463
});
452464

453465
it("continues reaping other sessions when one stop attempt defects", async () => {
@@ -483,9 +495,9 @@ describe("ProviderSessionReaper", () => {
483495
},
484496
},
485497
]),
486-
stopSessionImplementation: (request) =>
498+
suspendSessionImplementation: (request) =>
487499
request.threadId === defectThreadId
488-
? Effect.die(new Error("simulated stop defect"))
500+
? Effect.die(new Error("simulated suspend defect"))
489501
: Effect.void,
490502
});
491503
const repository = await runtime!.runPromise(Effect.service(ProviderSessionRuntimeRepository));
@@ -523,11 +535,12 @@ describe("ProviderSessionReaper", () => {
523535
scope = await Effect.runPromise(Scope.make("sequential"));
524536
await Effect.runPromise(reaper.start().pipe(Scope.provide(scope)));
525537

526-
await waitFor(() => harness.stopSession.mock.calls.length === 2);
538+
await waitFor(() => harness.suspendSession.mock.calls.length === 2);
527539

528-
expect(harness.stopSession.mock.calls.map(([request]) => request.threadId)).toEqual([
540+
expect(harness.suspendSession.mock.calls.map(([request]) => request.threadId)).toEqual([
529541
defectThreadId,
530542
reapedThreadId,
531543
]);
544+
expect(harness.stopSession).not.toHaveBeenCalled();
532545
});
533546
});

apps/server/src/provider/Layers/ProviderSessionReaper.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ const makeProviderSessionReaper = (options?: ProviderSessionReaperLiveOptions) =
6565
continue;
6666
}
6767

68-
const reaped = yield* providerService.stopSession({ threadId: binding.threadId }).pipe(
68+
const reaped = yield* providerService.suspendSession({ threadId: binding.threadId }).pipe(
6969
Effect.tap(() =>
7070
Effect.logInfo("provider.session.reaped", {
7171
threadId: binding.threadId,

apps/server/src/provider/Services/ProviderService.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,11 +72,26 @@ export interface ProviderServiceShape {
7272

7373
/**
7474
* Stop a provider session.
75+
*
76+
* Intentional termination: tears down the adapter session and removes the
77+
* persisted binding (including `resumeCursor`). Use for user-driven stops
78+
* and thread deletions.
7579
*/
7680
readonly stopSession: (
7781
input: ProviderStopSessionInput,
7882
) => Effect.Effect<void, ProviderServiceError>;
7983

84+
/**
85+
* Suspend a provider session.
86+
*
87+
* Idle cleanup: tears down the adapter session but preserves the persisted
88+
* `resumeCursor` so the next `startSession` for this thread resumes the
89+
* conversation. Use for reaper-style inactivity sweeps.
90+
*/
91+
readonly suspendSession: (
92+
input: ProviderStopSessionInput,
93+
) => Effect.Effect<void, ProviderServiceError>;
94+
8095
/**
8196
* List active provider sessions.
8297
*

0 commit comments

Comments
 (0)