Skip to content

Commit 70f6917

Browse files
authored
Merge pull request #48 from LAA-Software-Engineering/issue/14-trace-recorder
feat(trace): event types, recorder, and reader for runtime store
2 parents 3500c3b + 77d2a9e commit 70f6917

5 files changed

Lines changed: 201 additions & 0 deletions

File tree

internal/trace/doc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,5 @@
11
// Package trace records structured execution events for logs and debugging.
2+
//
3+
// [Recorder] checks that a run row exists before appending (clear failure when StartRun was
4+
// skipped). Event type strings are defined as Event* constants in events.go (design doc §12.2 I).
25
package trace

internal/trace/events.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package trace
2+
3+
import "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
4+
5+
// Event is one persisted trace row (design doc §14.2); same shape as [state.TraceEvent].
6+
type Event = state.TraceEvent
7+
8+
// Event type names from design doc §12.2 I (Trace recorder).
9+
const (
10+
EventRunStarted = "run.started"
11+
EventRunFinished = "run.finished"
12+
EventStepStarted = "step.started"
13+
EventStepFinished = "step.finished"
14+
EventStepFailed = "step.failed"
15+
EventToolCalled = "tool.called"
16+
EventToolCompleted = "tool.completed"
17+
EventModelCalled = "model.called"
18+
EventModelCompleted = "model.completed"
19+
EventPolicyDenied = "policy.denied"
20+
)

internal/trace/reader.go

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
package trace
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
8+
)
9+
10+
// Reader loads trace events from [state.RuntimeStore] (read side for logs / inspect).
11+
type Reader struct {
12+
RT state.RuntimeStore
13+
}
14+
15+
// NewReader returns a reader backed by rt.
16+
func NewReader(rt state.RuntimeStore) *Reader {
17+
return &Reader{RT: rt}
18+
}
19+
20+
// ListByRunID returns events for runID ordered by seq ascending.
21+
func (r *Reader) ListByRunID(ctx context.Context, runID string) ([]Event, error) {
22+
if r == nil || r.RT == nil {
23+
return nil, errors.New("trace: nil reader or runtime store")
24+
}
25+
return r.RT.ListTraceEventsByRunID(ctx, runID)
26+
}

internal/trace/recorder.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package trace
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"encoding/json"
7+
"errors"
8+
"fmt"
9+
"strings"
10+
"time"
11+
12+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
13+
)
14+
15+
// ErrRunNotFound is returned when appending events for a run_id that has no row in runs.
16+
var ErrRunNotFound = errors.New("trace: run not found")
17+
18+
// Recorder appends trace_events rows via [state.RuntimeStore] (design doc §12.2 I, §14.2).
19+
type Recorder struct {
20+
RT state.RuntimeStore
21+
Clock func() time.Time
22+
}
23+
24+
// NewRecorder returns a recorder backed by rt. rt must not be nil when Append is called.
25+
func NewRecorder(rt state.RuntimeStore) *Recorder {
26+
return &Recorder{RT: rt}
27+
}
28+
29+
func (r *Recorder) now() time.Time {
30+
if r != nil && r.Clock != nil {
31+
return r.Clock()
32+
}
33+
return time.Now().UTC()
34+
}
35+
36+
// Append verifies the run exists, serializes data to JSON for data_json, then appends one event.
37+
// stepID may be empty for run-level events.
38+
func (r *Recorder) Append(ctx context.Context, runID, stepID, typ string, data map[string]any) (seq int64, err error) {
39+
if r == nil || r.RT == nil {
40+
return 0, errors.New("trace: nil recorder or runtime store")
41+
}
42+
runID = strings.TrimSpace(runID)
43+
if runID == "" {
44+
return 0, errors.New("trace: empty run_id")
45+
}
46+
typ = strings.TrimSpace(typ)
47+
if typ == "" {
48+
return 0, errors.New("trace: empty event type")
49+
}
50+
51+
if _, err := r.RT.GetRun(ctx, runID); err != nil {
52+
if errors.Is(err, sql.ErrNoRows) {
53+
return 0, fmt.Errorf("trace: cannot append event for run %q: %w", runID, ErrRunNotFound)
54+
}
55+
return 0, fmt.Errorf("trace: get run %q: %w", runID, err)
56+
}
57+
58+
dataJSON := "{}"
59+
if len(data) > 0 {
60+
b, err := json.Marshal(data)
61+
if err != nil {
62+
return 0, fmt.Errorf("trace: marshal event data: %w", err)
63+
}
64+
dataJSON = string(b)
65+
}
66+
67+
return r.RT.AppendTraceEvent(ctx, runID, r.now(), typ, strings.TrimSpace(stepID), dataJSON)
68+
}

