Skip to content

Commit 436e6b6

Browse files
committed
Fix Cursor ACP session replay on resume
1 parent e408b01 commit 436e6b6

5 files changed

Lines changed: 308 additions & 2 deletions

File tree

FORK.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ This repository is a fork of `pingdotgg/t3code`. Keep this file focused on fork
2929
### Provider Launch Environment
3030

3131
- Provider sessions use a shared launch environment pipeline instead of ad hoc environment assembly.
32+
- Cursor ACP load replay is reconciled against already-projected assistant text so restarts do not duplicate seen output while still allowing unseen external assistant output to appear.
3233

3334
### Base Path And Remote URLs
3435

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2396,6 +2396,152 @@ describe("ProviderRuntimeIngestion", () => {
23962396
).toBe(false);
23972397
});
23982398

2399+
it("reconciles cursor load replay against already projected assistant text before the next turn", async () => {
2400+
const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } });
2401+
const now = "2026-01-01T00:00:00.000Z";
2402+
const replayedItemId = asItemId("cursor-replay-before-turn");
2403+
2404+
harness.emit({
2405+
type: "turn.started",
2406+
eventId: asEventId("evt-cursor-load-reconcile-turn-1-started"),
2407+
provider: ProviderDriverKind.make("cursor"),
2408+
createdAt: now,
2409+
threadId: asThreadId("thread-1"),
2410+
turnId: asTurnId("turn-cursor-load-reconcile-1"),
2411+
});
2412+
harness.emit({
2413+
type: "content.delta",
2414+
eventId: asEventId("evt-cursor-load-reconcile-turn-1-delta"),
2415+
provider: ProviderDriverKind.make("cursor"),
2416+
createdAt: now,
2417+
threadId: asThreadId("thread-1"),
2418+
turnId: asTurnId("turn-cursor-load-reconcile-1"),
2419+
itemId: replayedItemId,
2420+
payload: {
2421+
streamKind: "assistant_text",
2422+
delta: "already seen",
2423+
},
2424+
});
2425+
harness.emit({
2426+
type: "turn.completed",
2427+
eventId: asEventId("evt-cursor-load-reconcile-turn-1-completed"),
2428+
provider: ProviderDriverKind.make("cursor"),
2429+
createdAt: now,
2430+
threadId: asThreadId("thread-1"),
2431+
turnId: asTurnId("turn-cursor-load-reconcile-1"),
2432+
status: "completed",
2433+
});
2434+
2435+
await waitForThread(
2436+
harness.readModel,
2437+
(thread) =>
2438+
thread.session?.status === "ready" &&
2439+
thread.messages.some(
2440+
(message: ProviderRuntimeTestMessage) => message.text === "already seen",
2441+
),
2442+
);
2443+
2444+
harness.emit({
2445+
type: "content.delta",
2446+
eventId: asEventId("evt-cursor-load-reconcile-replayed-delta"),
2447+
provider: ProviderDriverKind.make("cursor"),
2448+
createdAt: now,
2449+
threadId: asThreadId("thread-1"),
2450+
itemId: replayedItemId,
2451+
payload: {
2452+
streamKind: "assistant_text",
2453+
delta: "already seen",
2454+
},
2455+
});
2456+
await harness.drain();
2457+
2458+
const snapshot = await harness.readModel();
2459+
const thread = snapshot.threads.find((entry) => entry.id === asThreadId("thread-1"));
2460+
expect(thread?.messages.map((message: ProviderRuntimeTestMessage) => message.text)).toEqual([
2461+
"already seen",
2462+
]);
2463+
});
2464+
2465+
it("projects unseen cursor load replay assistant text before the next turn", async () => {
2466+
const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } });
2467+
const now = "2026-01-01T00:00:00.000Z";
2468+
const replayedItemId = asItemId("cursor-replay-with-new-text");
2469+
2470+
harness.emit({
2471+
type: "turn.started",
2472+
eventId: asEventId("evt-cursor-load-reconcile-new-turn-1-started"),
2473+
provider: ProviderDriverKind.make("cursor"),
2474+
createdAt: now,
2475+
threadId: asThreadId("thread-1"),
2476+
turnId: asTurnId("turn-cursor-load-reconcile-new-1"),
2477+
});
2478+
harness.emit({
2479+
type: "content.delta",
2480+
eventId: asEventId("evt-cursor-load-reconcile-new-turn-1-delta"),
2481+
provider: ProviderDriverKind.make("cursor"),
2482+
createdAt: now,
2483+
threadId: asThreadId("thread-1"),
2484+
turnId: asTurnId("turn-cursor-load-reconcile-new-1"),
2485+
itemId: replayedItemId,
2486+
payload: {
2487+
streamKind: "assistant_text",
2488+
delta: "already seen",
2489+
},
2490+
});
2491+
harness.emit({
2492+
type: "turn.completed",
2493+
eventId: asEventId("evt-cursor-load-reconcile-new-turn-1-completed"),
2494+
provider: ProviderDriverKind.make("cursor"),
2495+
createdAt: now,
2496+
threadId: asThreadId("thread-1"),
2497+
turnId: asTurnId("turn-cursor-load-reconcile-new-1"),
2498+
status: "completed",
2499+
});
2500+
2501+
await waitForThread(
2502+
harness.readModel,
2503+
(thread) =>
2504+
thread.session?.status === "ready" &&
2505+
thread.messages.some(
2506+
(message: ProviderRuntimeTestMessage) => message.text === "already seen",
2507+
),
2508+
);
2509+
2510+
harness.emit({
2511+
type: "content.delta",
2512+
eventId: asEventId("evt-cursor-load-reconcile-new-replayed-delta"),
2513+
provider: ProviderDriverKind.make("cursor"),
2514+
createdAt: now,
2515+
threadId: asThreadId("thread-1"),
2516+
itemId: replayedItemId,
2517+
payload: {
2518+
streamKind: "assistant_text",
2519+
delta: "already seen",
2520+
},
2521+
});
2522+
harness.emit({
2523+
type: "content.delta",
2524+
eventId: asEventId("evt-cursor-load-reconcile-new-unseen-delta"),
2525+
provider: ProviderDriverKind.make("cursor"),
2526+
createdAt: now,
2527+
threadId: asThreadId("thread-1"),
2528+
itemId: replayedItemId,
2529+
payload: {
2530+
streamKind: "assistant_text",
2531+
delta: "outside change",
2532+
},
2533+
});
2534+
2535+
const thread = await waitForThread(harness.readModel, (entry) =>
2536+
entry.messages.some(
2537+
(message: ProviderRuntimeTestMessage) => message.text === "outside change",
2538+
),
2539+
);
2540+
expect(
2541+
thread.messages.map((message: ProviderRuntimeTestMessage) => message.text).sort(),
2542+
).toEqual(["already seen", "outside change"]);
2543+
});
2544+
23992545
it("streams assistant deltas when thread.turn.start requests streaming mode", async () => {
24002546
const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } });
24012547
const now = "2026-01-01T00:00:00.000Z";

apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts

Lines changed: 78 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ const TURN_MESSAGE_IDS_BY_TURN_CACHE_CAPACITY = 10_000;
5555
const TURN_MESSAGE_IDS_BY_TURN_TTL = Duration.minutes(120);
5656
const STALE_REPLAY_ITEM_IDS_BY_TURN_CACHE_CAPACITY = 10_000;
5757
const STALE_REPLAY_ITEM_IDS_BY_TURN_TTL = Duration.minutes(120);
58+
const CURSOR_REPLAY_REMAINING_ASSISTANT_TEXT_BY_THREAD_CACHE_CAPACITY = 10_000;
59+
const CURSOR_REPLAY_REMAINING_ASSISTANT_TEXT_BY_THREAD_TTL = Duration.minutes(120);
5860
const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000;
5961
const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120);
6062
const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000;
@@ -228,6 +230,26 @@ function hasRenderableAssistantText(text: string | undefined): boolean {
228230
return (text?.trim().length ?? 0) > 0;
229231
}
230232

233+
function joinedAssistantMessageText(messages: ReadonlyArray<OrchestrationMessage>): string {
234+
let text = "";
235+
for (let index = 0; index < messages.length; index += 1) {
236+
const message = messages[index];
237+
if (message?.role === "assistant" && message.text.length > 0) {
238+
text += message.text;
239+
}
240+
}
241+
return text;
242+
}
243+
244+
function isCursorTurnlessAssistantDelta(event: ProviderRuntimeEvent): boolean {
245+
return (
246+
String(event.provider) === "cursor" &&
247+
event.type === "content.delta" &&
248+
event.turnId === undefined &&
249+
event.payload.streamKind === "assistant_text"
250+
);
251+
}
252+
231253
function proposedPlanIdForTurn(threadId: ThreadId, turnId: TurnId): string {
232254
return `plan:${threadId}:turn:${turnId}`;
233255
}
@@ -797,6 +819,12 @@ const make = Effect.gen(function* () {
797819
lookup: () => Effect.succeed(new Set<string>()),
798820
});
799821

