Skip to content

Commit eeb9b57

Browse files
leo-aa88cursoragent
andcommitted
feat(state): add tenant/thread/actor attribution to runs and traces
Introduce migration 005 with attribution columns on runs and trace_events, defaults/backfill for existing databases, ListRunsFiltered, and propagation of scope fields onto trace rows without joins. Closes #111 (persistence layer). Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent 83c4217 commit eeb9b57

11 files changed

Lines changed: 562 additions & 31 deletions

File tree

internal/state/attribution.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package state
2+
3+
import "strings"
4+
5+
// Local development defaults for run attribution (issue #111).
6+
// Do not rely on these in CI or production; pass explicit tenant, thread, and actor identifiers.
7+
const (
8+
DefaultTenantID = "tenant-1"
9+
DefaultThreadID = "thread-1"
10+
DefaultActorID = "user-1"
11+
DefaultSource = "cli"
12+
)
13+
14+
// RunAttribution scopes a run to a tenant and thread and records who triggered it.
15+
type RunAttribution struct {
16+
TenantID string
17+
ThreadID string
18+
ActorID string
19+
ParentRunID string
20+
RequestID string
21+
IdempotencyKey string
22+
Source string
23+
}
24+
25+
// NormalizeAttribution fills empty attribution fields with [DefaultTenantID], [DefaultThreadID],
26+
// [DefaultActorID], and [DefaultSource]. Optional fields (parent run, idempotency key) stay empty
27+
// when unset.
28+
func NormalizeAttribution(a *RunAttribution) {
29+
if a == nil {
30+
return
31+
}
32+
if strings.TrimSpace(a.TenantID) == "" {
33+
a.TenantID = DefaultTenantID
34+
}
35+
if strings.TrimSpace(a.ThreadID) == "" {
36+
a.ThreadID = DefaultThreadID
37+
}
38+
if strings.TrimSpace(a.ActorID) == "" {
39+
a.ActorID = DefaultActorID
40+
}
41+
if strings.TrimSpace(a.Source) == "" {
42+
a.Source = DefaultSource
43+
}
44+
a.TenantID = strings.TrimSpace(a.TenantID)
45+
a.ThreadID = strings.TrimSpace(a.ThreadID)
46+
a.ActorID = strings.TrimSpace(a.ActorID)
47+
a.ParentRunID = strings.TrimSpace(a.ParentRunID)
48+
a.RequestID = strings.TrimSpace(a.RequestID)
49+
a.IdempotencyKey = strings.TrimSpace(a.IdempotencyKey)
50+
a.Source = strings.TrimSpace(a.Source)
51+
}
52+
53+
// ApplyAttribution copies normalized attribution onto a [Run].
54+
func ApplyAttribution(r *Run, a RunAttribution) {
55+
if r == nil {
56+
return
57+
}
58+
NormalizeAttribution(&a)
59+
r.TenantID = a.TenantID
60+
r.ThreadID = a.ThreadID
61+
r.ActorID = a.ActorID
62+
r.ParentRunID = a.ParentRunID
63+
r.RequestID = a.RequestID
64+
r.IdempotencyKey = a.IdempotencyKey
65+
r.Source = a.Source
66+
}

