Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ agk trace list
agk trace show <trace-id>
```

**Live Tail**
Follow a run's spans in real time (great alongside `agk run` in another terminal).
```bash
agk trace watch # follow the live run, or wait for the next one
```

**Visual Flowchart (Mermaid)**
Generate a diagram of the agent's execution path.
```bash
Expand All @@ -203,6 +209,7 @@ agk trace mermaid > trace_flow.md
| `trace list` | List all captured trace runs. |
| `trace show` | Display summary of a specific run. |
| `trace view` | Open the interactive TUI trace explorer. |
| `trace watch` | Live-tail a run's spans as it executes. |
| `trace mermaid` | Generate Mermaid flowchart of trace execution. |

---
Expand Down
247 changes: 247 additions & 0 deletions cmd/trace_watch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,247 @@
package cmd

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"time"

"github.com/fatih/color"
"github.com/spf13/cobra"
)

// watchCmd live-tails a trace run as it executes, printing each span as it lands.
var watchCmd = &cobra.Command{
Use: "watch [run-id]",
Short: "Live-tail a trace run as it executes",
Long: `Live-tail a trace run, printing each span as it is appended to trace.jsonl.

With no run ID, watch follows the currently-running run if one is live, otherwise
it waits for the next run to start (e.g. from 'agk run' in another terminal). The
watch ends when the run completes (its manifest.json is written) or on Ctrl+C.

Examples:
agk trace watch # follow the live run, or wait for the next one
agk trace watch <run-id> # follow a specific run`,
Args: cobra.MaximumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
runID := ""
if len(args) > 0 {
runID = args[0]
}
return watchTrace(runID)
},
}

func init() {
traceCmd.AddCommand(watchCmd)
}

func watchTrace(runID string) error {
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer stop()

resolved, err := resolveWatchRun(ctx, runsDirName, runID)
if err != nil {
return err
}
if resolved == "" {
return nil // cancelled while waiting
}

return tailRun(ctx, filepath.Join(runsDirName, resolved), resolved)
}

// resolveWatchRun picks the run to follow: an explicit ID, the live latest run, or
// (when the latest run has already completed) the next run to appear.
func resolveWatchRun(ctx context.Context, runsDir, runID string) (string, error) {
if runID != "" {
if _, err := os.Stat(filepath.Join(runsDir, runID)); err != nil {
return "", fmt.Errorf("trace not found: %s", runID)
}
return runID, nil
}

// A latest run with no manifest yet is still live β€” follow it.
if latest := getLatestRunID(); latest != "" {
if _, err := os.Stat(filepath.Join(runsDir, latest, "manifest.json")); err != nil {
return latest, nil
}
}

fmt.Println("⏳ Waiting for a new run to start (e.g. `agk run`)... Ctrl+C to stop")
before := currentRunSet(runsDir)

ticker := time.NewTicker(300 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return "", nil
case <-ticker.C:
for name := range currentRunSet(runsDir) {
if !before[name] {
return name, nil
}
}
}
}
}

func currentRunSet(runsDir string) map[string]bool {
set := map[string]bool{}
entries, err := os.ReadDir(runsDir)
if err != nil {
return set
}
for _, e := range entries {
if e.IsDir() {
set[e.Name()] = true
}
}
return set
}

func tailRun(ctx context.Context, runPath, runID string) error {
tracePath := filepath.Join(runPath, "trace.jsonl")
manifestPath := filepath.Join(runPath, "manifest.json")

color.Cyan("πŸ‘€ Watching %s (Ctrl+C to stop)", runID)
fmt.Println(strings.Repeat("─", 64))

tailer := &traceTailer{path: tracePath}
printNew := func() {
for _, line := range tailer.poll() {
var span map[string]interface{}
if err := json.Unmarshal([]byte(line), &span); err == nil {
fmt.Println(formatWatchSpan(span))
}
}
}

ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
printNew()
fmt.Println("\nπŸ‘‹ Stopped")
return nil
case <-ticker.C:
printNew()
if _, err := os.Stat(manifestPath); err == nil {
printNew() // final drain
printWatchSummary(runPath, runID)
return nil
}
}
}
}

func printWatchSummary(runPath, runID string) {
fmt.Println(strings.Repeat("─", 64))
if m, err := readManifest(runPath); err == nil {
color.Green("βœ… Run complete β€” %d spans, %d LLM calls, %d tokens, $%.4f, %.2fs",
m.SpanCount, m.LLMCalls, m.TotalTokens, m.EstimatedCost, m.Duration)
}
fmt.Printf("β†’ Inspect: %s\n", color.CyanString("agk trace view %s", runID))
}

