Skip to content

Commit 8940163

Browse files
committed
fix(provider): remove persisted binding on session stop for clean restar
- Prevents stale resumeCursor from crashed turns being reused - stopSession now deletes the binding instead of marking as "stopped" - sendTurn after stop now fails validation (no binding exists) - Next start must explicitly call startSession, ensuring truly fresh state
1 parent e20a374 commit 8940163

7 files changed

Lines changed: 81 additions & 78 deletions

File tree

apps/server/src/orchestration/Layers/ProviderCommandReactor.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -746,8 +746,19 @@ const make = Effect.gen(function* () {
746746
}
747747

748748
const now = event.payload.createdAt;
749-
if (thread.session && thread.session.status !== "stopped") {
750-
yield* providerService.stopSession({ threadId: thread.id });
749+
if (thread.session) {
750+
// Always call stopSession when a stop is requested — even if the orchestration
751+
// session is already marked "stopped". The provider-side DB binding can still
752+
// hold a stale resumeCursor from a crashed turn; calling stopSession clears it
753+
// so the next turn starts truly fresh instead of re-resuming a dead session id.
754+
yield* providerService.stopSession({ threadId: thread.id }).pipe(
755+
Effect.catchCause((cause) =>
756+
Effect.logWarning("provider command reactor skipped provider session stop", {
757+
threadId: thread.id,
758+
cause: Cause.pretty(cause),
759+
}),
760+
),
761+
);
751762
}
752763

753764
yield* setThreadSession({

apps/server/src/provider/Layers/CodexAdapter.test.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,7 @@ const providerSessionDirectoryTestLayer = Layer.succeed(ProviderSessionDirectory
144144
getProvider: () =>
145145
Effect.die(new Error("ProviderSessionDirectory.getProvider is not used in test")),
146146
getBinding: () => Effect.succeed(Option.none()),
147+
remove: () => Effect.void,
147148
listThreadIds: () => Effect.succeed([]),
148149
listBindings: () => Effect.succeed([]),
149150
});

apps/server/src/provider/Layers/ProviderService.test.ts

Lines changed: 52 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -567,31 +567,24 @@ routing.layer("ProviderServiceLive routing", (it) => {
567567
});
568568

569569
yield* provider.stopSession({ threadId: session.threadId });
570-
routing.codex.startSession.mockClear();
571-
routing.codex.sendTurn.mockClear();
572-
573-
yield* provider.sendTurn({
574-
threadId: session.threadId,
575-
input: "after-stop",
576-
attachments: [],
577-
});
578-
579-
assert.equal(routing.codex.startSession.mock.calls.length, 1);
580-
const resumedStartInput = routing.codex.startSession.mock.calls[0]?.[0];
581-
assert.equal(typeof resumedStartInput === "object" && resumedStartInput !== null, true);
582-
if (resumedStartInput && typeof resumedStartInput === "object") {
583-
const startPayload = resumedStartInput as {
584-
provider?: string;
585-
cwd?: string;
586-
resumeCursor?: unknown;
587-
threadId?: string;
588-
};
589-
assert.equal(startPayload.provider, "codex");
590-
assert.equal(startPayload.cwd, "/tmp/project");
591-
assert.deepEqual(startPayload.resumeCursor, session.resumeCursor);
592-
assert.equal(startPayload.threadId, session.threadId);
593-
}
594-
assert.equal(routing.codex.sendTurn.mock.calls.length, 1);
570+
const sendAfterStop = yield* Effect.result(
571+
provider.sendTurn({
572+
threadId: session.threadId,
573+
input: "after-stop",
574+
attachments: [],
575+
}),
576+
);
577+
assert.equal(sendAfterStop._tag, "Failure");
578+
if (sendAfterStop._tag !== "Failure") return;
579+
assert.equal(sendAfterStop.failure._tag, "ProviderValidationError");
580+
if (sendAfterStop.failure._tag !== "ProviderValidationError") return;
581+
assert.equal(sendAfterStop.failure.operation, "ProviderService.sendTurn");
582+
assert.equal(
583+
sendAfterStop.failure.issue.includes(
584+
`Cannot route thread '${session.threadId}' because no persisted provider binding exists.`,
585+
),
586+
true,
587+
);
595588
}),
596589
);
597590

@@ -635,55 +628,46 @@ routing.layer("ProviderServiceLive routing", (it) => {
635628
}),
636629
);
637630

