Skip to content

Commit dfd75d4

Browse files
authored
Merge pull request #42 from LAA-Software-Engineering/issue/10-runtime-trace-storage
feat(state/sqlite): runtime/trace tables and Store API (#10)
2 parents 38971ab + 8ee759e commit dfd75d4

6 files changed

Lines changed: 372 additions & 2 deletions

File tree

internal/state/models.go

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,40 @@ type AppliedProject struct {
1919
Version string
2020
AppliedAt time.Time
2121
}
22+
23+
// Run is one workflow execution row in runs (design doc §14.2).
24+
type Run struct {
25+
RunID string
26+
WorkflowName string
27+
Env string
28+
Status string
29+
StartedAt time.Time
30+
FinishedAt *time.Time
31+
InputJSON string
32+
OutputJSON string
33+
ErrorText string
34+
TotalCostUSD float64
35+
}
36+
37+
// RunStep is one row in run_steps (design doc §14.2).
38+
type RunStep struct {
39+
RunID string
40+
StepID string
41+
Status string
42+
StartedAt *time.Time
43+
FinishedAt *time.Time
44+
InputJSON string
45+
OutputJSON string
46+
ErrorText string
47+
CostUSD float64
48+
}
49+
50+
// TraceEvent is one append-only row in trace_events (design doc §14.2).
51+
type TraceEvent struct {
52+
RunID string
53+
Seq int64
54+
Timestamp time.Time
55+
Type string
56+
StepID string
57+
DataJSON string
58+
}

internal/state/sqlite/doc.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,8 @@
1-
// Package sqlite implements deployment state storage in SQLite (design doc §14.1).
1+
// Package sqlite implements deployment and runtime/trace state in SQLite (design doc §§14.1–14.2).
22
// Use [Open] with a file DSN; [Migrate] runs versioned SQL from /migrations/sqlite.
3+
//
4+
// Consistency: [Open] runs PRAGMA foreign_keys=ON after the first connection is established
5+
// (with MaxOpenConns(1), that matches the single pooled connection). SQLite then enforces FOREIGN
6+
// KEY from run_steps and trace_events to runs (including ON DELETE CASCADE). Callers may still
7+
// validate run_id in the application for clearer errors; the pragma is the DB-level guarantee.
38
package sqlite

internal/state/sqlite/runtime.go

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
package sqlite
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"fmt"
7+
"time"
8+
9+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
10+
)
11+
12+
// StartRun inserts a new row in runs (design doc §14.2).
13+
func (s *Store) StartRun(ctx context.Context, r state.Run) error {
14+
in := r.InputJSON
15+
if in == "" {
16+
in = "{}"
17+
}
18+
at := r.StartedAt.UTC().Format(time.RFC3339Nano)
19+
_, err := s.db.ExecContext(ctx, `
20+
INSERT INTO runs (run_id, workflow_name, env, status, started_at, input_json, total_cost_usd)
21+
VALUES (?, ?, ?, ?, ?, ?, ?)
22+
`, r.RunID, r.WorkflowName, r.Env, r.Status, at, in, r.TotalCostUSD)
23+
return err
24+
}
25+
26+
// FinishRun updates status, finished_at, output_json, error_text, and total_cost_usd.
27+
func (s *Store) FinishRun(ctx context.Context, runID, status string, finishedAt time.Time, outputJSON, errorText string, totalCostUSD float64) error {
28+
fin := finishedAt.UTC().Format(time.RFC3339Nano)
29+
var out, et any
30+
if outputJSON != "" {
31+
out = outputJSON
32+
}
33+
if errorText != "" {
34+
et = errorText
35+
}
36+
res, err := s.db.ExecContext(ctx, `
37+
UPDATE runs SET status = ?, finished_at = ?, output_json = ?, error_text = ?, total_cost_usd = ?
38+
WHERE run_id = ?
39+
`, status, fin, out, et, totalCostUSD, runID)
40+
if err != nil {
41+
return err
42+
}
43+
n, err := res.RowsAffected()
44+
if err != nil {
45+
return err
46+
}
47+
if n == 0 {
48+
return sql.ErrNoRows
49+
}
50+
return nil
51+
}
52+
53+
// UpsertRunStep inserts or updates a step row for (run_id, step_id).
54+
func (s *Store) UpsertRunStep(ctx context.Context, st state.RunStep) error {
55+
var started, finished any
56+
if st.StartedAt != nil {
57+
started = st.StartedAt.UTC().Format(time.RFC3339Nano)
58+
}
59+
if st.FinishedAt != nil {
60+
finished = st.FinishedAt.UTC().Format(time.RFC3339Nano)
61+
}
62+
var inJ, outJ, errT any
63+
if st.InputJSON != "" {
64+
inJ = st.InputJSON
65+
}
66+
if st.OutputJSON != "" {
67+
outJ = st.OutputJSON
68+
}
69+
if st.ErrorText != "" {
70+
errT = st.ErrorText
71+
}
72+
_, err := s.db.ExecContext(ctx, `
73+
INSERT INTO run_steps (run_id, step_id, status, started_at, finished_at, input_json, output_json, error_text, cost_usd)
74+
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
75+
ON CONFLICT(run_id, step_id) DO UPDATE SET
76+
status = excluded.status,
77+
started_at = excluded.started_at,
78+
finished_at = excluded.finished_at,
79+
input_json = excluded.input_json,
80+
output_json = excluded.output_json,
81+
error_text = excluded.error_text,
82+
cost_usd = excluded.cost_usd
83+
`, st.RunID, st.StepID, st.Status, started, finished, inJ, outJ, errT, st.CostUSD)
84+
return err
85+
}
86+
87+
// AppendTraceEvent appends one trace row with the next monotonic seq for run_id.
88+
func (s *Store) AppendTraceEvent(ctx context.Context, runID string, ts time.Time, eventType string, stepID string, dataJSON string) (seq int64, err error) {
89+
dj := dataJSON
90+
if dj == "" {
91+
dj = "{}"
92+
}
93+
var sid any
94+
if stepID != "" {
95+
sid = stepID
96+
}
97+
tss := ts.UTC().Format(time.RFC3339Nano)
98+
99+
tx, err := s.db.BeginTx(ctx, nil)
100+
if err != nil {
101+
return 0, err
102+
}
103+
defer func() { _ = tx.Rollback() }()
104+
105+
if err := tx.QueryRowContext(ctx, `SELECT IFNULL(MAX(seq), 0) + 1 FROM trace_events WHERE run_id = ?`, runID).Scan(&seq); err != nil {
106+
return 0, err
107+
}
108+
if _, err := tx.ExecContext(ctx, `
109+
INSERT INTO trace_events (run_id, seq, timestamp, type, step_id, data_json)
110+
VALUES (?, ?, ?, ?, ?, ?)
111+
`, runID, seq, tss, eventType, sid, dj); err != nil {
112+
return 0, err
113+
}
114+
if err := tx.Commit(); err != nil {
115+
return 0, err
116+
}
117+
return seq, nil
118+
}
119+
120+
// GetRun returns the run row or sql.ErrNoRows.
121+
func (s *Store) GetRun(ctx context.Context, runID string) (*state.Run, error) {
122+
row := s.db.QueryRowContext(ctx, `
123+
SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd
124+
FROM runs
125+
WHERE run_id = ?
126+
`, runID)
127+
var r state.Run
128+
var started, finished sql.NullString
129+
var outJ, errT sql.NullString
130+
if err := row.Scan(&r.RunID, &r.WorkflowName, &r.Env, &r.Status, &started, &finished, &r.InputJSON, &outJ, &errT, &r.TotalCostUSD); err != nil {
131+
return nil, err
132+
}
133+
st, err := parseSQLiteTime(started.String)
134+
if err != nil {
135+
return nil, fmt.Errorf("started_at: %w", err)
136+
}
137+
r.StartedAt = st
138+
if finished.Valid && finished.String != "" {
139+
ft, err := parseSQLiteTime(finished.String)
140+
if err != nil {
141+
return nil, fmt.Errorf("finished_at: %w", err)
142+
}
143+
r.FinishedAt = &ft
144+
}
145+
if outJ.Valid {
146+
r.OutputJSON = outJ.String
147+
}
148+
if errT.Valid {
149+
r.ErrorText = errT.String
150+
}
151+
return &r, nil
152+
}
153+
154+
// ListTraceEventsByRunID returns trace rows for run_id ordered by seq ascending.
155+
func (s *Store) ListTraceEventsByRunID(ctx context.Context, runID string) ([]state.TraceEvent, error) {
156+
rows, err := s.db.QueryContext(ctx, `
157+
SELECT run_id, seq, timestamp, type, step_id, data_json
158+
FROM trace_events
159+
WHERE run_id = ?
160+
ORDER BY seq ASC
161+
`, runID)
162+
if err != nil {
163+
return nil, err
164+
}
165+
defer rows.Close()
166+
167+
var out []state.TraceEvent
168+
for rows.Next() {
169+
var e state.TraceEvent
170+
var ts string
171+
var step sql.NullString
172+
if err := rows.Scan(&e.RunID, &e.Seq, &ts, &e.Type, &step, &e.DataJSON); err != nil {
173+
return nil, err
174+
}
175+
t, err := parseSQLiteTime(ts)
176+
if err != nil {
177+
return nil, fmt.Errorf("timestamp: %w", err)
178+
}
179+
e.Timestamp = t
180+
if step.Valid {
181+
e.StepID = step.String
182+
}
183+
out = append(out, e)
184+
}
185+
return out, rows.Err()
186+
}