// traceTailer incrementally reads new complete lines appended to a JSONL file,
// buffering any trailing partial line until it is finished.
type traceTailer struct {
path string
offset int64
buf []byte
}

func (t *traceTailer) poll() []string {
f, err := os.Open(t.path)
if err != nil {
return nil
}
defer func() { _ = f.Close() }()

if _, err := f.Seek(t.offset, io.SeekStart); err != nil {
return nil
}
data, err := io.ReadAll(f)
if err != nil {
return nil
}
t.offset += int64(len(data))
t.buf = append(t.buf, data...)

var lines []string
for {
i := bytes.IndexByte(t.buf, '\n')
if i < 0 {
break
}
if line := strings.TrimSpace(string(t.buf[:i])); line != "" {
lines = append(lines, line)
}
t.buf = t.buf[i+1:]
}
return lines
}

func formatWatchSpan(span map[string]interface{}) string {
name, _ := span["Name"].(string)
start := parseSpanTime(span["StartTime"])
end := parseSpanTime(span["EndTime"])

var durMs int64
if !start.IsZero() && !end.IsZero() {
durMs = end.Sub(start).Milliseconds()
}
ts := "--:--:--"
if !start.IsZero() {
ts = start.Format("15:04:05")
}

return fmt.Sprintf("%s %s %-44s %6dms", ts, watchIcon(strings.ToLower(name)), truncateName(name, 44), durMs)
}

func watchIcon(lowerName string) string {
switch {
case strings.Contains(lowerName, "llm"):
return "πŸ€–"
case strings.Contains(lowerName, "tool"):
return "πŸ”§"
case strings.Contains(lowerName, "workflow"):
return "πŸ”€"
case strings.Contains(lowerName, "agent"):
return "πŸ‘€"
default:
return "β€’"
}
}

func truncateName(s string, max int) string {
if len(s) <= max {
return s
}
if max <= 1 {
return s[:max]
}
return s[:max-1] + "…"
}

func parseSpanTime(v interface{}) time.Time {
s, ok := v.(string)
if !ok || s == "" {
return time.Time{}
}
if t, err := time.Parse(time.RFC3339, s); err == nil {
return t
}
return time.Time{}
}
85 changes: 85 additions & 0 deletions cmd/trace_watch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package cmd

import (
"os"
"path/filepath"
"strings"
"testing"
)

func TestTraceTailer(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "trace.jsonl")
if err := os.WriteFile(path, []byte("line1\nline2\n"), 0o644); err != nil {
t.Fatal(err)
}

tl := &traceTailer{path: path}

// First poll returns both complete lines.
if got := tl.poll(); len(got) != 2 || got[0] != "line1" || got[1] != "line2" {
t.Fatalf("first poll = %v, want [line1 line2]", got)
}

// Nothing new yet.
if got := tl.poll(); len(got) != 0 {
t.Fatalf("expected no new lines, got %v", got)
}

// A partial line (no newline) is buffered, not emitted.
appendString(t, path, "partial")
if got := tl.poll(); len(got) != 0 {
t.Fatalf("partial line should buffer, got %v", got)
}

// Completing the line emits it whole.
appendString(t, path, "-done\n")
if got := tl.poll(); len(got) != 1 || got[0] != "partial-done" {
t.Fatalf("poll = %v, want [partial-done]", got)
}
}

func TestFormatWatchSpan(t *testing.T) {
span := map[string]interface{}{
"Name": "agk.llm.generate",
"StartTime": "2026-06-22T10:00:00Z",
"EndTime": "2026-06-22T10:00:01Z",
}
line := formatWatchSpan(span)
if !strings.Contains(line, "agk.llm.generate") {
t.Errorf("missing span name: %q", line)
}
if !strings.Contains(line, "1000ms") {
t.Errorf("expected 1000ms duration: %q", line)
}
if !strings.Contains(line, "πŸ€–") {
t.Errorf("expected LLM icon: %q", line)
}
}

func TestWatchIconAndTruncate(t *testing.T) {
if watchIcon("agk.tool.call") != "πŸ”§" {
t.Error("tool icon mismatch")
}
if watchIcon("agk.workflow.run") != "πŸ”€" {
t.Error("workflow icon mismatch")
}
if got := truncateName("abcdefghij", 5); got != "abcd…" {
t.Errorf("truncateName = %q, want abcd…", got)
}
if got := truncateName("short", 10); got != "short" {
t.Errorf("truncateName should not change short strings, got %q", got)
}
}

func appendString(t *testing.T, path, s string) {
t.Helper()
f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY, 0o644)
if err != nil {
t.Fatal(err)
}
defer func() { _ = f.Close() }()
if _, err := f.WriteString(s); err != nil {
t.Fatal(err)
}
}
Loading