Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions internal/trace/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
// Package trace records structured execution events for logs and debugging.
//
// [Recorder] checks that a run row exists before appending (clear failure when StartRun was
// skipped). Event type strings are defined as Event* constants in events.go (design doc §12.2 I).
package trace
20 changes: 20 additions & 0 deletions internal/trace/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package trace

import "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"

// Event is one persisted trace row (design doc §14.2); same shape as [state.TraceEvent].
type Event = state.TraceEvent

// Event type names from design doc §12.2 I (Trace recorder).
const (
EventRunStarted = "run.started"
EventRunFinished = "run.finished"
EventStepStarted = "step.started"
EventStepFinished = "step.finished"
EventStepFailed = "step.failed"
EventToolCalled = "tool.called"
EventToolCompleted = "tool.completed"
EventModelCalled = "model.called"
EventModelCompleted = "model.completed"
EventPolicyDenied = "policy.denied"
)
26 changes: 26 additions & 0 deletions internal/trace/reader.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package trace

import (
"context"
"errors"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
)

// Reader loads trace events from [state.RuntimeStore] (read side for logs / inspect).
type Reader struct {
RT state.RuntimeStore
}

// NewReader returns a reader backed by rt.
func NewReader(rt state.RuntimeStore) *Reader {
return &Reader{RT: rt}
}

// ListByRunID returns events for runID ordered by seq ascending.
func (r *Reader) ListByRunID(ctx context.Context, runID string) ([]Event, error) {
if r == nil || r.RT == nil {
return nil, errors.New("trace: nil reader or runtime store")
}
return r.RT.ListTraceEventsByRunID(ctx, runID)
}
68 changes: 68 additions & 0 deletions internal/trace/recorder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package trace

import (
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"strings"
"time"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
)

// ErrRunNotFound is returned when appending events for a run_id that has no row in runs.
var ErrRunNotFound = errors.New("trace: run not found")

// Recorder appends trace_events rows via [state.RuntimeStore] (design doc §12.2 I, §14.2).
type Recorder struct {
RT state.RuntimeStore
Clock func() time.Time
}

// NewRecorder returns a recorder backed by rt. rt must not be nil when Append is called.
func NewRecorder(rt state.RuntimeStore) *Recorder {
return &Recorder{RT: rt}
}

func (r *Recorder) now() time.Time {
if r != nil && r.Clock != nil {
return r.Clock()
}
return time.Now().UTC()
}

// Append verifies the run exists, serializes data to JSON for data_json, then appends one event.
// stepID may be empty for run-level events.
func (r *Recorder) Append(ctx context.Context, runID, stepID, typ string, data map[string]any) (seq int64, err error) {
if r == nil || r.RT == nil {
return 0, errors.New("trace: nil recorder or runtime store")
}
runID = strings.TrimSpace(runID)
if runID == "" {
return 0, errors.New("trace: empty run_id")
}
typ = strings.TrimSpace(typ)
if typ == "" {
return 0, errors.New("trace: empty event type")
}

if _, err := r.RT.GetRun(ctx, runID); err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, fmt.Errorf("trace: cannot append event for run %q: %w", runID, ErrRunNotFound)
}
return 0, fmt.Errorf("trace: get run %q: %w", runID, err)
}

dataJSON := "{}"
if len(data) > 0 {
b, err := json.Marshal(data)
if err != nil {
return 0, fmt.Errorf("trace: marshal event data: %w", err)
}
dataJSON = string(b)
}

return r.RT.AppendTraceEvent(ctx, runID, r.now(), typ, strings.TrimSpace(stepID), dataJSON)
}
84 changes: 84 additions & 0 deletions internal/trace/recorder_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
package trace

import (
"context"
"errors"
"path/filepath"
"strings"
"testing"
"time"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite"
)

func TestRecorder_Append_increasingSeqPerRunID(t *testing.T) {
ctx := context.Background()
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "trace.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

started := time.Date(2026, 4, 11, 9, 0, 0, 0, time.UTC)
if err := st.StartRun(ctx, state.Run{
RunID: "run-a",
WorkflowName: "wf",
Env: "dev",
Status: "running",
StartedAt: started,
InputJSON: `{}`,
TotalCostUSD: 0,
}); err != nil {
t.Fatal(err)
}

fixed := started.Add(time.Minute)
rec := NewRecorder(st)
rec.Clock = func() time.Time { return fixed }

seq1, err := rec.Append(ctx, "run-a", "s1", EventStepStarted, map[string]any{"x": 1})
if err != nil {
t.Fatal(err)
}
seq2, err := rec.Append(ctx, "run-a", "s1", EventStepFinished, map[string]any{"ok": true})
if err != nil {
t.Fatal(err)
}
if seq1 != 1 || seq2 != 2 {
t.Fatalf("seq = %d, %d want 1, 2", seq1, seq2)
}

rd := NewReader(st)
events, err := rd.ListByRunID(ctx, "run-a")
if err != nil {
t.Fatal(err)
}
if len(events) != 2 || events[0].Seq != 1 || events[1].Seq != 2 {
t.Fatalf("events = %+v", events)
}
if events[0].DataJSON != `{"x":1}` || events[1].DataJSON != `{"ok":true}` {
t.Fatalf("data json = %q, %q", events[0].DataJSON, events[1].DataJSON)
}
}

func TestRecorder_Append_missingRunFailsWithErrRunNotFound(t *testing.T) {
ctx := context.Background()
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "trace2.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

rec := NewRecorder(st)
_, err = rec.Append(ctx, "missing-run", "", EventRunStarted, nil)
if err == nil {
t.Fatal("expected error")
}
if !errors.Is(err, ErrRunNotFound) {
t.Fatalf("want ErrRunNotFound in chain, got %v", err)
}
if !strings.Contains(err.Error(), "missing-run") {
t.Fatalf("expected clear error mentioning run id, got: %v", err)
}
}
Loading