Skip to content

Commit 5c27d3b

Browse files
leo-aa88cursoragent
andcommitted
feat(cli,inspect,runtime): wire run attribution flags and filters
Add agentctl run attribution flags with local defaults and prod guardrails, logs/inspector filtering by tenant/thread/actor, OTel gen_ai scope attributes, and resume behavior that preserves persisted thread identity. Closes #111. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent eeb9b57 commit 5c27d3b

15 files changed

Lines changed: 541 additions & 22 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/).
88

99
### Added
1010

11+
- **Run attribution** (issue #111): `tenant_id`, `thread_id`, `actor_id`, `parent_run_id`, `request_id`, `idempotency_key`, and `source` on `runs`; trace events carry matching tenant/thread/actor for filterable logs and inspector queries. `agentctl run` accepts `--tenant-id`, `--thread-id`, `--actor-id` (local defaults `tenant-1` / `thread-1` / `user-1`); `agentctl logs` and `GET /api/runs` filter by the same dimensions. `--resume` reuses persisted `run_id` and `thread_id`. OTel spans emit `gen_ai.tenant.id`, `gen_ai.thread.id`, `gen_ai.actor.id`, and `gen_ai.request.id`. See [`docs/ATTRIBUTION.md`](docs/ATTRIBUTION.md).
1112
- **Trace payload redaction** (issue #110): trace events are sanitized, key-redacted, and size-capped before SQLite storage. Defaults mask common secret key names; override via `Project.spec.traces.redactKeys`, `maxPayloadBytes`, and `spec.traces.redaction` (`maxDepth`, `maxBytes` for binary previews, `maxStringChars`). HITL edit `argsDiff` is redacted before persistence. Local runs use [trace.NewRecorderForGraph] from project spec.
1213
- **Optional OpenTelemetry trace export** (issue #108): `Project.spec.telemetry` (`enabled`, `serviceName`, `endpoint` with `env:` tokens, `consoleExport`) emits WayFind-aligned `gen_ai.*` spans (`agent.run`, `model.chat`, `tool.exec`, `approval`) alongside SQLite traces. Disabled by default; init failures log a warning and never fail runs. See [`docs/OTEL.md`](docs/OTEL.md) for a Jaeger quick start.
1314
- **`agentctl inspect --web`** — read-only local inspector (default `http://127.0.0.1:8787`) over SQLite state: runs, trace timeline, run steps, applied deployment resources, and checkpoints ([#109](https://github.com/LAA-Software-Engineering/agentic-control-plane/issues/109)).

docs/ATTRIBUTION.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# Run attribution (tenant, thread, actor)
2+
3+
Issue [#111](https://github.com/LAA-Software-Engineering/agentic-control-plane/issues/111) adds lightweight tenancy and attribution to `runs` and `trace_events`.
4+
5+
## Fields
6+
7+
| Field | Purpose |
8+
| --- | --- |
9+
| `tenant_id` | Outermost multi-tenant scope |
10+
| `thread_id` | Session continuity across runs and `--resume` |
11+
| `actor_id` | Who triggered the run (caller-asserted for now) |
12+
| `parent_run_id` | Lineage for sub-runs (not set on resume of the same run) |
13+
| `request_id` | Per-invocation correlation id (distinct from `run_id`) |
14+
| `idempotency_key` | Optional dedupe key for accidental re-triggers |
15+
| `source` | Origin label (`cli`, `actions`, `api`, …) |
16+
17+
Trace events duplicate `tenant_id`, `thread_id`, and `actor_id` from the parent run so `logs` and the inspector can filter without joins.
18+
19+
## CLI defaults (local only)
20+
21+
When flags are omitted, `agentctl run` stores:
22+
23+
- `tenant_id`: `tenant-1`
24+
- `thread_id`: `thread-1`
25+
- `actor_id`: `user-1`
26+
- `source`: `cli`
27+
28+
**Do not rely on these defaults in CI or production.** Pass real actor ids (for example the CI principal) and include tenant/environment context in `thread_id`.
29+
30+
```bash
31+
agentctl run workflow/demo \
32+
--tenant-id acme \
33+
--thread-id prod-review-42 \
34+
--actor-id github-actions@acme
35+
```
36+
37+
Filter history:
38+
39+
```bash
40+
agentctl logs --tenant-id acme --thread-id prod-review-42
41+
```
42+
43+
## Resume
44+
45+
`agentctl run --resume <run-id>` reuses the original `run_id` and `thread_id` from the persisted run row. Attribution flags on resume are ignored so thread timelines stay coherent. `--parent-run-id` is for genuine sub-runs, not resumes.
46+
47+
## Inspector API
48+
49+
`GET /api/runs` accepts optional query parameters:
50+
51+
- `tenant_id`
52+
- `thread_id`
53+
- `actor_id`
54+
- `workflow`
55+
- `limit`
56+
57+
## OpenTelemetry
58+
59+
When telemetry is enabled, spans emit `gen_ai.tenant.id`, `gen_ai.thread.id`, `gen_ai.actor.id`, `gen_ai.run.id`, and `gen_ai.request.id` alongside existing gen_ai attributes. See [OTEL.md](./OTEL.md).
60+
61+
## Production guidance
62+
63+
- SQLite attribution is advisory; DB-level tenant isolation belongs to a future remote/Postgres store.
64+
- `actor_id` is supplied by the caller and is not authenticated in this release.

internal/cli/logs.go

Lines changed: 52 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,9 @@ import (
2020
func newLogsCmd() *cobra.Command {
2121
var runID string
2222
var workflow string
23+
var tenantID string
24+
var threadID string
25+
var actorID string
2326

2427
cmd := &cobra.Command{
2528
Use: "logs",
@@ -28,36 +31,47 @@ func newLogsCmd() *cobra.Command {
2831
Long: `Inspect execution history stored in the SQLite state database.
2932
3033
Without filters, lists recent runs (newest first). Use --run to print trace events for one run
31-
(ordered by seq), or --workflow to print events for recent runs of a workflow name.
34+
(ordered by seq), --workflow to print events for recent runs of a workflow name, or
35+
--tenant-id / --thread-id / --actor-id to filter the run list (combinable).
3236
3337
Examples:
3438
agentctl logs
3539
agentctl logs --run <run-id>
3640
agentctl logs --workflow pr-review
41+
agentctl logs --tenant-id acme --thread-id prod-session-1
3742
3843
Exit codes (section 11.2):
3944
0 — success
4045
1 — generic failure (e.g. cannot open SQLite)
4146
2 — validation failure (unknown run id, invalid flags)`,
4247
RunE: func(cmd *cobra.Command, args []string) error {
4348
_ = args
44-
return runLogs(cmd, runID, workflow)
49+
return runLogs(cmd, runID, workflow, tenantID, threadID, actorID)
4550
},
4651
}
4752
cmd.Flags().StringVar(&runID, "run", "", "show trace events for this run id")
4853
cmd.Flags().StringVar(&workflow, "workflow", "", "show trace events for recent runs of this workflow")
54+
cmd.Flags().StringVar(&tenantID, "tenant-id", "", "filter runs by tenant id")
55+
cmd.Flags().StringVar(&threadID, "thread-id", "", "filter runs by thread id")
56+
cmd.Flags().StringVar(&actorID, "actor-id", "", "filter runs by actor id")
4957
return cmd
5058
}
5159

52-
func runLogs(cmd *cobra.Command, runID, workflow string) error {
60+
func runLogs(cmd *cobra.Command, runID, workflow, tenantID, threadID, actorID string) error {
5361
ctx := context.Background()
5462
g := Globals()
5563

5664
runID = strings.TrimSpace(runID)
5765
workflow = strings.TrimSpace(workflow)
66+
tenantID = strings.TrimSpace(tenantID)
67+
threadID = strings.TrimSpace(threadID)
68+
actorID = strings.TrimSpace(actorID)
5869
if runID != "" && workflow != "" {
5970
return NewExitErrorf(ExitValidationError, "logs: use only one of --run or --workflow")
6071
}
72+
if runID != "" && (tenantID != "" || threadID != "" || actorID != "") {
73+
return NewExitErrorf(ExitValidationError, "logs: --run cannot be combined with tenant/thread/actor filters")
74+
}
6175

6276
graph, root, err := prepareProjectGraph(g.ProjectRoot, g)
6377
if err != nil {
@@ -82,11 +96,18 @@ func runLogs(cmd *cobra.Command, runID, workflow string) error {
8296
}
8397
}
8498

99+
filter := state.RunListFilter{
100+
TenantID: tenantID,
101+
ThreadID: threadID,
102+
ActorID: actorID,
103+
WorkflowName: workflow,
104+
}
105+
85106
switch {
86107
case runID != "":
87108
return writeLogsForRun(cmd, ctx, st, dsn, runID, g)
88-
case workflow != "":
89-
return writeLogsForWorkflow(cmd, ctx, st, dsn, workflow, g)
109+
case workflow != "" || tenantID != "" || threadID != "" || actorID != "":
110+
return writeLogsFiltered(cmd, ctx, st, dsn, filter, g)
90111
default:
91112
return writeLogsRunList(cmd, ctx, st, dsn, g)
92113
}
@@ -106,11 +127,13 @@ func writeLogsForRun(cmd *cobra.Command, ctx context.Context, st *sqlite.Store,
106127
return writeLogsEventsOutput(cmd, dsn, runID, "", events, g)
107128
}
108129

109-
func writeLogsForWorkflow(cmd *cobra.Command, ctx context.Context, st *sqlite.Store, dsn, workflow string, g *Global) error {
110-
runs, err := st.ListRunsByWorkflow(ctx, workflow, state.DefaultRunListLimit)
130+
func writeLogsFiltered(cmd *cobra.Command, ctx context.Context, st *sqlite.Store, dsn string, filter state.RunListFilter, g *Global) error {
131+
filter.Limit = state.DefaultRunListLimit
132+
runs, err := st.ListRunsFiltered(ctx, filter)
111133
if err != nil {
112134
return fmt.Errorf("logs: list runs: %w", err)
113135
}
136+
workflow := strings.TrimSpace(filter.WorkflowName)
114137

115138
if g.Output != render.FormatTable {
116139
type runEntry struct {
@@ -145,7 +168,9 @@ func writeLogsForWorkflow(cmd *cobra.Command, ctx context.Context, st *sqlite.St
145168
}
146169

147170
var b strings.Builder
148-
fmt.Fprintf(&b, "State: %s\nWorkflow filter: %s\n\n", dsn, workflow)
171+
fmt.Fprintf(&b, "State: %s\n", dsn)
172+
writeLogsFilterHeader(&b, filter)
173+
b.WriteString("\n")
149174
if len(runs) == 0 {
150175
fmt.Fprintln(&b, "No runs found.")
151176
_, err := fmt.Fprint(cmd.OutOrStdout(), b.String())
@@ -166,6 +191,21 @@ func writeLogsForWorkflow(cmd *cobra.Command, ctx context.Context, st *sqlite.St
166191
return err
167192
}
168193

194+
func writeLogsFilterHeader(b *strings.Builder, filter state.RunListFilter) {
195+
if w := strings.TrimSpace(filter.WorkflowName); w != "" {
196+
fmt.Fprintf(b, "Workflow filter: %s\n", w)
197+
}
198+
if t := strings.TrimSpace(filter.TenantID); t != "" {
199+
fmt.Fprintf(b, "Tenant filter: %s\n", t)
200+
}
201+
if th := strings.TrimSpace(filter.ThreadID); th != "" {
202+
fmt.Fprintf(b, "Thread filter: %s\n", th)
203+
}
204+
if a := strings.TrimSpace(filter.ActorID); a != "" {
205+
fmt.Fprintf(b, "Actor filter: %s\n", a)
206+
}
207+
}
208+
169209
func writeLogsRunList(cmd *cobra.Command, ctx context.Context, st *sqlite.Store, dsn string, g *Global) error {
170210
runs, err := st.ListRecentRuns(ctx, state.DefaultRunListLimit)
171211
if err != nil {
@@ -186,10 +226,11 @@ func writeLogsRunList(cmd *cobra.Command, ctx context.Context, st *sqlite.Store,
186226
return err
187227
}
188228
w := tabwriter.NewWriter(&b, 0, 0, 2, ' ', 0)
189-
fmt.Fprintln(w, "RUN ID\tWORKFLOW\tENV\tSTATUS\tSTARTED")
229+
fmt.Fprintln(w, "RUN ID\tWORKFLOW\tENV\tSTATUS\tTENANT\tTHREAD\tACTOR\tSTARTED")
190230
for _, r := range runs {
191-
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\n",
192-
r.RunID, r.WorkflowName, r.Env, r.Status, r.StartedAt.UTC().Format(time.RFC3339),
231+
fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\t%s\t%s\n",
232+
r.RunID, r.WorkflowName, r.Env, r.Status, r.TenantID, r.ThreadID, r.ActorID,
233+
r.StartedAt.UTC().Format(time.RFC3339),
193234
)
194235
}
195236
if err := w.Flush(); err != nil {
Lines changed: 119 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,119 @@
1+
package cli
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"io"
8+
"path/filepath"
9+
"strings"
10+
"testing"
11+
"time"
12+
13+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
14+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite"
15+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/statejson"
16+
)
17+
18+
func TestLogs_filterByTenantAndThread(t *testing.T) {
19+
ctx := context.Background()
20+
db := filepath.Join(t.TempDir(), "logs-filter.db")
21+
st, err := sqlite.Open(ctx, db)
22+
if err != nil {
23+
t.Fatal(err)
24+
}
25+
t.Cleanup(func() { _ = st.Close() })
26+
27+
start := time.Date(2026, 6, 4, 10, 0, 0, 0, time.UTC)
28+
for _, r := range []state.Run{
29+
{RunID: "match", WorkflowName: "wf", Env: "local", Status: "succeeded", StartedAt: start,
30+
InputJSON: `{}`, TenantID: "acme", ThreadID: "session-1", ActorID: "u1", RequestID: "r1", Source: "cli"},
31+
{RunID: "other-thread", WorkflowName: "wf", Env: "local", Status: "succeeded", StartedAt: start.Add(time.Minute),
32+
InputJSON: `{}`, TenantID: "acme", ThreadID: "session-2", ActorID: "u1", RequestID: "r2", Source: "cli"},
33+
} {
34+
if err := st.StartRun(ctx, r); err != nil {
35+
t.Fatal(err)
36+
}
37+
if _, err := st.AppendTraceEvent(ctx, r.RunID, start, "run.started", "", `{}`); err != nil {
38+
t.Fatal(err)
39+
}
40+
}
41+
42+
ResetGlobalsForTest()
43+
var out bytes.Buffer
44+
cmd := NewRootCmd()
45+
cmd.SetOut(&out)
46+
cmd.SetErr(&out)
47+
root := runProjRoot(t)
48+
cmd.SetArgs([]string{"logs", "--tenant-id", "acme", "--thread-id", "session-1", "--project", root, "--state", db, "-o", "json"})
49+
if err := cmd.Execute(); err != nil {
50+
t.Fatal(err)
51+
}
52+
53+
var payload struct {
54+
Runs []struct {
55+
RunID string `json:"runId"`
56+
Events []statejson.TraceEventRecord `json:"events"`
57+
} `json:"runs"`
58+
}
59+
if err := json.Unmarshal(out.Bytes(), &payload); err != nil {
60+
t.Fatalf("json: %v\n%s", err, out.String())
61+
}
62+
if len(payload.Runs) != 1 || payload.Runs[0].RunID != "match" {
63+
t.Fatalf("runs: %+v", payload.Runs)
64+
}
65+
if len(payload.Runs[0].Events) != 1 || payload.Runs[0].Events[0].TenantID != "acme" || payload.Runs[0].Events[0].ThreadID != "session-1" {
66+
t.Fatalf("events: %+v", payload.Runs[0].Events)
67+
}
68+
}
69+
70+
func TestLogs_runListShowsAttributionColumns(t *testing.T) {
71+
ctx := context.Background()
72+
db := filepath.Join(t.TempDir(), "logs-table.db")
73+
st, err := sqlite.Open(ctx, db)
74+
if err != nil {
75+
t.Fatal(err)
76+
}
77+
t.Cleanup(func() { _ = st.Close() })
78+
79+
start := time.Now().UTC()
80+
if err := st.StartRun(ctx, state.Run{
81+
RunID: "r1", WorkflowName: "wf", Env: "local", Status: "running", StartedAt: start,
82+
InputJSON: `{}`, TenantID: "t", ThreadID: "th", ActorID: "a", RequestID: "req", Source: "cli",
83+
}); err != nil {
84+
t.Fatal(err)
85+
}
86+
87+
ResetGlobalsForTest()
88+
var out bytes.Buffer
89+
cmd := NewRootCmd()
90+
cmd.SetOut(&out)
91+
cmd.SetErr(io.Discard)
92+
root := runProjRoot(t)
93+
cmd.SetArgs([]string{"logs", "--project", root, "--state", db})
94+
if err := cmd.Execute(); err != nil {
95+
t.Fatal(err)
96+
}
97+
text := out.String()
98+
if !strings.Contains(text, "TENANT") || !strings.Contains(text, "THREAD") || !strings.Contains(text, "ACTOR") {
99+
t.Fatalf("table headers:\n%s", text)
100+
}
101+
if !strings.Contains(text, "r1") || !strings.Contains(text, " t ") || !strings.Contains(text, " th ") || !strings.Contains(text, " a ") {
102+
t.Fatalf("table values:\n%s", text)
103+
}
104+
}
105+
106+
func TestLogs_rejectsRunWithTenantFilter(t *testing.T) {
107+
ResetGlobalsForTest()
108+
cmd := NewRootCmd()
109+
cmd.SetOut(io.Discard)
110+
cmd.SetErr(io.Discard)
111+
cmd.SetArgs([]string{"logs", "--run", "r1", "--tenant-id", "acme"})
112+
err := cmd.Execute()
113+
if err == nil {
114+
t.Fatal("expected validation error")
115+
}
116+
if !strings.Contains(err.Error(), "cannot be combined") {
117+
t.Fatalf("err = %v", err)
118+
}
119+
}

0 commit comments

Comments
 (0)