Skip to content

Commit 400ff14

Browse files
committed
🤖 refactor: dedupe startup barrier replay logic
--- _Generated with `mux` • Model: `openai:gpt-5.4` • Thinking: `xhigh` • Cost: `$44.22`_ <!-- mux-attribution: model=openai:gpt-5.4 thinking=xhigh costs=44.22 -->
1 parent a5871a5 commit 400ff14

File tree

7 files changed

+268
-112
lines changed

7 files changed

+268
-112
lines changed

‎src/browser/components/ProjectSidebar/ProjectSidebar.test.tsx‎

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,19 @@ function installProjectSidebarTestDoubles() {
425425
() =>
426426
({
427427
getWorkspaceMetadata: () => undefined,
428+
getWorkspaceSidebarState: () => ({
429+
canInterrupt: false,
430+
isStarting: false,
431+
awaitingUserQuestion: false,
432+
lastAbortReason: null,
433+
currentModel: null,
434+
recencyTimestamp: null,
435+
loadedSkills: [],
436+
skillLoadErrors: [],
437+
agentStatus: undefined,
438+
terminalActiveCount: 0,
439+
terminalSessionCount: 0,
440+
}),
428441
getAggregator: () => undefined,
429442
subscribeKey: () => () => undefined,
430443
}) as unknown as ReturnType<typeof WorkspaceStoreModule.useWorkspaceStoreRaw>

‎src/browser/components/ProjectSidebar/ProjectSidebar.tsx‎

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,13 @@ function getWorkspaceAttentionSignal(
155155
}
156156
}
157157