internal/state/attribution_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package state
2+
3+
import "testing"
4+
5+
func TestNormalizeAttribution_defaults(t *testing.T) {
6+
a := RunAttribution{}
7+
NormalizeAttribution(&a)
8+
if a.TenantID != DefaultTenantID || a.ThreadID != DefaultThreadID || a.ActorID != DefaultActorID || a.Source != DefaultSource {
9+
t.Fatalf("defaults: %+v", a)
10+
}
11+
}
12+
13+
func TestNormalizeAttribution_preservesExplicit(t *testing.T) {
14+
a := RunAttribution{
15+
TenantID: " acme ",
16+
ThreadID: "prod-thread",
17+
ActorID: "ci-bot",
18+
ParentRunID: " parent-1 ",
19+
RequestID: "req-1",
20+
IdempotencyKey: "idem-1",
21+
Source: "actions",
22+
}
23+
NormalizeAttribution(&a)
24+
if a.TenantID != "acme" || a.ThreadID != "prod-thread" || a.ActorID != "ci-bot" {
25+
t.Fatalf("trimmed: %+v", a)
26+
}
27+
if a.ParentRunID != "parent-1" || a.RequestID != "req-1" || a.IdempotencyKey != "idem-1" || a.Source != "actions" {
28+
t.Fatalf("optional: %+v", a)
29+
}
30+
}
31+
32+
func TestNormalizeAttribution_nilSafe(t *testing.T) {
33+
NormalizeAttribution(nil)
34+
}
35+
36+
func TestApplyAttribution(t *testing.T) {
37+
var r Run
38+
ApplyAttribution(&r, RunAttribution{TenantID: "t", ThreadID: "th", ActorID: "a", RequestID: "r1", Source: "api"})
39+
if r.TenantID != "t" || r.ThreadID != "th" || r.ActorID != "a" || r.RequestID != "r1" || r.Source != "api" {
40+
t.Fatalf("run: %+v", r)
41+
}
42+
}
43+
44+
func TestApplyAttribution_nilRun(t *testing.T) {
45+
ApplyAttribution(nil, RunAttribution{})
46+
}

internal/state/models.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,13 @@ type Run struct {
4242
TotalCostUSD float64
4343
WorkflowSpecHash string
4444
EnvironmentName string
45+
TenantID string
46+
ThreadID string
47+
ActorID string
48+
ParentRunID string
49+
RequestID string
50+
IdempotencyKey string
51+
Source string
4552
}
4653

4754
// RunStep is one row in run_steps (design doc §14.2).
@@ -65,6 +72,19 @@ type TraceEvent struct {
6572
Type string
6673
StepID string
6774
DataJSON string
75+
TenantID string
76+
ThreadID string
77+
ActorID string
78+
}
79+
80+
// RunListFilter selects runs for logs and inspector queries (issue #111).
81+
// Empty filter fields are ignored. Limit is clamped via [ClampRunListLimit].
82+
type RunListFilter struct {
83+
TenantID string
84+
ThreadID string
85+
ActorID string
86+
WorkflowName string
87+
Limit int
6888
}
6989

