Skip to content

Commit 0d55a42

Browse files
fix(web): ignore stale runtime projection snapshots (#2301)
1 parent 0ee302e commit 0d55a42

2 files changed

Lines changed: 229 additions & 1 deletion

File tree

apps/web/src/environments/runtime/service.test.ts

Lines changed: 108 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,10 @@
11
import { describe, expect, it } from "vitest";
22

3-
import { shouldApplyTerminalEvent } from "./service";
3+
import {
4+
shouldApplyProjectionEvent,
5+
shouldApplyProjectionSnapshot,
6+
shouldApplyTerminalEvent,
7+
} from "./service";
48

59
describe("shouldApplyTerminalEvent", () => {
610
it("applies terminal events for draft-only threads", () => {
@@ -39,3 +43,106 @@ describe("shouldApplyTerminalEvent", () => {
3943
).toBe(true);
4044
});
4145
});
46+
47+
describe("shouldApplyProjectionSnapshot", () => {
48+
it("accepts the first snapshot for an environment", () => {
49+
expect(
50+
shouldApplyProjectionSnapshot({
51+
current: null,
52+
next: {
53+
snapshotSequence: 1,
54+
updatedAt: "2026-04-22T10:00:00.000Z",
55+
},
56+
}),
57+
).toBe(true);
58+
});
59+
60+
it("drops snapshots with an older sequence", () => {
61+
expect(
62+
shouldApplyProjectionSnapshot({
63+
current: {
64+
sequence: 5,
65+
updatedAt: "2026-04-22T10:05:00.000Z",
66+
},
67+
next: {
68+
snapshotSequence: 4,
69+
updatedAt: "2026-04-22T10:06:00.000Z",
70+
},
71+
}),
72+
).toBe(false);
73+
});
74+
75+
it("drops snapshots with the same sequence and older timestamp", () => {
76+
expect(
77+
shouldApplyProjectionSnapshot({
78+
current: {
79+
sequence: 5,
80+
updatedAt: "2026-04-22T10:05:00.000Z",
81+
},
82+
next: {
83+
snapshotSequence: 5,
84+
updatedAt: "2026-04-22T10:04:59.000Z",
85+
},
86+
}),
87+
).toBe(false);
88+
});
89+
90+
it("accepts snapshots with the same sequence and a newer timestamp", () => {
91+
expect(
92+
shouldApplyProjectionSnapshot({
93+
current: {
94+
sequence: 5,
95+
updatedAt: "2026-04-22T10:05:00.000Z",
96+
},
97+
next: {
98+
snapshotSequence: 5,
99+
updatedAt: "2026-04-22T10:05:01.000Z",
100+
},
101+
}),
102+
).toBe(true);
103+
});
104+
});
105+
106+
describe("shouldApplyProjectionEvent", () => {
107+
it("accepts the first event for an environment", () => {
108+
expect(
109+
shouldApplyProjectionEvent({
110+
current: null,
111+
sequence: 1,
112+
}),
113+
).toBe(true);
114+
});
115+
116+
it("drops stale or duplicate events", () => {
117+
expect(
118+
shouldApplyProjectionEvent({
119+
current: {
120+
sequence: 5,
121+
updatedAt: "2026-04-22T10:05:00.000Z",
122+
},
123+
sequence: 5,
124+
}),
125+
).toBe(false);
126+
expect(
127+
shouldApplyProjectionEvent({
128+
current: {
129+
sequence: 5,
130+
updatedAt: "2026-04-22T10:05:00.000Z",
131+
},
132+
sequence: 4,
133+
}),
134+
).toBe(false);
135+
});
136+
137+
it("accepts newer events", () => {
138+
expect(
139+
shouldApplyProjectionEvent({
140+
current: {
141+
sequence: 5,
142+
updatedAt: "2026-04-22T10:05:00.000Z",
143+
},
144+
sequence: 6,
145+
}),
146+
).toBe(true);
147+
});
148+
});

apps/web/src/environments/runtime/service.ts

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,13 @@ type ThreadDetailSubscriptionEntry = {
8787
const environmentConnections = new Map<EnvironmentId, EnvironmentConnection>();
8888
const environmentConnectionListeners = new Set<() => void>();
8989
const threadDetailSubscriptions = new Map<string, ThreadDetailSubscriptionEntry>();
90+
const lastAppliedProjectionVersionByEnvironment = new Map<
91+
EnvironmentId,
92+
{
93+
readonly sequence: number;
94+
readonly updatedAt: string | null;
95+
}
96+
>();
9097

9198
let activeService: EnvironmentServiceState | null = null;
9299
let needsProviderInvalidation = false;
@@ -102,6 +109,98 @@ const THREAD_DETAIL_SUBSCRIPTION_IDLE_EVICTION_MS = 15 * 60 * 1000;
102109
const MAX_CACHED_THREAD_DETAIL_SUBSCRIPTIONS = 32;
103110
const NOOP = () => undefined;
104111

112+
function compareAppliedProjectionVersion(
113+
left: { readonly sequence: number; readonly updatedAt: string | null },
114+
right: { readonly sequence: number; readonly updatedAt: string | null },
115+
): number {
116+
if (left.sequence !== right.sequence) {
117+
return left.sequence - right.sequence;
118+
}
119+
120+
const leftUpdatedAt = left.updatedAt ?? "";
121+
const rightUpdatedAt = right.updatedAt ?? "";
122+
if (leftUpdatedAt === rightUpdatedAt) {
123+
return 0;
124+
}
125+
126+
return leftUpdatedAt < rightUpdatedAt ? -1 : 1;
127+
}
128+
129+
function toAppliedProjectionVersion(
130+
snapshot: Pick<OrchestrationShellSnapshot, "snapshotSequence" | "updatedAt">,
131+
): {
132+
readonly sequence: number;
133+
readonly updatedAt: string;
134+
} {
135+
return {
136+
sequence: snapshot.snapshotSequence,
137+
updatedAt: snapshot.updatedAt,
138+
};
139+
}
140+
141+
export function shouldApplyProjectionSnapshot(input: {
142+
readonly current: {
143+
readonly sequence: number;
144+
readonly updatedAt: string | null;
145+
} | null;
146+
readonly next: Pick<OrchestrationShellSnapshot, "snapshotSequence" | "updatedAt">;
147+
}): boolean {
148+
if (input.current === null) {
149+
return true;
150+
}
151+
152+
return compareAppliedProjectionVersion(input.current, toAppliedProjectionVersion(input.next)) < 0;
153+
}
154+
155+
export function shouldApplyProjectionEvent(input: {
156+
readonly current: {
157+
readonly sequence: number;
158+
readonly updatedAt: string | null;
159+
} | null;
160+
readonly sequence: number;
161+
}): boolean {
162+
if (input.current === null) {
163+
return true;
164+
}
165+
166+
return input.sequence > input.current.sequence;
167+
}
168+
169+
function readLastAppliedProjectionVersion(environmentId: EnvironmentId): {
170+
readonly sequence: number;
171+
readonly updatedAt: string | null;
172+
} | null {
173+
return lastAppliedProjectionVersionByEnvironment.get(environmentId) ?? null;
174+
}
175+
176+
function markAppliedProjectionSnapshot(
177+
environmentId: EnvironmentId,
178+
snapshot: Pick<OrchestrationShellSnapshot, "snapshotSequence" | "updatedAt">,
179+
): void {
180+
const nextVersion = toAppliedProjectionVersion(snapshot);
181+
const currentVersion = readLastAppliedProjectionVersion(environmentId);
182+
if (
183+
currentVersion !== null &&
184+
compareAppliedProjectionVersion(currentVersion, nextVersion) >= 0
185+
) {
186+
return;
187+
}
188+
189+
lastAppliedProjectionVersionByEnvironment.set(environmentId, nextVersion);
190+
}
191+
192+
function markAppliedProjectionEvent(environmentId: EnvironmentId, sequence: number): void {
193+
const currentVersion = readLastAppliedProjectionVersion(environmentId);
194+
if (currentVersion !== null && sequence <= currentVersion.sequence) {
195+
return;
196+
}
197+
198+
lastAppliedProjectionVersionByEnvironment.set(environmentId, {
199+
sequence,
200+
updatedAt: currentVersion?.updatedAt ?? null,
201+
});
202+
}
203+
105204
function getThreadDetailSubscriptionKey(environmentId: EnvironmentId, threadId: ThreadId): string {
106205
return scopedThreadKey(scopeThreadRef(environmentId, threadId));
107206
}
@@ -600,6 +699,15 @@ export function applyEnvironmentThreadDetailEvent(
600699
}
601700

602701
function applyShellEvent(event: OrchestrationShellStreamEvent, environmentId: EnvironmentId) {
702+
if (
703+
!shouldApplyProjectionEvent({
704+
current: readLastAppliedProjectionVersion(environmentId),
705+
sequence: event.sequence,
706+
})
707+
) {
708+
return;
709+
}
710+
603711
const threadId =
604712
event.kind === "thread-upserted"
605713
? event.thread.id
@@ -610,6 +718,7 @@ function applyShellEvent(event: OrchestrationShellStreamEvent, environmentId: En
610718
const previousThread = threadRef ? selectThreadByRef(useStore.getState(), threadRef) : undefined;
611719

612720
useStore.getState().applyShellEvent(event, environmentId);
721+
markAppliedProjectionEvent(environmentId, event.sequence);
613722

614723
switch (event.kind) {
615724
case "project-upserted":
@@ -643,7 +752,17 @@ function createEnvironmentConnectionHandlers() {
643752
return {
644753
applyShellEvent,
645754
syncShellSnapshot: (snapshot: OrchestrationShellSnapshot, environmentId: EnvironmentId) => {
755+
if (
756+
!shouldApplyProjectionSnapshot({
757+
current: readLastAppliedProjectionVersion(environmentId),
758+
next: snapshot,
759+
})
760+
) {
761+
return;
762+
}
763+
646764
useStore.getState().syncServerShellSnapshot(snapshot, environmentId);
765+
markAppliedProjectionSnapshot(environmentId, snapshot);
647766
reconcileThreadDetailSubscriptionsForEnvironment(
648767
environmentId,
649768
snapshot.threads.map((thread) => thread.id),
@@ -758,6 +877,7 @@ async function removeConnection(environmentId: EnvironmentId): Promise<boolean>
758877
}
759878

760879
disposeThreadDetailSubscriptionsForEnvironment(environmentId);
880+
lastAppliedProjectionVersionByEnvironment.delete(environmentId);
761881
environmentConnections.delete(environmentId);
762882
emitEnvironmentConnectionRegistryChange();
763883
await connection.dispose();
@@ -1086,6 +1206,7 @@ export function startEnvironmentConnectionService(queryClient: QueryClient): ()
10861206

10871207
export async function resetEnvironmentServiceForTests(): Promise<void> {
10881208
stopActiveService();
1209+
lastAppliedProjectionVersionByEnvironment.clear();
10891210
for (const key of Array.from(threadDetailSubscriptions.keys())) {
10901211
disposeThreadDetailSubscriptionByKey(key);
10911212
}

0 commit comments

Comments
 (0)