158+
function isWorkspaceWorkingForSidebar(
159+
workspaceStore: WorkspaceStore,
160+
workspaceId: string
161+
): boolean {
162+
return getWorkspaceAttentionSignal(workspaceStore, workspaceId)?.isWorking ?? false;
163+
}
164+
158165
function didWorkspaceAttentionSignalChange(
159166
prev: WorkspaceAttentionSignal | undefined,
160167
next: WorkspaceAttentionSignal
@@ -990,12 +997,7 @@ const ProjectSidebarInner: React.FC<ProjectSidebarProps> = ({
990997
if (result.success && result.data?.kind === "confirm-lossy-untracked-files") {
991998
const metadata = workspaceStore.getWorkspaceMetadata(workspaceId);
992999
const displayTitle = metadata?.title ?? metadata?.name ?? workspaceId;
993-
const aggregator = workspaceStore.getAggregator(workspaceId);
994-
const hasActiveStreams = (aggregator?.getActiveStreams().length ?? 0) > 0;
995-
const pendingStreamStartTime = aggregator?.getPendingStreamStartTime();
996-
const isStarting = pendingStreamStartTime != null && !hasActiveStreams;
997-
const awaitingUserQuestion = aggregator?.hasAwaitingUserQuestion() ?? false;
998-
const isStreaming = (hasActiveStreams || isStarting) && !awaitingUserQuestion;
1000+
const isStreaming = isWorkspaceWorkingForSidebar(workspaceStore, workspaceId);
9991001
setArchiveConfirmation({
10001002
workspaceId,
10011003
displayTitle,
@@ -1025,15 +1027,7 @@ const ProjectSidebarInner: React.FC<ProjectSidebarProps> = ({
10251027
displayTitle,
10261028
buttonElement,
10271029
untrackedPaths: preflight.data.paths,
1028-
isStreaming: (() => {
1029-
const aggregator = workspaceStore.getAggregator(workspaceId);
1030-
if (!aggregator) return false;
1031-
const hasActiveStreams = aggregator.getActiveStreams().length > 0;
1032-
const isStarting =
1033-
aggregator.getPendingStreamStartTime() !== null && !hasActiveStreams;
1034-
const awaitingUserQuestion = aggregator.hasAwaitingUserQuestion();
1035-
return (hasActiveStreams || isStarting) && !awaitingUserQuestion;
1036-
})(),
1030+
isStreaming: isWorkspaceWorkingForSidebar(workspaceStore, workspaceId),
10371031
});
10381032
return;
10391033
}
@@ -1060,26 +1054,17 @@ const ProjectSidebarInner: React.FC<ProjectSidebarProps> = ({
10601054
);
10611055

10621056
const hasActiveStream = useCallback(
1063-
(workspaceId: string) => {
1064-
const aggregator = workspaceStore.getAggregator(workspaceId);
1065-
if (!aggregator) return false;
1066-
const hasActiveStreams = aggregator.getActiveStreams().length > 0;
1067-
const isStarting = aggregator.getPendingStreamStartTime() !== null && !hasActiveStreams;
1068-
const awaitingUserQuestion = aggregator.hasAwaitingUserQuestion();
1069-
return (hasActiveStreams || isStarting) && !awaitingUserQuestion;
1070-
},
1057+
(workspaceId: string) => isWorkspaceWorkingForSidebar(workspaceStore, workspaceId),
10711058
[workspaceStore]
10721059
);
10731060

10741061
const workspaceHasAttention = useCallback(
10751062
(workspace: FrontendWorkspaceMetadata) => {
10761063
const workspaceId = workspace.id;
1077-
const aggregator = workspaceStore.getAggregator(workspaceId);
1078-
const hasActiveStreams = aggregator ? aggregator.getActiveStreams().length > 0 : false;
1079-
const isStarting = aggregator?.getPendingStreamStartTime() != null && !hasActiveStreams;
1080-
const awaitingUserQuestion = aggregator?.hasAwaitingUserQuestion() ?? false;
1081-
const isWorking = (hasActiveStreams || isStarting) && !awaitingUserQuestion;
1082-
const hasError = aggregator?.getLastAbortReason()?.reason === "system";
1064+
const attentionSignal = getWorkspaceAttentionSignal(workspaceStore, workspaceId);
1065+
const isWorking = attentionSignal?.isWorking ?? false;
1066+
const awaitingUserQuestion = attentionSignal?.awaitingUserQuestion ?? false;
1067+
const hasError = attentionSignal?.hasSystemError ?? false;
10831068
const isRemoving = workspace.isRemoving === true;
10841069
const isArchiving = archivingWorkspaceIds.has(workspaceId);
10851070
const isInitializing = workspace.isInitializing === true;

‎src/browser/stores/WorkspaceStore.ts‎

Lines changed: 37 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3399,8 +3399,9 @@ export class WorkspaceStore {
33993399
}
34003400

34013401
/**
3402-
* Check if data is a buffered event type by checking the handler map.
3403-
* This ensures isStreamEvent() and processStreamEvent() can never fall out of sync.
3402+
* Check if data is one of the standard buffered event types backed by WorkspaceStore handlers.
3403+
* Replayed stream-error opts into the same buffering path through getBufferedReplayEventBehavior()
3404+
* so live errors can still bypass buffering and surface immediately.
34043405
*/
34053406
private isBufferedEvent(data: WorkspaceChatMessage): boolean {
34063407
if (!("type" in data)) {
@@ -3416,6 +3417,31 @@ export class WorkspaceStore {
34163417
);
34173418
}
34183419

3420+
private getBufferedReplayEventBehavior(
3421+
data: WorkspaceChatMessage
3422+
): { previewDuringReplay: boolean } | null {
3423+
if (isStreamError(data)) {
3424+
return data.replay === true ? { previewDuringReplay: true } : null;
3425+
}
3426+
3427+
if (!this.isBufferedEvent(data)) {
3428+
return null;
3429+
}
3430+
3431+
return {
3432+
previewDuringReplay: isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data),
3433+
};
3434+
}
3435+
3436+
private previewBufferedEventDuringReplay(
3437+
workspaceId: string,
3438+
aggregator: StreamingMessageAggregator,
3439+
data: WorkspaceChatMessage
3440+
): void {
3441+
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false });
3442+
this.states.bump(workspaceId);
3443+
}
3444+
34193445
private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void {
34203446
// Aggregator must exist - workspaces are initialized in addWorkspace() before subscriptions run.
34213447
const aggregator = this.assertGet(workspaceId);
@@ -3561,20 +3587,15 @@ export class WorkspaceStore {
35613587
//
35623588
// This is especially important for workspaces with long histories (100+ messages),
35633589
// where unbuffered rendering would cause visible lag and UI stutter.
3564-
if (!transient.caughtUp && isStreamError(data) && data.replay === true) {
3565-
// Show replayed terminal errors immediately so reconnect UIs preserve the same
3566-
// failure classification/copy as the live session, then replay them again after
3567-
// history loads so full-replay replacement does not wipe the error back out.
3568-
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false });
3569-
this.states.bump(workspaceId);
3570-
transient.pendingStreamEvents.push(data);
3571-
return;
3572-
}
3573-
3574-
if (!transient.caughtUp && this.isBufferedEvent(data)) {
3575-
if (isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data)) {
3576-
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false });
3577-
this.states.bump(workspaceId);
3590+
const bufferedReplayEventBehavior = !transient.caughtUp
3591+
? this.getBufferedReplayEventBehavior(data)
3592+
: null;
3593+
if (bufferedReplayEventBehavior) {
3594+
if (bufferedReplayEventBehavior.previewDuringReplay) {
3595+
// Preview replayed startup/terminal state immediately so reconnect UI preserves the
3596+
// live session's barrier/error classification until buffered events are replayed again
3597+
// after transcript hydration completes.
3598+
this.previewBufferedEventDuringReplay(workspaceId, aggregator, data);
35783599
}
35793600

35803601
transient.pendingStreamEvents.push(data);

‎src/browser/utils/messages/StreamingMessageAggregator.ts‎

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import type {
88
import { createMuxMessage, getCompactionFollowUpContent } from "@/common/types/message";
99

1010
import {
11+
copyRuntimeStatusEvent,
1112
copyStreamLifecycleSnapshot,
13+
hasInFlightStreamLifecycle,
14+
isTerminalRuntimeStatusPhase,
1215
type StreamStartEvent,
1316
type StreamDeltaEvent,
1417
type UsageDeltaEvent,
@@ -1209,11 +1212,7 @@ export class StreamingMessageAggregator {
12091212
}
12101213

12111214
private clearInFlightStreamLifecycle(): void {
1212-
if (
1213-
this.streamLifecycle?.phase === "preparing" ||
1214-
this.streamLifecycle?.phase === "streaming" ||
1215-
this.streamLifecycle?.phase === "completing"
1216-
) {
1215+
if (hasInFlightStreamLifecycle(this.streamLifecycle)) {
12171216
this.streamLifecycle = null;
12181217
}
12191218
}
@@ -1224,12 +1223,12 @@ export class StreamingMessageAggregator {
12241223
*/
12251224
handleRuntimeStatus(status: RuntimeStatusEvent): void {
12261225
// Keep stream lifecycle code focused on when runtime status becomes irrelevant.
1227-
if (status.phase === "ready" || status.phase === "error") {
1226+
if (isTerminalRuntimeStatusPhase(status.phase)) {
12281227
this.clearRuntimeStatus();
12291228
return;
12301229
}
12311230

1232-
this.runtimeStatus = status;
1231+
this.runtimeStatus = copyRuntimeStatusEvent(status);
12331232
}
12341233

12351234
private clearRuntimeStatus(): void {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { describe, expect, test } from "bun:test";
2+
3+
import {
4+
areRuntimeStatusEventsEqual,
5+
areStreamLifecycleSnapshotsEqual,
6+
copyRuntimeStatusEvent,
7+
hasInFlightStreamLifecycle,
8+
isTerminalRuntimeStatusPhase,
9+
} from "./stream";
10+
11+
describe("stream shared helpers", () => {
12+
test("compare stream lifecycle snapshots with nullish abort reasons", () => {
13+
expect(
14+
areStreamLifecycleSnapshotsEqual(
15+
{ phase: "failed", hadAnyOutput: false },
16+
{ phase: "failed", hadAnyOutput: false }
17+
)
18+
).toBe(true);
19+
20+
expect(
21+
areStreamLifecycleSnapshotsEqual(
22+
{ phase: "failed", hadAnyOutput: false, abortReason: "user" },
23+
{ phase: "failed", hadAnyOutput: false }
24+
)
25+
).toBe(false);
26+
});
27+
28+
test("copy runtime-status events and compare optional fields nullishly", () => {
29+
const copied = copyRuntimeStatusEvent({
30+
type: "runtime-status",
31+
workspaceId: "ws-1",
32+
phase: "starting",
33+
runtimeType: "ssh",
34+
detail: "Checking workspace runtime...",
35+
});
36+
37+
expect(copied).toEqual({
38+
type: "runtime-status",
39+
workspaceId: "ws-1",
40+
phase: "starting",
41+
runtimeType: "ssh",
42+
detail: "Checking workspace runtime...",
43+
});
44+
expect(copied).not.toBe(copyRuntimeStatusEvent(copied));
45+
expect("source" in copied).toBe(false);
46+
47+
expect(
48+
areRuntimeStatusEventsEqual(copied, {
49+
phase: "starting",
50+
runtimeType: "ssh",
51+
detail: "Checking workspace runtime...",
52+
})
53+
).toBe(true);
54+
expect(
55+
areRuntimeStatusEventsEqual(copied, {
56+
phase: "starting",
57+
runtimeType: "ssh",
58+
detail: "Loading tools...",
59+
})
60+
).toBe(false);
61+
});
62+
63+
test("share terminal runtime-status and in-flight lifecycle semantics", () => {
64+
expect(isTerminalRuntimeStatusPhase("ready")).toBe(true);
65+
expect(isTerminalRuntimeStatusPhase("error")).toBe(true);
66+
expect(isTerminalRuntimeStatusPhase("starting")).toBe(false);
67+
68+
expect(hasInFlightStreamLifecycle({ phase: "preparing" })).toBe(true);
69+
expect(hasInFlightStreamLifecycle({ phase: "streaming" })).toBe(true);
70+
expect(hasInFlightStreamLifecycle({ phase: "completing" })).toBe(true);
71+
expect(hasInFlightStreamLifecycle({ phase: "failed" })).toBe(false);
72+
expect(hasInFlightStreamLifecycle({ phase: "interrupted" })).toBe(false);
73+
expect(hasInFlightStreamLifecycle(null)).toBe(false);
74+
});
75+
});

‎src/common/types/stream.ts‎

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@ export function copyStreamLifecycleSnapshot(
5454
};
5555
}
5656

57+
export function areStreamLifecycleSnapshotsEqual(
58+
left: Pick<StreamLifecycleSnapshot, "phase" | "hadAnyOutput" | "abortReason"> | null,
59+
right: Pick<StreamLifecycleSnapshot, "phase" | "hadAnyOutput" | "abortReason"> | null
60+
): boolean {
61+
return (
62+
left === right ||
63+
(left !== null &&
64+
right !== null &&
65+
left.phase === right.phase &&
66+
left.hadAnyOutput === right.hadAnyOutput &&
67+
(left.abortReason ?? null) === (right.abortReason ?? null))
68+
);
69+
}
70+
71+
export function isInFlightStreamLifecyclePhase(phase: StreamLifecyclePhase): boolean {
72+
return phase === "preparing" || phase === "streaming" || phase === "completing";
73+
}
74+
75+
export function hasInFlightStreamLifecycle(
76+
snapshot: Pick<StreamLifecycleSnapshot, "phase"> | null | undefined
77+
): boolean {
78+
return snapshot != null && isInFlightStreamLifecyclePhase(snapshot.phase);
79+
}
80+
5781
export interface StreamAbortReasonSnapshot {
5882
reason: StreamAbortReason;
5983
at: number;
@@ -89,3 +113,42 @@ export type AutoRetryAbandonedEvent = z.infer<typeof AutoRetryAbandonedEventSche
89113
* Used for both runtime readiness and generic startup breadcrumbs in the barrier UI.
90114
*/
91115
export type RuntimeStatusEvent = z.infer<typeof RuntimeStatusEventSchema>;
116+
117+
/**
118+
* Shared runtime-status helpers used by both the backend session replay path and the
119+
* renderer aggregator so startup breadcrumbs follow the same lifecycle semantics.
120+
*/
121+
export function copyRuntimeStatusEvent(
122+
status: Pick<
123+
RuntimeStatusEvent,
124+
"type" | "workspaceId" | "phase" | "runtimeType" | "source" | "detail"
125+
>
126+
): RuntimeStatusEvent {
127+
return {
128+
type: status.type,
129+
workspaceId: status.workspaceId,
130+
phase: status.phase,
131+
runtimeType: status.runtimeType,
132+
...(status.source != null ? { source: status.source } : {}),
133+
...(status.detail != null ? { detail: status.detail } : {}),
134+
};
135+
}
136+
137+
export function areRuntimeStatusEventsEqual(
138+
left: Pick<RuntimeStatusEvent, "phase" | "runtimeType" | "source" | "detail"> | null,
139+
right: Pick<RuntimeStatusEvent, "phase" | "runtimeType" | "source" | "detail"> | null
140+
): boolean {
141+
return (
142+
left === right ||
143+
(left !== null &&
144+
right !== null &&
145+
left.phase === right.phase &&
146+
left.runtimeType === right.runtimeType &&
147+
(left.source ?? null) === (right.source ?? null) &&
148+
(left.detail ?? null) === (right.detail ?? null))
149+
);
150+
}
151+
152+
export function isTerminalRuntimeStatusPhase(phase: RuntimeStatusEvent["phase"]): boolean {
153+
return phase === "ready" || phase === "error";
154+
}

0 commit comments

Comments
 (0)