Skip to content

Commit acd1abb

Browse files
Add provider handoff compaction on model switches
- Compact the active thread before switching providers - Preserve context with a handoff summary in the next turn - Update orchestration and adapter tests for compaction
1 parent a3dadf3 commit acd1abb

26 files changed

Lines changed: 1140 additions & 97 deletions

apps/server/integration/TestProviderAdapter.integration.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,15 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter
472472
sessions.clear();
473473
});
474474

475+
const compactThread: ProviderAdapterShape<ProviderAdapterError>["compactThread"] = (threadId) =>
476+
readThread(threadId).pipe(
477+
Effect.map((snapshot) => {
478+
const latestTurn = snapshot.turns.at(-1);
479+
const latestItem = latestTurn?.items.at(-1);
480+
return typeof latestItem === "string" ? latestItem : null;
481+
}),
482+
);
483+
475484
const adapter: ProviderAdapterShape<ProviderAdapterError> = {
476485
provider,
477486
capabilities: {
@@ -486,6 +495,7 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter
486495
listSessions,
487496
hasSession,
488497
readThread,
498+
compactThread,
489499
rollbackThread,
490500
stopAll,
491501
streamEvents: Stream.fromQueue(runtimeEvents),

apps/server/src/codexAppServerManager.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ export interface CodexThreadSnapshot {
138138
}
139139

140140
const CODEX_VERSION_CHECK_TIMEOUT_MS = 4_000;
141+
const CODEX_THREAD_COMPACTION_TIMEOUT_MS = 60_000;
141142

142143
const ANSI_ESCAPE_CHAR = String.fromCharCode(27);
143144
const ANSI_ESCAPE_REGEX = new RegExp(`${ANSI_ESCAPE_CHAR}\\[[0-9;]*m`, "g");
@@ -795,6 +796,60 @@ export class CodexAppServerManager extends EventEmitter<CodexAppServerManagerEve
795796
return this.parseThreadSnapshot("thread/read", response);
796797
}
797798

799+
async compactThread(threadId: ThreadId): Promise<CodexThreadSnapshot> {
800+
const context = this.requireSession(threadId);
801+
const providerThreadId = readResumeThreadId({
802+
threadId: context.session.threadId,
803+
runtimeMode: context.session.runtimeMode,
804+
resumeCursor: context.session.resumeCursor,
805+
});
806+
if (!providerThreadId) {
807+
throw new Error("Session is missing a provider resume thread id.");
808+
}
809+
810+
const compactionWait = {
811+
cleanup: () => undefined,
812+
};
813+
const waitForCompaction = new Promise<void>((resolve, reject) => {
814+
const timeout = setTimeout(() => {
815+
compactionWait.cleanup();
816+
reject(new Error("Timed out waiting for Codex thread compaction."));
817+
}, CODEX_THREAD_COMPACTION_TIMEOUT_MS);
818+
819+
const onEvent = (event: ProviderEvent) => {
820+
if (
821+
event.provider !== "codex" ||
822+
event.threadId !== context.session.threadId ||
823+
event.kind !== "notification"
824+
) {
825+
return;
826+
}
827+
if (event.method === "thread/compacted") {
828+
compactionWait.cleanup();
829+
resolve();
830+
}
831+
};
832+
833+
compactionWait.cleanup = () => {
834+
clearTimeout(timeout);
835+
this.off("event", onEvent);
836+
};
837+
838+
this.on("event", onEvent);
839+
});
840+
841+
try {
842+
await this.sendRequest(context, "thread/compact/start", {
843+
threadId: providerThreadId,
844+
});
845+
await waitForCompaction;
846+
} catch (error) {
847+
compactionWait.cleanup();
848+
throw error;
849+
}
850+
return this.readThread(threadId);
851+
}
852+
798853
async rollbackThread(threadId: ThreadId, numTurns: number): Promise<CodexThreadSnapshot> {
799854
const context = this.requireSession(threadId);
800855
const providerThreadId = readResumeThreadId({

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,10 @@ function createProviderServiceHarness(
9595
respondToRequest: () => unsupported(),
9696
respondToUserInput: () => unsupported(),
9797
stopSession: () => unsupported(),
98+
stopSessionForProvider: () => unsupported(),
9899
listSessions,
99100
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
101+
compactThread: () => unsupported(),
100102
rollbackConversation,
101103
get streamEvents() {
102104
return Stream.fromPubSub(runtimeEventPubSub);

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 128 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ describe("ProviderCommandReactor", () => {
106106
const runtimeEventPubSub = Effect.runSync(PubSub.unbounded<ProviderRuntimeEvent>());
107107
let nextSessionIndex = 1;
108108
const runtimeSessions: Array<ProviderSession> = [];
109-
const modelSelection = input?.threadModelSelection ?? {
109+
const defaultModelSelection = input?.threadModelSelection ?? {
110110
provider: "codex",
111111
model: "gpt-5-codex",
112112
};
@@ -123,8 +123,25 @@ describe("ProviderCommandReactor", () => {
123123
typeof input.threadId === "string"
124124
? ThreadId.make(input.threadId)
125125
: ThreadId.make(`thread-${sessionIndex}`);
126+
const provider =
127+
typeof input === "object" &&
128+
input !== null &&
129+
"provider" in input &&
130+
(input.provider === "codex" || input.provider === "claudeAgent")
131+
? input.provider
132+
: defaultModelSelection.provider;
133+
const model =
134+
typeof input === "object" &&
135+
input !== null &&
136+
"modelSelection" in input &&
137+
typeof input.modelSelection === "object" &&
138+
input.modelSelection !== null &&
139+
"model" in input.modelSelection &&
140+
typeof input.modelSelection.model === "string"
141+
? input.modelSelection.model
142+
: defaultModelSelection.model;
126143
const session: ProviderSession = {
127-
provider: modelSelection.provider,
144+
provider,
128145
status: "ready" as const,
129146
runtimeMode:
130147
typeof input === "object" &&
@@ -133,13 +150,21 @@ describe("ProviderCommandReactor", () => {
133150
(input.runtimeMode === "approval-required" || input.runtimeMode === "full-access")
134151
? input.runtimeMode
135152
: "full-access",
136-
...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}),
153+
...(model ? { model } : {}),
137154
threadId,
138155
resumeCursor: resumeCursor ?? { opaque: `resume-${sessionIndex}` },
139156
createdAt: now,
140157
updatedAt: now,
141158
};
142-
runtimeSessions.push(session);
159+
const existingIndex = runtimeSessions.findIndex(
160+
(runtimeSession) =>
161+
runtimeSession.threadId === threadId && runtimeSession.provider === provider,
162+
);
163+
if (existingIndex >= 0) {
164+
runtimeSessions.splice(existingIndex, 1, session);
165+
} else {
166+
runtimeSessions.push(session);
167+
}
143168
return Effect.succeed(session);
144169
});
145170
const sendTurn = vi.fn((_: unknown) =>
@@ -151,6 +176,13 @@ describe("ProviderCommandReactor", () => {
151176
const interruptTurn = vi.fn((_: unknown) => Effect.void);
152177
const respondToRequest = vi.fn<ProviderServiceShape["respondToRequest"]>(() => Effect.void);
153178
const respondToUserInput = vi.fn<ProviderServiceShape["respondToUserInput"]>(() => Effect.void);
179+
const compactThread = vi.fn((input: unknown) =>
180+
Effect.succeed(
181+
typeof input === "object" && input !== null && "threadId" in input
182+
? `Summary for ${String(input.threadId)}`
183+
: "Summary",
184+
),
185+
);
154186
const stopSession = vi.fn((input: unknown) =>
155187
Effect.sync(() => {
156188
const threadId =
@@ -160,7 +192,18 @@ describe("ProviderCommandReactor", () => {
160192
if (!threadId) {
161193
return;
162194
}
163-
const index = runtimeSessions.findIndex((session) => session.threadId === threadId);
195+
const provider =
196+
typeof input === "object" &&
197+
input !== null &&
198+
"provider" in input &&
199+
(input.provider === "codex" || input.provider === "claudeAgent")
200+
? input.provider
201+
: undefined;
202+
const index = runtimeSessions.findIndex(
203+
(session) =>
204+
session.threadId === threadId &&
205+
(provider === undefined || session.provider === provider),
206+
);
164207
if (index >= 0) {
165208
runtimeSessions.splice(index, 1);
166209
}
@@ -201,7 +244,9 @@ describe("ProviderCommandReactor", () => {
201244
interruptTurn: interruptTurn as ProviderServiceShape["interruptTurn"],
202245
respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"],
203246
respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"],
247+
compactThread: compactThread as ProviderServiceShape["compactThread"],
204248
stopSession: stopSession as ProviderServiceShape["stopSession"],
249+
stopSessionForProvider: stopSession as ProviderServiceShape["stopSessionForProvider"],
205250
listSessions: () => Effect.succeed(runtimeSessions),
206251
getCapabilities: (_provider) =>
207252
Effect.succeed({
@@ -250,7 +295,7 @@ describe("ProviderCommandReactor", () => {
250295
projectId: asProjectId("project-1"),
251296
title: "Provider Project",
252297
workspaceRoot: "/tmp/provider-project",
253-
defaultModelSelection: modelSelection,
298+
defaultModelSelection: defaultModelSelection,
254299
createdAt: now,
255300
}),
256301
);
@@ -261,7 +306,7 @@ describe("ProviderCommandReactor", () => {
261306
threadId: ThreadId.make("thread-1"),
262307
projectId: asProjectId("project-1"),
263308
title: "Thread",
264-
modelSelection: modelSelection,
309+
modelSelection: defaultModelSelection,
265310
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
266311
runtimeMode: "approval-required",
267312
branch: null,
@@ -277,6 +322,7 @@ describe("ProviderCommandReactor", () => {
277322
interruptTurn,
278323
respondToRequest,
279324
respondToUserInput,
325+
compactThread,
280326
stopSession,
281327
renameBranch,
282328
generateBranchName,
@@ -716,7 +762,7 @@ describe("ProviderCommandReactor", () => {
716762
});
717763
});
718764

719-
it("rejects a first turn when requested provider conflicts with the thread model", async () => {
765+
it("starts the requested provider on the first turn even when the thread default differs", async () => {
720766
const harness = await createHarness({
721767
threadModelSelection: { provider: "codex", model: "gpt-5-codex" },
722768
});
@@ -743,29 +789,28 @@ describe("ProviderCommandReactor", () => {
743789
}),
744790
);
745791

746-
await waitFor(async () => {
747-
const readModel = await Effect.runPromise(harness.engine.getReadModel());
748-
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
749-
return (
750-
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
751-
false
752-
);
753-
});
754-
755-
expect(harness.startSession).not.toHaveBeenCalled();
756-
expect(harness.sendTurn).not.toHaveBeenCalled();
792+
await waitFor(() => harness.startSession.mock.calls.length === 1);
793+
await waitFor(() => harness.sendTurn.mock.calls.length === 1);
757794

758795
const readModel = await Effect.runPromise(harness.engine.getReadModel());
759796
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
760-
expect(thread?.session).toBeNull();
761-
expect(
762-
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
763-
).toMatchObject({
764-
summary: "Provider turn start failed",
765-
payload: {
766-
detail: expect.stringContaining("cannot switch to 'claudeAgent'"),
797+
expect(harness.compactThread).not.toHaveBeenCalled();
798+
expect(harness.startSession.mock.calls[0]?.[1]).toMatchObject({
799+
provider: "claudeAgent",
800+
modelSelection: {
801+
provider: "claudeAgent",
802+
model: "claude-opus-4-6",
767803
},
768804
});
805+
expect(harness.sendTurn.mock.calls[0]?.[0]).toMatchObject({
806+
threadId: ThreadId.make("thread-1"),
807+
input: "hello claude",
808+
modelSelection: {
809+
provider: "claudeAgent",
810+
model: "claude-opus-4-6",
811+
},
812+
});
813+
expect(thread?.session?.providerName).toBe("claudeAgent");
769814
});
770815

771816
it("preserves the active session model when in-session model switching is unsupported", async () => {
@@ -1066,7 +1111,7 @@ describe("ProviderCommandReactor", () => {
10661111
});
10671112
});
10681113

1069-
it("rejects provider changes after a thread is already bound to a session provider", async () => {
1114+
it("compacts and hands off context when switching providers mid-thread", async () => {
10701115
const harness = await createHarness();
10711116
const now = new Date().toISOString();
10721117

@@ -1111,31 +1156,69 @@ describe("ProviderCommandReactor", () => {
11111156
}),
11121157
);
11131158

1114-
await waitFor(async () => {
1115-
const readModel = await Effect.runPromise(harness.engine.getReadModel());
1116-
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
1117-
return (
1118-
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
1119-
false
1120-
);
1121-
});
1159+
await waitFor(() => harness.startSession.mock.calls.length === 2);
1160+
await waitFor(() => harness.sendTurn.mock.calls.length === 2);
11221161

1123-
expect(harness.startSession.mock.calls.length).toBe(1);
1124-
expect(harness.sendTurn.mock.calls.length).toBe(1);
1125-
expect(harness.stopSession.mock.calls.length).toBe(0);
1162+
expect(harness.compactThread.mock.calls.length).toBe(1);
1163+
expect(harness.compactThread.mock.calls[0]?.[0]).toMatchObject({
1164+
threadId: ThreadId.make("thread-1"),
1165+
});
1166+
expect(harness.startSession.mock.calls[1]?.[1]).toMatchObject({
1167+
provider: "claudeAgent",
1168+
modelSelection: {
1169+
provider: "claudeAgent",
1170+
model: "claude-opus-4-6",
1171+
},
1172+
});
1173+
expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({
1174+
threadId: ThreadId.make("thread-1"),
1175+
modelSelection: {
1176+
provider: "claudeAgent",
1177+
model: "claude-opus-4-6",
1178+
},
1179+
});
1180+
const switchedTurnInput = harness.sendTurn.mock.calls[1]?.[0] as { input?: string } | undefined;
1181+
expect(switchedTurnInput?.input).toContain("<provider_handoff>");
1182+
expect(switchedTurnInput?.input).toContain("Summary for thread-1");
1183+
expect(switchedTurnInput?.input).toContain("second");
1184+
expect(harness.stopSession.mock.calls.length).toBe(1);
1185+
expect(harness.stopSession.mock.calls[0]?.[0]).toMatchObject({
1186+
threadId: ThreadId.make("thread-1"),
1187+
provider: "codex",
1188+
});
11261189

11271190
const readModel = await Effect.runPromise(harness.engine.getReadModel());
11281191
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
11291192
expect(thread?.session?.threadId).toBe("thread-1");
1130-
expect(thread?.session?.providerName).toBe("codex");
1193+
expect(thread?.session?.providerName).toBe("claudeAgent");
11311194
expect(thread?.session?.runtimeMode).toBe("approval-required");
11321195
expect(
1133-
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
1134-
).toMatchObject({
1135-
payload: {
1136-
detail: expect.stringContaining("cannot switch to 'claudeAgent'"),
1137-
},
1138-
});
1196+
thread?.activities.filter((activity) => activity.kind.startsWith("provider.handoff.")),
1197+
).toEqual(
1198+
expect.arrayContaining([
1199+
expect.objectContaining({
1200+
kind: "provider.handoff.compacting",
1201+
summary: "Handing off thread",
1202+
turnId: null,
1203+
payload: expect.objectContaining({
1204+
sourceProvider: "codex",
1205+
targetProvider: "claudeAgent",
1206+
}),
1207+
}),
1208+
expect.objectContaining({
1209+
kind: "provider.handoff.completed",
1210+
summary: "Handed off thread",
1211+
turnId: null,
1212+
payload: expect.objectContaining({
1213+
sourceProvider: "codex",
1214+
targetProvider: "claudeAgent",
1215+
}),
1216+
}),
1217+
]),
1218+
);
1219+
expect(
1220+
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed"),
1221+
).toBe(false);
11391222
});
11401223

11411224
it("does not stop the active session when restart fails before rebind", async () => {

0 commit comments

Comments
 (0)