Skip to content

Commit 3954a34

Browse files
authored
Merge pull request #128 from LAA-Software-Engineering/feat/hitl-approvals-106
feat(engine,policy): human-in-the-loop approvals (issue #106)
2 parents 0b870ae + ebbe84b commit 3954a34

26 files changed

Lines changed: 2232 additions & 93 deletions

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ The full product vision, YAML spec v0, and architecture are documented in [**`do
4949
- **`agentctl validate`** — load project, apply **project defaults** (`spec.defaults`), then **environment overlays** (`-e` / `--env`, `Environment` resources §7.6), then validate graph, schemas, and references
5050
- **`agentctl plan`** — diff desired graph vs SQLite **deployment** state; risk hints; JSON/YAML output includes a **`deploymentBaseline`** digest for the store snapshot
5151
- **`agentctl apply`** — persist plan (TTY confirm or `--auto-approve` / `AGENTCTL_AUTO_APPROVE`); **optimistic concurrency** — if the deployment store changed after the plan snapshot (e.g. another process applied the same `--state` file while this run waited at the prompt), apply fails with **exit code 3**; re-run **plan** then **apply**
52-
- **`agentctl run`** — execute a workflow locally; JSON Schema for inputs where configured; policy gates
52+
- **`agentctl run`** — execute a workflow locally; JSON Schema for inputs where configured; policy gates pause for **human-in-the-loop (HITL)** approval when a tool call requires it
5353
- **`agentctl logs`** — read **trace events** from SQLite (`--run`, `--workflow`, or recent runs)
5454
- **Tools****`native`**, **`http`**, **`mock`**, and **`mcp`** — MCP supports **stdio** (subprocess) or **streamable HTTP** (`spec.mcp.transport: http`, `url`, optional `headers` with `env:` tokens)
5555
- **Project defaults** — besides **`model`** and **`policy`**, optional **`runtime`** flows to **`spec.runtime`** on agents/workflows when omitted (MVP: **`local`** or unset; see spec validation)
@@ -142,6 +142,8 @@ Notes:
142142

143143
- **`init`** creates `my-agent-system/` with `apiVersion: agentic.dev/v0` resources and a **`hello`** workflow (native `echo` tool only — **no network**).
144144
- **`apply`** in non-interactive environments needs **`--auto-approve`** or **`AGENTCTL_AUTO_APPROVE=1`**.
145+
- **`run`** HITL: gated tool calls exit with **`Status: interrupted`** (exit **0**). Resume with **`--resume <run-id> --decision approve|reject|edit|switch`** (use **`--decision-edit-json`** / **`--decision-switch-target`** when needed), or skip prompts with **`--auto-approve`** / **`AGENTCTL_AUTO_APPROVE=1`**. Pre-approve a specific call with repeated **`--approve <uses>`**. Set **`AGENTCTL_HITL_ACTOR`** to attribute decisions in trace logs.
146+
- **`Policy.spec.hitl.interruptOn`** keys are **Tool metadata.name** values; they configure review options (edit rules, switch targets) for calls already gated by **`approvals.requiredFor`** or safety metadata — they do not gate tools on their own.
145147
- **`run`** stores traces in the **same** SQLite file used for plan/apply (default **`.agentic/state.db`** under `--project`).
146148
- If **`spec.traces.retentionDays`** is a positive integer, runs older than that many **UTC calendar days** (by `runs.started_at`) are deleted lazily on **`run`** and **`logs`** (child trace rows cascade). Unset or non-positive means no pruning.
147149
- Use **`logs --run <id>`** after a run if you want a single run’s trace (IDs are printed by **`run`**).

internal/cli/hitl.go

Lines changed: 192 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,192 @@
1+
package cli
2+
3+
import (
4+
"bufio"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"os"
10+
"strings"
11+
12+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine"
13+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/policy"
14+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime"
15+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
16+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
17+
"github.com/mattn/go-isatty"
18+
)
19+
20+
// EnvHitlActor overrides the actor recorded on approval trace events.
21+
const EnvHitlActor = "AGENTCTL_HITL_ACTOR"
22+
23+
// maxDecisionEditJSONBytes caps --decision-edit-json size (well below checkpoint limits).
24+
const maxDecisionEditJSONBytes = 1 << 20
25+
26+
func hitlActorFromEnv() string {
27+
if v := strings.TrimSpace(os.Getenv(EnvHitlActor)); v != "" {
28+
return v
29+
}
30+
if u := strings.TrimSpace(os.Getenv("USER")); u != "" {
31+
return u
32+
}
33+
return policy.DefaultHitlActor
34+
}
35+
36+
func applyHitlRunOptions(opts *runtime.WorkflowRunOptions, resuming bool, autoApprove bool, decision string, editJSON string, switchTarget string) error {
37+
opts.AutoApprove = autoApprove || envAutoApproveEnabled()
38+
opts.HitlActor = hitlActorFromEnv()
39+
decision = strings.TrimSpace(decision)
40+
editJSON = strings.TrimSpace(editJSON)
41+
switchTarget = strings.TrimSpace(switchTarget)
42+
43+
if decision == "" {
44+
if editJSON != "" || switchTarget != "" {
45+
return fmt.Errorf("run: --decision-edit-json and --decision-switch-target require --decision")
46+
}
47+
return nil
48+
}
49+
if !resuming {
50+
return fmt.Errorf("run: --decision requires --resume <run-id>")
51+
}
52+
kind, err := spec.ParseHitlDecisionKind(decision)
53+
if err != nil {
54+
return err
55+
}
56+
hd := &runtime.HitlDecisionOptions{Kind: kind}
57+
switch kind {
58+
case spec.HitlDecisionEdit:
59+
if editJSON == "" {
60+
return fmt.Errorf("run: --decision edit requires --decision-edit-json")
61+
}
62+
if len(editJSON) > maxDecisionEditJSONBytes {
63+
return fmt.Errorf("run: --decision-edit-json exceeds %d bytes", maxDecisionEditJSONBytes)
64+
}
65+
var m map[string]any
66+
if err := json.Unmarshal([]byte(editJSON), &m); err != nil {
67+
return fmt.Errorf("run: --decision-edit-json: %w", err)
68+
}
69+
if m == nil {
70+
return fmt.Errorf("run: --decision-edit-json must be a JSON object")
71+
}
72+
hd.EditedWith = m
73+
case spec.HitlDecisionSwitch:
74+
hd.SwitchTarget = switchTarget
75+
if hd.SwitchTarget == "" {
76+
return fmt.Errorf("run: --decision switch requires --decision-switch-target")
77+
}
78+
}
79+
opts.HitlDecision = hd
80+
return nil
81+
}
82+
83+
func maybePromptHitlDecision(in io.Reader, out io.Writer, gate policy.HitlGate) (*policy.HitlDecisionInput, error) {
84+
if !isatty.IsTerminal(os.Stdin.Fd()) {
85+
return nil, nil
86+
}
87+
actor := hitlActorFromEnv()
88+
display := policy.RedactHitlArgs(gate.With, gate.Review.RedactKeys)
89+
fmt.Fprintf(out, "\n%s\n", gate.Review.Description)
90+
fmt.Fprintf(out, "Tool: %s\nArguments: %v\n", gate.Uses, display)
91+
fmt.Fprintf(out, "Allowed decisions: %v\n", gate.Review.AllowedDecisions)
92+
if len(gate.Review.SwitchTargets) > 0 {
93+
fmt.Fprintf(out, "Switch targets: %v\n", gate.Review.SwitchTargets)
94+
}
95+
for {
96+
fmt.Fprintf(out, "Decision [approve/reject/edit/switch]: ")
97+
line, err := readLine(in)
98+
if err != nil {
99+
return nil, err
100+
}
101+
kind, err := spec.ParseHitlDecisionKind(line)
102+
if err != nil {
103+
fmt.Fprintf(out, "Unknown decision %q\n", line)
104+
continue
105+
}
106+
if !policy.IsDecisionAllowed(kind, gate.Review.AllowedDecisions) {
107+
fmt.Fprintf(out, "Decision %q is not allowed for this call\n", kind)
108+
continue
109+
}
110+
dec := &policy.HitlDecisionInput{Kind: kind, Actor: actor}
111+
switch kind {
112+
case spec.HitlDecisionEdit:
113+
fmt.Fprintf(out, "Edited args JSON: ")
114+
editLine, err := readLine(in)
115+
if err != nil {
116+
return nil, err
117+
}
118+
if len(editLine) > maxDecisionEditJSONBytes {
119+
fmt.Fprintf(out, "Edited args exceed %d bytes\n", maxDecisionEditJSONBytes)
120+
continue
121+
}
122+
var m map[string]any
123+
if err := json.Unmarshal([]byte(editLine), &m); err != nil {
124+
fmt.Fprintf(out, "Invalid JSON: %v\n", err)
125+
continue
126+
}
127+
if err := policy.ValidateHitlEdit(gate.With, m, gate.Review); err != nil {
128+
fmt.Fprintf(out, "%v\n", err)
129+
continue
130+
}
131+
dec.EditedWith = m
132+
case spec.HitlDecisionSwitch:
133+
fmt.Fprintf(out, "Switch target operation: ")
134+
target, err := readLine(in)
135+
if err != nil {
136+
return nil, err
137+
}
138+
dec.SwitchTarget = target
139+
}
140+
return dec, nil
141+
}
142+
}
143+
144+
func readLine(r io.Reader) (string, error) {
145+
sc := bufio.NewScanner(r)
146+
if !sc.Scan() {
147+
if err := sc.Err(); err != nil {
148+
return "", err
149+
}
150+
return "", fmt.Errorf("run: unexpected EOF reading hitl decision")
151+
}
152+
return strings.TrimSpace(sc.Text()), nil
153+
}
154+
155+
func hitlGateFromCheckpoint(contextJSON string) (*policy.HitlGate, error) {
156+
var payload struct {
157+
PendingHitl *engine.PendingHitlState `json:"pendingHitl,omitempty"`
158+
}
159+
if err := json.Unmarshal([]byte(contextJSON), &payload); err != nil {
160+
return nil, fmt.Errorf("unmarshal checkpoint: %w", err)
161+
}
162+
if payload.PendingHitl == nil {
163+
return nil, nil
164+
}
165+
p := payload.PendingHitl
166+
return &policy.HitlGate{
167+
Uses: p.Uses,
168+
With: p.With,
169+
Review: p.Review,
170+
}, nil
171+
}
172+
173+
// loadPendingHitlGate reads the latest checkpoint for a run awaiting HITL input.
174+
func loadPendingHitlGate(ctx context.Context, st state.RuntimeStore, runID string) (*policy.HitlGate, error) {
175+
cp, err := st.GetLatestCheckpoint(ctx, runID)
176+
if err != nil {
177+
return nil, err
178+
}
179+
return hitlGateFromCheckpoint(cp.ContextJSON)
180+
}
181+
182+
// requirePendingHitlGate returns the pending gate or an error when interrupted without one.
183+
func requirePendingHitlGate(ctx context.Context, st state.RuntimeStore, runID string) (*policy.HitlGate, error) {
184+
gate, err := loadPendingHitlGate(ctx, st, runID)
185+
if err != nil {
186+
return nil, err
187+
}
188+
if gate == nil {
189+
return nil, fmt.Errorf("run: run %q is interrupted but checkpoint has no pending approval gate", runID)
190+
}
191+
return gate, nil
192+
}

internal/cli/run.go

Lines changed: 78 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,21 @@ import (
1414
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/render"
1515
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime"
1616
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime/local"
17+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
1718
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
1819
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite"
20+
"github.com/mattn/go-isatty"
1921
"github.com/spf13/cobra"
2022
)
2123

2224
func newRunCmd() *cobra.Command {
2325
var inputFile string
2426
var inputPairs []string
2527
var approves []string
28+
var autoApprove bool
29+
var decision string
30+
var decisionEditJSON string
31+
var decisionSwitchTarget string
2632
var resumeRunID string
2733

2834
cmd := &cobra.Command{
@@ -37,6 +43,8 @@ Workflow input is built from optional --input-file (JSON object) plus repeated -
3743
--approve using the full uses string (e.g. tool.helper.echo).
3844
3945
Resume an interrupted or incomplete run with --resume <run-id> (no workflow argument).
46+
When a run pauses for human approval, resume with --decision and related flags, or use
47+
--auto-approve / AGENTCTL_AUTO_APPROVE=1 for non-interactive approval.
4048
4149
Examples:
4250
agentctl run workflow/demo --input topic=hello
@@ -71,12 +79,16 @@ Exit codes (section 11.2):
7179
return NewExitError(ExitValidationError, err)
7280
}
7381
}
74-
return runRun(cmd, wfName, resumeRunID, inputFile, inputPairs, approves)
82+
return runRun(cmd, wfName, resumeRunID, inputFile, inputPairs, approves, autoApprove, decision, decisionEditJSON, decisionSwitchTarget)
7583
},
7684
}
7785
cmd.Flags().StringVar(&inputFile, "input-file", "", "path to JSON file with workflow input object")
7886
cmd.Flags().StringArrayVar(&inputPairs, "input", nil, "workflow input as key=value (repeatable; values are strings)")
7987
cmd.Flags().StringArrayVar(&approves, "approve", nil, "approve a policy-gated tool uses string (repeatable)")
88+
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "auto-approve human-in-the-loop gates (or set AGENTCTL_AUTO_APPROVE=1)")
89+
cmd.Flags().StringVar(&decision, "decision", "", "HITL decision when resuming: approve, reject, edit, or switch")
90+
cmd.Flags().StringVar(&decisionEditJSON, "decision-edit-json", "", "JSON object of edited tool args when --decision edit")
91+
cmd.Flags().StringVar(&decisionSwitchTarget, "decision-switch-target", "", "target operation when --decision switch")
8092
cmd.Flags().StringVar(&resumeRunID, "resume", "", "resume an interrupted or incomplete run by id")
8193
return cmd
8294
}
@@ -165,7 +177,7 @@ func classifyRunError(err error) int {
165177
}
166178
}
167179

168-
func runRun(cmd *cobra.Command, wfName, resumeRunID, inputFile string, inputPairs, approves []string) error {
180+
func runRun(cmd *cobra.Command, wfName, resumeRunID, inputFile string, inputPairs, approves []string, autoApprove bool, decision, decisionEditJSON, decisionSwitchTarget string) error {
169181
ctx := context.Background()
170182
g := Globals()
171183

@@ -203,33 +215,71 @@ func runRun(cmd *cobra.Command, wfName, resumeRunID, inputFile string, inputPair
203215
defer func() { _ = st.Close() }()
204216

205217
rt := local.NewRuntime(root, st)
206-
opts := runtime.WorkflowRunOptions{
207-
EnvironmentName: strings.TrimSpace(g.Env),
208-
Env: env,
209-
InputJSON: inputJSON,
210-
ApprovedActions: approves,
211-
Resume: resumeID != "",
212-
RunID: resumeID,
213-
}
214-
if !opts.Resume {
215-
opts.WorkflowName = wfName
216-
}
217-
runID, runErr := rt.ExecuteWorkflow(ctx, opts)
218218

219-
outWfName := wfName
220-
if opts.Resume && runID != "" {
221-
if r, gerr := st.GetRun(ctx, runID); gerr == nil && r != nil {
222-
outWfName = r.WorkflowName
219+
for {
220+
opts := runtime.WorkflowRunOptions{
221+
EnvironmentName: strings.TrimSpace(g.Env),
222+
Env: env,
223+
InputJSON: inputJSON,
224+
ApprovedActions: approves,
225+
Resume: resumeID != "",
226+
RunID: resumeID,
223227
}
224-
}
228+
if err := applyHitlRunOptions(&opts, resumeID != "", autoApprove, decision, decisionEditJSON, decisionSwitchTarget); err != nil {
229+
return NewExitError(ExitValidationError, err)
230+
}
231+
if !opts.Resume {
232+
opts.WorkflowName = wfName
233+
}
234+
runID, runErr := rt.ExecuteWorkflow(ctx, opts)
225235

226-
if werr := writeRunOutput(cmd, ctx, st, env, dsn, outWfName, runID, runErr, g); werr != nil {
227-
return werr
228-
}
229-
if runErr != nil {
230-
return NewExitError(classifyRunError(runErr), fmt.Errorf("run: %w", runErr))
236+
outWfName := wfName
237+
if opts.Resume && runID != "" {
238+
if r, gerr := st.GetRun(ctx, runID); gerr == nil && r != nil {
239+
outWfName = r.WorkflowName
240+
}
241+
}
242+
243+
if runErr == nil && runID != "" {
244+
if r, gerr := st.GetRun(ctx, runID); gerr == nil && r != nil && r.Status == state.RunStatusInterrupted {
245+
if opts.AutoApprove || strings.TrimSpace(decision) != "" {
246+
if _, gerr := requirePendingHitlGate(ctx, st, runID); gerr != nil {
247+
return gerr
248+
}
249+
resumeID = runID
250+
continue
251+
}
252+
gate, gerr := requirePendingHitlGate(ctx, st, runID)
253+
if gerr != nil {
254+
return gerr
255+
}
256+
if isatty.IsTerminal(os.Stdin.Fd()) {
257+
dec, perr := maybePromptHitlDecision(cmd.InOrStdin(), cmd.OutOrStdout(), *gate)
258+
if perr != nil {
259+
return perr
260+
}
261+
if dec != nil {
262+
resumeID = runID
263+
decision = string(dec.Kind)
264+
if dec.Kind == spec.HitlDecisionEdit {
265+
b, _ := json.Marshal(dec.EditedWith)
266+
decisionEditJSON = string(b)
267+
}
268+
decisionSwitchTarget = dec.SwitchTarget
269+
continue
270+
}
271+
}
272+
}
273+
}
274+
275+
if werr := writeRunOutput(cmd, ctx, st, env, dsn, outWfName, runID, runErr, g); werr != nil {
276+
return werr
277+
}
278+
if runErr != nil {
279+
return NewExitError(classifyRunError(runErr), fmt.Errorf("run: %w", runErr))
280+
}
281+
return nil
231282
}
232-
return nil
233283
}
234284

235285
func writeRunOutput(cmd *cobra.Command, ctx context.Context, st *sqlite.Store, env, dsn, wfName, runID string, runErr error, g *Global) error {
@@ -286,6 +336,9 @@ func writeRunOutput(cmd *cobra.Command, ctx context.Context, st *sqlite.Store, e
286336
fmt.Fprintf(&b, "\nRun ID: %s\n", runID)
287337
if got != nil {
288338
fmt.Fprintf(&b, "Status: %s\n", got.Status)
339+
if got.Status == state.RunStatusInterrupted {
340+
fmt.Fprintf(&b, "Resume with: agentctl run --resume %s --decision approve|reject|edit|switch ...\n", runID)
341+
}
289342
if got.ErrorText != "" {
290343
fmt.Fprintf(&b, "Error: %s\n", got.ErrorText)
291344
}

0 commit comments

Comments
 (0)