Skip to content

Commit b33cb6c

Browse files
authored
Merge branch 'main' into chore/coderabbit-triage-v0.2.10
2 parents 8458358 + 8e3bac3 commit b33cb6c

2 files changed

Lines changed: 317 additions & 17 deletions

File tree

components/backend/websocket/agui_store.go

Lines changed: 103 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -222,8 +222,11 @@ const (
222222
)
223223

224224
// loadEvents reads AG-UI events for a session from the JSONL log.
225-
// For files larger than replayMaxTailBytes, only the tail is read to
226-
// keep reconnect latency bounded (129ms at 1M events vs 9.7s full scan).
225+
// For files larger than replayMaxTailBytes, a head+tail strategy is used:
226+
// the head is scanned for snapshot/lifecycle events (MESSAGES_SNAPSHOT,
227+
// STATE_SNAPSHOT, RUN_STARTED, RUN_FINISHED, RUN_ERROR) while the tail
228+
// provides recent streaming events. This keeps reconnect latency bounded
229+
// while ensuring the frontend always has complete conversation history.
227230
// Automatically triggers legacy migration if the log doesn't exist but
228231
// a pre-AG-UI messages.jsonl file does.
229232
func loadEvents(sessionID string) []map[string]interface{} {
@@ -265,30 +268,26 @@ func loadEvents(sessionID string) []map[string]interface{} {
265268
return scanJSONL(f)
266269
}
267270

268-
// Large file — seek to tail to bound reconnect latency.
269-
log.Printf("AGUI Store: large event log for %s (%.1f MB), reading tail only", sessionID, float64(fileSize)/(1024*1024))
270-
offset := fileSize - replayMaxTailBytes
271-
if _, err := f.Seek(offset, 0); err != nil {
271+
log.Printf("AGUI Store: large event log for %s (%.1f MB), using head+tail read", sessionID, float64(fileSize)/(1024*1024))
272+
273+
headEvents := scanHeadSnapshotEvents(f)
274+
275+
tailOffset := fileSize - replayMaxTailBytes
276+
if _, err := f.Seek(tailOffset, 0); err != nil {
272277
log.Printf("AGUI Store: seek failed for %s: %v, falling back to full read", sessionID, err)
273278
events, _ := readJSONLFile(path)
274279
return events
275280
}
276281

277-
// Read a single byte at the seek position to check if we landed on a
278-
// record boundary ('\n' or start-of-file). If so, the next scanner
279-
// line is a complete record and should not be skipped.
280282
var boundary [1]byte
281283
onBoundary := false
282-
if offset == 0 {
284+
if tailOffset == 0 {
283285
onBoundary = true
284286
} else if n, err := f.Read(boundary[:]); err == nil && n == 1 && boundary[0] == '\n' {
285287
onBoundary = true
286288
}
287-
// If we read one byte that wasn't '\n', we're mid-record — the
288-
// scanner will pick up from this position and the first line will
289-
// be partial (skip it below).
290289

291-
var events []map[string]interface{}
290+
var tailEvents []map[string]interface{}
292291
scanner := bufio.NewScanner(f)
293292
scanner.Buffer(make([]byte, 0, scannerInitialBufferSize), scannerMaxLineSize)
294293
skipFirst := !onBoundary
@@ -297,7 +296,6 @@ func loadEvents(sessionID string) []map[string]interface{} {
297296
if len(line) == 0 {
298297
continue
299298
}
300-
// Skip the first line only if the seek landed mid-record
301299
if skipFirst {
302300
skipFirst = false
303301
continue
@@ -307,12 +305,38 @@ func loadEvents(sessionID string) []map[string]interface{} {
307305
log.Printf("AGUI Store: skipping malformed JSON line in tail scan: %v", err)
308306
continue
309307
}
310-
events = append(events, evt)
308+
tailEvents = append(tailEvents, evt)
311309
}
312310
if err := scanner.Err(); err != nil {
313311
log.Printf("AGUI Store: tail scan error for %s: %v", sessionID, err)
314312
}
315-
return events
313+
314+
if len(headEvents) == 0 {
315+
return tailEvents
316+
}
317+
318+
headTimestamps := make(map[string]bool, len(headEvents))
319+
for _, evt := range headEvents {
320+
if ts, ok := evt["timestamp"].(string); ok {
321+
if evtType, _ := evt["type"].(string); evtType != "" {
322+
headTimestamps[evtType+"|"+ts] = true
323+
}
324+
}
325+
}
326+
327+
merged := make([]map[string]interface{}, 0, len(headEvents)+len(tailEvents))
328+
merged = append(merged, headEvents...)
329+
for _, evt := range tailEvents {
330+
ts, _ := evt["timestamp"].(string)
331+
evtType, _ := evt["type"].(string)
332+
if ts != "" && evtType != "" && headTimestamps[evtType+"|"+ts] {
333+
continue
334+
}
335+
merged = append(merged, evt)
336+
}
337+
338+
log.Printf("AGUI Store: head+tail merge for %s: %d head + %d tail = %d total events", sessionID, len(headEvents), len(tailEvents), len(merged))
339+
return merged
316340
}
317341

318342
// scanJSONL reads all JSONL events from an already-open file handle.
@@ -338,6 +362,68 @@ func scanJSONL(f *os.File) []map[string]interface{} {
338362
return events
339363
}
340364

365+
var headScanEventTypes = map[string]bool{
366+
types.EventTypeMessagesSnapshot: true,
367+
types.EventTypeStateSnapshot: true,
368+
types.EventTypeRunStarted: true,
369+
types.EventTypeRunFinished: true,
370+
types.EventTypeRunError: true,
371+
}
372+
373+
func scanHeadSnapshotEvents(f *os.File) []map[string]interface{} {
374+
if _, err := f.Seek(0, 0); err != nil {
375+
return nil
376+
}
377+
378+
reader := bufio.NewReaderSize(f, scannerInitialBufferSize)
379+
var result []map[string]interface{}
380+
var bytesRead int64
381+
382+
for bytesRead < replayMaxTailBytes {
383+
line, err := reader.ReadBytes('\n')
384+
bytesRead += int64(len(line))
385+
386+
line = bytes.TrimSpace(line)
387+
if len(line) == 0 {
388+
if err != nil {
389+
break
390+
}
391+
continue
392+
}
393+
394+
evtType := fastExtractType(line)
395+
if headScanEventTypes[evtType] {
396+
var evt map[string]interface{}
397+
if jsonErr := json.Unmarshal(line, &evt); jsonErr == nil {
398+
result = append(result, evt)
399+
}
400+
}
401+
402+
if err != nil {
403+
break
404+
}
405+
}
406+
return result
407+
}
408+
409+
func fastExtractType(line []byte) string {
410+
idx := bytes.Index(line, []byte(`"type"`))
411+
if idx < 0 {
412+
return ""
413+
}
414+
rest := line[idx+6:]
415+
start := bytes.IndexByte(rest, '"')
416+
if start < 0 {
417+
return ""
418+
}
419+
rest = rest[start+1:]
420+
end := bytes.IndexByte(rest, '"')
421+
if end < 0 {
422+
return ""
423+
}
424+
return string(rest[:end])
425+
}
426+
341427
// DeriveAgentStatus reads a session's event log and returns the agent
342428
// status derived from the last significant events.
343429
//

components/backend/websocket/agui_store_test.go

Lines changed: 214 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package websocket
33
import (
44
"ambient-code-backend/types"
55
"encoding/json"
6+
"fmt"
67
"os"
78
"path/filepath"
9+
"strings"
810
"testing"
911
)
1012

@@ -449,3 +451,215 @@ func TestLoadEventsForReplay(t *testing.T) {
449451
}
450452
})
451453
}
454+
455+
func TestFastExtractType(t *testing.T) {
456+
tests := []struct {
457+
name string
458+
input string
459+
expected string
460+
}{
461+
{"standard event", `{"type":"RUN_STARTED","runId":"r1"}`, "RUN_STARTED"},
462+
{"type not first field", `{"runId":"r1","type":"RUN_FINISHED","ts":123}`, "RUN_FINISHED"},
463+
{"messages snapshot", `{"type":"MESSAGES_SNAPSHOT","messages":[]}`, "MESSAGES_SNAPSHOT"},
464+
{"no type field", `{"runId":"r1","data":"hello"}`, ""},
465+
{"empty object", `{}`, ""},
466+
{"empty string", ``, ""},
467+
}
468+
469+
for _, tt := range tests {
470+
t.Run(tt.name, func(t *testing.T) {
471+
result := fastExtractType([]byte(tt.input))
472+
if result != tt.expected {
473+
t.Errorf("fastExtractType(%q) = %q, want %q", tt.input, result, tt.expected)
474+
}
475+
})
476+
}
477+
}
478+
479+
func writeLargeEventFile(t *testing.T, path string, headEvents []map[string]interface{}, paddingCount int, tailEvents []map[string]interface{}) {
480+
t.Helper()
481+
f, err := os.Create(path)
482+
if err != nil {
483+
t.Fatalf("Failed to create events file: %v", err)
484+
}
485+
defer f.Close()
486+
487+
for _, evt := range headEvents {
488+
data, err := json.Marshal(evt)
489+
if err != nil {
490+
t.Fatalf("Failed to marshal head event: %v", err)
491+
}
492+
if _, err := f.Write(append(data, '\n')); err != nil {
493+
t.Fatalf("Failed to write head event: %v", err)
494+
}
495+
}
496+
497+
paddingContent := strings.Repeat("x", 200)
498+
for i := 0; i < paddingCount; i++ {
499+
evt := map[string]interface{}{
500+
"type": types.EventTypeTextMessageContent,
501+
"messageId": fmt.Sprintf("msg-pad-%d", i),
502+
"delta": paddingContent,
503+
"timestamp": fmt.Sprintf("2025-01-01T00:01:%02dZ", i%60),
504+
}
505+
data, err := json.Marshal(evt)
506+
if err != nil {
507+
t.Fatalf("Failed to marshal padding event: %v", err)
508+
}
509+
if _, err := f.Write(append(data, '\n')); err != nil {
510+
t.Fatalf("Failed to write padding event: %v", err)
511+
}
512+
}
513+
514+
for _, evt := range tailEvents {
515+
data, err := json.Marshal(evt)
516+
if err != nil {
517+
t.Fatalf("Failed to marshal tail event: %v", err)
518+
}
519+
if _, err := f.Write(append(data, '\n')); err != nil {
520+
t.Fatalf("Failed to write tail event: %v", err)
521+
}
522+
}
523+
}
524+
525+
func TestLoadEventsHeadTailMerge(t *testing.T) {
526+
tmpDir, err := os.MkdirTemp("", "agui-headtail-test-*")
527+
if err != nil {
528+
t.Fatalf("Failed to create temp dir: %v", err)
529+
}
530+
defer os.RemoveAll(tmpDir)
531+
532+
origStateBaseDir := StateBaseDir
533+
StateBaseDir = tmpDir
534+
defer func() { StateBaseDir = origStateBaseDir }()
535+
536+
t.Run("large file preserves head snapshot events", func(t *testing.T) {
537+
sessionID := "test-large-headtail"
538+
sessionsDir := filepath.Join(tmpDir, "sessions", sessionID)
539+
if err := os.MkdirAll(sessionsDir, 0755); err != nil {
540+
t.Fatalf("Failed to create sessions dir: %v", err)
541+
}
542+
543+
eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl")
544+
writeLargeEventFile(t, eventsFile,
545+
[]map[string]interface{}{
546+
{"type": types.EventTypeRunStarted, "runId": "r1", "timestamp": "2025-01-01T00:00:00Z"},
547+
{"type": types.EventTypeMessagesSnapshot, "messages": []interface{}{
548+
map[string]interface{}{"id": "msg1", "role": "user", "content": "Hello"},
549+
}, "timestamp": "2025-01-01T00:00:01Z"},
550+
},
551+
15000,
552+
[]map[string]interface{}{
553+
{"type": types.EventTypeTextMessageContent, "messageId": "msg-tail", "delta": "tail event", "timestamp": "2025-01-01T00:02:00Z"},
554+
},
555+
)
556+
557+
stat, err := os.Stat(eventsFile)
558+
if err != nil {
559+
t.Fatalf("Failed to stat events file: %v", err)
560+
}
561+
if stat.Size() <= replayMaxTailBytes {
562+
t.Fatalf("Test file too small (%d bytes), need > %d to trigger head+tail path", stat.Size(), replayMaxTailBytes)
563+
}
564+
565+
result := loadEvents(sessionID)
566+
if len(result) == 0 {
567+
t.Fatal("Expected events from loadEvents, got none")
568+
}
569+
570+
hasRunStarted := false
571+
hasMessagesSnapshot := false
572+
for _, evt := range result {
573+
evtType, _ := evt["type"].(string)
574+
if evtType == types.EventTypeRunStarted {
575+
hasRunStarted = true
576+
}
577+
if evtType == types.EventTypeMessagesSnapshot {
578+
hasMessagesSnapshot = true
579+
}
580+
}
581+
582+
if !hasRunStarted {
583+
t.Error("Expected RUN_STARTED from head scan to be present in merged result")
584+
}
585+
if !hasMessagesSnapshot {
586+
t.Error("Expected MESSAGES_SNAPSHOT from head scan to be present in merged result")
587+
}
588+
589+
if result[0]["type"] != types.EventTypeRunStarted {
590+
t.Errorf("Expected first event to be RUN_STARTED, got %v", result[0]["type"])
591+
}
592+
if result[1]["type"] != types.EventTypeMessagesSnapshot {
593+
t.Errorf("Expected second event to be MESSAGES_SNAPSHOT, got %v", result[1]["type"])
594+
}
595+
})
596+
597+
t.Run("large file deduplicates overlapping events", func(t *testing.T) {
598+
sessionID := "test-large-dedup"
599+
sessionsDir := filepath.Join(tmpDir, "sessions", sessionID)
600+
if err := os.MkdirAll(sessionsDir, 0755); err != nil {
601+
t.Fatalf("Failed to create sessions dir: %v", err)
602+
}
603+
604+
eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl")
605+
writeLargeEventFile(t, eventsFile,
606+
[]map[string]interface{}{
607+
{"type": types.EventTypeRunStarted, "runId": "r1", "timestamp": "2025-01-01T00:00:00Z"},
608+
},
609+
15000,
610+
[]map[string]interface{}{
611+
{"type": types.EventTypeRunFinished, "runId": "r1", "timestamp": "2025-01-01T00:03:00Z"},
612+
},
613+
)
614+
615+
stat, err := os.Stat(eventsFile)
616+
if err != nil {
617+
t.Fatalf("Failed to stat events file: %v", err)
618+
}
619+
if stat.Size() <= replayMaxTailBytes {
620+
t.Fatalf("Test file too small (%d bytes), need > %d", stat.Size(), replayMaxTailBytes)
621+
}
622+
623+
result := loadEvents(sessionID)
624+
625+
runStartedCount := 0
626+
for _, evt := range result {
627+
if evt["type"] == types.EventTypeRunStarted {
628+
runStartedCount++
629+
}
630+
}
631+
if runStartedCount != 1 {
632+
t.Errorf("Expected exactly 1 RUN_STARTED (no duplicates), got %d", runStartedCount)
633+
}
634+
})
635+
636+
t.Run("large file with no head snapshots returns tail only", func(t *testing.T) {
637+
sessionID := "test-large-no-head-snapshot"
638+
sessionsDir := filepath.Join(tmpDir, "sessions", sessionID)
639+
if err := os.MkdirAll(sessionsDir, 0755); err != nil {
640+
t.Fatalf("Failed to create sessions dir: %v", err)
641+
}
642+
643+
eventsFile := filepath.Join(sessionsDir, "agui-events.jsonl")
644+
writeLargeEventFile(t, eventsFile, nil, 15000, nil)
645+
646+
stat, err := os.Stat(eventsFile)
647+
if err != nil {
648+
t.Fatalf("Failed to stat events file: %v", err)
649+
}
650+
if stat.Size() <= replayMaxTailBytes {
651+
t.Fatalf("Test file too small (%d bytes), need > %d", stat.Size(), replayMaxTailBytes)
652+
}
653+
654+
result := loadEvents(sessionID)
655+
if len(result) == 0 {
656+
t.Fatal("Expected tail events, got none")
657+
}
658+
659+
for _, evt := range result {
660+
if evt["type"] != types.EventTypeTextMessageContent {
661+
t.Errorf("Expected only TEXT_MESSAGE_CONTENT events, got %v", evt["type"])
662+
}
663+
}
664+
})
665+
}

0 commit comments

Comments
 (0)