-
Notifications
You must be signed in to change notification settings - Fork 100
Expand file tree
/
Copy pathStreamingMessageAggregator.ts
More file actions
3079 lines (2704 loc) · 111 KB
/
StreamingMessageAggregator.ts
File metadata and controls
3079 lines (2704 loc) · 111 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import type {
MuxMessage,
MuxMetadata,
MuxFilePart,
DisplayedMessage,
CompactionRequestData,
} from "@/common/types/message";
import { createMuxMessage, getCompactionFollowUpContent } from "@/common/types/message";
import {
copyRuntimeStatusEvent,
copyStreamLifecycleSnapshot,
hasInFlightStreamLifecycle,
isTerminalRuntimeStatusPhase,
type StreamStartEvent,
type StreamDeltaEvent,
type UsageDeltaEvent,
type StreamEndEvent,
type StreamAbortEvent,
type StreamAbortReasonSnapshot,
type ToolCallStartEvent,
type ToolCallDeltaEvent,
type ToolCallEndEvent,
type ReasoningDeltaEvent,
type ReasoningEndEvent,
type RuntimeStatusEvent,
type StreamLifecycleEvent,
type StreamLifecycleSnapshot,
} from "@/common/types/stream";
import type { LanguageModelV2Usage } from "@ai-sdk/provider";
import type { TodoItem, StatusSetToolResult, NotifyToolResult } from "@/common/types/tools";
import { completeInProgressTodoItems } from "@/common/utils/todoList";
import { getToolOutputUiOnly } from "@/common/utils/tools/toolOutputUiOnly";
import { computePriorHistoryFingerprint } from "@/common/orpc/onChatCursorFingerprint";
import type {
WorkspaceChatMessage,
StreamErrorMessage,
DeleteMessage,
OnChatCursor,
} from "@/common/orpc/types";
import { isInitStart, isInitOutput, isInitEnd, isMuxMessage } from "@/common/orpc/types";
import {
buildAggregateResponseCompleteMetadata,
buildResponseCompleteMetadata,
type ResponseCompleteHandler,
} from "./responseCompletionMetadata";
import { showBrowserNotification } from "@/browser/utils/ui/showBrowserNotification";
import type {
DynamicToolPart,
DynamicToolPartPending,
DynamicToolPartAvailable,
} from "@/common/types/toolParts";
import type { AgentSkillDescriptor, AgentSkillScope } from "@/common/types/agentSkill";
import { INIT_HOOK_MAX_LINES } from "@/common/constants/toolLimits";
import { isDynamicToolPart } from "@/common/types/toolParts";
import { z } from "zod";
import { createDeltaStorage, type DeltaRecordStorage } from "./StreamingTPSCalculator";
import { buildTranscriptTruncationPlan } from "./transcriptTruncationPlan";
import { computeRecencyTimestamp } from "./recency";
import { assert } from "@/common/utils/assert";
import { getStatusStateKey } from "@/common/constants/storage";
import { getFollowUpContentText } from "@/browser/utils/compaction/format";
// Maximum number of messages to display in the DOM for performance
// Full history is still maintained internally for token counting and stats
const AgentStatusSchema = z.object({
emoji: z.string(),
message: z.string(),
url: z.string().optional(),
});
// Synthetic agent-skill snapshot messages include metadata.agentSkillSnapshot.
// We use this to keep the SkillIndicator in sync for /{skillName} invocations.
const AgentSkillSnapshotMetadataSchema = z.object({
skillName: z.string().min(1),
scope: z.enum(["project", "global", "built-in"]),
sha256: z.string().optional(),
frontmatterYaml: z.string().optional(),
});
/** Re-export for consumers that need the loaded skill type */
export type LoadedSkill = AgentSkillDescriptor;
/** A runtime skill load failure (agent_skill_read returned { success: false }) */
export interface SkillLoadError {
/** Skill name that was requested */
name: string;
/** Error message from the backend */
error: string;
}
type AgentStatus = z.infer<typeof AgentStatusSchema>;
/**
* Maximum number of DisplayedMessages to render before truncation kicks in.
* We keep all user prompts and structural markers, while allowing older assistant
* content to collapse behind history-hidden markers for faster initial paint.
*/
const MAX_DISPLAYED_MESSAGES = 64;
/**
* Message types that are always preserved even in truncated history.
* Older assistant/tool/reasoning rows may be omitted until the user clicks “Load all”.
*/
const ALWAYS_KEEP_MESSAGE_TYPES = new Set<DisplayedMessage["type"]>([
"user",
"stream-error",
"compaction-boundary",
"plan-display",
"workspace-init",
]);
interface StreamingContext {
/** Backend timestamp when stream started (Date.now()) */
serverStartTime: number;
/**
* Offset to translate backend timestamps into the renderer clock.
* Computed as: `Date.now() - lastServerTimestamp`.
*/
clockOffsetMs: number;
/** Most recent backend timestamp observed for this stream */
lastServerTimestamp: number;
isComplete: boolean;
isCompacting: boolean;
// Idle compaction is background maintenance, not a user-visible completion, so
// completion notifications must stay suppressed even for the currently selected workspace.
isIdleCompaction: boolean;
hasCompactionContinue: boolean;
// Track the last known queued-follow-up state on the active stream itself so
// background activity completion can still suppress intermediate notifications
// after the workspace loses its live queued-message subscription.
hasQueuedFollowUp: boolean;
isReplay: boolean;
model: string;
routedThroughGateway?: boolean;
routeProvider?: string;
/** Timestamp of first content token (text or reasoning delta) - backend Date.now() */
serverFirstTokenTime: number | null;
/** Accumulated tool execution time in ms */
toolExecutionMs: number;
/** Map of tool call start times for in-progress tool calls (backend timestamps) */
pendingToolStarts: Map<string, number>;
/** Mode (plan/exec) */
mode?: string;
/** Effective thinking level after model policy clamping */
thinkingLevel?: string;
}
interface PendingCompactionRequest {
parsed: CompactionRequestData;
source?: "idle-compaction" | "auto-compaction";
}
/**
* Check if a tool result indicates success (for tools that return { success: boolean })
*/
function hasSuccessResult(result: unknown): boolean {
return (
typeof result === "object" && result !== null && "success" in result && result.success === true
);
}
/**
* Check if a tool result indicates failure.
* Handles both explicit failure ({ success: false }) and implicit failure ({ error: "..." })
*/
function hasFailureResult(result: unknown): boolean {
if (typeof result !== "object" || result === null) return false;
// Explicit failure
if ("success" in result && result.success === false) return true;
// Implicit failure - error field present
if ("error" in result && result.error) return true;
return false;
}
function resolveRouteProvider(
routeProvider: string | undefined,
routedThroughGateway: boolean | undefined
): string | undefined {
return routeProvider ?? (routedThroughGateway === true ? "mux-gateway" : undefined);
}
function normalizeMessageRouteProvider(message: MuxMessage): MuxMessage {
const routeProvider = resolveRouteProvider(
message.metadata?.routeProvider,
message.metadata?.routedThroughGateway
);
if (!message.metadata || routeProvider === message.metadata.routeProvider) {
return message;
}
return {
...message,
metadata: {
...message.metadata,
routeProvider,
},
};
}
/**
* Merge adjacent text/reasoning parts using array accumulation + join().
* Avoids O(n²) string allocations from repeated concatenation.
* Tool parts are preserved as-is between merged text/reasoning runs.
*/
function mergeAdjacentParts(parts: MuxMessage["parts"]): MuxMessage["parts"] {
if (parts.length <= 1) return parts;
const merged: MuxMessage["parts"] = [];
let pendingTexts: string[] = [];
let pendingTextTimestamp: number | undefined;
let pendingReasonings: string[] = [];
let pendingReasoningTimestamp: number | undefined;
const flushText = () => {
if (pendingTexts.length > 0) {
merged.push({
type: "text",
text: pendingTexts.join(""),
timestamp: pendingTextTimestamp,
});
pendingTexts = [];
pendingTextTimestamp = undefined;
}
};
const flushReasoning = () => {
if (pendingReasonings.length > 0) {
merged.push({
type: "reasoning",
text: pendingReasonings.join(""),
timestamp: pendingReasoningTimestamp,
});
pendingReasonings = [];
pendingReasoningTimestamp = undefined;
}
};
for (const part of parts) {
if (part.type === "text") {
flushReasoning();
pendingTexts.push(part.text);
pendingTextTimestamp ??= part.timestamp;
} else if (part.type === "reasoning") {
flushText();
pendingReasonings.push(part.text);
pendingReasoningTimestamp ??= part.timestamp;
} else {
// Tool part - flush and keep as-is
flushText();
flushReasoning();
merged.push(part);
}
}
flushText();
flushReasoning();
return merged;
}
function extractAgentSkillSnapshotBody(snapshotText: string): string | null {
assert(typeof snapshotText === "string", "extractAgentSkillSnapshotBody requires snapshotText");
// Expected format (backend):
// <agent-skill ...>\n{body}\n</agent-skill>
if (!snapshotText.startsWith("<agent-skill")) {
return null;
}
const openTagEnd = snapshotText.indexOf(">\n");
if (openTagEnd === -1) {
return null;
}
const closeTag = "\n</agent-skill>";
const closeTagStart = snapshotText.lastIndexOf(closeTag);
if (closeTagStart === -1) {
return null;
}
const bodyStart = openTagEnd + ">\n".length;
if (closeTagStart < bodyStart) {
return null;
}
// Be strict about trailing content: if we can't confidently extract the body,
// avoid showing a misleading preview.
const trailing = snapshotText.slice(closeTagStart + closeTag.length);
if (trailing.trim().length > 0) {
return null;
}
return snapshotText.slice(bodyStart, closeTagStart);
}
interface AgentSkillSnapshotContent {
sha256?: string;
frontmatterYaml?: string;
body: string;
}
function getTextPartContent(parts: ReadonlyArray<MuxMessage["parts"][number]>): string {
const content: string[] = [];
for (const part of parts) {
if (part.type === "text") {
content.push(part.text);
}
}
return content.join("");
}
function getAgentSkillSnapshotKey(scope: AgentSkillScope, skillName: string): string {
return `${scope}:${skillName}`;
}
function maybeCollectAgentSkillSnapshot(
message: MuxMessage,
snapshots: Map<string, AgentSkillSnapshotContent>
): void {
const snapshotMeta = message.metadata?.agentSkillSnapshot;
if (!snapshotMeta) {
return;
}
const parsed = AgentSkillSnapshotMetadataSchema.safeParse(snapshotMeta);
if (!parsed.success) {
return;
}
const body = extractAgentSkillSnapshotBody(getTextPartContent(message.parts));
if (body === null) {
return;
}
snapshots.set(getAgentSkillSnapshotKey(parsed.data.scope, parsed.data.skillName), {
sha256: parsed.data.sha256,
frontmatterYaml: parsed.data.frontmatterYaml,
body,
});
}
export class StreamingMessageAggregator {
private messages = new Map<string, MuxMessage>();
private activeStreams = new Map<string, StreamingContext>();
// Derived value cache - invalidated as a unit on every mutation.
// Adding a new cached value? Add it here and it will auto-invalidate.
private displayedMessageCache = new Map<
string,
{ version: number; agentSkillSnapshotCacheKey?: string; messages: DisplayedMessage[] }
>();
private messageVersions = new Map<string, number>();
private cache: {
allMessages?: MuxMessage[];
displayedMessages?: DisplayedMessage[];
latestStreamingBashToolCallId?: string | null; // null = computed, none found
} = {};
private recencyTimestamp: number | null = null;
private lastResponseCompletedAt: number | null = null;
/** Oldest historySequence from the server's last replay window.
* Used for reconnect cursors instead of the absolute minimum (which
* includes user-loaded older pages via loadOlderHistory). */
private establishedOldestHistorySequence: number | null = null;
// Delta history for token counting and TPS calculation
private deltaHistory = new Map<string, DeltaRecordStorage>();
// Active stream usage tracking (updated on each usage-delta event)
// Consolidates step-level (context window) and cumulative (cost) usage by messageId
private activeStreamUsage = new Map<
string,
{
// Step-level: this step only (for context window display)
step: { usage: LanguageModelV2Usage; providerMetadata?: Record<string, unknown> };
// Cumulative: sum across all steps (for live cost display)
cumulative: { usage: LanguageModelV2Usage; providerMetadata?: Record<string, unknown> };
}
>();
// Current TODO list (updated when todo_write succeeds)
// Incomplete lists persist across streams and reloads; fully completed lists clear
// once the final stream finishes so stale plans do not linger in the UI.
private currentTodos: TodoItem[] = [];
// Current agent status (updated when status_set is called)
// Unlike todos, this persists after stream completion to show last activity
private agentStatus: AgentStatus | undefined = undefined;
// Loaded skills (updated when agent_skill_read succeeds)
// Persists after stream completion (like agentStatus) to show which skills were loaded
// Keyed by skill name to avoid duplicates
private loadedSkills = new Map<string, LoadedSkill>();
// Cached array for getLoadedSkills() to preserve reference identity for memoization
private loadedSkillsCache: LoadedSkill[] = [];
// Runtime skill load errors (updated when agent_skill_read fails)
// Keyed by skill name; cleared when the skill is later loaded successfully
private skillLoadErrors = new Map<string, SkillLoadError>();
private skillLoadErrorsCache: SkillLoadError[] = [];
// Last URL set via status_set - kept in memory to reuse when later calls omit url
private lastStatusUrl: string | undefined = undefined;
// Whether to disable DOM message capping for this workspace.
// Controlled via the HistoryHiddenMessage “Load all” button.
private showAllMessages = false;
// Workspace ID (used for status persistence)
private readonly workspaceId: string | undefined;
// Workspace init hook state (ephemeral, not persisted to history)
private initState: {
status: "running" | "success" | "error";
hookPath: string;
lines: Array<{ line: string; isError: boolean }>;
exitCode: number | null;
startTime: number;
endTime: number | null;
truncatedLines?: number; // Lines dropped from middle when output exceeded limit
} | null = null;
// Throttle init-output cache invalidation to avoid re-render per line during fast streaming
private initOutputThrottleTimer: ReturnType<typeof setTimeout> | null = null;
private static readonly INIT_OUTPUT_THROTTLE_MS = 100;
// Track when we're waiting for stream-start after user message
// Prevents retry barrier flash during normal send flow
// Stores timestamp of when user message was sent (null = no pending stream)
// IMPORTANT: We intentionally keep this timestamp until a stream actually starts
// (or the user retries) so retry UI/backoff logic doesn't misfire on send failures.
private pendingStreamStartTime: number | null = null;
// Canonical backend-owned stream lifecycle. This distinguishes slow startup from a
// genuinely interrupted/failed turn, including reconnects while PREPARING is still in flight.
private streamLifecycle: StreamLifecycleSnapshot | null = null;
// Last observed stream-abort reason (used to gate auto-retry).
private lastAbortReason: StreamAbortReasonSnapshot | null = null;
// Current pre-stream startup status.
// This begins with runtime readiness for Coder, but also carries generic
// startup breadcrumbs like "Loading tools..." while the request is preparing.
private runtimeStatus: RuntimeStatusEvent | null = null;
// Pending compaction request metadata for the next stream (set when user message arrives).
// Used to infer compaction state before stream-start arrives.
private pendingCompactionRequest: PendingCompactionRequest | null = null;
// Model used for the pending send (set on user message) so the "starting" UI
// reflects one-shot/compaction overrides instead of stale localStorage values.
private pendingStreamModel: string | null = null;
// Last completed stream timing stats (preserved after stream ends for display)
// Unlike activeStreams, this persists until the next stream starts
private lastCompletedStreamStats: {
startTime: number;
endTime: number;
firstTokenTime: number | null;
toolExecutionMs: number;
model: string;
outputTokens: number;
reasoningTokens: number;
streamingMs: number; // Time from first token to end (for accurate tok/s)
mode?: string; // Mode in which this response occurred
} | null = null;
// Optimistic "interrupting" state: set before calling interruptStream
// Shows "interrupting..." in StreamingBarrier until real stream-abort arrives
private interruptingMessageId: string | null = null;
// Session-level timing stats: model -> stats (totals computed on-the-fly)
private sessionTimingStats: Record<
string,
{
totalDurationMs: number;
totalToolExecutionMs: number;
totalTtftMs: number;
ttftCount: number;
responseCount: number;
totalOutputTokens: number;
totalReasoningTokens: number;
totalStreamingMs: number; // Cumulative streaming time (for accurate tok/s)
}
> = {};
// Workspace creation timestamp (used for recency calculation)
// REQUIRED: Backend guarantees every workspace has createdAt via config.ts
private readonly createdAt: string;
// Workspace unarchived timestamp (used for recency calculation to bump restored workspaces)
private unarchivedAt?: string;
// Optional callback for navigating to a workspace (set by parent component)
// Used for notification click handling in browser mode
onNavigateToWorkspace?: (workspaceId: string) => void;
// Optional callback when an assistant response completes (used for "notify on response" feature).
// completedAt is non-null for all final streams and drives read-marking in App.tsx.
// Only non-compaction completions also bump lastResponseCompletedAt (recency).
onResponseComplete?: ResponseCompleteHandler;
constructor(createdAt: string, workspaceId?: string, unarchivedAt?: string) {
this.createdAt = createdAt;
this.workspaceId = workspaceId;
this.unarchivedAt = unarchivedAt;
// Load persisted agent status from localStorage
if (workspaceId) {
const persistedStatus = this.loadPersistedAgentStatus();
if (persistedStatus) {
this.agentStatus = persistedStatus;
this.lastStatusUrl = persistedStatus.url;
}
}
this.updateRecency();
}
/** Update unarchivedAt timestamp (called when workspace is restored from archive) */
setUnarchivedAt(unarchivedAt: string | undefined): void {
this.unarchivedAt = unarchivedAt;
this.updateRecency();
}
/**
* Disable the displayed message cap for this workspace.
* Intended for user-triggered “Load all” UI.
*/
setShowAllMessages(showAllMessages: boolean): void {
assert(typeof showAllMessages === "boolean", "setShowAllMessages requires boolean");
if (this.showAllMessages === showAllMessages) {
return;
}
this.showAllMessages = showAllMessages;
this.invalidateCache();
}
/** Load persisted agent status from localStorage */
private loadPersistedAgentStatus(): AgentStatus | undefined {
if (!this.workspaceId) return undefined;
try {
const stored = localStorage.getItem(getStatusStateKey(this.workspaceId));
if (!stored) return undefined;
const parsed = AgentStatusSchema.safeParse(JSON.parse(stored));
return parsed.success ? parsed.data : undefined;
} catch {
// Ignore localStorage errors or JSON parse failures
}
return undefined;
}
/** Persist agent status to localStorage */
private savePersistedAgentStatus(status: AgentStatus): void {
if (!this.workspaceId) return;
const parsed = AgentStatusSchema.safeParse(status);
if (!parsed.success) return;
try {
localStorage.setItem(getStatusStateKey(this.workspaceId), JSON.stringify(parsed.data));
} catch {
// Ignore localStorage errors
}
}
/** Remove persisted agent status from localStorage */
private clearPersistedAgentStatus(): void {
if (!this.workspaceId) return;
try {
localStorage.removeItem(getStatusStateKey(this.workspaceId));
} catch {
// Ignore localStorage errors
}
}
/** Clear all session timing stats (in-memory only). */
clearSessionTimingStats(): void {
this.sessionTimingStats = {};
this.lastCompletedStreamStats = null;
}
private updateStreamClock(context: StreamingContext, serverTimestamp: number): void {
assert(context, "updateStreamClock requires context");
assert(typeof serverTimestamp === "number", "updateStreamClock requires serverTimestamp");
// Only update if this timestamp is >= the most recent one we've seen.
// During stream replay, older historical parts may be re-emitted out of order.
//
// NOTE: This is a display-oriented clock translation (not true synchronization).
// We refresh the offset whenever we see a newer backend timestamp. If the renderer clock
// drifts significantly during a very long stream, the translated times may be off by a
// small amount, which is acceptable for UI stats.
if (serverTimestamp < context.lastServerTimestamp) {
return;
}
context.lastServerTimestamp = serverTimestamp;
context.clockOffsetMs = Date.now() - serverTimestamp;
}
/**
* Detect the replay→live transition for reconnect streams.
*
* During reconnect, `replayStream()` emits all catch-up events with `replay: true`.
* Once the catch-up phase is over, fresh live deltas arrive without the flag.
* This helper flips `isReplay` to false on the first non-replay event so that
* `streamPresentation.source` correctly transitions to "live" and smoothing
* resumes instead of staying bypassed.
*
* IMPORTANT: Only call from content handlers (handleStreamDelta, handleReasoningDelta).
* Tool events are not buffered by the reconnect relay and can arrive before replay
* text finishes flushing — calling this from tool handlers would prematurely end
* replay phase and reclassify catch-up content as live.
*/
private syncReplayPhase(messageId: string, replay?: boolean): void {
const context = this.activeStreams.get(messageId);
if (context && context.isReplay && replay !== true) {
context.isReplay = false;
}
}
private translateServerTime(context: StreamingContext, serverTimestamp: number): number {
assert(context, "translateServerTime requires context");
assert(typeof serverTimestamp === "number", "translateServerTime requires serverTimestamp");
return serverTimestamp + context.clockOffsetMs;
}
private bumpMessageVersion(messageId: string): void {
const current = this.messageVersions.get(messageId) ?? 0;
this.messageVersions.set(messageId, current + 1);
}
private markMessageDirty(messageId: string): void {
this.bumpMessageVersion(messageId);
this.invalidateCache();
}
private deleteMessage(messageId: string): boolean {
const didDelete = this.messages.delete(messageId);
if (didDelete) {
this.displayedMessageCache.delete(messageId);
this.messageVersions.delete(messageId);
// Clean up token tracking state to prevent memory leaks
this.deltaHistory.delete(messageId);
this.activeStreamUsage.delete(messageId);
}
return didDelete;
}
private invalidateCache(): void {
this.cache = {};
this.updateRecency();
}
/**
* Recompute and cache recency from current messages.
* Called automatically when messages change.
*/
private updateRecency(): void {
const messages = this.getAllMessages();
const messageRecency = computeRecencyTimestamp(messages, this.createdAt, this.unarchivedAt);
const candidates = [messageRecency, this.lastResponseCompletedAt].filter(
(t): t is number => t !== null
);
this.recencyTimestamp = candidates.length > 0 ? Math.max(...candidates) : null;
}
/**
* Get the current recency timestamp (O(1) accessor).
* Used for workspace sorting by last user interaction.
*/
getRecencyTimestamp(): number | null {
return this.recencyTimestamp;
}
/**
* Check if two TODO lists are equal (deep comparison).
* Prevents unnecessary re-renders when todo_write is called with identical content.
*/
private todosEqual(a: TodoItem[], b: TodoItem[]): boolean {
if (a.length !== b.length) return false;
return a.every((todoA, i) => {
const todoB = b[i];
return todoA.content === todoB.content && todoA.status === todoB.status;
});
}
/**
* Get the current TODO list.
* Updated whenever todo_write succeeds.
*/
getCurrentTodos(): TodoItem[] {
return this.currentTodos;
}
/**
* Get the current agent status.
* Updated whenever status_set is called.
* Persists after stream completion (unlike todos).
*/
getAgentStatus(): AgentStatus | undefined {
return this.agentStatus;
}
/**
* Get the list of loaded skills for this workspace.
* Updated whenever agent_skill_read succeeds.
* Persists after stream completion (like agentStatus).
* Returns a stable array reference for memoization (only changes when skills change).
*/
getLoadedSkills(): LoadedSkill[] {
return this.loadedSkillsCache;
}
/**
* Get runtime skill load errors (agent_skill_read failures).
* Errors are cleared for a skill when it later loads successfully.
* Returns a stable array reference for memoization.
*/
getSkillLoadErrors(): SkillLoadError[] {
return this.skillLoadErrorsCache;
}
/**
* Check if there's an executing ask_user_question tool awaiting user input.
* Used to show "Awaiting your input" instead of "streaming..." in the UI.
*/
hasAwaitingUserQuestion(): boolean {
// Only treat the workspace as "awaiting input" when the *latest* displayed
// message is an executing ask_user_question tool.
//
// This avoids false positives from stale historical partials if the user
// continued the chat after skipping/canceling the questions.
const displayed = this.getDisplayedMessages();
const last = displayed[displayed.length - 1];
if (last?.type !== "tool") {
return false;
}
return last.toolName === "ask_user_question" && last.status === "executing";
}
/**
* Extract compaction summary text from a completed assistant message.
* Used when a compaction stream completes to get the summary for history replacement.
* @param messageId The ID of the assistant message to extract text from
* @returns The concatenated text from all text parts, or undefined if message not found
*/
getCompactionSummary(messageId: string): string | undefined {
const message = this.messages.get(messageId);
if (!message) return undefined;
// Concatenate all text parts (ignore tool calls and reasoning)
return getTextPartContent(message.parts);
}
/**
* Clean up stream-scoped state when stream ends (normally or abnormally).
* Called by handleStreamEnd, handleStreamAbort, and handleStreamError.
*
* Clears:
* - Active stream tracking (this.activeStreams)
* - Transient agentStatus (from displayStatus) - restored to persisted value
*
* Preserves:
* - currentTodos (incomplete lists stay visible; handleStreamEnd may clear fully completed lists)
* - lastCompletedStreamStats - timing stats from this stream for display after completion
*/
private cleanupStreamState(messageId: string): void {
// Clear optimistic interrupt flag if this stream was being interrupted.
// This handles cases where streams end normally or with errors (not just abort).
if (this.interruptingMessageId === messageId) {
this.interruptingMessageId = null;
}
// Capture timing stats before removing the stream context
const context = this.activeStreams.get(messageId);
if (context) {
const endTime = Date.now();
const message = this.messages.get(messageId);
// Prefer backend-provided duration (computed in the same clock domain as tool/delta timestamps).
// Fall back to renderer-based timing translated into the renderer clock.
const durationMsFromMetadata = message?.metadata?.duration;
const fallbackStartTime = this.translateServerTime(context, context.serverStartTime);
const fallbackDurationMs = Math.max(0, endTime - fallbackStartTime);
const durationMs =
typeof durationMsFromMetadata === "number" && Number.isFinite(durationMsFromMetadata)
? durationMsFromMetadata
: fallbackDurationMs;
const ttftMs =
context.serverFirstTokenTime !== null
? Math.max(0, context.serverFirstTokenTime - context.serverStartTime)
: null;
// Get output tokens from cumulative usage (if available).
// Fall back to message metadata for abort/error cases where clearTokenState was
// called before cleanupStreamState (e.g., stream abort event handler ordering).
const cumulativeUsage = this.activeStreamUsage.get(messageId)?.cumulative.usage;
const metadataUsage = message?.metadata?.usage;
const outputTokens = cumulativeUsage?.outputTokens ?? metadataUsage?.outputTokens ?? 0;
const reasoningTokens =
cumulativeUsage?.reasoningTokens ?? metadataUsage?.reasoningTokens ?? 0;
// Account for in-progress tool calls (can happen on abort/error)
let totalToolExecutionMs = context.toolExecutionMs;
if (context.pendingToolStarts.size > 0) {
const serverEndTime = context.serverStartTime + durationMs;
for (const toolStartTime of context.pendingToolStarts.values()) {
const toolMs = serverEndTime - toolStartTime;
if (toolMs > 0) {
totalToolExecutionMs += toolMs;
}
}
}
// Streaming duration excludes TTFT and tool execution - used for avg tok/s
const streamingMs = Math.max(0, durationMs - (ttftMs ?? 0) - totalToolExecutionMs);
const mode = message?.metadata?.mode ?? context.mode;
// Store last completed stream stats (include durations anchored in the renderer clock)
const startTime = endTime - durationMs;
const firstTokenTime = ttftMs !== null ? startTime + ttftMs : null;
this.lastCompletedStreamStats = {
startTime,
endTime,
firstTokenTime,
toolExecutionMs: totalToolExecutionMs,
model: context.model,
outputTokens,
reasoningTokens,
streamingMs,
mode,
};
// Use composite key model:mode for per-model+mode stats
// Old data (no mode) will just use model as key, maintaining backward compat
const statsKey = mode ? `${context.model}:${mode}` : context.model;
// Accumulate into per-model stats (totals computed on-the-fly in getSessionTimingStats)
const modelStats = this.sessionTimingStats[statsKey] ?? {
totalDurationMs: 0,
totalToolExecutionMs: 0,
totalTtftMs: 0,
ttftCount: 0,
responseCount: 0,
totalOutputTokens: 0,
totalReasoningTokens: 0,
totalStreamingMs: 0,
};
modelStats.totalDurationMs += durationMs;
modelStats.totalToolExecutionMs += totalToolExecutionMs;
modelStats.responseCount += 1;
modelStats.totalOutputTokens += outputTokens;
modelStats.totalReasoningTokens += reasoningTokens;
modelStats.totalStreamingMs += streamingMs;
if (ttftMs !== null) {
modelStats.totalTtftMs += ttftMs;
modelStats.ttftCount += 1;
}
this.sessionTimingStats[statsKey] = modelStats;
}
this.activeStreams.delete(messageId);
// Restore persisted status - clears transient displayStatus, preserves status_set values
this.agentStatus = this.loadPersistedAgentStatus();
}
/**
* Compact a message's parts array by merging adjacent text/reasoning parts.
* Called when streaming ends to convert thousands of delta parts into single strings.
* This reduces memory from O(deltas) small objects to O(content_types) merged objects.
*/
/**
* Extract the final response text from a message (text after the last tool call).
* Used for notification body content.
*/
private extractFinalResponseText(message: MuxMessage | undefined): string {
if (!message) return "";
const parts = message.parts;
const lastToolIndex = parts.findLastIndex((part) => part.type === "dynamic-tool");
const textPartsAfterTools = lastToolIndex >= 0 ? parts.slice(lastToolIndex + 1) : parts;
return getTextPartContent(textPartsAfterTools).trim();
}
private compactMessageParts(message: MuxMessage): void {
message.parts = mergeAdjacentParts(message.parts);
}
addMessage(message: MuxMessage): void {
const normalizedMessage = normalizeMessageRouteProvider(message);
const existing = this.messages.get(normalizedMessage.id);
if (existing) {
const existingParts = Array.isArray(existing.parts) ? existing.parts.length : 0;
const incomingParts = Array.isArray(normalizedMessage.parts)
? normalizedMessage.parts.length
: 0;
// Prefer richer content when duplicates arrive (e.g., placeholder vs completed message)
if (incomingParts < existingParts) {
return;
}
}
// Just store the message - backend assigns historySequence
this.messages.set(normalizedMessage.id, normalizedMessage);
this.markMessageDirty(normalizedMessage.id);
}
/**
* Remove a message from the aggregator.
* Used for dismissing ephemeral messages like /plan output.
* Rebuilds detected links to remove any that only existed in the removed message.
*/
removeMessage(messageId: string): void {
if (this.deleteMessage(messageId)) {
this.invalidateCache();
}
}
/**
* Load historical messages in batch, preserving their historySequence numbers.
* This is more efficient than calling addMessage() repeatedly.
*
* @param messages - Historical messages to load
* @param hasActiveStream - Whether there's an active stream in buffered events (for reconnection scenario)
* @param opts.mode - "replace" clears existing state first, "append" merges into existing state
* @param opts.skipDerivedState - Skip replaying messages into derived state when appending older history
*/
loadHistoricalMessages(
messages: MuxMessage[],
hasActiveStream = false,
opts?: { mode?: "replace" | "append"; skipDerivedState?: boolean }
): void {
const mode = opts?.mode ?? "replace";
if (mode === "replace") {
// Clear existing state to prevent stale messages from persisting.
this.messages.clear();
this.displayedMessageCache.clear();
this.messageVersions.clear();
this.deltaHistory.clear();
this.activeStreamUsage.clear();
this.loadedSkills.clear();
this.loadedSkillsCache = [];
this.skillLoadErrors.clear();
this.skillLoadErrorsCache = [];
this.lastResponseCompletedAt = null;
// Track the replay window's oldest sequence for reconnect cursors.
let minSeq: number | null = null;
for (const msg of messages) {
const seq = msg.metadata?.historySequence;
if (typeof seq === "number" && (minSeq === null || seq < minSeq)) {
minSeq = seq;
}
}
this.establishedOldestHistorySequence = minSeq;
}
const overwrittenMessageIds: string[] = [];
const appliedMessages: MuxMessage[] = [];
// Add/overwrite messages in the map
for (const message of messages) {
const normalizedMessage = normalizeMessageRouteProvider(message);
const existing = mode === "append" ? this.messages.get(normalizedMessage.id) : undefined;
if (existing) {
const existingParts = Array.isArray(existing.parts) ? existing.parts.length : 0;
const incomingParts = Array.isArray(normalizedMessage.parts)
? normalizedMessage.parts.length
: 0;
// Since-replay can include a stale boundary row for an active stream message while
// richer in-memory parts already exist. Keep the richer message to avoid dropping
// in-flight tool/text parts that filtered replay deltas may not resend.
if (incomingParts < existingParts) {
continue;
}
overwrittenMessageIds.push(normalizedMessage.id);
}
this.messages.set(normalizedMessage.id, normalizedMessage);
appliedMessages.push(normalizedMessage);
}
if (mode === "append") {
for (const messageId of overwrittenMessageIds) {
// Append replay can overwrite an existing message ID (e.g., partial -> finalized).
// Bump per-message version so displayed row caches are invalidated and rebuilt.
this.bumpMessageVersion(messageId);
this.displayedMessageCache.delete(messageId);