Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
166 changes: 166 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2230,6 +2230,172 @@ describe("ProviderRuntimeIngestion", () => {
expect(secondMessage?.text).toBe("second turn response");
});

it("ignores cursor assistant item replay without a turn id while a new turn is active", async () => {
const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } });
const now = "2026-01-01T00:00:00.000Z";
const replayedItemId = asItemId("assistant:cursor-session:segment:0");
const secondItemId = asItemId("assistant:cursor-session:segment:1");

harness.emit({
type: "turn.started",
eventId: asEventId("evt-cursor-replay-turn-1-started"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-1"),
});
harness.emit({
type: "content.delta",
eventId: asEventId("evt-cursor-replay-turn-1-delta"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-1"),
itemId: replayedItemId,
payload: {
streamKind: "assistant_text",
delta: "first turn response",
},
});
harness.emit({
type: "item.completed",
eventId: asEventId("evt-cursor-replay-turn-1-complete"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-1"),
itemId: replayedItemId,
payload: {
itemType: "assistant_message",
status: "completed",
},
});
harness.emit({
type: "turn.completed",
eventId: asEventId("evt-cursor-replay-turn-1-turn-complete"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-1"),
status: "completed",
});

await waitForThread(
harness.readModel,
(thread) =>
thread.session?.status === "ready" &&
thread.messages.some(
(message: ProviderRuntimeTestMessage) => message.text === "first turn response",
),
);

harness.emit({
type: "turn.started",
eventId: asEventId("evt-cursor-replay-turn-2-started"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-2"),
});
await waitForThread(
harness.readModel,
(thread) =>
thread.session?.status === "running" &&
thread.session.activeTurnId === "turn-cursor-replay-2",
);

// Cursor can replay the previous assistant segment after an ACP session
// resume without attaching a turn id, then report that same item completed
// under the active turn. Neither event belongs to the new turn.
harness.emit({
type: "item.started",
eventId: asEventId("evt-cursor-replay-stale-started"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
itemId: replayedItemId,
payload: {
itemType: "assistant_message",
status: "inProgress",
},
});
harness.emit({
type: "content.delta",
eventId: asEventId("evt-cursor-replay-stale-delta"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
itemId: replayedItemId,
payload: {
streamKind: "assistant_text",
delta: "first turn response",
},
});
harness.emit({
type: "item.completed",
eventId: asEventId("evt-cursor-replay-stale-complete"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-2"),
itemId: replayedItemId,
payload: {
itemType: "assistant_message",
status: "completed",
},
});

harness.emit({
type: "content.delta",
eventId: asEventId("evt-cursor-replay-turn-2-delta"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-2"),
itemId: secondItemId,
payload: {
streamKind: "assistant_text",
delta: "second turn response",
},
});
harness.emit({
type: "item.completed",
eventId: asEventId("evt-cursor-replay-turn-2-complete"),
provider: ProviderDriverKind.make("cursor"),
createdAt: now,
threadId: asThreadId("thread-1"),
turnId: asTurnId("turn-cursor-replay-2"),
itemId: secondItemId,
payload: {
itemType: "assistant_message",
status: "completed",
},
});

const thread = await waitForThread(harness.readModel, (entry) =>
entry.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id === turnScopedAssistantMessageId("turn-cursor-replay-2", String(secondItemId)),
),
);
expect(thread.messages.map((message: ProviderRuntimeTestMessage) => message.text)).toEqual([
"first turn response",
"second turn response",
]);
expect(
thread.messages.some(
(message: ProviderRuntimeTestMessage) => message.id === `assistant:${replayedItemId}`,
),
).toBe(false);
expect(
thread.messages.some(
(message: ProviderRuntimeTestMessage) =>
message.id ===
turnScopedAssistantMessageId("turn-cursor-replay-2", String(replayedItemId)),
),
).toBe(false);
});

it("streams assistant deltas when thread.turn.start requests streaming mode", async () => {
const harness = await createHarness({ serverSettings: { enableAssistantStreaming: true } });
const now = "2026-01-01T00:00:00.000Z";
Expand Down
92 changes: 92 additions & 0 deletions apps/server/src/orchestration/Layers/ProviderRuntimeIngestion.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ type GoalActivityState = Pick<OrchestrationThreadGoal, "objective" | "status">;

const TURN_MESSAGE_IDS_BY_TURN_CACHE_CAPACITY = 10_000;
const TURN_MESSAGE_IDS_BY_TURN_TTL = Duration.minutes(120);
const STALE_REPLAY_ITEM_IDS_BY_TURN_CACHE_CAPACITY = 10_000;
const STALE_REPLAY_ITEM_IDS_BY_TURN_TTL = Duration.minutes(120);
const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_CACHE_CAPACITY = 20_000;
const BUFFERED_MESSAGE_TEXT_BY_MESSAGE_ID_TTL = Duration.minutes(120);
const BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY = 10_000;
Expand Down Expand Up @@ -258,6 +260,34 @@ function runtimeAssistantMessageIdFromEvent(
): MessageId {
return assistantSegmentMessageId(assistantSegmentBaseKeyFromEvent(event, turnId), 0);
}

function isTurnOutputRuntimeEvent(event: ProviderRuntimeEvent): boolean {
switch (event.type) {
case "content.delta":
case "item.started":
case "item.updated":
case "item.completed":
case "request.opened":
case "request.resolved":
case "runtime.warning":
case "task.started":
case "task.progress":
case "task.completed":
case "thread.state.changed":
case "thread.token-usage.updated":
case "tool.denied":
case "turn.completed":
case "turn.diff.updated":
case "turn.plan.updated":
case "turn.proposed.completed":
case "turn.proposed.delta":
case "user-input.requested":
case "user-input.resolved":
return true;
default:
return false;
}
}
function buildContextWindowActivityPayload(
event: ProviderRuntimeEvent,
): ThreadTokenUsageSnapshot | undefined {
Expand Down Expand Up @@ -761,6 +791,12 @@ const make = Effect.gen(function* () {
),
});

const staleReplayItemIdsByTurnKey = yield* Cache.make<string, Set<string>>({
capacity: STALE_REPLAY_ITEM_IDS_BY_TURN_CACHE_CAPACITY,
timeToLive: STALE_REPLAY_ITEM_IDS_BY_TURN_TTL,
lookup: () => Effect.succeed(new Set<string>()),
});

const bufferedProposedPlanById = yield* Cache.make<string, { text: string; createdAt: string }>({
capacity: BUFFERED_PROPOSED_PLAN_BY_ID_CACHE_CAPACITY,
timeToLive: BUFFERED_PROPOSED_PLAN_BY_ID_TTL,
Expand Down Expand Up @@ -842,6 +878,37 @@ const make = Effect.gen(function* () {
const clearAssistantSegmentStateForTurn = (threadId: ThreadId, turnId: TurnId) =>
Cache.invalidate(assistantSegmentStateByTurnKey, providerTurnKey(threadId, turnId));

const rememberStaleReplayItemForTurn = (threadId: ThreadId, turnId: TurnId, itemId: string) =>
Cache.getOption(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)).pipe(
Effect.flatMap((existingIds) =>
Cache.set(
staleReplayItemIdsByTurnKey,
providerTurnKey(threadId, turnId),
Option.match(existingIds, {
onNone: () => new Set([itemId]),
onSome: (ids) => {
const nextIds = new Set(ids);
nextIds.add(itemId);
return nextIds;
},
}),
),
),
);

const hasStaleReplayItemForTurn = (threadId: ThreadId, turnId: TurnId, itemId: string) =>
Cache.getOption(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId)).pipe(
Effect.map((existingIds) =>
Option.match(existingIds, {
onNone: () => false,
onSome: (ids) => ids.has(itemId),
}),
),
);

const clearStaleReplayItemsForTurn = (threadId: ThreadId, turnId: TurnId) =>
Cache.invalidate(staleReplayItemIdsByTurnKey, providerTurnKey(threadId, turnId));

const getActiveAssistantMessageIdForTurn = (threadId: ThreadId, turnId: TurnId) =>
getAssistantSegmentStateForTurn(threadId, turnId).pipe(
Effect.map((state) =>
Expand Down Expand Up @@ -1196,6 +1263,7 @@ const make = Effect.gen(function* () {
const proposedPlanPrefix = `plan:${threadId}:`;
const turnKeys = Array.from(yield* Cache.keys(turnMessageIdsByTurnKey));
const assistantSegmentKeys = Array.from(yield* Cache.keys(assistantSegmentStateByTurnKey));
const staleReplayKeys = Array.from(yield* Cache.keys(staleReplayItemIdsByTurnKey));
const proposedPlanKeys = Array.from(yield* Cache.keys(bufferedProposedPlanById));
yield* Effect.forEach(
turnKeys,
Expand Down Expand Up @@ -1224,6 +1292,12 @@ const make = Effect.gen(function* () {
: Effect.void,
{ concurrency: 1 },
).pipe(Effect.asVoid);
yield* Effect.forEach(
staleReplayKeys,
(key) =>
key.startsWith(prefix) ? Cache.invalidate(staleReplayItemIdsByTurnKey, key) : Effect.void,
{ concurrency: 1 },
).pipe(Effect.asVoid);
yield* Effect.forEach(
proposedPlanKeys,
(key) =>
Expand Down Expand Up @@ -1465,6 +1539,23 @@ const make = Effect.gen(function* () {
}
}

const eventItemId = event.itemId === undefined ? undefined : String(event.itemId);
const staleReplayItemForActiveTurn =
activeTurnId !== null && eventItemId !== undefined
? yield* hasStaleReplayItemForTurn(thread.id, activeTurnId, eventItemId)
: false;
const shouldSkipRuntimeOutput =
STRICT_PROVIDER_LIFECYCLE_GUARD &&
isTurnOutputRuntimeEvent(event) &&
(conflictsWithActiveTurn || missingTurnForActiveTurn || staleReplayItemForActiveTurn);

if (shouldSkipRuntimeOutput) {
if (missingTurnForActiveTurn && activeTurnId !== null && eventItemId !== undefined) {
yield* rememberStaleReplayItemForTurn(thread.id, activeTurnId, eventItemId);
}
return;
}

const assistantDelta =
event.type === "content.delta" && event.payload.streamKind === "assistant_text"
? event.payload.delta
Expand Down Expand Up @@ -1668,6 +1759,7 @@ const make = Effect.gen(function* () {
).pipe(Effect.asVoid);
yield* clearAssistantMessageIdsForTurn(thread.id, turnId);
yield* clearAssistantSegmentStateForTurn(thread.id, turnId);
yield* clearStaleReplayItemsForTurn(thread.id, turnId);

yield* finalizeBufferedProposedPlan({
event,
Expand Down
Loading