internal/state/sqlite/store.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
_ "modernc.org/sqlite" // register "sqlite" driver
1212
)
1313

14-
// Store persists deployment state (design doc §14.1) in SQLite.
14+
// Store persists deployment state (§14.1) and runtime/trace state (§14.2) in SQLite.
1515
type Store struct {
1616
db *sql.DB
1717
}
@@ -29,6 +29,12 @@ func Open(ctx context.Context, dsn string) (*Store, error) {
2929
_ = db.Close()
3030
return nil, fmt.Errorf("ping sqlite: %w", err)
3131
}
32+
// SQLite disables FK checks by default; enforce per connection. With MaxOpenConns(1) this
33+
// covers the pooled connection used for all statements on this Store.
34+
if _, err := db.ExecContext(ctx, `PRAGMA foreign_keys=ON`); err != nil {
35+
_ = db.Close()
36+
return nil, fmt.Errorf("sqlite foreign_keys: %w", err)
37+
}
3238
if err := Migrate(ctx, db); err != nil {
3339
_ = db.Close()
3440
return nil, err

internal/state/sqlite/store_test.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,98 @@ func TestOpen_twiceSameFile(t *testing.T) {
103103
t.Cleanup(func() { _ = s2.Close() })
104104
}
105105

106+
func TestRuntime_insertRunEventsQueryByRunID(t *testing.T) {
107+
ctx := context.Background()
108+
st, err := Open(ctx, filepath.Join(t.TempDir(), "runtime.db"))
109+
if err != nil {
110+
t.Fatal(err)
111+
}
112+
t.Cleanup(func() { _ = st.Close() })
113+
114+
started := time.Date(2026, 4, 11, 10, 0, 0, 0, time.UTC)
115+
run := state.Run{
116+
RunID: "run-1",
117+
WorkflowName: "wf-a",
118+
Env: "dev",
119+
Status: "running",
120+
StartedAt: started,
121+
InputJSON: `{"k":1}`,
122+
TotalCostUSD: 0,
123+
}
124+
if err := st.StartRun(ctx, run); err != nil {
125+
t.Fatal(err)
126+
}
127+
128+
stepStarted := started.Add(time.Minute)
129+
if err := st.UpsertRunStep(ctx, state.RunStep{
130+
RunID: run.RunID,
131+
StepID: "s1",
132+
Status: "ok",
133+
StartedAt: &stepStarted,
134+
InputJSON: `{}`,
135+
CostUSD: 0.01,
136+
}); err != nil {
137+
t.Fatal(err)
138+
}
139+
140+
ts1 := started.Add(2 * time.Minute)
141+
seq1, err := st.AppendTraceEvent(ctx, run.RunID, ts1, "log", "", `{"m":"a"}`)
142+
if err != nil {
143+
t.Fatal(err)
144+
}
145+
ts2 := started.Add(3 * time.Minute)
146+
seq2, err := st.AppendTraceEvent(ctx, run.RunID, ts2, "metric", "s1", `{"cpu":1}`)
147+
if err != nil {
148+
t.Fatal(err)
149+
}
150+
if seq1 != 1 || seq2 != 2 {
151+
t.Fatalf("seq = %d, %d want 1, 2", seq1, seq2)
152+
}
153+
154+
events, err := st.ListTraceEventsByRunID(ctx, run.RunID)
155+
if err != nil {
156+
t.Fatal(err)
157+
}
158+
if len(events) != 2 {
159+
t.Fatalf("len(events) = %d", len(events))
160+
}
161+
if events[0].Seq != 1 || events[0].Type != "log" || events[0].DataJSON != `{"m":"a"}` {
162+
t.Fatalf("event[0] = %+v", events[0])
163+
}
164+
if events[1].Seq != 2 || events[1].StepID != "s1" {
165+
t.Fatalf("event[1] = %+v", events[1])
166+
}
167+
168+
fin := started.Add(4 * time.Minute)
169+
if err := st.FinishRun(ctx, run.RunID, "succeeded", fin, `{"out":true}`, "", 0.02); err != nil {
170+
t.Fatal(err)
171+
}
172+
got, err := st.GetRun(ctx, run.RunID)
173+
if err != nil {
174+
t.Fatal(err)
175+
}
176+
if got.Status != "succeeded" || got.OutputJSON != `{"out":true}` || got.TotalCostUSD != 0.02 {
177+
t.Fatalf("GetRun = %+v", got)
178+
}
179+
if got.FinishedAt == nil || !got.FinishedAt.Equal(fin) {
180+
t.Fatalf("FinishedAt = %v want %v", got.FinishedAt, fin)
181+
}
182+
}
183+
184+
func TestAppendTraceEvent_foreignKeyRequiresRun(t *testing.T) {
185+
ctx := context.Background()
186+
st, err := Open(ctx, filepath.Join(t.TempDir(), "fk.db"))
187+
if err != nil {
188+
t.Fatal(err)
189+
}
190+
t.Cleanup(func() { _ = st.Close() })
191+
192+
_, err = st.AppendTraceEvent(ctx, "no-such-run", time.Now().UTC(), "log", "", `{}`)
193+
if err == nil {
194+
t.Fatal("expected error for missing run_id")
195+
}
196+
}
197+
106198
func TestGetAppliedResource_notFound(t *testing.T) {
107199
ctx := context.Background()
108200
st, err := Open(ctx, filepath.Join(t.TempDir(), "nf.db"))

0 commit comments

Comments
 (0)