Skip to content

Commit 4a743ee

Browse files
committed
🤖 refactor: share startup replay status helpers
--- _Generated with `mux` • Model: `openai:gpt-5.4` • Thinking: `xhigh` • Cost: `$18.38`_ <!-- mux-attribution: model=openai:gpt-5.4 thinking=xhigh costs=18.38 -->
1 parent a5871a5 commit 4a743ee

5 files changed

Lines changed: 160 additions & 40 deletions

File tree

‎src/browser/stores/WorkspaceStore.ts‎

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3416,6 +3416,10 @@ export class WorkspaceStore {
34163416
);
34173417
}
34183418

3419+
private shouldApplyBufferedEventImmediatelyDuringReplay(data: WorkspaceChatMessage): boolean {
3420+
return isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data);
3421+
}
3422+
34193423
private handleChatMessage(workspaceId: string, data: WorkspaceChatMessage): void {
34203424
// Aggregator must exist - workspaces are initialized in addWorkspace() before subscriptions run.
34213425
const aggregator = this.assertGet(workspaceId);
@@ -3572,7 +3576,7 @@ export class WorkspaceStore {
35723576
}
35733577

35743578
if (!transient.caughtUp && this.isBufferedEvent(data)) {
3575-
if (isStreamLifecycle(data) || isStreamAbort(data) || isRuntimeStatus(data)) {
3579+
if (this.shouldApplyBufferedEventImmediatelyDuringReplay(data)) {
35763580
applyWorkspaceChatEventToAggregator(aggregator, data, { allowSideEffects: false });
35773581
this.states.bump(workspaceId);
35783582
}

‎src/browser/utils/messages/StreamingMessageAggregator.ts‎

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ import type {
88
import { createMuxMessage, getCompactionFollowUpContent } from "@/common/types/message";
99

1010
import {
11+
copyRuntimeStatusEvent,
1112
copyStreamLifecycleSnapshot,
13+
hasInFlightStreamLifecycle,
14+
isTerminalRuntimeStatusPhase,
1215
type StreamStartEvent,
1316
type StreamDeltaEvent,
1417
type UsageDeltaEvent,
@@ -1209,11 +1212,7 @@ export class StreamingMessageAggregator {
12091212
}
12101213

12111214
private clearInFlightStreamLifecycle(): void {
1212-
if (
1213-
this.streamLifecycle?.phase === "preparing" ||
1214-
this.streamLifecycle?.phase === "streaming" ||
1215-
this.streamLifecycle?.phase === "completing"
1216-
) {
1215+
if (hasInFlightStreamLifecycle(this.streamLifecycle)) {
12171216
this.streamLifecycle = null;
12181217
}
12191218
}
@@ -1224,12 +1223,12 @@ export class StreamingMessageAggregator {
12241223
*/
12251224
handleRuntimeStatus(status: RuntimeStatusEvent): void {
12261225
// Keep stream lifecycle code focused on when runtime status becomes irrelevant.
1227-
if (status.phase === "ready" || status.phase === "error") {
1226+
if (isTerminalRuntimeStatusPhase(status.phase)) {
12281227
this.clearRuntimeStatus();
12291228
return;
12301229
}
12311230

1232-
this.runtimeStatus = status;
1231+
this.runtimeStatus = copyRuntimeStatusEvent(status);
12331232
}
12341233

12351234
private clearRuntimeStatus(): void {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
import { describe, expect, test } from "bun:test";
2+
3+
import {
4+
areRuntimeStatusEventsEqual,
5+
areStreamLifecycleSnapshotsEqual,
6+
copyRuntimeStatusEvent,
7+
hasInFlightStreamLifecycle,
8+
isTerminalRuntimeStatusPhase,
9+
} from "./stream";
10+
11+
describe("stream shared helpers", () => {
12+
test("compare stream lifecycle snapshots with nullish abort reasons", () => {
13+
expect(
14+
areStreamLifecycleSnapshotsEqual(
15+
{ phase: "failed", hadAnyOutput: false },
16+
{ phase: "failed", hadAnyOutput: false }
17+
)
18+
).toBe(true);
19+
20+
expect(
21+
areStreamLifecycleSnapshotsEqual(
22+
{ phase: "failed", hadAnyOutput: false, abortReason: "user" },
23+
{ phase: "failed", hadAnyOutput: false }
24+
)
25+
).toBe(false);
26+
});
27+
28+
test("copy runtime-status events and compare optional fields nullishly", () => {
29+
const copied = copyRuntimeStatusEvent({
30+
type: "runtime-status",
31+
workspaceId: "ws-1",
32+
phase: "starting",
33+
runtimeType: "ssh",
34+
detail: "Checking workspace runtime...",
35+
});
36+
37+
expect(copied).toEqual({
38+
type: "runtime-status",
39+
workspaceId: "ws-1",
40+
phase: "starting",
41+
runtimeType: "ssh",
42+
detail: "Checking workspace runtime...",
43+
});
44+
expect(copied).not.toBe(copyRuntimeStatusEvent(copied));
45+
expect("source" in copied).toBe(false);
46+
47+
expect(
48+
areRuntimeStatusEventsEqual(copied, {
49+
phase: "starting",
50+
runtimeType: "ssh",
51+
detail: "Checking workspace runtime...",
52+
})
53+
).toBe(true);
54+
expect(
55+
areRuntimeStatusEventsEqual(copied, {
56+
phase: "starting",
57+
runtimeType: "ssh",
58+
detail: "Loading tools...",
59+
})
60+
).toBe(false);
61+
});
62+
63+
test("share terminal runtime-status and in-flight lifecycle semantics", () => {
64+
expect(isTerminalRuntimeStatusPhase("ready")).toBe(true);
65+
expect(isTerminalRuntimeStatusPhase("error")).toBe(true);
66+
expect(isTerminalRuntimeStatusPhase("starting")).toBe(false);
67+
68+
expect(hasInFlightStreamLifecycle({ phase: "preparing" })).toBe(true);
69+
expect(hasInFlightStreamLifecycle({ phase: "streaming" })).toBe(true);
70+
expect(hasInFlightStreamLifecycle({ phase: "completing" })).toBe(true);
71+
expect(hasInFlightStreamLifecycle({ phase: "failed" })).toBe(false);
72+
expect(hasInFlightStreamLifecycle({ phase: "interrupted" })).toBe(false);
73+
expect(hasInFlightStreamLifecycle(null)).toBe(false);
74+
});
75+
});

‎src/common/types/stream.ts‎

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,30 @@ export function copyStreamLifecycleSnapshot(
5454
};
5555
}
5656

57+
export function areStreamLifecycleSnapshotsEqual(
58+
left: Pick<StreamLifecycleSnapshot, "phase" | "hadAnyOutput" | "abortReason"> | null,
59+
right: Pick<StreamLifecycleSnapshot, "phase" | "hadAnyOutput" | "abortReason"> | null
60+
): boolean {
61+
return (
62+
left === right ||
63+
(left !== null &&
64+
right !== null &&
65+
left.phase === right.phase &&
66+
left.hadAnyOutput === right.hadAnyOutput &&
67+
(left.abortReason ?? null) === (right.abortReason ?? null))
68+
);
69+
}
70+
71+
export function isInFlightStreamLifecyclePhase(phase: StreamLifecyclePhase): boolean {
72+
return phase === "preparing" || phase === "streaming" || phase === "completing";
73+
}
74+
75+
export function hasInFlightStreamLifecycle(
76+
snapshot: Pick<StreamLifecycleSnapshot, "phase"> | null | undefined
77+
): boolean {
78+
return snapshot != null && isInFlightStreamLifecyclePhase(snapshot.phase);
79+
}
80+
5781
export interface StreamAbortReasonSnapshot {
5882
reason: StreamAbortReason;
5983
at: number;
@@ -89,3 +113,42 @@ export type AutoRetryAbandonedEvent = z.infer<typeof AutoRetryAbandonedEventSche
89113
* Used for both runtime readiness and generic startup breadcrumbs in the barrier UI.
90114
*/
91115
export type RuntimeStatusEvent = z.infer<typeof RuntimeStatusEventSchema>;
116+
117+
/**
118+
* Shared runtime-status helpers used by both the backend session replay path and the
119+
* renderer aggregator so startup breadcrumbs follow the same lifecycle semantics.
120+
*/
121+
export function copyRuntimeStatusEvent(
122+
status: Pick<
123+
RuntimeStatusEvent,
124+
"type" | "workspaceId" | "phase" | "runtimeType" | "source" | "detail"
125+
>
126+
): RuntimeStatusEvent {
127+
return {
128+
type: status.type,
129+
workspaceId: status.workspaceId,
130+
phase: status.phase,
131+
runtimeType: status.runtimeType,
132+
...(status.source != null ? { source: status.source } : {}),
133+
...(status.detail != null ? { detail: status.detail } : {}),
134+
};
135+
}
136+
137+
export function areRuntimeStatusEventsEqual(
138+
left: Pick<RuntimeStatusEvent, "phase" | "runtimeType" | "source" | "detail"> | null,
139+
right: Pick<RuntimeStatusEvent, "phase" | "runtimeType" | "source" | "detail"> | null
140+
): boolean {
141+
return (
142+
left === right ||
143+
(left !== null &&
144+
right !== null &&
145+
left.phase === right.phase &&
146+
left.runtimeType === right.runtimeType &&
147+
(left.source ?? null) === (right.source ?? null) &&
148+
(left.detail ?? null) === (right.detail ?? null))
149+
);
150+
}
151+
152+
export function isTerminalRuntimeStatusPhase(phase: RuntimeStatusEvent["phase"]): boolean {
153+
return phase === "ready" || phase === "error";
154+
}

‎src/node/services/agentSession.ts‎

Lines changed: 11 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ import { isAgentEffectivelyDisabled } from "@/node/services/agentDefinitions/age
7575
import { resolveAgentInheritanceChain } from "@/node/services/agentDefinitions/resolveAgentInheritanceChain";
7676
import { MessageQueue } from "./messageQueue";
7777
import {
78+
areRuntimeStatusEventsEqual,
79+
areStreamLifecycleSnapshotsEqual,
80+
copyRuntimeStatusEvent,
7881
copyStreamLifecycleSnapshot,
82+
isTerminalRuntimeStatusPhase,
7983
type RuntimeStatusEvent,
8084
type StreamAbortReason,
8185
type StreamEndEvent,
@@ -620,38 +624,13 @@ export class AgentSession {
620624
return this.terminalStreamLifecycle ?? { phase: "idle", hadAnyOutput: false };
621625
}
622626

623-
private hasSameStreamLifecycle(
624-
left: StreamLifecycleSnapshot | null,
625-
right: StreamLifecycleSnapshot
626-
): boolean {
627-
return (
628-
left !== null &&
629-
left.phase === right.phase &&
630-
left.hadAnyOutput === right.hadAnyOutput &&
631-
(left.abortReason ?? null) === (right.abortReason ?? null)
632-
);
633-
}
634-
635-
private hasSameRuntimeStatus(
636-
left: RuntimeStatusEvent | null,
637-
right: RuntimeStatusEvent
638-
): boolean {
639-
return (
640-
left !== null &&
641-
left.phase === right.phase &&
642-
left.runtimeType === right.runtimeType &&
643-
(left.source ?? null) === (right.source ?? null) &&
644-
(left.detail ?? null) === (right.detail ?? null)
645-
);
646-
}
647-
648627
private emitStreamLifecycleIfChanged(): void {
649628
if (this.disposed) {
650629
return;
651630
}
652631

653632
const snapshot = this.getCurrentStreamLifecycleSnapshot();
654-
if (this.hasSameStreamLifecycle(this.lastEmittedStreamLifecycle, snapshot)) {
633+
if (areStreamLifecycleSnapshotsEqual(this.lastEmittedStreamLifecycle, snapshot)) {
655634
return;
656635
}
657636

@@ -684,12 +663,12 @@ export class AgentSession {
684663
}
685664

686665
private updatePreparingRuntimeStatus(status: RuntimeStatusEvent): void {
687-
if (status.phase === "ready" || status.phase === "error") {
666+
if (isTerminalRuntimeStatusPhase(status.phase)) {
688667
this.clearPreparingRuntimeStatus();
689668
return;
690669
}
691670

692-
this.preparingRuntimeStatus = status;
671+
this.preparingRuntimeStatus = copyRuntimeStatusEvent(status);
693672
}
694673

695674
private clearPreparingRuntimeStatus(): void {
@@ -1607,7 +1586,7 @@ export class AgentSession {
16071586
}
16081587

16091588
const lifecycle = this.getCurrentStreamLifecycleSnapshot();
1610-
if (!this.hasSameStreamLifecycle(replayedStreamLifecycle, lifecycle)) {
1589+
if (!areStreamLifecycleSnapshotsEqual(replayedStreamLifecycle, lifecycle)) {
16111590
replayedStreamLifecycle = copyStreamLifecycleSnapshot(lifecycle);
16121591
emitReplayStatusMessage({
16131592
type: "stream-lifecycle",
@@ -1617,9 +1596,9 @@ export class AgentSession {
16171596
}
16181597

16191598
const runtimeStatus = this.preparingRuntimeStatus;
1620-
if (runtimeStatus && !this.hasSameRuntimeStatus(replayedRuntimeStatus, runtimeStatus)) {
1621-
replayedRuntimeStatus = { ...runtimeStatus };
1622-
emitReplayStatusMessage(runtimeStatus);
1599+
if (runtimeStatus && !areRuntimeStatusEventsEqual(replayedRuntimeStatus, runtimeStatus)) {
1600+
replayedRuntimeStatus = copyRuntimeStatusEvent(runtimeStatus);
1601+
emitReplayStatusMessage(replayedRuntimeStatus);
16231602
}
16241603
};
16251604

0 commit comments

Comments
 (0)