638-
it.effect("preserves the persisted binding when stopping a session", () =>
639-
Effect.gen(function* () {
640-
const provider = yield* ProviderService;
641-
const runtimeRepository = yield* ProviderSessionRuntimeRepository;
631+
it.effect(
632+
"removes the persisted binding when stopping a session so the next start is fresh",
633+
() =>
634+
Effect.gen(function* () {
635+
const provider = yield* ProviderService;
636+
const runtimeRepository = yield* ProviderSessionRuntimeRepository;
642637

643-
const initial = yield* provider.startSession(asThreadId("thread-reap-preserve"), {
644-
provider: "codex",
645-
threadId: asThreadId("thread-reap-preserve"),
646-
cwd: "/tmp/project-reap-preserve",
647-
runtimeMode: "full-access",
648-
});
638+
const initial = yield* provider.startSession(asThreadId("thread-stop-removes"), {
639+
provider: "codex",
640+
threadId: asThreadId("thread-stop-removes"),
641+
cwd: "/tmp/project-stop-removes",
642+
runtimeMode: "full-access",
643+
});
649644

650-
yield* provider.stopSession({ threadId: initial.threadId });
645+
yield* provider.stopSession({ threadId: initial.threadId });
651646

652-
const persistedAfterStop = yield* runtimeRepository.getByThreadId({
653-
threadId: initial.threadId,
654-
});
655-
assert.equal(Option.isSome(persistedAfterStop), true);
656-
if (Option.isSome(persistedAfterStop)) {
657-
assert.equal(persistedAfterStop.value.status, "stopped");
658-
assert.deepEqual(persistedAfterStop.value.resumeCursor, initial.resumeCursor);
659-
}
647+
const persistedAfterStop = yield* runtimeRepository.getByThreadId({
648+
threadId: initial.threadId,
649+
});
650+
assert.equal(Option.isNone(persistedAfterStop), true);
660651

661-
routing.codex.startSession.mockClear();
662-
routing.codex.sendTurn.mockClear();
652+
routing.codex.startSession.mockClear();
663653

664-
yield* provider.sendTurn({
665-
threadId: initial.threadId,
666-
input: "resume after reap",
667-
attachments: [],
668-
});
654+
const restarted = yield* provider.startSession(initial.threadId, {
655+
provider: "codex",
656+
threadId: initial.threadId,
657+
cwd: "/tmp/project-stop-removes",
658+
runtimeMode: "full-access",
659+
});
669660

670-
assert.equal(routing.codex.startSession.mock.calls.length, 1);
671-
const resumedStartInput = routing.codex.startSession.mock.calls[0]?.[0];
672-
assert.equal(typeof resumedStartInput === "object" && resumedStartInput !== null, true);
673-
if (resumedStartInput && typeof resumedStartInput === "object") {
674-
const startPayload = resumedStartInput as {
675-
provider?: string;
676-
cwd?: string;
677-
resumeCursor?: unknown;
678-
threadId?: string;
679-
};
680-
assert.equal(startPayload.provider, "codex");
681-
assert.equal(startPayload.cwd, "/tmp/project-reap-preserve");
682-
assert.deepEqual(startPayload.resumeCursor, initial.resumeCursor);
683-
assert.equal(startPayload.threadId, initial.threadId);
684-
}
685-
assert.equal(routing.codex.sendTurn.mock.calls.length, 1);
686-
}),
661+
assert.equal(restarted.provider, "codex");
662+
assert.equal(routing.codex.startSession.mock.calls.length, 1);
663+
const restartedInput = routing.codex.startSession.mock.calls[0]?.[0];
664+
if (restartedInput && typeof restartedInput === "object") {
665+
const startPayload = restartedInput as {
666+
resumeCursor?: unknown;
667+
};
668+
assert.equal(startPayload.resumeCursor, undefined);
669+
}
670+
}),
687671
);
688672

689673
it.effect("routes explicit claudeAgent provider session starts to the claude adapter", () =>

apps/server/src/provider/Layers/ProviderService.ts

Lines changed: 1 addition & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -586,14 +586,7 @@ const makeProviderService = Effect.fn("makeProviderService")(function* (
586586
if (routed.isActive) {
587587
yield* routed.adapter.stopSession(routed.threadId);
588588
}
589-
yield* directory.upsert({
590-
threadId: input.threadId,
591-
provider: routed.adapter.provider,
592-
status: "stopped",
593-
runtimePayload: {
594-
activeTurnId: null,
595-
},
596-
});
589+
yield* directory.remove(input.threadId);
597590
}).pipe(
598591
withMetrics({
599592
counter: providerSessionsTotal,

apps/server/src/provider/Layers/ProviderSessionDirectory.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ const makeProviderSessionDirectory = Effect.gen(function* () {
145145
),
146146
);
147147

148+
const remove: ProviderSessionDirectoryShape["remove"] = (threadId) =>
149+
repository
150+
.deleteByThreadId({ threadId })
151+
.pipe(
152+
Effect.mapError(toPersistenceError("ProviderSessionDirectory.remove:deleteByThreadId")),
153+
);
154+
148155
const listThreadIds: ProviderSessionDirectoryShape["listThreadIds"] = () =>
149156
repository.list().pipe(
150157
Effect.mapError(toPersistenceError("ProviderSessionDirectory.listThreadIds:list")),
@@ -167,6 +174,7 @@ const makeProviderSessionDirectory = Effect.gen(function* () {
167174
upsert,
168175
getProvider,
169176
getBinding,
177+
remove,
170178
listThreadIds,
171179
listBindings,
172180
} satisfies ProviderSessionDirectoryShape;

apps/server/src/provider/Services/ProviderSessionDirectory.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,10 @@ export interface ProviderSessionDirectoryShape {
4545
threadId: ThreadId,
4646
) => Effect.Effect<Option.Option<ProviderRuntimeBinding>, ProviderSessionDirectoryReadError>;
4747

48+
readonly remove: (
49+
threadId: ThreadId,
50+
) => Effect.Effect<void, ProviderSessionDirectoryPersistenceError>;
51+
4852
readonly listThreadIds: () => Effect.Effect<
4953
ReadonlyArray<ThreadId>,
5054
ProviderSessionDirectoryPersistenceError

apps/web/src/components/ChatView.browser.tsx

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ import {
4949
} from "../lib/terminalContext";
5050
import { isMacPlatform } from "../lib/utils";
5151
import { __resetLocalApiForTests } from "../localApi";
52+
import { __resetNativeApiForTests } from "../nativeApi";
5253
import { AppAtomRegistryProvider } from "../rpc/atomRegistry";
5354
import { getServerConfig } from "../rpc/serverState";
5455
import { getRouter } from "../router";
@@ -1761,6 +1762,7 @@ describe("ChatView timeline estimator parity (full app)", () => {
17611762
},
17621763
});
17631764
await __resetLocalApiForTests();
1765+
await __resetNativeApiForTests();
17641766
await setViewport(DEFAULT_VIEWPORT);
17651767
localStorage.clear();
17661768
document.body.innerHTML = "";

0 commit comments

Comments
 (0)