internal/trace/recorder_test.go

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
package trace
2+
3+
import (
4+
"context"
5+
"errors"
6+
"path/filepath"
7+
"strings"
8+
"testing"
9+
"time"
10+
11+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
12+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite"
13+
)
14+
15+
func TestRecorder_Append_increasingSeqPerRunID(t *testing.T) {
16+
ctx := context.Background()
17+
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "trace.db"))
18+
if err != nil {
19+
t.Fatal(err)
20+
}
21+
t.Cleanup(func() { _ = st.Close() })
22+
23+
started := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC)
24+
if err := st.StartRun(ctx, state.Run{
25+
RunID: "run-a",
26+
WorkflowName: "wf",
27+
Env: "dev",
28+
Status: "running",
29+
StartedAt: started,
30+
InputJSON: `{}`,
31+
TotalCostUSD: 0,
32+
}); err != nil {
33+
t.Fatal(err)
34+
}
35+
36+
fixed := started.Add(time.Minute)
37+
rec := NewRecorder(st)
38+
rec.Clock = func() time.Time { return fixed }
39+
40+
seq1, err := rec.Append(ctx, "run-a", "s1", EventStepStarted, map[string]any{"x": 1})
41+
if err != nil {
42+
t.Fatal(err)
43+
}
44+
seq2, err := rec.Append(ctx, "run-a", "s1", EventStepFinished, map[string]any{"ok": true})
45+
if err != nil {
46+
t.Fatal(err)
47+
}
48+
if seq1 != 1 || seq2 != 2 {
49+
t.Fatalf("seq = %d, %d want 1, 2", seq1, seq2)
50+
}
51+
52+
rd := NewReader(st)
53+
events, err := rd.ListByRunID(ctx, "run-a")
54+
if err != nil {
55+
t.Fatal(err)
56+
}
57+
if len(events) != 2 || events[0].Seq != 1 || events[1].Seq != 2 {
58+
t.Fatalf("events = %+v", events)
59+
}
60+
if events[0].DataJSON != `{"x":1}` || events[1].DataJSON != `{"ok":true}` {
61+
t.Fatalf("data json = %q, %q", events[0].DataJSON, events[1].DataJSON)
62+
}
63+
}
64+
65+
func TestRecorder_Append_missingRunFailsWithErrRunNotFound(t *testing.T) {
66+
ctx := context.Background()
67+
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "trace2.db"))
68+
if err != nil {
69+
t.Fatal(err)
70+
}
71+
t.Cleanup(func() { _ = st.Close() })
72+
73+
rec := NewRecorder(st)
74+
_, err = rec.Append(ctx, "missing-run", "", EventRunStarted, nil)
75+
if err == nil {
76+
t.Fatal("expected error")
77+
}
78+
if !errors.Is(err, ErrRunNotFound) {
79+
t.Fatalf("want ErrRunNotFound in chain, got %v", err)
80+
}
81+
if !strings.Contains(err.Error(), "missing-run") {
82+
t.Fatalf("expected clear error mentioning run id, got: %v", err)
83+
}
84+
}

0 commit comments

Comments
 (0)