822+
const cursorReplayRemainingAssistantTextByThreadId = yield* Cache.make<ThreadId, string>({
823+
capacity: CURSOR_REPLAY_REMAINING_ASSISTANT_TEXT_BY_THREAD_CACHE_CAPACITY,
824+
timeToLive: CURSOR_REPLAY_REMAINING_ASSISTANT_TEXT_BY_THREAD_TTL,
825+
lookup: () => Effect.succeed(""),
826+
});
827+
800828
const bufferedProposedPlanById = yield* Cache.make<string, { text: string; createdAt: string }>({
801829
capacity: BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY,
802830
timeToLive: BUFFERED_PROPOSED_PLAN_BY_ID_TTL,
@@ -909,6 +937,39 @@ const make = Effect.gen(function* () {
909937
const clearStaleReplayItemsForTurn = (threadId: ThreadId, turnId: TurnId) =>
910938
Cache.invalidate(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId));
911939

940+
const reconcileCursorTurnlessAssistantDelta = (input: {
941+
threadId: ThreadId;
942+
messages: ReadonlyArray<OrchestrationMessage>;
943+
delta: string;
944+
}) =>
945+
Cache.getOption(cursorReplayRemainingAssistantTextByThreadId, input.threadId).pipe(
946+
Effect.flatMap((cachedRemaining) =>
947+
Effect.gen(function* () {
948+
const remainingAlreadySeenText = Option.getOrElse(cachedRemaining, () =>
949+
joinedAssistantMessageText(input.messages),
950+
);
951+
if (remainingAlreadySeenText.length === 0) {
952+
yield* Cache.set(cursorReplayRemainingAssistantTextByThreadId, input.threadId, "");
953+
return input.delta;
954+
}
955+
if (remainingAlreadySeenText.startsWith(input.delta)) {
956+
yield* Cache.set(
957+
cursorReplayRemainingAssistantTextByThreadId,
958+
input.threadId,
959+
remainingAlreadySeenText.slice(input.delta.length),
960+
);
961+
return "";
962+
}
963+
if (input.delta.startsWith(remainingAlreadySeenText)) {
964+
yield* Cache.set(cursorReplayRemainingAssistantTextByThreadId, input.threadId, "");
965+
return input.delta.slice(remainingAlreadySeenText.length);
966+
}
967+
yield* Cache.set(cursorReplayRemainingAssistantTextByThreadId, input.threadId, "");
968+
return input.delta;
969+
}),
970+
),
971+
);
972+
912973
const getActiveAssistantMessageIdForTurn = (threadId: ThreadId, turnId: TurnId) =>
913974
getAssistantSegmentStateForTurn(threadId, turnId).pipe(
914975
Effect.map((state) =>
@@ -1565,6 +1626,16 @@ const make = Effect.gen(function* () {
15651626

15661627
if (assistantDelta && assistantDelta.length > 0) {
15671628
const turnId = toTurnId(event.turnId);
1629+
const reconciledAssistantDelta = isCursorTurnlessAssistantDelta(event)
1630+
? yield* reconcileCursorTurnlessAssistantDelta({
1631+
threadId: thread.id,
1632+
messages: (yield* getLoadedThreadDetail())?.messages ?? [],
1633+
delta: assistantDelta,
1634+
})
1635+
: assistantDelta;
1636+
if (reconciledAssistantDelta.length === 0) {
1637+
return;
1638+
}
15681639
const assistantMessageId = yield* getOrCreateAssistantMessageId({
15691640
threadId: thread.id,
15701641
event,
@@ -1579,7 +1650,10 @@ const make = Effect.gen(function* () {
15791650
(settings) => (settings.enableAssistantStreaming ? "streaming" : "buffered"),
15801651
);
15811652
if (assistantDeliveryMode === "buffered") {
1582-
const spillChunk = yield* appendBufferedAssistantText(assistantMessageId, assistantDelta);
1653+
const spillChunk = yield* appendBufferedAssistantText(
1654+
assistantMessageId,
1655+
reconciledAssistantDelta,
1656+
);
15831657
if (spillChunk.length > 0) {
15841658
yield* orchestrationEngine.dispatch({
15851659
type: "thread.message.assistant.delta",
@@ -1597,7 +1671,7 @@ const make = Effect.gen(function* () {
15971671
commandId: yield* providerCommandId(event, "assistant-delta"),
15981672
threadId: thread.id,
15991673
messageId: assistantMessageId,
1600-
delta: assistantDelta,
1674+
delta: reconciledAssistantDelta,
16011675
...(turnId ? { turnId } : {}),
16021676
createdAt: now,
16031677
});
@@ -1760,6 +1834,7 @@ const make = Effect.gen(function* () {
17601834
yield* clearAssistantMessageIdsForTurn(thread.id, turnId);
17611835
yield* clearAssistantSegmentStateForTurn(thread.id, turnId);
17621836
yield* clearStaleReplayItemsForTurn(thread.id, turnId);
1837+
yield* Cache.invalidate(cursorReplayRemainingAssistantTextByThreadId, thread.id);
17631838

17641839
yield* finalizeBufferedProposedPlan({
17651840
event,
@@ -1774,6 +1849,7 @@ const make = Effect.gen(function* () {
17741849

17751850
if (event.type === "session.exited") {
17761851
yield* clearTurnStateForSession(thread.id);
1852+
yield* Cache.invalidate(cursorReplayRemainingAssistantTextByThreadId, thread.id);
17771853
}
17781854

17791855
if (event.type === "runtime.error") {
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import { assert, it } from "@effect/vitest";
2+
import * as Effect from "effect/Effect";
3+
import * as Queue from "effect/Queue";
4+
import * as Ref from "effect/Ref";
5+
import type * as EffectAcpSchema from "effect-acp/schema";
6+
7+
import { handleSessionUpdateForTest } from "./AcpSessionRuntime.ts";
8+
import type {
9+
AcpParsedSessionEvent,
10+
AcpSessionModeState,
11+
AcpToolCallState,
12+
} from "./AcpRuntimeModel.ts";
13+
14+
it.effect("emits loaded-session updates for downstream reconciliation", () =>
15+
Effect.gen(function* () {
16+
const queue = yield* Queue.unbounded<AcpParsedSessionEvent>();
17+
const modeStateRef = yield* Ref.make<AcpSessionModeState | undefined>({
18+
currentModeId: "ask",
19+
availableModes: [
20+
{ id: "ask", name: "Ask" },
21+
{ id: "code", name: "Code" },
22+
],
23+
});
24+
const toolCallsRef = yield* Ref.make(new Map<string, AcpToolCallState>());
25+
const assistantSegmentRef = yield* Ref.make({ nextSegmentIndex: 0 });
26+
27+
const handle = (params: EffectAcpSchema.SessionNotification) =>
28+
handleSessionUpdateForTest({
29+
queue,
30+
modeStateRef,
31+
toolCallsRef,
32+
assistantSegmentRef,
33+
params,
34+
});
35+
36+
yield* handle({
37+
sessionId: "cursor-session",
38+
update: {
39+
sessionUpdate: "current_mode_update",
40+
currentModeId: "code",
41+
},
42+
});
43+
yield* handle({
44+
sessionId: "cursor-session",
45+
update: {
46+
sessionUpdate: "plan",
47+
entries: [{ content: "Old replayed plan", priority: "high", status: "completed" }],
48+
},
49+
});
50+
yield* handle({
51+
sessionId: "cursor-session",
52+
update: {
53+
sessionUpdate: "user_message_chunk",
54+
content: { type: "text", text: "old replayed user prompt" },
55+
},
56+
});
57+
yield* handle({
58+
sessionId: "cursor-session",
59+
update: {
60+
sessionUpdate: "agent_message_chunk",
61+
content: { type: "text", text: "old replayed assistant text" },
62+
},
63+
});
64+
65+
assert.equal(yield* Queue.size(queue), 4);
66+
assert.equal((yield* Ref.get(modeStateRef))?.currentModeId, "code");
67+
68+
const mode = yield* Queue.take(queue);
69+
const plan = yield* Queue.take(queue);
70+
const started = yield* Queue.take(queue);
71+
const delta = yield* Queue.take(queue);
72+
assert.equal(mode._tag, "ModeChanged");
73+
assert.equal(plan._tag, "PlanUpdated");
74+
assert.equal(started._tag, "AssistantItemStarted");
75+
assert.equal(delta._tag, "ContentDelta");
76+
if (delta._tag === "ContentDelta") {
77+
assert.equal(delta.text, "old replayed assistant text");
78+
}
79+
assert.equal(yield* Queue.size(queue), 0);
80+
}),
81+
);

apps/server/src/provider/acp/AcpSessionRuntime.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -672,6 +672,8 @@ const handleSessionUpdate = ({
672672
}
673673
});
674674

675+
export const handleSessionUpdateForTest = handleSessionUpdate;
676+
675677
function updateModeState(modeState: AcpSessionModeState, nextModeId: string): AcpSessionModeState {
676678
const normalized = nextModeId.trim();
677679
if (!normalized) {

0 commit comments

Comments
 (0)