Skip to content

Commit 235076a

Browse files
committed
feat: add provider handoff compaction on model switches
Cherry-picked from upstream pingdotgg#1911
1 parent 7510b9f commit 235076a

28 files changed

Lines changed: 1877 additions & 96 deletions

apps/server/integration/TestProviderAdapter.integration.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -472,6 +472,15 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter
472472
sessions.clear();
473473
});
474474

475+
const compactThread: ProviderAdapterShape<ProviderAdapterError>["compactThread"] = (threadId) =>
476+
readThread(threadId).pipe(
477+
Effect.map((snapshot) => {
478+
const latestTurn = snapshot.turns.at(-1);
479+
const latestItem = latestTurn?.items.at(-1);
480+
return typeof latestItem === "string" ? latestItem : null;
481+
}),
482+
);
483+
475484
const adapter: ProviderAdapterShape<ProviderAdapterError> = {
476485
provider,
477486
capabilities: {
@@ -486,6 +495,7 @@ export const makeTestProviderAdapterHarness = (options?: MakeTestProviderAdapter
486495
listSessions,
487496
hasSession,
488497
readThread,
498+
compactThread,
489499
rollbackThread,
490500
stopAll,
491501
get streamEvents() {

apps/server/src/codexAppServerManager.ts

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ export interface CodexThreadSnapshot {
138138
}
139139

140140
const CODEX_VERSION_CHECK_TIMEOUT_MS = 4_000;
141+
const CODEX_THREAD_COMPACTION_TIMEOUT_MS = 60_000;
141142

142143
const ANSI_ESCAPE_CHAR = String.fromCharCode(27);
143144
const ANSI_ESCAPE_REGEX = new RegExp(`${ANSI_ESCAPE_CHAR}\\[[0-9;]*m`, "g");
@@ -795,6 +796,60 @@ export class CodexAppServerManager extends EventEmitter<CodexAppServerManagerEve
795796
return this.parseThreadSnapshot("thread/read", response);
796797
}
797798

799+
async compactThread(threadId: ThreadId): Promise<CodexThreadSnapshot> {
800+
const context = this.requireSession(threadId);
801+
const providerThreadId = readResumeThreadId({
802+
threadId: context.session.threadId,
803+
runtimeMode: context.session.runtimeMode,
804+
resumeCursor: context.session.resumeCursor,
805+
});
806+
if (!providerThreadId) {
807+
throw new Error("Session is missing a provider resume thread id.");
808+
}
809+
810+
const compactionWait = {
811+
cleanup: () => undefined,
812+
};
813+
const waitForCompaction = new Promise<void>((resolve, reject) => {
814+
const timeout = setTimeout(() => {
815+
compactionWait.cleanup();
816+
reject(new Error("Timed out waiting for Codex thread compaction."));
817+
}, CODEX_THREAD_COMPACTION_TIMEOUT_MS);
818+
819+
const onEvent = (event: ProviderEvent) => {
820+
if (
821+
event.provider !== "codex" ||
822+
event.threadId !== context.session.threadId ||
823+
event.kind !== "notification"
824+
) {
825+
return;
826+
}
827+
if (event.method === "thread/compacted") {
828+
compactionWait.cleanup();
829+
resolve();
830+
}
831+
};
832+
833+
compactionWait.cleanup = () => {
834+
clearTimeout(timeout);
835+
this.off("event", onEvent);
836+
};
837+
838+
this.on("event", onEvent);
839+
});
840+
841+
try {
842+
await this.sendRequest(context, "thread/compact/start", {
843+
threadId: providerThreadId,
844+
});
845+
await waitForCompaction;
846+
} catch (error) {
847+
compactionWait.cleanup();
848+
throw error;
849+
}
850+
return this.readThread(threadId);
851+
}
852+
798853
async rollbackThread(threadId: ThreadId, numTurns: number): Promise<CodexThreadSnapshot> {
799854
const context = this.requireSession(threadId);
800855
const providerThreadId = readResumeThreadId({

apps/server/src/orchestration/Layers/CheckpointReactor.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,8 +95,10 @@ function createProviderServiceHarness(
9595
respondToRequest: () => unsupported(),
9696
respondToUserInput: () => unsupported(),
9797
stopSession: () => unsupported(),
98+
stopSessionForProvider: () => unsupported(),
9899
listSessions,
99100
getCapabilities: () => Effect.succeed({ sessionModelSwitch: "in-session" }),
101+
compactThread: () => unsupported(),
100102
rollbackConversation,
101103
get streamEvents() {
102104
return Stream.fromPubSub(runtimeEventPubSub);

apps/server/src/orchestration/Layers/ProviderCommandReactor.test.ts

Lines changed: 128 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ describe("ProviderCommandReactor", () => {
110110
const runtimeEventPubSub = Effect.runSync(PubSub.unbounded<ProviderRuntimeEvent>());
111111
let nextSessionIndex = 1;
112112
const runtimeSessions: Array<ProviderSession> = [];
113-
const modelSelection = input?.threadModelSelection ?? {
113+
const defaultModelSelection = input?.threadModelSelection ?? {
114114
provider: "codex",
115115
model: "gpt-5-codex",
116116
};
@@ -127,8 +127,25 @@ describe("ProviderCommandReactor", () => {
127127
typeof input.threadId === "string"
128128
? ThreadId.make(input.threadId)
129129
: ThreadId.make(`thread-${sessionIndex}`);
130+
const provider =
131+
typeof input === "object" &&
132+
input !== null &&
133+
"provider" in input &&
134+
(input.provider === "codex" || input.provider === "claudeAgent")
135+
? input.provider
136+
: defaultModelSelection.provider;
137+
const model =
138+
typeof input === "object" &&
139+
input !== null &&
140+
"modelSelection" in input &&
141+
typeof input.modelSelection === "object" &&
142+
input.modelSelection !== null &&
143+
"model" in input.modelSelection &&
144+
typeof input.modelSelection.model === "string"
145+
? input.modelSelection.model
146+
: defaultModelSelection.model;
130147
const session: ProviderSession = {
131-
provider: modelSelection.provider,
148+
provider,
132149
status: "ready" as const,
133150
runtimeMode:
134151
typeof input === "object" &&
@@ -137,13 +154,21 @@ describe("ProviderCommandReactor", () => {
137154
(input.runtimeMode === "approval-required" || input.runtimeMode === "full-access")
138155
? input.runtimeMode
139156
: "full-access",
140-
...(modelSelection.model !== undefined ? { model: modelSelection.model } : {}),
157+
...(model ? { model } : {}),
141158
threadId,
142159
resumeCursor: resumeCursor ?? { opaque: `resume-${sessionIndex}` },
143160
createdAt: now,
144161
updatedAt: now,
145162
};
146-
runtimeSessions.push(session);
163+
const existingIndex = runtimeSessions.findIndex(
164+
(runtimeSession) =>
165+
runtimeSession.threadId === threadId && runtimeSession.provider === provider,
166+
);
167+
if (existingIndex >= 0) {
168+
runtimeSessions.splice(existingIndex, 1, session);
169+
} else {
170+
runtimeSessions.push(session);
171+
}
147172
return Effect.succeed(session);
148173
});
149174
const sendTurn = vi.fn((_: unknown) =>
@@ -155,6 +180,13 @@ describe("ProviderCommandReactor", () => {
155180
const interruptTurn = vi.fn((_: unknown) => Effect.void);
156181
const respondToRequest = vi.fn<ProviderServiceShape["respondToRequest"]>(() => Effect.void);
157182
const respondToUserInput = vi.fn<ProviderServiceShape["respondToUserInput"]>(() => Effect.void);
183+
const compactThread = vi.fn((input: unknown) =>
184+
Effect.succeed(
185+
typeof input === "object" && input !== null && "threadId" in input
186+
? `Summary for ${String(input.threadId)}`
187+
: "Summary",
188+
),
189+
);
158190
const stopSession = vi.fn((input: unknown) =>
159191
Effect.sync(() => {
160192
const threadId =
@@ -164,7 +196,18 @@ describe("ProviderCommandReactor", () => {
164196
if (!threadId) {
165197
return;
166198
}
167-
const index = runtimeSessions.findIndex((session) => session.threadId === threadId);
199+
const provider =
200+
typeof input === "object" &&
201+
input !== null &&
202+
"provider" in input &&
203+
(input.provider === "codex" || input.provider === "claudeAgent")
204+
? input.provider
205+
: undefined;
206+
const index = runtimeSessions.findIndex(
207+
(session) =>
208+
session.threadId === threadId &&
209+
(provider === undefined || session.provider === provider),
210+
);
168211
if (index >= 0) {
169212
runtimeSessions.splice(index, 1);
170213
}
@@ -223,7 +266,9 @@ describe("ProviderCommandReactor", () => {
223266
interruptTurn: interruptTurn as ProviderServiceShape["interruptTurn"],
224267
respondToRequest: respondToRequest as ProviderServiceShape["respondToRequest"],
225268
respondToUserInput: respondToUserInput as ProviderServiceShape["respondToUserInput"],
269+
compactThread: compactThread as ProviderServiceShape["compactThread"],
226270
stopSession: stopSession as ProviderServiceShape["stopSession"],
271+
stopSessionForProvider: stopSession as ProviderServiceShape["stopSessionForProvider"],
227272
listSessions: () => Effect.succeed(runtimeSessions),
228273
getCapabilities: (_provider) =>
229274
Effect.succeed({
@@ -281,7 +326,7 @@ describe("ProviderCommandReactor", () => {
281326
projectId: asProjectId("project-1"),
282327
title: "Provider Project",
283328
workspaceRoot: "/tmp/provider-project",
284-
defaultModelSelection: modelSelection,
329+
defaultModelSelection: defaultModelSelection,
285330
createdAt: now,
286331
}),
287332
);
@@ -292,7 +337,7 @@ describe("ProviderCommandReactor", () => {
292337
threadId: ThreadId.make("thread-1"),
293338
projectId: asProjectId("project-1"),
294339
title: "Thread",
295-
modelSelection: modelSelection,
340+
modelSelection: defaultModelSelection,
296341
interactionMode: DEFAULT_PROVIDER_INTERACTION_MODE,
297342
runtimeMode: "approval-required",
298343
branch: null,
@@ -308,6 +353,7 @@ describe("ProviderCommandReactor", () => {
308353
interruptTurn,
309354
respondToRequest,
310355
respondToUserInput,
356+
compactThread,
311357
stopSession,
312358
renameBranch,
313359
refreshStatus,
@@ -750,7 +796,7 @@ describe("ProviderCommandReactor", () => {
750796
});
751797
});
752798

753-
it("rejects a first turn when requested provider conflicts with the thread model", async () => {
799+
it("starts the requested provider on the first turn even when the thread default differs", async () => {
754800
const harness = await createHarness({
755801
threadModelSelection: { provider: "codex", model: "gpt-5-codex" },
756802
});
@@ -777,29 +823,28 @@ describe("ProviderCommandReactor", () => {
777823
}),
778824
);
779825

780-
await waitFor(async () => {
781-
const readModel = await Effect.runPromise(harness.engine.getReadModel());
782-
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
783-
return (
784-
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
785-
false
786-
);
787-
});
788-
789-
expect(harness.startSession).not.toHaveBeenCalled();
790-
expect(harness.sendTurn).not.toHaveBeenCalled();
826+
await waitFor(() => harness.startSession.mock.calls.length === 1);
827+
await waitFor(() => harness.sendTurn.mock.calls.length === 1);
791828

792829
const readModel = await Effect.runPromise(harness.engine.getReadModel());
793830
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
794-
expect(thread?.session).toBeNull();
795-
expect(
796-
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
797-
).toMatchObject({
798-
summary: "Provider turn start failed",
799-
payload: {
800-
detail: expect.stringContaining("cannot switch to 'claudeAgent'"),
831+
expect(harness.compactThread).not.toHaveBeenCalled();
832+
expect(harness.startSession.mock.calls[0]?.[1]).toMatchObject({
833+
provider: "claudeAgent",
834+
modelSelection: {
835+
provider: "claudeAgent",
836+
model: "claude-opus-4-6",
801837
},
802838
});
839+
expect(harness.sendTurn.mock.calls[0]?.[0]).toMatchObject({
840+
threadId: ThreadId.make("thread-1"),
841+
input: "hello claude",
842+
modelSelection: {
843+
provider: "claudeAgent",
844+
model: "claude-opus-4-6",
845+
},
846+
});
847+
expect(thread?.session?.providerName).toBe("claudeAgent");
803848
});
804849

805850
it("preserves the active session model when in-session model switching is unsupported", async () => {
@@ -1100,7 +1145,7 @@ describe("ProviderCommandReactor", () => {
11001145
});
11011146
});
11021147

1103-
it("rejects provider changes after a thread is already bound to a session provider", async () => {
1148+
it("compacts and hands off context when switching providers mid-thread", async () => {
11041149
const harness = await createHarness();
11051150
const now = new Date().toISOString();
11061151

@@ -1145,31 +1190,69 @@ describe("ProviderCommandReactor", () => {
11451190
}),
11461191
);
11471192

1148-
await waitFor(async () => {
1149-
const readModel = await Effect.runPromise(harness.engine.getReadModel());
1150-
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
1151-
return (
1152-
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed") ??
1153-
false
1154-
);
1155-
});
1193+
await waitFor(() => harness.startSession.mock.calls.length === 2);
1194+
await waitFor(() => harness.sendTurn.mock.calls.length === 2);
11561195

1157-
expect(harness.startSession.mock.calls.length).toBe(1);
1158-
expect(harness.sendTurn.mock.calls.length).toBe(1);
1159-
expect(harness.stopSession.mock.calls.length).toBe(0);
1196+
expect(harness.compactThread.mock.calls.length).toBe(1);
1197+
expect(harness.compactThread.mock.calls[0]?.[0]).toMatchObject({
1198+
threadId: ThreadId.make("thread-1"),
1199+
});
1200+
expect(harness.startSession.mock.calls[1]?.[1]).toMatchObject({
1201+
provider: "claudeAgent",
1202+
modelSelection: {
1203+
provider: "claudeAgent",
1204+
model: "claude-opus-4-6",
1205+
},
1206+
});
1207+
expect(harness.sendTurn.mock.calls[1]?.[0]).toMatchObject({
1208+
threadId: ThreadId.make("thread-1"),
1209+
modelSelection: {
1210+
provider: "claudeAgent",
1211+
model: "claude-opus-4-6",
1212+
},
1213+
});
1214+
const switchedTurnInput = harness.sendTurn.mock.calls[1]?.[0] as { input?: string } | undefined;
1215+
expect(switchedTurnInput?.input).toContain("<provider_handoff>");
1216+
expect(switchedTurnInput?.input).toContain("Summary for thread-1");
1217+
expect(switchedTurnInput?.input).toContain("second");
1218+
expect(harness.stopSession.mock.calls.length).toBe(1);
1219+
expect(harness.stopSession.mock.calls[0]?.[0]).toMatchObject({
1220+
threadId: ThreadId.make("thread-1"),
1221+
provider: "codex",
1222+
});
11601223

11611224
const readModel = await Effect.runPromise(harness.engine.getReadModel());
11621225
const thread = readModel.threads.find((entry) => entry.id === ThreadId.make("thread-1"));
11631226
expect(thread?.session?.threadId).toBe("thread-1");
1164-
expect(thread?.session?.providerName).toBe("codex");
1227+
expect(thread?.session?.providerName).toBe("claudeAgent");
11651228
expect(thread?.session?.runtimeMode).toBe("approval-required");
11661229
expect(
1167-
thread?.activities.find((activity) => activity.kind === "provider.turn.start.failed"),
1168-
).toMatchObject({
1169-
payload: {
1170-
detail: expect.stringContaining("cannot switch to 'claudeAgent'"),
1171-
},
1172-
});
1230+
thread?.activities.filter((activity) => activity.kind.startsWith("provider.handoff.")),
1231+
).toEqual(
1232+
expect.arrayContaining([
1233+
expect.objectContaining({
1234+
kind: "provider.handoff.compacting",
1235+
summary: "Handing off thread",
1236+
turnId: null,
1237+
payload: expect.objectContaining({
1238+
sourceProvider: "codex",
1239+
targetProvider: "claudeAgent",
1240+
}),
1241+
}),
1242+
expect.objectContaining({
1243+
kind: "provider.handoff.completed",
1244+
summary: "Handed off thread",
1245+
turnId: null,
1246+
payload: expect.objectContaining({
1247+
sourceProvider: "codex",
1248+
targetProvider: "claudeAgent",
1249+
}),
1250+
}),
1251+
]),
1252+
);
1253+
expect(
1254+
thread?.activities.some((activity) => activity.kind === "provider.turn.start.failed"),
1255+
).toBe(false);
11731256
});
11741257

11751258
it("does not stop the active session when restart fails before rebind", async () => {

0 commit comments

Comments
 (0)