Skip to content

Commit 7a5e4c8

Browse files
committed
fix: invalidate streaming-stats cache on stream-error
Codex P2 on PR #3219: streamingStatsStore was bumped on stream-end and stream-abort but not on stream-error. Subscribers (TypewriterMarkdown via useWorkspaceStreamingStats) could keep returning the failed stream's TPS/charsPerSec until the next delta arrived, leaking stale rates into the next stream's early renders. Mirror the stream-end / stream-abort terminal-cleanup pattern in the stream-error path: cancel any pending coalesced bumps, then bump streamingStatsStore so consumers re-read and the snapshot collapses to null (getActiveStreamMessageId is already undefined post-error). Adds a regression test that drives stream-start + stream-delta + caught-up to populate the cache, then asserts both that subscribers are notified and that the post-error snapshot is null.
1 parent ff12fce commit 7a5e4c8

2 files changed

Lines changed: 79 additions & 0 deletions

File tree

src/browser/stores/WorkspaceStore.test.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1875,6 +1875,77 @@ describe("WorkspaceStore", () => {
18751875
expect(clearedState.canInterrupt).toBe(false);
18761876
});
18771877

1878+
it("invalidates streaming-stats cache on stream-error so subscribers don't see stale TPS", async () => {
1879+
const workspaceId = "stream-error-invalidates-stats";
1880+
const streamModel = "anthropic:claude-opus-4-6";
1881+
1882+
mockOnChat.mockImplementation(async function* (
1883+
_input?: { workspaceId: string; mode?: unknown },
1884+
options?: { signal?: AbortSignal }
1885+
): AsyncGenerator<WorkspaceChatMessage, void, unknown> {
1886+
if (options?.signal?.aborted) {
1887+
yield { type: "caught-up" };
1888+
}
1889+
await waitForAbortSignal(options?.signal);
1890+
});
1891+
1892+
recreateStore();
1893+
await new Promise((resolve) => setTimeout(resolve, 0));
1894+
createAndAddWorkspace(store, workspaceId);
1895+
1896+
const rawStore = store as unknown as {
1897+
handleChatMessage: (workspaceId: string, data: WorkspaceChatMessage) => void;
1898+
};
1899+
1900+
// Open a stream and feed a delta so streaming stats become non-null.
1901+
// stream events are buffered until a caught-up event flushes them onto
1902+
// the aggregator, so we send caught-up before reading the live stats.
1903+
const messageId = "stream-error-message";
1904+
rawStore.handleChatMessage(workspaceId, {
1905+
type: "stream-start",
1906+
workspaceId,
1907+
messageId,
1908+
model: streamModel,
1909+
historySequence: 1,
1910+
startTime: 1_000,
1911+
});
1912+
rawStore.handleChatMessage(workspaceId, {
1913+
type: "stream-delta",
1914+
workspaceId,
1915+
messageId,
1916+
delta: "hello world ",
1917+
tokens: 3,
1918+
timestamp: 1_500,
1919+
});
1920+
rawStore.handleChatMessage(workspaceId, { type: "caught-up" });
1921+
1922+
// Subscribe so stale-cache regression would surface as a missed bump.
1923+
let notifications = 0;
1924+
const unsubscribe = store.subscribeStreamingStats(workspaceId, () => {
1925+
notifications += 1;
1926+
});
1927+
1928+
const before = store.getWorkspaceStreamingStats(workspaceId);
1929+
expect(before).not.toBeNull();
1930+
1931+
rawStore.handleChatMessage(workspaceId, {
1932+
type: "stream-error",
1933+
messageId,
1934+
error: "Mock provider failure",
1935+
errorType: "network",
1936+
});
1937+
1938+
// The terminal stream-error must bump streamingStatsStore so listeners
1939+
// re-read; once recomputed, getActiveStreamMessageId returns undefined
1940+
// and the snapshot collapses to null. Without the bump, the cache would
1941+
// keep returning `before` (stale TPS leaking into the next stream).
1942+
expect(notifications).toBeGreaterThanOrEqual(1);
1943+
const after = store.getWorkspaceStreamingStats(workspaceId);
1944+
expect(after).toBeNull();
1945+
1946+
unsubscribe();
1947+
});
1948+
18781949
it("prefers buffered stream-start state over stale non-streaming activity during hydration", async () => {
18791950
const workspaceId = "buffered-stream-start-over-activity";
18801951
const staleActivityModel = "openai:gpt-4o-mini";

src/browser/stores/WorkspaceStore.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3830,7 +3830,15 @@ export class WorkspaceStore {
38303830

38313831
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects });
38323832

3833+
// stream-error is a terminal stream event, just like stream-end/stream-abort.
3834+
// Mirror their cleanup so subscribers don't see stale streaming stats from the
3835+
// failed stream — applyWorkspaceChatEventToAggregator already cleared token
3836+
// state; we now flush any pending coalesced bump and invalidate the
3837+
// streamingStatsStore cache so useWorkspaceStreamingStats returns null.
3838+
this.cancelPendingIdleBump(workspaceId);
3839+
this.cancelPendingStreamingBump(workspaceId);
38333840
this.states.bump(workspaceId);
3841+
this.streamingStatsStore.bump(workspaceId);
38343842
return;
38353843
}
38363844

0 commit comments

Comments
 (0)