Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ The full product vision, YAML spec v0, and architecture are documented in [**`do
- **`agentctl validate`** — load project, apply **project defaults** (`spec.defaults`), then **environment overlays** (`-e` / `--env`, `Environment` resources §7.6), then validate graph, schemas, and references
- **`agentctl plan`** — diff desired graph vs SQLite **deployment** state; risk hints; JSON/YAML output includes a **`deploymentBaseline`** digest for the store snapshot
- **`agentctl apply`** — persist plan (TTY confirm or `--auto-approve` / `AGENTCTL_AUTO_APPROVE`); **optimistic concurrency** — if the deployment store changed after the plan snapshot (e.g. another process applied the same `--state` file while this run waited at the prompt), apply fails with **exit code 3**; re-run **plan** then **apply**
- **`agentctl run`** — execute a workflow locally; JSON Schema for inputs where configured; policy gates
- **`agentctl run`** — execute a workflow locally; JSON Schema for inputs where configured; policy gates pause for **human-in-the-loop (HITL)** approval when a tool call requires it
- **`agentctl logs`** — read **trace events** from SQLite (`--run`, `--workflow`, or recent runs)
- **Tools** — **`native`**, **`http`**, **`mock`**, and **`mcp`** — MCP supports **stdio** (subprocess) or **streamable HTTP** (`spec.mcp.transport: http`, `url`, optional `headers` with `env:` tokens)
- **Project defaults** — besides **`model`** and **`policy`**, optional **`runtime`** flows to **`spec.runtime`** on agents/workflows when omitted (MVP: **`local`** or unset; see spec validation)
Expand Down Expand Up @@ -142,6 +142,8 @@ Notes:

- **`init`** creates `my-agent-system/` with `apiVersion: agentic.dev/v0` resources and a **`hello`** workflow (native `echo` tool only — **no network**).
- **`apply`** in non-interactive environments needs **`--auto-approve`** or **`AGENTCTL_AUTO_APPROVE=1`**.
- **`run`** HITL: gated tool calls exit with **`Status: interrupted`** (exit **0**). Resume with **`--resume <run-id> --decision approve|reject|edit|switch`** (use **`--decision-edit-json`** / **`--decision-switch-target`** when needed), or skip prompts with **`--auto-approve`** / **`AGENTCTL_AUTO_APPROVE=1`**. Pre-approve a specific call with repeated **`--approve <uses>`**. Set **`AGENTCTL_HITL_ACTOR`** to attribute decisions in trace logs.
- **`Policy.spec.hitl.interruptOn`** keys are **Tool metadata.name** values; they configure review options (edit rules, switch targets) for calls already gated by **`approvals.requiredFor`** or safety metadata — they do not gate tools on their own.
- **`run`** stores traces in the **same** SQLite file used for plan/apply (default **`.agentic/state.db`** under `--project`).
- If **`spec.traces.retentionDays`** is a positive integer, runs older than that many **UTC calendar days** (by `runs.started_at`) are deleted lazily on **`run`** and **`logs`** (child trace rows cascade). Unset or non-positive means no pruning.
- Use **`logs --run <id>`** after a run if you want a single run’s trace (IDs are printed by **`run`**).
Expand Down
192 changes: 192 additions & 0 deletions internal/cli/hitl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package cli

import (
"bufio"
"context"
"encoding/json"
"fmt"
"io"
"os"
"strings"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/policy"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
"github.com/mattn/go-isatty"
)

// EnvHitlActor overrides the actor recorded on approval trace events.
const EnvHitlActor = "AGENTCTL_HITL_ACTOR"

// maxDecisionEditJSONBytes caps --decision-edit-json size (well below checkpoint limits).
const maxDecisionEditJSONBytes = 1 << 20

func hitlActorFromEnv() string {
if v := strings.TrimSpace(os.Getenv(EnvHitlActor)); v != "" {
return v
}
if u := strings.TrimSpace(os.Getenv("USER")); u != "" {
return u
}
return policy.DefaultHitlActor
}

func applyHitlRunOptions(opts *runtime.WorkflowRunOptions, resuming bool, autoApprove bool, decision string, editJSON string, switchTarget string) error {
opts.AutoApprove = autoApprove || envAutoApproveEnabled()
opts.HitlActor = hitlActorFromEnv()
decision = strings.TrimSpace(decision)
editJSON = strings.TrimSpace(editJSON)
switchTarget = strings.TrimSpace(switchTarget)

if decision == "" {
if editJSON != "" || switchTarget != "" {
return fmt.Errorf("run: --decision-edit-json and --decision-switch-target require --decision")
}
return nil
}
if !resuming {
return fmt.Errorf("run: --decision requires --resume <run-id>")
}
kind, err := spec.ParseHitlDecisionKind(decision)
if err != nil {
return err
}
hd := &runtime.HitlDecisionOptions{Kind: kind}
switch kind {
case spec.HitlDecisionEdit:
if editJSON == "" {
return fmt.Errorf("run: --decision edit requires --decision-edit-json")
}
if len(editJSON) > maxDecisionEditJSONBytes {
return fmt.Errorf("run: --decision-edit-json exceeds %d bytes", maxDecisionEditJSONBytes)
}
var m map[string]any
if err := json.Unmarshal([]byte(editJSON), &m); err != nil {
return fmt.Errorf("run: --decision-edit-json: %w", err)
}
if m == nil {
return fmt.Errorf("run: --decision-edit-json must be a JSON object")
}
hd.EditedWith = m
case spec.HitlDecisionSwitch:
hd.SwitchTarget = switchTarget
if hd.SwitchTarget == "" {
return fmt.Errorf("run: --decision switch requires --decision-switch-target")
}
}
opts.HitlDecision = hd
return nil
}

func maybePromptHitlDecision(in io.Reader, out io.Writer, gate policy.HitlGate) (*policy.HitlDecisionInput, error) {
if !isatty.IsTerminal(os.Stdin.Fd()) {
return nil, nil
}
actor := hitlActorFromEnv()
display := policy.RedactHitlArgs(gate.With, gate.Review.RedactKeys)
fmt.Fprintf(out, "\n%s\n", gate.Review.Description)
fmt.Fprintf(out, "Tool: %s\nArguments: %v\n", gate.Uses, display)
fmt.Fprintf(out, "Allowed decisions: %v\n", gate.Review.AllowedDecisions)
if len(gate.Review.SwitchTargets) > 0 {
fmt.Fprintf(out, "Switch targets: %v\n", gate.Review.SwitchTargets)
}
for {
fmt.Fprintf(out, "Decision [approve/reject/edit/switch]: ")
line, err := readLine(in)
if err != nil {
return nil, err
}
kind, err := spec.ParseHitlDecisionKind(line)
if err != nil {
fmt.Fprintf(out, "Unknown decision %q\n", line)
continue
}
if !policy.IsDecisionAllowed(kind, gate.Review.AllowedDecisions) {
fmt.Fprintf(out, "Decision %q is not allowed for this call\n", kind)
continue
}
dec := &policy.HitlDecisionInput{Kind: kind, Actor: actor}
switch kind {
case spec.HitlDecisionEdit:
fmt.Fprintf(out, "Edited args JSON: ")
editLine, err := readLine(in)
if err != nil {
return nil, err
}
if len(editLine) > maxDecisionEditJSONBytes {
fmt.Fprintf(out, "Edited args exceed %d bytes\n", maxDecisionEditJSONBytes)
continue
}
var m map[string]any
if err := json.Unmarshal([]byte(editLine), &m); err != nil {
fmt.Fprintf(out, "Invalid JSON: %v\n", err)
continue
}
if err := policy.ValidateHitlEdit(gate.With, m, gate.Review); err != nil {
fmt.Fprintf(out, "%v\n", err)
continue
}
dec.EditedWith = m
case spec.HitlDecisionSwitch:
fmt.Fprintf(out, "Switch target operation: ")
target, err := readLine(in)
if err != nil {
return nil, err
}
dec.SwitchTarget = target
}
return dec, nil
}
}

func readLine(r io.Reader) (string, error) {
sc := bufio.NewScanner(r)
if !sc.Scan() {
if err := sc.Err(); err != nil {
return "", err
}
return "", fmt.Errorf("run: unexpected EOF reading hitl decision")
}
return strings.TrimSpace(sc.Text()), nil
}

func hitlGateFromCheckpoint(contextJSON string) (*policy.HitlGate, error) {
var payload struct {
PendingHitl *engine.PendingHitlState `json:"pendingHitl,omitempty"`
}
if err := json.Unmarshal([]byte(contextJSON), &payload); err != nil {
return nil, fmt.Errorf("unmarshal checkpoint: %w", err)
}
if payload.PendingHitl == nil {
return nil, nil
}
p := payload.PendingHitl
return &policy.HitlGate{
Uses: p.Uses,
With: p.With,
Review: p.Review,
}, nil
}

// loadPendingHitlGate reads the latest checkpoint for a run awaiting HITL input.
func loadPendingHitlGate(ctx context.Context, st state.RuntimeStore, runID string) (*policy.HitlGate, error) {
cp, err := st.GetLatestCheckpoint(ctx, runID)
if err != nil {
return nil, err
}
return hitlGateFromCheckpoint(cp.ContextJSON)
}

// requirePendingHitlGate returns the pending gate or an error when interrupted without one.
func requirePendingHitlGate(ctx context.Context, st state.RuntimeStore, runID string) (*policy.HitlGate, error) {
gate, err := loadPendingHitlGate(ctx, st, runID)
if err != nil {
return nil, err
}
if gate == nil {
return nil, fmt.Errorf("run: run %q is interrupted but checkpoint has no pending approval gate", runID)
}
return gate, nil
}
103 changes: 78 additions & 25 deletions internal/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,21 @@ import (
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/render"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime/local"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite"
"github.com/mattn/go-isatty"
"github.com/spf13/cobra"
)

func newRunCmd() *cobra.Command {
var inputFile string
var inputPairs []string
var approves []string
var autoApprove bool
var decision string
var decisionEditJSON string
var decisionSwitchTarget string
var resumeRunID string

cmd := &cobra.Command{
Expand All @@ -37,6 +43,8 @@ Workflow input is built from optional --input-file (JSON object) plus repeated -
--approve using the full uses string (e.g. tool.helper.echo).

Resume an interrupted or incomplete run with --resume <run-id> (no workflow argument).
When a run pauses for human approval, resume with --decision and related flags, or use
--auto-approve / AGENTCTL_AUTO_APPROVE=1 for non-interactive approval.

Examples:
agentctl run workflow/demo --input topic=hello
Expand Down Expand Up @@ -71,12 +79,16 @@ Exit codes (section 11.2):
return NewExitError(ExitValidationError, err)
}
}
return runRun(cmd, wfName, resumeRunID, inputFile, inputPairs, approves)
return runRun(cmd, wfName, resumeRunID, inputFile, inputPairs, approves, autoApprove, decision, decisionEditJSON, decisionSwitchTarget)
},
}
cmd.Flags().StringVar(&inputFile, "input-file", "", "path to JSON file with workflow input object")
cmd.Flags().StringArrayVar(&inputPairs, "input", nil, "workflow input as key=value (repeatable; values are strings)")
cmd.Flags().StringArrayVar(&approves, "approve", nil, "approve a policy-gated tool uses string (repeatable)")
cmd.Flags().BoolVar(&autoApprove, "auto-approve", false, "auto-approve human-in-the-loop gates (or set AGENTCTL_AUTO_APPROVE=1)")
cmd.Flags().StringVar(&decision, "decision", "", "HITL decision when resuming: approve, reject, edit, or switch")
cmd.Flags().StringVar(&decisionEditJSON, "decision-edit-json", "", "JSON object of edited tool args when --decision edit")
cmd.Flags().StringVar(&decisionSwitchTarget, "decision-switch-target", "", "target operation when --decision switch")
cmd.Flags().StringVar(&resumeRunID, "resume", "", "resume an interrupted or incomplete run by id")
return cmd
}
Expand Down Expand Up @@ -165,7 +177,7 @@ func classifyRunError(err error) int {
}
}

func runRun(cmd *cobra.Command, wfName, resumeRunID, inputFile string, inputPairs, approves []string) error {
func runRun(cmd *cobra.Command, wfName, resumeRunID, inputFile string, inputPairs, approves []string, autoApprove bool, decision, decisionEditJSON, decisionSwitchTarget string) error {
ctx := context.Background()
g := Globals()

Expand Down Expand Up @@ -203,33 +215,71 @@ func runRun(cmd *cobra.Command, wfName, resumeRunID, inputFile string, inputPair
defer func() { _ = st.Close() }()

rt := local.NewRuntime(root, st)
opts := runtime.WorkflowRunOptions{
EnvironmentName: strings.TrimSpace(g.Env),
Env: env,
InputJSON: inputJSON,
ApprovedActions: approves,
Resume: resumeID != "",
RunID: resumeID,
}
if !opts.Resume {
opts.WorkflowName = wfName
}
runID, runErr := rt.ExecuteWorkflow(ctx, opts)

outWfName := wfName
if opts.Resume && runID != "" {
if r, gerr := st.GetRun(ctx, runID); gerr == nil && r != nil {
outWfName = r.WorkflowName
for {
opts := runtime.WorkflowRunOptions{
EnvironmentName: strings.TrimSpace(g.Env),
Env: env,
InputJSON: inputJSON,
ApprovedActions: approves,
Resume: resumeID != "",
RunID: resumeID,
}
}
if err := applyHitlRunOptions(&opts, resumeID != "", autoApprove, decision, decisionEditJSON, decisionSwitchTarget); err != nil {
return NewExitError(ExitValidationError, err)
}
if !opts.Resume {
opts.WorkflowName = wfName
}
runID, runErr := rt.ExecuteWorkflow(ctx, opts)

if werr := writeRunOutput(cmd, ctx, st, env, dsn, outWfName, runID, runErr, g); werr != nil {
return werr
}
if runErr != nil {
return NewExitError(classifyRunError(runErr), fmt.Errorf("run: %w", runErr))
outWfName := wfName
if opts.Resume && runID != "" {
if r, gerr := st.GetRun(ctx, runID); gerr == nil && r != nil {
outWfName = r.WorkflowName
}
}

if runErr == nil && runID != "" {
if r, gerr := st.GetRun(ctx, runID); gerr == nil && r != nil && r.Status == state.RunStatusInterrupted {
if opts.AutoApprove || strings.TrimSpace(decision) != "" {
if _, gerr := requirePendingHitlGate(ctx, st, runID); gerr != nil {
return gerr
}
resumeID = runID
continue
}
gate, gerr := requirePendingHitlGate(ctx, st, runID)
if gerr != nil {
return gerr
}
if isatty.IsTerminal(os.Stdin.Fd()) {
dec, perr := maybePromptHitlDecision(cmd.InOrStdin(), cmd.OutOrStdout(), *gate)
if perr != nil {
return perr
}
if dec != nil {
resumeID = runID
decision = string(dec.Kind)
if dec.Kind == spec.HitlDecisionEdit {
b, _ := json.Marshal(dec.EditedWith)
decisionEditJSON = string(b)
}
decisionSwitchTarget = dec.SwitchTarget
continue
}
}
}
}

if werr := writeRunOutput(cmd, ctx, st, env, dsn, outWfName, runID, runErr, g); werr != nil {
return werr
}
if runErr != nil {
return NewExitError(classifyRunError(runErr), fmt.Errorf("run: %w", runErr))
}
return nil
}
return nil
}

func writeRunOutput(cmd *cobra.Command, ctx context.Context, st *sqlite.Store, env, dsn, wfName, runID string, runErr error, g *Global) error {
Expand Down Expand Up @@ -286,6 +336,9 @@ func writeRunOutput(cmd *cobra.Command, ctx context.Context, st *sqlite.Store, e
fmt.Fprintf(&b, "\nRun ID: %s\n", runID)
if got != nil {
fmt.Fprintf(&b, "Status: %s\n", got.Status)
if got.Status == state.RunStatusInterrupted {
fmt.Fprintf(&b, "Resume with: agentctl run --resume %s --decision approve|reject|edit|switch ...\n", runID)
}
if got.ErrorText != "" {
fmt.Fprintf(&b, "Error: %s\n", got.ErrorText)
}
Expand Down
Loading
Loading