diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts index 65805bc6a41..2c54d4156e5 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts @@ -2230,6 +2230,172 @@ describe("ProviderRuntimeIngestion", () => { expect(secondMessage?.text).toBe("second turn response"); }); + it("ignores cursor assistant item replay without a turn id while a new turn is active", async () => { + const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); + const now = "2026-01-01T00:00:00.000Z"; + const replayedItemId = asItemId("assistant:cursor-session:segment:0"); + const secondItemId = asItemId("assistant:cursor-session:segment:1"); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-cursor-replay-turn-1-started"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-1"), + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-cursor-replay-turn-1-delta"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-1"), + itemId: replayedItemId, + payload: { + streamKind: "assistant_text", + delta: "first turn response", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-cursor-replay-turn-1-complete"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-1"), + itemId: replayedItemId, + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + harness.emit({ + type: "turn.completed", + eventId: asEventId("evt-cursor-replay-turn-1-turn-complete"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-1"), + status: "completed", + }); + + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "ready" && + thread.messages.some( + (message: ProviderRuntimeTestMessage) => message.text === "first turn response", + ), + ); + + harness.emit({ + type: "turn.started", + eventId: asEventId("evt-cursor-replay-turn-2-started"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-2"), + }); + await waitForThread( + harness.readModel, + (thread) => + thread.session?.status === "running" && + thread.session.activeTurnId === "turn-cursor-replay-2", + ); + + // Cursor can replay the previous assistant segment after an ACP session + // resume without attaching a turn id, then report that same item completed + // under the active turn. Neither event belongs to the new turn. + harness.emit({ + type: "item.started", + eventId: asEventId("evt-cursor-replay-stale-started"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + itemId: replayedItemId, + payload: { + itemType: "assistant_message", + status: "inProgress", + }, + }); + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-cursor-replay-stale-delta"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + itemId: replayedItemId, + payload: { + streamKind: "assistant_text", + delta: "first turn response", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-cursor-replay-stale-complete"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-2"), + itemId: replayedItemId, + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + harness.emit({ + type: "content.delta", + eventId: asEventId("evt-cursor-replay-turn-2-delta"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-2"), + itemId: secondItemId, + payload: { + streamKind: "assistant_text", + delta: "second turn response", + }, + }); + harness.emit({ + type: "item.completed", + eventId: asEventId("evt-cursor-replay-turn-2-complete"), + provider: ProviderDriverKind.make("cursor"), + createdAt: now, + threadId: asThreadId("thread-1"), + turnId: asTurnId("turn-cursor-replay-2"), + itemId: secondItemId, + payload: { + itemType: "assistant_message", + status: "completed", + }, + }); + + const thread = await waitForThread(harness.readModel, (entry) => + entry.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === turnScopedAssistantMessageId("turn-cursor-replay-2", String(secondItemId)), + ), + ); + expect(thread.messages.map((message: ProviderRuntimeTestMessage) => message.text)).toEqual([ + "first turn response", + "second turn response", + ]); + expect( + thread.messages.some( + (message: ProviderRuntimeTestMessage) => message.id === `assistant:${replayedItemId}`, + ), + ).toBe(false); + expect( + thread.messages.some( + (message: ProviderRuntimeTestMessage) => + message.id === + turnScopedAssistantMessageId("turn-cursor-replay-2", String(replayedItemId)), + ), + ).toBe(false); + }); + it("streams assistant deltas when thread.turn.start requests streaming mode", async () => { const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } }); const now = "2026-01-01T00:00:00.000Z"; diff --git a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts index 5e8d88f2f82..9ffd67a00fb 100644 --- a/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts +++ b/apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts @@ -53,6 +53,8 @@ type GoalActivityState = Pick; const TURN_MESSAGE_IDS_BY_TURN_CACHE_CAPACITY = 10_000; const TURN_MESSAGE_IDS_BY_TURN_TTL = Duration.minutes(120); +const STALE_REPLAY_ITEM_IDS_BY_TURN_CACHE_CAPACITY = 10_000; +const STALE_REPLAY_ITEM_IDS_BY_TURN_TTL = Duration.minutes(120); const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000; const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120); const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000; @@ -258,6 +260,34 @@ function runtimeAssistantMessageIdFromEvent( ): MessageId { return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(event, turnId), 0); } + +function isTurnOutputRuntimeEvent(event: ProviderRuntimeEvent): boolean { + switch (event.type) { + case "content.delta": + case "item.started": + case "item.updated": + case "item.completed": + case "request.opened": + case "request.resolved": + case "runtime.warning": + case "task.started": + case "task.progress": + case "task.completed": + case "thread.state.changed": + case "thread.token-usage.updated": + case "tool.denied": + case "turn.completed": + case "turn.diff.updated": + case "turn.plan.updated": + case "turn.proposed.completed": + case "turn.proposed.delta": + case "user-input.requested": + case "user-input.resolved": + return true; + default: + return false; + } +} function buildContextWindowActivityPayload( event: ProviderRuntimeEvent, ): ThreadTokenUsageSnapshot | undefined { @@ -761,6 +791,12 @@ const make = Effect.gen(function* () { ), }); + const staleReplayItemIdsByTurnKey = yield* Cache.make>({ + capacity: STALE_REPLAY_ITEM_IDS_BY_TURN_CACHE_CAPACITY, + timeToLive: STALE_REPLAY_ITEM_IDS_BY_TURN_TTL, + lookup: () => Effect.succeed(new Set()), + }); + const bufferedProposedPlanById = yield* Cache.make({ capacity: BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY, timeToLive: BUFFERED_PROPOSED_PLAN_BY_ID_TTL, @@ -842,6 +878,37 @@ const make = Effect.gen(function* () { const clearAssistantSegmentStateForTurn = (threadId: ThreadId, turnId: TurnId) => Cache.invalidate(assistantSegmentStateByTurnKey, providerTurnKey(threadId, turnId)); + const rememberStaleReplayItemForTurn = (threadId: ThreadId, turnId: TurnId, itemId: string) => + Cache.getOption(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)).pipe( + Effect.flatMap((existingIds) => + Cache.set( + staleReplayItemIdsByTurnKey, + providerTurnKey(threadId, turnId), + Option.match(existingIds, { + onNone: () => new Set([itemId]), + onSome: (ids) => { + const nextIds = new Set(ids); + nextIds.add(itemId); + return nextIds; + }, + }), + ), + ), + ); + + const hasStaleReplayItemForTurn = (threadId: ThreadId, turnId: TurnId, itemId: string) => + Cache.getOption(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)).pipe( + Effect.map((existingIds) => + Option.match(existingIds, { + onNone: () => false, + onSome: (ids) => ids.has(itemId), + }), + ), + ); + + const clearStaleReplayItemsForTurn = (threadId: ThreadId, turnId: TurnId) => + Cache.invalidate(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)); + const getActiveAssistantMessageIdForTurn = (threadId: ThreadId, turnId: TurnId) => getAssistantSegmentStateForTurn(threadId, turnId).pipe( Effect.map((state) => @@ -1196,6 +1263,7 @@ const make = Effect.gen(function* () { const proposedPlanPrefix = `plan:${threadId}:`; const turnKeys = Array.from(yield* Cache.keys(turnMessageIdsByTurnKey)); const assistantSegmentKeys = Array.from(yield* Cache.keys(assistantSegmentStateByTurnKey)); + const staleReplayKeys = Array.from(yield* Cache.keys(staleReplayItemIdsByTurnKey)); const proposedPlanKeys = Array.from(yield* Cache.keys(bufferedProposedPlanById)); yield* Effect.forEach( turnKeys, @@ -1224,6 +1292,12 @@ const make = Effect.gen(function* () { : Effect.void, { concurrency: 1 }, ).pipe(Effect.asVoid); + yield* Effect.forEach( + staleReplayKeys, + (key) => + key.startsWith(prefix) ? Cache.invalidate(staleReplayItemIdsByTurnKey, key) : Effect.void, + { concurrency: 1 }, + ).pipe(Effect.asVoid); yield* Effect.forEach( proposedPlanKeys, (key) => @@ -1465,6 +1539,23 @@ const make = Effect.gen(function* () { } } + const eventItemId = event.itemId === undefined ? undefined : String(event.itemId); + const staleReplayItemForActiveTurn = + activeTurnId !== null && eventItemId !== undefined + ? yield* hasStaleReplayItemForTurn(thread.id, activeTurnId, eventItemId) + : false; + const shouldSkipRuntimeOutput = + STRICT_PROVIDER_LIFECYCLE_GUARD && + isTurnOutputRuntimeEvent(event) && + (conflictsWithActiveTurn || missingTurnForActiveTurn || staleReplayItemForActiveTurn); + + if (shouldSkipRuntimeOutput) { + if (missingTurnForActiveTurn && activeTurnId !== null && eventItemId !== undefined) { + yield* rememberStaleReplayItemForTurn(thread.id, activeTurnId, eventItemId); + } + return; + } + const assistantDelta = event.type === "content.delta" && event.payload.streamKind === "assistant_text" ? event.payload.delta @@ -1668,6 +1759,7 @@ const make = Effect.gen(function* () { ).pipe(Effect.asVoid); yield* clearAssistantMessageIdsForTurn(thread.id, turnId); yield* clearAssistantSegmentStateForTurn(thread.id, turnId); + yield* clearStaleReplayItemsForTurn(thread.id, turnId); yield* finalizeBufferedProposedPlan({ event,