Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions internal/state/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,40 @@ type AppliedProject struct {
Version string
AppliedAt time.Time
}

// Run is one workflow execution row in runs (design doc §14.2).
type Run struct {
RunID string
WorkflowName string
Env string
Status string
StartedAt time.Time
FinishedAt *time.Time
InputJSON string
OutputJSON string
ErrorText string
TotalCostUSD float64
}

// RunStep is one row in run_steps (design doc §14.2).
type RunStep struct {
RunID string
StepID string
Status string
StartedAt *time.Time
FinishedAt *time.Time
InputJSON string
OutputJSON string
ErrorText string
CostUSD float64
}

// TraceEvent is one append-only row in trace_events (design doc §14.2).
type TraceEvent struct {
RunID string
Seq int64
Timestamp time.Time
Type string
StepID string
DataJSON string
}
7 changes: 6 additions & 1 deletion internal/state/sqlite/doc.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
// Package sqlite implements deployment state storage in SQLite (design doc §14.1).
// Package sqlite implements deployment and runtime/trace state in SQLite (design doc §§14.1–14.2).
// Use [Open] with a file DSN; [Migrate] runs versioned SQL from /migrations/sqlite.
//
// Consistency: [Open] runs PRAGMA foreign_keys=ON after the first connection is established
// (with MaxOpenConns(1), that matches the single pooled connection). SQLite then enforces FOREIGN
// KEY from run_steps and trace_events to runs (including ON DELETE CASCADE). Callers may still
// validate run_id in the application for clearer errors; the pragma is the DB-level guarantee.
package sqlite
186 changes: 186 additions & 0 deletions internal/state/sqlite/runtime.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
package sqlite

import (
"context"
"database/sql"
"fmt"
"time"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
)

// StartRun inserts a new row in runs (design doc §14.2).
func (s *Store) StartRun(ctx context.Context, r state.Run) error {
in := r.InputJSON
if in == "" {
in = "{}"
}
at := r.StartedAt.UTC().Format(time.RFC3339Nano)
_, err := s.db.ExecContext(ctx, `
INSERT INTO runs (run_id, workflow_name, env, status, started_at, input_json, total_cost_usd)
VALUES (?, ?, ?, ?, ?, ?, ?)
`, r.RunID, r.WorkflowName, r.Env, r.Status, at, in, r.TotalCostUSD)
return err
}

// FinishRun updates status, finished_at, output_json, error_text, and total_cost_usd.
func (s *Store) FinishRun(ctx context.Context, runID, status string, finishedAt time.Time, outputJSON, errorText string, totalCostUSD float64) error {
fin := finishedAt.UTC().Format(time.RFC3339Nano)
var out, et any
if outputJSON != "" {
out = outputJSON
}
if errorText != "" {
et = errorText
}
res, err := s.db.ExecContext(ctx, `
UPDATE runs SET status = ?, finished_at = ?, output_json = ?, error_text = ?, total_cost_usd = ?
WHERE run_id = ?
`, status, fin, out, et, totalCostUSD, runID)
if err != nil {
return err
}
n, err := res.RowsAffected()
if err != nil {
return err
}
if n == 0 {
return sql.ErrNoRows
}
return nil
}

// UpsertRunStep inserts or updates a step row for (run_id, step_id).
func (s *Store) UpsertRunStep(ctx context.Context, st state.RunStep) error {
var started, finished any
if st.StartedAt != nil {
started = st.StartedAt.UTC().Format(time.RFC3339Nano)
}
if st.FinishedAt != nil {
finished = st.FinishedAt.UTC().Format(time.RFC3339Nano)
}
var inJ, outJ, errT any
if st.InputJSON != "" {
inJ = st.InputJSON
}
if st.OutputJSON != "" {
outJ = st.OutputJSON
}
if st.ErrorText != "" {
errT = st.ErrorText
}
_, err := s.db.ExecContext(ctx, `
INSERT INTO run_steps (run_id, step_id, status, started_at, finished_at, input_json, output_json, error_text, cost_usd)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(run_id, step_id) DO UPDATE SET
status = excluded.status,
started_at = excluded.started_at,
finished_at = excluded.finished_at,
input_json = excluded.input_json,
output_json = excluded.output_json,
error_text = excluded.error_text,
cost_usd = excluded.cost_usd
`, st.RunID, st.StepID, st.Status, started, finished, inJ, outJ, errT, st.CostUSD)
return err
}