7090
// Checkpoint status values stored in run_checkpoints (issue #105).
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
package sqlite
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"path/filepath"
7+
"testing"
8+
"time"
9+
10+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
11+
)
12+
13+
func TestStartRun_attributionDefaultsAndTracePropagation(t *testing.T) {
14+
ctx := context.Background()
15+
st, err := Open(ctx, filepath.Join(t.TempDir(), "attr.db"))
16+
if err != nil {
17+
t.Fatal(err)
18+
}
19+
t.Cleanup(func() { _ = st.Close() })
20+
21+
start := time.Date(2026, 6, 4, 12, 0, 0, 0, time.UTC)
22+
if err := st.StartRun(ctx, state.Run{
23+
RunID: "run-attr", WorkflowName: "wf", Env: "local", Status: state.RunStatusRunning,
24+
StartedAt: start, InputJSON: `{}`, TotalCostUSD: 0,
25+
}); err != nil {
26+
t.Fatal(err)
27+
}
28+
29+
got, err := st.GetRun(ctx, "run-attr")
30+
if err != nil {
31+
t.Fatal(err)
32+
}
33+
if got.TenantID != state.DefaultTenantID || got.ThreadID != state.DefaultThreadID || got.ActorID != state.DefaultActorID {
34+
t.Fatalf("defaults: %+v", got)
35+
}
36+
if got.Source != state.DefaultSource {
37+
t.Fatalf("source = %q", got.Source)
38+
}
39+
if got.RequestID != "run-attr" {
40+
t.Fatalf("request_id = %q want run-attr", got.RequestID)
41+
}
42+
43+
if _, err := st.AppendTraceEvent(ctx, "run-attr", start, "run.started", "", `{}`); err != nil {
44+
t.Fatal(err)
45+
}
46+
events, err := st.ListTraceEventsByRunID(ctx, "run-attr")
47+
if err != nil {
48+
t.Fatal(err)
49+
}
50+
if len(events) != 1 {
51+
t.Fatalf("events = %d", len(events))
52+
}
53+
if events[0].TenantID != state.DefaultTenantID || events[0].ThreadID != state.DefaultThreadID || events[0].ActorID != state.DefaultActorID {
54+
t.Fatalf("trace attribution: %+v", events[0])
55+
}
56+
}
57+
58+
func TestStartRun_explicitAttribution(t *testing.T) {
59+
ctx := context.Background()
60+
st, err := Open(ctx, filepath.Join(t.TempDir(), "attr-explicit.db"))
61+
if err != nil {
62+
t.Fatal(err)
63+
}
64+
t.Cleanup(func() { _ = st.Close() })
65+
66+
start := time.Now().UTC()
67+
state.ApplyAttribution(&state.Run{
68+
RunID: "r2", WorkflowName: "wf", Env: "staging", Status: state.RunStatusRunning,
69+
StartedAt: start, InputJSON: `{}`,
70+
}, state.RunAttribution{
71+
TenantID: "acme", ThreadID: "prod-thread", ActorID: "ci-bot",
72+
ParentRunID: "parent-1", RequestID: "req-99", IdempotencyKey: "idem-1", Source: "actions",
73+
})
74+
run := state.Run{
75+
RunID: "r2", WorkflowName: "wf", Env: "staging", Status: state.RunStatusRunning,
76+
StartedAt: start, InputJSON: `{}`,
77+
TenantID: "acme", ThreadID: "prod-thread", ActorID: "ci-bot",
78+
ParentRunID: "parent-1", RequestID: "req-99", IdempotencyKey: "idem-1", Source: "actions",
79+
}
80+
if err := st.StartRun(ctx, run); err != nil {
81+
t.Fatal(err)
82+
}
83+
got, err := st.GetRun(ctx, "r2")
84+
if err != nil {
85+
t.Fatal(err)
86+
}
87+
if got.TenantID != "acme" || got.ThreadID != "prod-thread" || got.ActorID != "ci-bot" {
88+
t.Fatalf("attribution: %+v", got)
89+
}
90+
if got.ParentRunID != "parent-1" || got.RequestID != "req-99" || got.IdempotencyKey != "idem-1" || got.Source != "actions" {
91+
t.Fatalf("metadata: %+v", got)
92+
}
93+
}
94+
95+
func TestListRunsFiltered(t *testing.T) {
96+
ctx := context.Background()
97+
st, err := Open(ctx, filepath.Join(t.TempDir(), "filter.db"))
98+
if err != nil {
99+
t.Fatal(err)
100+
}
101+
t.Cleanup(func() { _ = st.Close() })
102+
103+
t0 := time.Date(2026, 6, 1, 10, 0, 0, 0, time.UTC)
104+
runs := []state.Run{
105+
{RunID: "a", WorkflowName: "wf", Env: "local", Status: "running", StartedAt: t0,
106+
InputJSON: `{}`, TenantID: "t1", ThreadID: "th1", ActorID: "u1", RequestID: "ra", Source: "cli"},
107+
{RunID: "b", WorkflowName: "wf", Env: "local", Status: "running", StartedAt: t0.Add(time.Hour),
108+
InputJSON: `{}`, TenantID: "t1", ThreadID: "th2", ActorID: "u1", RequestID: "rb", Source: "cli"},
109+
{RunID: "c", WorkflowName: "other", Env: "local", Status: "running", StartedAt: t0.Add(2 * time.Hour),
110+
InputJSON: `{}`, TenantID: "t2", ThreadID: "th1", ActorID: "u2", RequestID: "rc", Source: "cli"},
111+
}
112+
for _, r := range runs {
113+
if err := st.StartRun(ctx, r); err != nil {
114+
t.Fatal(err)
115+
}
116+
}
117+
118+
filtered, err := st.ListRunsFiltered(ctx, state.RunListFilter{TenantID: "t1", ThreadID: "th1"})
119+
if err != nil {
120+
t.Fatal(err)
121+
}
122+
if len(filtered) != 1 || filtered[0].RunID != "a" {
123+
t.Fatalf("tenant+thread: %#v", filtered)
124+
}
125+
126+
byActor, err := st.ListRunsFiltered(ctx, state.RunListFilter{ActorID: "u2"})
127+
if err != nil {
128+
t.Fatal(err)
129+
}
130+
if len(byActor) != 1 || byActor[0].RunID != "c" {
131+
t.Fatalf("actor: %#v", byActor)
132+
}
133+
134+
byWF, err := st.ListRunsFiltered(ctx, state.RunListFilter{WorkflowName: "wf", TenantID: "t1"})
135+
if err != nil {
136+
t.Fatal(err)
137+
}
138+
if len(byWF) != 2 {
139+
t.Fatalf("workflow+tenant: %#v", byWF)
140+
}
141+
}
142+
143+
func TestMigrate_attributionBackfillFromPre004DB(t *testing.T) {
144+
ctx := context.Background()
145+
dsn := filepath.Join(t.TempDir(), "legacy.db")
146+
db, err := sql.Open("sqlite", dsn)
147+
if err != nil {
148+
t.Fatal(err)
149+
}
150+
t.Cleanup(func() { _ = db.Close() })
151+
if err := db.PingContext(ctx); err != nil {
152+
t.Fatal(err)
153+
}
154+
155+
// Apply migrations through 004 only (pre-attribution schema).
156+
for _, ver := range []int{1, 2, 3, 4} {
157+
if err := applySingleMigration(ctx, db, ver); err != nil {
158+
t.Fatalf("migration %d: %v", ver, err)
159+
}
160+
}
161+
162+
start := time.Date(2026, 5, 1, 9, 0, 0, 0, time.UTC)
163+
if _, err := db.ExecContext(ctx, `
164+
INSERT INTO runs (run_id, workflow_name, env, status, started_at, input_json, total_cost_usd, workflow_spec_hash, environment_name)
165+
VALUES ('legacy-run', 'wf', 'local', 'running', ?, '{}', 0, '', '')
166+
`, start.UTC().Format(time.RFC3339Nano)); err != nil {
167+
t.Fatal(err)
168+
}
169+
if _, err := db.ExecContext(ctx, `
170+
INSERT INTO trace_events (run_id, seq, timestamp, type, data_json)
171+
VALUES ('legacy-run', 1, ?, 'log', '{}')
172+
`, start.UTC().Format(time.RFC3339Nano)); err != nil {
173+
t.Fatal(err)
174+
}
175+
176+
if err := applySingleMigration(ctx, db, 5); err != nil {
177+
t.Fatal(err)
178+
}
179+
180+
var tenant, thread, actor, requestID string
181+
if err := db.QueryRowContext(ctx, `
182+
SELECT tenant_id, thread_id, actor_id, request_id FROM runs WHERE run_id = 'legacy-run'
183+
`).Scan(&tenant, &thread, &actor, &requestID); err != nil {
184+
t.Fatal(err)
185+
}
186+
if tenant != state.DefaultTenantID || thread != state.DefaultThreadID || actor != state.DefaultActorID {
187+
t.Fatalf("run backfill: %s %s %s", tenant, thread, actor)
188+
}
189+
if requestID != "legacy-run" {
190+
t.Fatalf("request_id = %q", requestID)
191+
}
192+
if err := db.QueryRowContext(ctx, `
193+
SELECT tenant_id, thread_id, actor_id FROM trace_events WHERE run_id = 'legacy-run'
194+
`).Scan(&tenant, &thread, &actor); err != nil {
195+
t.Fatal(err)
196+
}
197+
if tenant != state.DefaultTenantID || thread != state.DefaultThreadID || actor != state.DefaultActorID {
198+
t.Fatalf("trace backfill: %s %s %s", tenant, thread, actor)
199+
}
200+
}

0 commit comments

Comments
 (0)