// AppendTraceEvent appends one trace row with the next monotonic seq for run_id.
func (s *Store) AppendTraceEvent(ctx context.Context, runID string, ts time.Time, eventType string, stepID string, dataJSON string) (seq int64, err error) {
dj := dataJSON
if dj == "" {
dj = "{}"
}
var sid any
if stepID != "" {
sid = stepID
}
tss := ts.UTC().Format(time.RFC3339Nano)

tx, err := s.db.BeginTx(ctx, nil)
if err != nil {
return 0, err
}
defer func() { _ = tx.Rollback() }()

if err := tx.QueryRowContext(ctx, `SELECT IFNULL(MAX(seq), 0) + 1 FROM trace_events WHERE run_id = ?`, runID).Scan(&seq); err != nil {
return 0, err
}
if _, err := tx.ExecContext(ctx, `
INSERT INTO trace_events (run_id, seq, timestamp, type, step_id, data_json)
VALUES (?, ?, ?, ?, ?, ?)
`, runID, seq, tss, eventType, sid, dj); err != nil {
return 0, err
}
if err := tx.Commit(); err != nil {
return 0, err
}
return seq, nil
}

// GetRun returns the run row or sql.ErrNoRows.
func (s *Store) GetRun(ctx context.Context, runID string) (*state.Run, error) {
row := s.db.QueryRowContext(ctx, `
SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd
FROM runs
WHERE run_id = ?
`, runID)
var r state.Run
var started, finished sql.NullString
var outJ, errT sql.NullString
if err := row.Scan(&r.RunID, &r.WorkflowName, &r.Env, &r.Status, &started, &finished, &r.InputJSON, &outJ, &errT, &r.TotalCostUSD); err != nil {
return nil, err
}
st, err := parseSQLiteTime(started.String)
if err != nil {
return nil, fmt.Errorf("started_at: %w", err)
}
r.StartedAt = st
if finished.Valid && finished.String != "" {
ft, err := parseSQLiteTime(finished.String)
if err != nil {
return nil, fmt.Errorf("finished_at: %w", err)
}
r.FinishedAt = &ft
}
if outJ.Valid {
r.OutputJSON = outJ.String
}
if errT.Valid {
r.ErrorText = errT.String
}
return &r, nil
}

// ListTraceEventsByRunID returns trace rows for run_id ordered by seq ascending.
func (s *Store) ListTraceEventsByRunID(ctx context.Context, runID string) ([]state.TraceEvent, error) {
rows, err := s.db.QueryContext(ctx, `
SELECT run_id, seq, timestamp, type, step_id, data_json
FROM trace_events
WHERE run_id = ?
ORDER BY seq ASC
`, runID)
if err != nil {
return nil, err
}
defer rows.Close()

var out []state.TraceEvent
for rows.Next() {
var e state.TraceEvent
var ts string
var step sql.NullString
if err := rows.Scan(&e.RunID, &e.Seq, &ts, &e.Type, &step, &e.DataJSON); err != nil {
return nil, err
}
t, err := parseSQLiteTime(ts)
if err != nil {
return nil, fmt.Errorf("timestamp: %w", err)
}
e.Timestamp = t
if step.Valid {
e.StepID = step.String
}
out = append(out, e)
}
return out, rows.Err()
}
8 changes: 7 additions & 1 deletion internal/state/sqlite/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
_ "modernc.org/sqlite" // register "sqlite" driver
)

// Store persists deployment state (design doc §14.1) in SQLite.
// Store persists deployment state (§14.1) and runtime/trace state (§14.2) in SQLite.
type Store struct {
db *sql.DB
}
Expand All @@ -29,6 +29,12 @@ func Open(ctx context.Context, dsn string) (*Store, error) {
_ = db.Close()
return nil, fmt.Errorf("ping sqlite: %w", err)
}
// SQLite disables FK checks by default; enforce per connection. With MaxOpenConns(1) this
// covers the pooled connection used for all statements on this Store.
if _, err := db.ExecContext(ctx, `PRAGMA foreign_keys=ON`); err != nil {
_ = db.Close()
return nil, fmt.Errorf("sqlite foreign_keys: %w", err)
}
if err := Migrate(ctx, db); err != nil {
_ = db.Close()
return nil, err
Expand Down
92 changes: 92 additions & 0 deletions internal/state/sqlite/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,98 @@ func TestOpen_twiceSameFile(t *testing.T) {
t.Cleanup(func() { _ = s2.Close() })
}

func TestRuntime_insertRunEventsQueryByRunID(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "runtime.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

started := time.Date(2026, 4, 11, 10, 0, 0, 0, time.UTC)
run := state.Run{
RunID: "run-1",
WorkflowName: "wf-a",
Env: "dev",
Status: "running",
StartedAt: started,
InputJSON: `{"k":1}`,
TotalCostUSD: 0,
}
if err := st.StartRun(ctx, run); err != nil {
t.Fatal(err)
}

stepStarted := started.Add(time.Minute)
if err := st.UpsertRunStep(ctx, state.RunStep{
RunID: run.RunID,
StepID: "s1",
Status: "ok",
StartedAt: &stepStarted,
InputJSON: `{}`,
CostUSD: 0.01,
}); err != nil {
t.Fatal(err)
}

ts1 := started.Add(2 * time.Minute)
seq1, err := st.AppendTraceEvent(ctx, run.RunID, ts1, "log", "", `{"m":"a"}`)
if err != nil {
t.Fatal(err)
}
ts2 := started.Add(3 * time.Minute)
seq2, err := st.AppendTraceEvent(ctx, run.RunID, ts2, "metric", "s1", `{"cpu":1}`)
if err != nil {
t.Fatal(err)
}
if seq1 != 1 || seq2 != 2 {
t.Fatalf("seq = %d, %d want 1, 2", seq1, seq2)
}

events, err := st.ListTraceEventsByRunID(ctx, run.RunID)
if err != nil {
t.Fatal(err)
}
if len(events) != 2 {
t.Fatalf("len(events) = %d", len(events))
}
if events[0].Seq != 1 || events[0].Type != "log" || events[0].DataJSON != `{"m":"a"}` {
t.Fatalf("event[0] = %+v", events[0])
}
if events[1].Seq != 2 || events[1].StepID != "s1" {
t.Fatalf("event[1] = %+v", events[1])
}

fin := started.Add(4 * time.Minute)
if err := st.FinishRun(ctx, run.RunID, "succeeded", fin, `{"out":true}`, "", 0.02); err != nil {
t.Fatal(err)
}
got, err := st.GetRun(ctx, run.RunID)
if err != nil {
t.Fatal(err)
}
if got.Status != "succeeded" || got.OutputJSON != `{"out":true}` || got.TotalCostUSD != 0.02 {
t.Fatalf("GetRun = %+v", got)
}
if got.FinishedAt == nil || !got.FinishedAt.Equal(fin) {
t.Fatalf("FinishedAt = %v want %v", got.FinishedAt, fin)
}
}

func TestAppendTraceEvent_foreignKeyRequiresRun(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "fk.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

_, err = st.AppendTraceEvent(ctx, "no-such-run", time.Now().UTC(), "log", "", `{}`)
if err == nil {
t.Fatal("expected error for missing run_id")
}
}

func TestGetAppliedResource_notFound(t *testing.T) {
ctx := context.Background()
st, err := Open(ctx, filepath.Join(t.TempDir(), "nf.db"))
Expand Down
Loading
Loading