diff --git a/CHANGELOG.md b/CHANGELOG.md index c860bb4..723ef03 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/). ### Added +- **Run checkpointing and resume** (issue #105): SQLite `run_checkpoints` table stores per-run execution snapshots after each completed step. `agentctl run --resume ` rehydrates interpolation context and continues from the next step without replaying earlier steps. Interrupted runs exit cleanly (status `interrupted`, exit code 0) and cascade with trace retention pruning. Checkpoints are written before step rows are marked succeeded to avoid replay on crash; runs pin `workflow_spec_hash` and `environment_name` for safe resume. - **Built-in policy presets** (issue #104): `strict`, `permissive`, and `shell_safe`. Select via `Project.spec.defaults.policy`, by referencing a preset name on agents/workflows, or with `Policy.spec.preset` (local rules layer on top). Presets expand during [NormalizeProjectGraph]; `strict`/`permissive` materialize approval flags, while `shell_safe` sets `ResolvedPreset` and relies on runtime token classification plus tool safety metadata for plan risk. - **`shell_safe` token classification** for native `command.run` / `run` / `exec` / `shell` operations: read-only first tokens (`ls`, `cat`, …) run unattended when the command contains no shell metacharacters (`;|&$`, newlines, `` ` ``, `$(…)`); risky tokens, unknown tokens, and side-effecting non-shell tools require `--approve`. **Heuristic only — not a sandbox.** - **`spec.safety` on Tool resources** (issue #103): optional `trusted`, `sideEffects`, and `requiresApproval` fields. [NormalizeProjectGraph] materializes fail-closed defaults on load. diff --git a/internal/cli/run.go b/internal/cli/run.go index 395881b..31c5535 100644 --- a/internal/cli/run.go +++ b/internal/cli/run.go @@ -3,11 +3,13 @@ package cli import ( "context" "encoding/json" + "errors" "fmt" "os" "path/filepath" "strings" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/policy" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/render" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime" @@ -21,6 +23,7 @@ func newRunCmd() *cobra.Command { var inputFile string var inputPairs []string var approves []string + var resumeRunID string cmd := &cobra.Command{ Use: "run workflow/", @@ -33,24 +36,48 @@ Workflow input is built from optional --input-file (JSON object) plus repeated - (string values only for key=value pairs). Policy-gated tool uses can be allowed with repeated --approve using the full uses string (e.g. tool.helper.echo). +Resume an interrupted or incomplete run with --resume (no workflow argument). + Examples: agentctl run workflow/demo --input topic=hello agentctl run workflow/demo --input-file input.json + agentctl run --resume run-abc123 Exit codes (section 11.2): - 0 — success + 0 — success (including interrupted runs awaiting resume) 1 — generic failure (e.g. cannot open SQLite, start run, trace) 2 — validation failure (project, workflow ref, input, input-file) 4 — execution failure (step/engine error after the run row exists) 5 — policy denial`, - Args: cobra.ExactArgs(1), + Args: func(cmd *cobra.Command, args []string) error { + resume, _ := cmd.Flags().GetString("resume") + if strings.TrimSpace(resume) != "" { + if len(args) != 0 { + return NewExitError(ExitValidationError, fmt.Errorf("run: --resume does not take a workflow argument")) + } + return nil + } + if len(args) != 1 { + return NewExitError(ExitValidationError, fmt.Errorf("run: requires workflow/ or --resume ")) + } + return nil + }, RunE: func(cmd *cobra.Command, args []string) error { - return runRun(cmd, args[0], inputFile, inputPairs, approves) + var wfName string + if len(args) == 1 { + var err error + wfName, err = parseWorkflowTarget(args[0]) + if err != nil { + return NewExitError(ExitValidationError, err) + } + } + return runRun(cmd, wfName, resumeRunID, inputFile, inputPairs, approves) }, } cmd.Flags().StringVar(&inputFile, "input-file", "", "path to JSON file with workflow input object") cmd.Flags().StringArrayVar(&inputPairs, "input", nil, "workflow input as key=value (repeatable; values are strings)") cmd.Flags().StringArrayVar(&approves, "approve", nil, "approve a policy-gated tool uses string (repeatable)") + cmd.Flags().StringVar(&resumeRunID, "resume", "", "resume an interrupted or incomplete run by id") return cmd } @@ -107,6 +134,9 @@ func classifyRunError(err error) int { if err == nil { return ExitSuccess } + if errors.Is(err, engine.ErrInterrupted) { + return ExitSuccess + } if _, ok := policy.AsDenied(err); ok { return ExitPolicyDenied } @@ -118,25 +148,30 @@ func classifyRunError(err error) int { strings.Contains(msg, "invalid input JSON"), strings.Contains(msg, "workflow input"), strings.Contains(msg, "marshal workflow input"), - strings.Contains(msg, "unknown environment"): + strings.Contains(msg, "unknown environment"), + strings.Contains(msg, "workflow spec changed"), + strings.Contains(msg, "does not match run"): return ExitValidationError case strings.Contains(msg, "open sqlite"), strings.Contains(msg, "ping sqlite"), strings.Contains(msg, "start run:"), - strings.Contains(msg, "trace run."): + strings.Contains(msg, "trace run."), + strings.Contains(msg, "not found"), + strings.Contains(msg, "has no checkpoint"), + strings.Contains(msg, "is not resumable"): return ExitGenericFailure default: return ExitExecutionError } } -func runRun(cmd *cobra.Command, target, inputFile string, inputPairs, approves []string) error { +func runRun(cmd *cobra.Command, wfName, resumeRunID, inputFile string, inputPairs, approves []string) error { ctx := context.Background() g := Globals() - wfName, err := parseWorkflowTarget(target) - if err != nil { - return NewExitError(ExitValidationError, err) + resumeID := strings.TrimSpace(resumeRunID) + if resumeID == "" && wfName == "" { + return NewExitError(ExitValidationError, fmt.Errorf("run: requires workflow/ or --resume ")) } graph, root, err := prepareProjectGraph(g.ProjectRoot, g) @@ -144,9 +179,12 @@ func runRun(cmd *cobra.Command, target, inputFile string, inputPairs, approves [ return NewExitError(ExitValidationError, err) } - inputJSON, err := buildRunInputJSON(inputFile, inputPairs) - if err != nil { - return NewExitError(ExitValidationError, err) + var inputJSON []byte + if resumeID == "" { + inputJSON, err = buildRunInputJSON(inputFile, inputPairs) + if err != nil { + return NewExitError(ExitValidationError, err) + } } env := planEnvironment(g) @@ -165,15 +203,27 @@ func runRun(cmd *cobra.Command, target, inputFile string, inputPairs, approves [ defer func() { _ = st.Close() }() rt := local.NewRuntime(root, st) - runID, runErr := rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{ - WorkflowName: wfName, + opts := runtime.WorkflowRunOptions{ EnvironmentName: strings.TrimSpace(g.Env), Env: env, InputJSON: inputJSON, ApprovedActions: approves, - }) + Resume: resumeID != "", + RunID: resumeID, + } + if !opts.Resume { + opts.WorkflowName = wfName + } + runID, runErr := rt.ExecuteWorkflow(ctx, opts) + + outWfName := wfName + if opts.Resume && runID != "" { + if r, gerr := st.GetRun(ctx, runID); gerr == nil && r != nil { + outWfName = r.WorkflowName + } + } - if werr := writeRunOutput(cmd, ctx, st, env, dsn, wfName, runID, runErr, g); werr != nil { + if werr := writeRunOutput(cmd, ctx, st, env, dsn, outWfName, runID, runErr, g); werr != nil { return werr } if runErr != nil { diff --git a/internal/cli/run_test.go b/internal/cli/run_test.go index 0c38e33..5553ac2 100644 --- a/internal/cli/run_test.go +++ b/internal/cli/run_test.go @@ -2,11 +2,26 @@ package cli import ( "bytes" + "context" + "encoding/json" + "errors" "io" "os" "path/filepath" "strings" "testing" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/plan" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/project" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime/local" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" ) func runProjRoot(t *testing.T) string { @@ -220,3 +235,126 @@ func TestRun_inputFile_succeeds(t *testing.T) { t.Fatal(out.String()) } } + +func TestRun_resume_missingRun_exit1(t *testing.T) { + db := filepath.Join(t.TempDir(), "resume-missing.db") + root := runProjRoot(t) + + ResetGlobalsForTest() + var out bytes.Buffer + cmd := NewRootCmd() + cmd.SetOut(&out) + cmd.SetErr(&out) + cmd.SetArgs([]string{ + "run", "--resume", "does-not-exist", + "--project", root, + "--state", db, + }) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error") + } + if ExitCodeOf(err) != ExitGenericFailure { + t.Fatalf("exit=%d err=%v out=%s", ExitCodeOf(err), err, out.String()) + } +} + +func TestRun_resume_withWorkflowArg_exit2(t *testing.T) { + db := filepath.Join(t.TempDir(), "resume-bad-args.db") + root := runProjRoot(t) + + ResetGlobalsForTest() + cmd := NewRootCmd() + cmd.SetArgs([]string{ + "run", "workflow/demo", "--resume", "some-id", + "--project", root, + "--state", db, + }) + err := cmd.Execute() + if err == nil { + t.Fatal("expected error") + } + if ExitCodeOf(err) != ExitValidationError { + t.Fatalf("exit=%d err=%v", ExitCodeOf(err), err) + } +} + +func TestRun_resume_happyPath(t *testing.T) { + ctx := context.Background() + db := filepath.Join(t.TempDir(), "resume-happy.db") + root := runProjRoot(t) + + st, err := sqlite.Open(ctx, db) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + graph, err := project.LoadProject(root) + if err != nil { + t.Fatal(err) + } + spec.NormalizeProjectGraph(graph) + graph, err = local.ApplyEnvironment(graph, "staging") + if err != nil { + t.Fatal(err) + } + wf := graph.Workflows["demo"] + wfHash, err := plan.WorkflowSpecHash(wf) + if err != nil { + t.Fatal(err) + } + + runID := "cli-resume-1" + started := time.Date(2026, 6, 1, 12, 0, 0, 0, time.UTC) + if err := st.StartRun(ctx, state.Run{ + RunID: runID, WorkflowName: "demo", Env: "dev", Status: state.RunStatusRunning, + StartedAt: started, InputJSON: `{"topic":"cli-resume"}`, TotalCostUSD: 0, + WorkflowSpecHash: wfHash, EnvironmentName: "staging", + }); err != nil { + t.Fatal(err) + } + + var input map[string]any + if err := json.Unmarshal([]byte(`{"topic":"cli-resume"}`), &input); err != nil { + t.Fatal(err) + } + idx := 0 + ex := &engine.Executor{ + Graph: graph, ProjectRoot: root, + Tools: tools.NewRegistry(graph), Models: models.NewRegistry(graph), + Store: st, Trace: trace.NewRecorder(st), + Now: func() time.Time { return started }, + } + if err := ex.Run(ctx, engine.RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + InterruptAfterStepIndex: &idx, + }); !errors.Is(err, engine.ErrInterrupted) { + t.Fatalf("interrupt: %v", err) + } + + ResetGlobalsForTest() + var out bytes.Buffer + cmd := NewRootCmd() + cmd.SetOut(&out) + cmd.SetErr(&out) + cmd.SetArgs([]string{ + "run", "--resume", runID, + "--project", root, + "-e", "staging", + "--state", db, + }) + if err := cmd.Execute(); err != nil { + t.Fatalf("resume: %v\n%s", err, out.String()) + } + if !strings.Contains(out.String(), "succeeded") { + t.Fatalf("output:\n%s", out.String()) + } + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != state.RunStatusSucceeded { + t.Fatalf("status %q", got.Status) + } +} diff --git a/internal/engine/checkpoint.go b/internal/engine/checkpoint.go new file mode 100644 index 0000000..537c839 --- /dev/null +++ b/internal/engine/checkpoint.go @@ -0,0 +1,168 @@ +package engine + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "strings" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/render" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" +) + +// ErrInterrupted is returned when a run pauses at an approval gate or stub interrupt (issue #105). +// Callers should treat this as a clean exit, not a failure. +var ErrInterrupted = errors.New("engine: run interrupted") + +const ( + checkpointPayloadVersion = 1 + maxCheckpointContextBytes = 4 << 20 // 4 MiB + maxCheckpointSteps = 256 +) + +// checkpointPayload is the engine-owned snapshot stored in run_checkpoints.context_json. +type checkpointPayload struct { + Version int `json:"version"` + Input map[string]any `json:"input"` + Steps map[string]StepResult `json:"steps"` + TotalCostUSD float64 `json:"totalCostUsd"` +} + +func marshalCheckpointPayload(ictx Context, totalCost float64) (string, error) { + payload := checkpointPayload{ + Version: checkpointPayloadVersion, + Input: ictx.Input, + Steps: ictx.Steps, + TotalCostUSD: totalCost, + } + if payload.Input == nil { + payload.Input = map[string]any{} + } + if payload.Steps == nil { + payload.Steps = map[string]StepResult{} + } + b, err := render.MarshalStableJSON(payload) + if err != nil { + return "", fmt.Errorf("engine: marshal checkpoint: %w", err) + } + if len(b) > maxCheckpointContextBytes { + return "", fmt.Errorf("engine: checkpoint context exceeds %d bytes", maxCheckpointContextBytes) + } + return string(b), nil +} + +func unmarshalCheckpointPayload(contextJSON string, wf *spec.WorkflowResource, completedStepIndex int) (Context, float64, error) { + if len(contextJSON) > maxCheckpointContextBytes { + return Context{}, 0, fmt.Errorf("engine: checkpoint context exceeds %d bytes", maxCheckpointContextBytes) + } + var payload checkpointPayload + if err := json.Unmarshal([]byte(contextJSON), &payload); err != nil { + return Context{}, 0, fmt.Errorf("engine: unmarshal checkpoint: %w", err) + } + if payload.Version != checkpointPayloadVersion { + return Context{}, 0, fmt.Errorf("engine: unsupported checkpoint version %d", payload.Version) + } + if payload.Input == nil { + payload.Input = map[string]any{} + } + if payload.Steps == nil { + payload.Steps = map[string]StepResult{} + } + if len(payload.Steps) > maxCheckpointSteps { + return Context{}, 0, fmt.Errorf("engine: checkpoint has too many steps (%d)", len(payload.Steps)) + } + if err := validateCheckpointSteps(payload.Steps, wf, completedStepIndex); err != nil { + return Context{}, 0, err + } + if payload.TotalCostUSD < 0 { + return Context{}, 0, fmt.Errorf("engine: negative totalCostUsd in checkpoint") + } + return Context{Input: payload.Input, Steps: payload.Steps}, payload.TotalCostUSD, nil +} + +func validateCheckpointSteps(steps map[string]StepResult, wf *spec.WorkflowResource, completedStepIndex int) error { + if wf == nil { + return fmt.Errorf("engine: nil workflow for checkpoint validation") + } + allowed := make(map[string]struct{}, completedStepIndex+1) + for i := 0; i <= completedStepIndex && i < len(wf.Spec.Steps); i++ { + id := strings.TrimSpace(wf.Spec.Steps[i].ID) + if id != "" { + allowed[id] = struct{}{} + } + } + for stepID := range steps { + if _, ok := allowed[stepID]; !ok { + return fmt.Errorf("engine: checkpoint references unknown or future step %q", stepID) + } + } + return nil +} + +func (e *Executor) saveCheckpoint(ctx context.Context, runID string, stepIndex int, stepID string, ictx Context, totalCost float64, status string) error { + ctxJSON, err := marshalCheckpointPayload(ictx, totalCost) + if err != nil { + return err + } + return e.Store.SaveCheckpoint(ctx, state.RunCheckpoint{ + RunID: runID, + StepIndex: stepIndex, + StepID: stepID, + ContextJSON: ctxJSON, + Status: status, + CreatedAt: e.now(), + }) +} + +func (e *Executor) loadResumeState(ctx context.Context, in RunInput) (Context, float64, int, error) { + cp, err := e.Store.GetLatestCheckpoint(ctx, in.RunID) + if err != nil { + return Context{}, 0, 0, fmt.Errorf("engine: load checkpoint: %w", err) + } + switch cp.Status { + case state.CheckpointStatusRunning, state.CheckpointStatusInterrupted: + default: + return Context{}, 0, 0, fmt.Errorf("engine: checkpoint status %q is not resumable", cp.Status) + } + wf, err := lookupWorkflow(e.Graph, in.WorkflowName) + if err != nil { + return Context{}, 0, 0, err + } + ictx, totalCost, err := unmarshalCheckpointPayload(cp.ContextJSON, wf, cp.StepIndex) + if err != nil { + return Context{}, 0, 0, err + } + return ictx, totalCost, cp.StepIndex + 1, nil +} + +func (e *Executor) interruptRun(ctx context.Context, in RunInput, stepIndex int, stepID string, ictx Context, totalCost float64) error { + if err := e.saveCheckpoint(ctx, in.RunID, stepIndex, stepID, ictx, totalCost, state.CheckpointStatusInterrupted); err != nil { + return fmt.Errorf("engine: save interrupted checkpoint: %w", err) + } + if err := e.Store.UpdateRunStatus(ctx, in.RunID, state.RunStatusInterrupted); err != nil { + return fmt.Errorf("engine: mark run interrupted: %w", err) + } + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, in.RunID, stepID, trace.EventRunInterrupted, map[string]any{ + "stepIndex": stepIndex, "stepId": stepID, + }) + } + return ErrInterrupted +} + +// resumeRunStartedAt returns StartedAt for resumed runs, using the original run row when available. +func resumeRunStartedAt(ctx context.Context, store state.RuntimeStore, in RunInput) time.Time { + if !in.Resume || store == nil { + return in.StartedAt + } + run, err := store.GetRun(ctx, in.RunID) + if err != nil || run == nil { + return in.StartedAt + } + return run.StartedAt +} diff --git a/internal/engine/checkpoint_order_test.go b/internal/engine/checkpoint_order_test.go new file mode 100644 index 0000000..77fad19 --- /dev/null +++ b/internal/engine/checkpoint_order_test.go @@ -0,0 +1,90 @@ +package engine + +import ( + "context" + "encoding/json" + "errors" + "path/filepath" + "testing" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" +) + +// TestRun_checkpointSurvivesMissingStepRow verifies resume works when the checkpoint +// was persisted before the run_steps succeeded row (crash window in PR #127 review). +func TestRun_checkpointSurvivesMissingStepRow(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "order.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + root := testProjectRoot(t) + graph := demoWorkflowGraph(t) + runID := "order-run" + started := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + inJSON := `{"topic":"order"}` + if err := st.StartRun(ctx, state.Run{ + RunID: runID, WorkflowName: "demo", Env: "dev", Status: state.RunStatusRunning, + StartedAt: started, InputJSON: inJSON, TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + var input map[string]any + if err := json.Unmarshal([]byte(inJSON), &input); err != nil { + t.Fatal(err) + } + + ex := &Executor{ + Graph: graph, ProjectRoot: root, + Tools: tools.NewRegistry(graph), Models: models.NewRegistry(graph), + Store: st, Trace: trace.NewRecorder(st), + Now: func() time.Time { return started }, + } + idx := 0 + if err := ex.Run(ctx, RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + InterruptAfterStepIndex: &idx, + }); !errors.Is(err, ErrInterrupted) { + t.Fatalf("interrupt: %v", err) + } + + cp, err := st.GetLatestCheckpoint(ctx, runID) + if err != nil { + t.Fatal(err) + } + if cp.StepID != "fetch" { + t.Fatalf("checkpoint step %q", cp.StepID) + } + + // Simulate crash after checkpoint but before/during step row commit: remove succeeded row. + if err := st.UpsertRunStep(ctx, state.RunStep{ + RunID: runID, StepID: "fetch", Status: "running", + StartedAt: &started, InputJSON: `{}`, + }); err != nil { + t.Fatal(err) + } + + if err := st.UpdateRunStatus(ctx, runID, state.RunStatusRunning); err != nil { + t.Fatal(err) + } + if err := ex.Run(ctx, RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + Resume: true, + }); err != nil { + t.Fatal(err) + } + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != state.RunStatusSucceeded { + t.Fatalf("status %q", got.Status) + } +} diff --git a/internal/engine/checkpoint_test.go b/internal/engine/checkpoint_test.go new file mode 100644 index 0000000..b6d3e12 --- /dev/null +++ b/internal/engine/checkpoint_test.go @@ -0,0 +1,53 @@ +package engine + +import ( + "encoding/json" + "testing" +) + +func TestMarshalCheckpointPayload_stableKeyOrder(t *testing.T) { + ictx := Context{ + Input: map[string]any{"b": 2, "a": 1}, + Steps: map[string]StepResult{ + "fetch": {Output: map[string]any{"x": 1}, Meta: map[string]any{"costUsd": 0.1}}, + }, + } + s1, err := marshalCheckpointPayload(ictx, 0.5) + if err != nil { + t.Fatal(err) + } + s2, err := marshalCheckpointPayload(ictx, 0.5) + if err != nil { + t.Fatal(err) + } + if s1 != s2 { + t.Fatalf("non-deterministic: %q vs %q", s1, s2) + } + if !json.Valid([]byte(s1)) { + t.Fatalf("invalid json: %s", s1) + } + + wf := demoWorkflowGraph(t).Workflows["demo"] + gotCtx, cost, err := unmarshalCheckpointPayload(s1, wf, 0) + if err != nil { + t.Fatal(err) + } + if cost != 0.5 { + t.Fatalf("cost = %v", cost) + } + b, _ := json.Marshal(gotCtx.Input) + if string(b) != `{"a":1,"b":2}` && string(b) != `{"b":2,"a":1}` { + t.Fatalf("input round-trip %s", b) + } + if len(gotCtx.Steps) != 1 { + t.Fatalf("steps = %d", len(gotCtx.Steps)) + } +} + +func TestUnmarshalCheckpointPayload_malformed(t *testing.T) { + wf := demoWorkflowGraph(t).Workflows["demo"] + _, _, err := unmarshalCheckpointPayload(`not-json`, wf, 0) + if err == nil { + t.Fatal("expected error") + } +} diff --git a/internal/engine/execution.go b/internal/engine/execution.go index fbade01..8165851 100644 --- a/internal/engine/execution.go +++ b/internal/engine/execution.go @@ -36,6 +36,11 @@ type RunInput struct { StartedAt time.Time Input map[string]any ApprovedActions []string + // Resume loads the latest checkpoint and continues from the next step (issue #105). + Resume bool + // InterruptAfterStepIndex, when non-nil, checkpoints and returns [ErrInterrupted] after + // completing the step at this index. Used to simulate approval gates until HITL lands. + InterruptAfterStepIndex *int } func (e *Executor) now() time.Time { @@ -78,9 +83,21 @@ func (e *Executor) Run(ctx context.Context, in RunInput) error { ictx := Context{Input: in.Input, Steps: make(map[string]StepResult)} var totalCost float64 + stepStartIdx := 0 + if in.Resume { + var err error + ictx, totalCost, stepStartIdx, err = e.loadResumeState(ctx, in) + if err != nil { + return err + } + } + runStartedAt := resumeRunStartedAt(ctx, e.Store, in) finishAt := e.now() - for _, step := range wf.Spec.Steps { + for i, step := range wf.Spec.Steps { + if i < stepStartIdx { + continue + } step := step if strings.TrimSpace(step.ID) == "" { return e.failRun(ctx, in, fmt.Errorf("engine: workflow step missing id"), totalCost) @@ -100,9 +117,9 @@ func (e *Executor) Run(ctx context.Context, in RunInput) error { with = map[string]any{} } - elapsed := e.now().Sub(in.StartedAt) + elapsed := e.now().Sub(runStartedAt) pctx := policy.RunContext{ - StartedAt: in.StartedAt, + StartedAt: runStartedAt, Elapsed: elapsed, AccumulatedCostUSD: totalCost, ApprovedActions: in.ApprovedActions, @@ -162,6 +179,15 @@ func (e *Executor) Run(ctx context.Context, in RunInput) error { return e.failRun(ctx, in, fmt.Errorf("engine: step %q: %w", step.ID, err), totalCost) } + meta := map[string]any{"costUsd": stepCost, "durationMs": finished.Sub(started).Milliseconds()} + ictx.Steps[step.ID] = StepResult{Output: out, Meta: meta} + + // Checkpoint before marking the step succeeded so resume never replays a completed step + // if the process dies after persistence (issue #105 / PR #127). + if err := e.saveCheckpoint(ctx, in.RunID, i, step.ID, ictx, totalCost, state.CheckpointStatusRunning); err != nil { + return e.failRun(ctx, in, fmt.Errorf("engine: checkpoint step %q: %w", step.ID, err), totalCost) + } + outJSON, _ := json.Marshal(out) if err := e.Store.UpsertRunStep(ctx, state.RunStep{ RunID: in.RunID, @@ -178,9 +204,9 @@ func (e *Executor) Run(ctx context.Context, in RunInput) error { if e.Trace != nil { _, _ = e.Trace.Append(ctx, in.RunID, step.ID, trace.EventStepFinished, map[string]any{"costUsd": stepCost}) } - - meta := map[string]any{"costUsd": stepCost, "durationMs": finished.Sub(started).Milliseconds()} - ictx.Steps[step.ID] = StepResult{Output: out, Meta: meta} + if in.InterruptAfterStepIndex != nil && i == *in.InterruptAfterStepIndex { + return e.interruptRun(ctx, in, i, step.ID, ictx, totalCost) + } } finalOut, err := buildWorkflowOutput(wf, ictx) @@ -192,12 +218,17 @@ func (e *Executor) Run(ctx context.Context, in RunInput) error { return e.failRun(ctx, in, err, totalCost) } finishAt = e.now() - return e.Store.FinishRun(ctx, in.RunID, "succeeded", finishAt, string(outBytes), "", totalCost) + if err := e.saveCheckpoint(ctx, in.RunID, len(wf.Spec.Steps)-1, "", ictx, totalCost, state.CheckpointStatusCompleted); err != nil { + return e.failRun(ctx, in, fmt.Errorf("engine: final checkpoint: %w", err), totalCost) + } + return e.Store.FinishRun(ctx, in.RunID, state.RunStatusSucceeded, finishAt, string(outBytes), "", totalCost) } func (e *Executor) failRun(ctx context.Context, in RunInput, runErr error, totalCost float64) error { + ictx := Context{Input: in.Input, Steps: map[string]StepResult{}} + _ = e.saveCheckpoint(ctx, in.RunID, -1, "", ictx, totalCost, state.CheckpointStatusFailed) finishAt := e.now() - _ = e.Store.FinishRun(ctx, in.RunID, "failed", finishAt, "", runErr.Error(), totalCost) + _ = e.Store.FinishRun(ctx, in.RunID, state.RunStatusFailed, finishAt, "", runErr.Error(), totalCost) return runErr } diff --git a/internal/engine/resume_test.go b/internal/engine/resume_test.go new file mode 100644 index 0000000..95062c1 --- /dev/null +++ b/internal/engine/resume_test.go @@ -0,0 +1,267 @@ +package engine + +import ( + "context" + "encoding/json" + "errors" + "path/filepath" + "testing" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" +) + +func demoWorkflowGraph(t *testing.T) *spec.ProjectGraph { + t.Helper() + return &spec.ProjectGraph{ + Spec: spec.ProjectSpec{ + Providers: &spec.ProjectProviders{ + Models: map[string]spec.ModelProviderConfig{ + "mock": {Type: "mock"}, + }, + }, + }, + Tools: map[string]*spec.ToolResource{ + "helper": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindTool, + Metadata: spec.Metadata{Name: "helper"}, + Spec: spec.ToolSpec{ + Type: "native", + Safety: &spec.ToolSafety{SideEffects: spec.BoolPtr(false)}, + }, + }, + }, + Agents: map[string]*spec.AgentResource{ + "reviewer": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindAgent, + Metadata: spec.Metadata{Name: "reviewer"}, + Spec: spec.AgentSpec{ + Model: "mock/gpt-4", + Instructions: "Summarize the tool payload as JSON.", + Output: &spec.AgentIO{Schema: "./schemas/agent-out.schema.json"}, + }, + }, + }, + Workflows: map[string]*spec.WorkflowResource{ + "demo": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindWorkflow, + Metadata: spec.Metadata{Name: "demo"}, + Spec: spec.WorkflowSpec{ + Steps: []spec.WorkflowStep{ + { + ID: "fetch", + Uses: "tool.helper.echo", + With: map[string]any{"topic": "${input.topic}"}, + }, + { + ID: "summarize", + Agent: "reviewer", + With: map[string]any{"echo": "${steps.fetch.output.echo}"}, + }, + }, + Output: &spec.WorkflowOutput{ + Value: map[string]any{ + "topic": "${input.topic}", + "summary": "${steps.summarize.output.summary}", + }, + }, + }, + }, + }, + } +} + +func TestRun_interruptAndResume_completesWithoutReplay(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "resume.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + root := testProjectRoot(t) + graph := demoWorkflowGraph(t) + runID := "run-resume" + started := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + inJSON := `{"topic":"agents"}` + if err := st.StartRun(ctx, state.Run{ + RunID: runID, WorkflowName: "demo", Env: "dev", Status: "running", + StartedAt: started, InputJSON: inJSON, TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + var input map[string]any + if err := json.Unmarshal([]byte(inJSON), &input); err != nil { + t.Fatal(err) + } + + ex := &Executor{ + Graph: graph, ProjectRoot: root, + Tools: tools.NewRegistry(graph), Models: models.NewRegistry(graph), + Store: st, Trace: trace.NewRecorder(st), + Now: func() time.Time { return started }, + } + err = ex.Run(ctx, RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + InterruptAfterStepIndex: intPtr(0), + }) + if !errors.Is(err, ErrInterrupted) { + t.Fatalf("err = %v want ErrInterrupted", err) + } + + run, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if run.Status != state.RunStatusInterrupted { + t.Fatalf("status = %q", run.Status) + } + cp, err := st.GetLatestCheckpoint(ctx, runID) + if err != nil { + t.Fatal(err) + } + if cp.Status != state.CheckpointStatusInterrupted || cp.StepIndex != 0 || cp.StepID != "fetch" { + t.Fatalf("checkpoint = %+v", cp) + } + + if err := st.UpdateRunStatus(ctx, runID, "running"); err != nil { + t.Fatal(err) + } + resumeAt := started.Add(time.Hour) + ex.Now = func() time.Time { return resumeAt } + if err := ex.Run(ctx, RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + Resume: true, + }); err != nil { + t.Fatal(err) + } + + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != "succeeded" { + t.Fatalf("status %q err=%q", got.Status, got.ErrorText) + } + + rows, err := st.ListTraceEventsByRunID(ctx, runID) + if err != nil { + t.Fatal(err) + } + var fetchStarts int + for _, ev := range rows { + if ev.StepID == "fetch" && ev.Type == "step.started" { + fetchStarts++ + } + } + if fetchStarts != 1 { + t.Fatalf("fetch step.started count = %d want 1", fetchStarts) + } +} + +func TestRun_resumeFromRunningCheckpoint(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "crash.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + root := testProjectRoot(t) + graph := demoWorkflowGraph(t) + runID := "run-crash" + started := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + inJSON := `{"topic":"crash-test"}` + if err := st.StartRun(ctx, state.Run{ + RunID: runID, WorkflowName: "demo", Env: "dev", Status: "running", + StartedAt: started, InputJSON: inJSON, TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + var input map[string]any + if err := json.Unmarshal([]byte(inJSON), &input); err != nil { + t.Fatal(err) + } + + ex := &Executor{ + Graph: graph, ProjectRoot: root, + Tools: tools.NewRegistry(graph), Models: models.NewRegistry(graph), + Store: st, Trace: trace.NewRecorder(st), + Now: func() time.Time { return started }, + } + if err := ex.Run(ctx, RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + InterruptAfterStepIndex: intPtr(0), + }); !errors.Is(err, ErrInterrupted) { + t.Fatalf("interrupt: %v", err) + } + if err := st.UpdateRunStatus(ctx, runID, "running"); err != nil { + t.Fatal(err) + } + + cp, err := st.GetLatestCheckpoint(ctx, runID) + if err != nil { + t.Fatal(err) + } + if err := st.SaveCheckpoint(ctx, state.RunCheckpoint{ + RunID: runID, StepIndex: cp.StepIndex, StepID: cp.StepID, + ContextJSON: cp.ContextJSON, Status: state.CheckpointStatusRunning, + CreatedAt: started.Add(time.Minute), + }); err != nil { + t.Fatal(err) + } + + if err := ex.Run(ctx, RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + Resume: true, + }); err != nil { + t.Fatal(err) + } + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != "succeeded" { + t.Fatalf("status %q", got.Status) + } +} + +func TestRun_resume_rejectsCompletedCheckpoint(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "done.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + started := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + if err := st.StartRun(ctx, state.Run{ + RunID: "done", WorkflowName: "demo", Env: "dev", Status: "succeeded", + StartedAt: started, InputJSON: `{}`, TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + if err := st.SaveCheckpoint(ctx, state.RunCheckpoint{ + RunID: "done", StepIndex: 1, StepID: "last", + ContextJSON: `{"version":1,"input":{},"steps":{},"totalCostUsd":0}`, + Status: state.CheckpointStatusCompleted, CreatedAt: started, + }); err != nil { + t.Fatal(err) + } + + ex := &Executor{Graph: demoWorkflowGraph(t), Store: st} + err = ex.Run(ctx, RunInput{RunID: "done", WorkflowName: "demo", Resume: true, Input: map[string]any{}}) + if err == nil { + t.Fatal("expected error") + } +} + +func intPtr(n int) *int { return &n } diff --git a/internal/plan/workflow_hash.go b/internal/plan/workflow_hash.go new file mode 100644 index 0000000..40a9ce3 --- /dev/null +++ b/internal/plan/workflow_hash.go @@ -0,0 +1,19 @@ +package plan + +import ( + "fmt" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" +) + +// WorkflowSpecHash returns the deployment spec_hash for a normalized workflow resource envelope. +func WorkflowSpecHash(wf *spec.WorkflowResource) (string, error) { + if wf == nil { + return "", fmt.Errorf("plan: nil workflow") + } + raw, err := canonicalResourceJSON(wf) + if err != nil { + return "", fmt.Errorf("plan: canonical json for workflow: %w", err) + } + return SpecHashHex(raw), nil +} diff --git a/internal/plan/workflow_hash_test.go b/internal/plan/workflow_hash_test.go new file mode 100644 index 0000000..68d5240 --- /dev/null +++ b/internal/plan/workflow_hash_test.go @@ -0,0 +1,35 @@ +package plan + +import ( + "testing" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" +) + +func TestWorkflowSpecHash_stable(t *testing.T) { + wf := &spec.WorkflowResource{ + APIVersion: spec.APIVersionV0, + Kind: spec.KindWorkflow, + Metadata: spec.Metadata{Name: "demo"}, + Spec: spec.WorkflowSpec{ + Steps: []spec.WorkflowStep{{ID: "a", Uses: "tool.x.y"}}, + }, + } + h1, err := WorkflowSpecHash(wf) + if err != nil { + t.Fatal(err) + } + h2, err := WorkflowSpecHash(wf) + if err != nil { + t.Fatal(err) + } + if h1 == "" || h1 != h2 { + t.Fatalf("hash %q %q", h1, h2) + } +} + +func TestWorkflowSpecHash_nil(t *testing.T) { + if _, err := WorkflowSpecHash(nil); err == nil { + t.Fatal("expected error") + } +} diff --git a/internal/render/json.go b/internal/render/json.go index 9dc0394..47b2afd 100644 --- a/internal/render/json.go +++ b/internal/render/json.go @@ -17,6 +17,12 @@ func WriteJSON(w io.Writer, v any) error { return enc.Encode(nv) } +// MarshalStableJSON encodes v as compact JSON with lexicographically sorted object keys. +func MarshalStableJSON(v any) ([]byte, error) { + nv := normalizeForStableJSON(v) + return json.Marshal(nv) +} + func normalizeForStableJSON(v any) any { switch t := v.(type) { case map[string]any: diff --git a/internal/runtime/local/prepare.go b/internal/runtime/local/prepare.go new file mode 100644 index 0000000..6b029c6 --- /dev/null +++ b/internal/runtime/local/prepare.go @@ -0,0 +1,43 @@ +package local + +import ( + "context" + "fmt" + "strings" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/project" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" +) + +// preparedProject is a loaded, normalized, environment-overlaid, validated project graph. +type preparedProject struct { + root string + graph *spec.ProjectGraph +} + +// prepareProject loads the project, applies environment overrides, validates, and prunes old runs. +func (r *Runtime) prepareProject(ctx context.Context, environmentName string) (*preparedProject, error) { + root := strings.TrimSpace(r.ProjectRoot) + if root == "" { + return nil, fmt.Errorf("local: empty project root") + } + graph, err := project.LoadProject(root) + if err != nil { + return nil, fmt.Errorf("local: load project: %w", err) + } + spec.NormalizeProjectGraph(graph) + graph, err = ApplyEnvironment(graph, environmentName) + if err != nil { + return nil, err + } + if err := spec.ValidateProjectGraph(graph, root); err != nil { + return nil, fmt.Errorf("local: validate project: %w", err) + } + if n := spec.TraceRetentionDays(graph); n > 0 { + cutoff := r.now().UTC().AddDate(0, 0, -n) + if _, err := r.Store.DeleteRunsStartedBefore(ctx, cutoff); err != nil { + return nil, fmt.Errorf("local: prune trace runs: %w", err) + } + } + return &preparedProject{root: root, graph: graph}, nil +} diff --git a/internal/runtime/local/resume_validate.go b/internal/runtime/local/resume_validate.go new file mode 100644 index 0000000..524c40d --- /dev/null +++ b/internal/runtime/local/resume_validate.go @@ -0,0 +1,41 @@ +package local + +import ( + "fmt" + "strings" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/plan" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +// resumeEnvironmentName returns the environment overlay to apply when resuming a run. +// When the run row pins a non-empty name, the CLI must not pass a conflicting -e value. +func resumeEnvironmentName(run *state.Run, opts runtime.WorkflowRunOptions) (string, error) { + pinned := strings.TrimSpace(run.EnvironmentName) + cli := strings.TrimSpace(opts.EnvironmentName) + if pinned == "" { + return cli, nil + } + if cli != "" && cli != pinned { + return "", fmt.Errorf("local: environment %q does not match run %q", cli, pinned) + } + return pinned, nil +} + +// validateResumeWorkflowSpec ensures the workflow definition has not changed since the run started. +func validateResumeWorkflowSpec(run *state.Run, wf *spec.WorkflowResource) error { + stored := strings.TrimSpace(run.WorkflowSpecHash) + if stored == "" { + return nil + } + current, err := plan.WorkflowSpecHash(wf) + if err != nil { + return fmt.Errorf("local: hash workflow: %w", err) + } + if current != stored { + return fmt.Errorf("local: workflow spec changed since run started") + } + return nil +} diff --git a/internal/runtime/local/resume_validate_test.go b/internal/runtime/local/resume_validate_test.go new file mode 100644 index 0000000..a40a39f --- /dev/null +++ b/internal/runtime/local/resume_validate_test.go @@ -0,0 +1,59 @@ +package local + +import ( + "testing" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +func TestResumeEnvironmentName_pinnedAndMatchingCLI(t *testing.T) { + run := &state.Run{EnvironmentName: "staging"} + got, err := resumeEnvironmentName(run, runtime.WorkflowRunOptions{EnvironmentName: "staging"}) + if err != nil || got != "staging" { + t.Fatalf("got %q err=%v", got, err) + } +} + +func TestResumeEnvironmentName_pinnedIgnoresEmptyCLI(t *testing.T) { + run := &state.Run{EnvironmentName: "staging"} + got, err := resumeEnvironmentName(run, runtime.WorkflowRunOptions{}) + if err != nil || got != "staging" { + t.Fatalf("got %q err=%v", got, err) + } +} + +func TestResumeEnvironmentName_conflict(t *testing.T) { + run := &state.Run{EnvironmentName: "staging"} + _, err := resumeEnvironmentName(run, runtime.WorkflowRunOptions{EnvironmentName: "prod"}) + if err == nil { + t.Fatal("expected error") + } +} + +func TestValidateResumeWorkflowSpec_mismatch(t *testing.T) { + wf := &spec.WorkflowResource{ + APIVersion: spec.APIVersionV0, + Kind: spec.KindWorkflow, + Metadata: spec.Metadata{Name: "demo"}, + Spec: spec.WorkflowSpec{ + Steps: []spec.WorkflowStep{{ID: "a", Uses: "tool.x.y"}}, + }, + } + run := &state.Run{WorkflowSpecHash: "deadbeef"} + if err := validateResumeWorkflowSpec(run, wf); err == nil { + t.Fatal("expected error") + } +} + +func TestValidateResumeWorkflowSpec_legacyEmptyHash(t *testing.T) { + wf := &spec.WorkflowResource{ + APIVersion: spec.APIVersionV0, + Kind: spec.KindWorkflow, + Metadata: spec.Metadata{Name: "demo"}, + } + if err := validateResumeWorkflowSpec(&state.Run{}, wf); err != nil { + t.Fatal(err) + } +} diff --git a/internal/runtime/local/runner.go b/internal/runtime/local/runner.go index 121f12f..2dfe381 100644 --- a/internal/runtime/local/runner.go +++ b/internal/runtime/local/runner.go @@ -2,15 +2,17 @@ package local import ( "context" + "database/sql" "encoding/json" + "errors" "fmt" "strings" + "time" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" - "github.com/LAA-Software-Engineering/agentic-control-plane/internal/project" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/plan" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime" - "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" @@ -22,33 +24,28 @@ var _ runtime.WorkflowRunner = (*Runtime)(nil) // ExecuteWorkflow loads the project from [Runtime.ProjectRoot], applies optional environment overrides, // validates input JSON and workflow input schema before persisting the run, then invokes [engine.Executor]. +// When opts.Resume is true, the existing run row and latest checkpoint are rehydrated instead of StartRun. func (r *Runtime) ExecuteWorkflow(ctx context.Context, opts runtime.WorkflowRunOptions) (string, error) { if r == nil || r.Store == nil { return "", fmt.Errorf("local: nil runtime or store") } - root := strings.TrimSpace(r.ProjectRoot) - if root == "" { - return "", fmt.Errorf("local: empty project root") + if opts.Resume { + return r.resumeWorkflow(ctx, opts) } + return r.startWorkflow(ctx, opts) +} - graph, err := project.LoadProject(root) - if err != nil { - return "", fmt.Errorf("local: load project: %w", err) - } - spec.NormalizeProjectGraph(graph) - graph, err = ApplyEnvironment(graph, opts.EnvironmentName) +func (r *Runtime) startWorkflow(ctx context.Context, opts runtime.WorkflowRunOptions) (string, error) { + prep, err := r.prepareProject(ctx, opts.EnvironmentName) if err != nil { return "", err } - if err := spec.ValidateProjectGraph(graph, root); err != nil { - return "", fmt.Errorf("local: validate project: %w", err) - } wfName := strings.TrimSpace(opts.WorkflowName) if wfName == "" { return "", fmt.Errorf("local: empty workflow name") } - wf, ok := graph.Workflows[wfName] + wf, ok := prep.graph.Workflows[wfName] if !ok || wf == nil { return "", fmt.Errorf("local: unknown workflow %q", wfName) } @@ -61,16 +58,13 @@ func (r *Runtime) ExecuteWorkflow(ctx context.Context, opts runtime.WorkflowRunO return "", fmt.Errorf("local: invalid input JSON: %w", err) } } - - if err := engine.ValidateWorkflowInput(root, wf, input); err != nil { + if err := engine.ValidateWorkflowInput(prep.root, wf, input); err != nil { return "", err } - if n := spec.TraceRetentionDays(graph); n > 0 { - cutoff := r.now().UTC().AddDate(0, 0, -n) - if _, err := r.Store.DeleteRunsStartedBefore(ctx, cutoff); err != nil { - return "", fmt.Errorf("local: prune trace runs: %w", err) - } + wfHash, err := plan.WorkflowSpecHash(wf) + if err != nil { + return "", err } runID := strings.TrimSpace(opts.RunID) @@ -90,13 +84,15 @@ func (r *Runtime) ExecuteWorkflow(ctx context.Context, opts runtime.WorkflowRunO started := r.now() if err := r.Store.StartRun(ctx, state.Run{ - RunID: runID, - WorkflowName: wfName, - Env: envLabel, - Status: "running", - StartedAt: started, - InputJSON: string(inputBytes), - TotalCostUSD: 0, + RunID: runID, + WorkflowName: wfName, + Env: envLabel, + Status: state.RunStatusRunning, + StartedAt: started, + InputJSON: string(inputBytes), + TotalCostUSD: 0, + WorkflowSpecHash: wfHash, + EnvironmentName: strings.TrimSpace(opts.EnvironmentName), }); err != nil { return runID, fmt.Errorf("local: start run: %w", err) } @@ -108,11 +104,99 @@ func (r *Runtime) ExecuteWorkflow(ctx context.Context, opts runtime.WorkflowRunO return runID, fmt.Errorf("local: trace run.started: %w", err) } + return r.executeEngine(ctx, prep, runID, wfName, envLabel, started, input, opts.ApprovedActions, false, rec) +} + +func (r *Runtime) resumeWorkflow(ctx context.Context, opts runtime.WorkflowRunOptions) (string, error) { + runID := strings.TrimSpace(opts.RunID) + if runID == "" { + return "", fmt.Errorf("local: resume requires run id") + } + + run, err := r.Store.GetRun(ctx, runID) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return runID, fmt.Errorf("local: run %q not found", runID) + } + return runID, fmt.Errorf("local: get run: %w", err) + } + switch run.Status { + case state.RunStatusRunning, state.RunStatusInterrupted: + default: + return runID, fmt.Errorf("local: run %q status %q is not resumable", runID, run.Status) + } + + if _, err := r.Store.GetLatestCheckpoint(ctx, runID); err != nil { + if errors.Is(err, sql.ErrNoRows) { + return runID, fmt.Errorf("local: run %q has no checkpoint", runID) + } + return runID, fmt.Errorf("local: load checkpoint: %w", err) + } + + envName, err := resumeEnvironmentName(run, opts) + if err != nil { + return runID, err + } + + prep, err := r.prepareProject(ctx, envName) + if err != nil { + return runID, err + } + + wfName := strings.TrimSpace(run.WorkflowName) + wf, ok := prep.graph.Workflows[wfName] + if !ok || wf == nil { + return runID, fmt.Errorf("local: unknown workflow %q", wfName) + } + if err := validateResumeWorkflowSpec(run, wf); err != nil { + return runID, err + } + + var input map[string]any + if err := json.Unmarshal([]byte(run.InputJSON), &input); err != nil { + return runID, fmt.Errorf("local: invalid stored input JSON: %w", err) + } + if input == nil { + input = map[string]any{} + } + if err := engine.ValidateWorkflowInput(prep.root, wf, input); err != nil { + return runID, err + } + + if err := r.Store.UpdateRunStatus(ctx, runID, state.RunStatusRunning); err != nil { + return runID, fmt.Errorf("local: mark run running: %w", err) + } + + rec := trace.NewRecorder(r.Store) + if _, err := rec.Append(ctx, runID, "", trace.EventRunResumed, map[string]any{ + "workflow": wfName, + }); err != nil { + return runID, fmt.Errorf("local: trace run.resumed: %w", err) + } + + envLabel := strings.TrimSpace(run.Env) + if envLabel == "" { + envLabel = "local" + } + + return r.executeEngine(ctx, prep, runID, wfName, envLabel, run.StartedAt, input, opts.ApprovedActions, true, rec) +} + +func (r *Runtime) executeEngine( + ctx context.Context, + prep *preparedProject, + runID, wfName, envLabel string, + started time.Time, + input map[string]any, + approved []string, + resume bool, + rec *trace.Recorder, +) (string, error) { ex := &engine.Executor{ - Graph: graph, - ProjectRoot: root, - Tools: tools.NewRegistry(graph), - Models: models.NewRegistry(graph), + Graph: prep.graph, + ProjectRoot: prep.root, + Tools: tools.NewRegistry(prep.graph), + Models: models.NewRegistry(prep.graph), Store: r.Store, Trace: rec, Now: r.Now, @@ -123,11 +207,15 @@ func (r *Runtime) ExecuteWorkflow(ctx context.Context, opts runtime.WorkflowRunO Env: envLabel, StartedAt: started, Input: input, - ApprovedActions: opts.ApprovedActions, + ApprovedActions: approved, + Resume: resume, }) finData := map[string]any{} if runErr != nil { + if errors.Is(runErr, engine.ErrInterrupted) { + return runID, nil + } finData["error"] = runErr.Error() } if _, terr := rec.Append(ctx, runID, "", trace.EventRunFinished, finData); terr != nil && runErr == nil { diff --git a/internal/runtime/local/runner_test.go b/internal/runtime/local/runner_test.go index ca50e95..7a7f61c 100644 --- a/internal/runtime/local/runner_test.go +++ b/internal/runtime/local/runner_test.go @@ -3,15 +3,20 @@ package local import ( "context" "database/sql" + "encoding/json" "errors" "path/filepath" "testing" "time" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/project" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" ) @@ -200,3 +205,87 @@ func TestExecuteWorkflow_prunesOldTraceRuns(t *testing.T) { t.Fatal(err) } } + +func TestExecuteWorkflow_resumeAfterInterrupt(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "resume-local.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + root := testRunProjRoot(t) + graph, err := project.LoadProject(root) + if err != nil { + t.Fatal(err) + } + spec.NormalizeProjectGraph(graph) + graph, err = ApplyEnvironment(graph, "staging") + if err != nil { + t.Fatal(err) + } + + runID := "resume-local-1" + started := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + inputJSON := []byte(`{"topic":"resume-me"}`) + if err := st.StartRun(ctx, state.Run{ + RunID: runID, WorkflowName: "demo", Env: "dev", Status: "running", + StartedAt: started, InputJSON: string(inputJSON), TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + + var input map[string]any + if err := json.Unmarshal(inputJSON, &input); err != nil { + t.Fatal(err) + } + idx := 0 + ex := &engine.Executor{ + Graph: graph, ProjectRoot: root, + Tools: tools.NewRegistry(graph), Models: models.NewRegistry(graph), + Store: st, Trace: trace.NewRecorder(st), + Now: func() time.Time { return started }, + } + if err := ex.Run(ctx, engine.RunInput{ + RunID: runID, WorkflowName: "demo", Env: "dev", StartedAt: started, Input: input, + InterruptAfterStepIndex: &idx, + }); !errors.Is(err, engine.ErrInterrupted) { + t.Fatalf("interrupt: %v", err) + } + + rt := NewRuntime(root, st) + rt.Now = func() time.Time { return started.Add(time.Hour) } + if _, err := rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{ + RunID: runID, Resume: true, EnvironmentName: "staging", + }); err != nil { + t.Fatal(err) + } + + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != "succeeded" { + t.Fatalf("status %q err=%q", got.Status, got.ErrorText) + } + + events, err := trace.NewReader(st).ListByRunID(ctx, runID) + if err != nil { + t.Fatal(err) + } + var resumed, fetchStarts int + for _, ev := range events { + if ev.Type == trace.EventRunResumed { + resumed++ + } + if ev.StepID == "fetch" && ev.Type == trace.EventStepStarted { + fetchStarts++ + } + } + if resumed != 1 { + t.Fatalf("run.resumed count = %d", resumed) + } + if fetchStarts != 1 { + t.Fatalf("fetch step.started count = %d want 1", fetchStarts) + } +} diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go index c461338..7b45858 100644 --- a/internal/runtime/runtime.go +++ b/internal/runtime/runtime.go @@ -16,6 +16,9 @@ type WorkflowRunOptions struct { InputJSON []byte // ApprovedActions are full tool uses strings approved for policy gates. ApprovedActions []string + // Resume continues an existing run from its latest checkpoint (issue #105). + // RunID must be set; InputJSON and WorkflowName are loaded from the persisted run. + Resume bool } // WorkflowRunner loads declarative state and executes a workflow locally (design doc section 16 MVP). diff --git a/internal/state/models.go b/internal/state/models.go index 7344433..9abfeee 100644 --- a/internal/state/models.go +++ b/internal/state/models.go @@ -20,18 +20,28 @@ type AppliedProject struct { AppliedAt time.Time } +// Run status values stored on runs (design doc §14.2, issue #105). +const ( + RunStatusRunning = "running" + RunStatusInterrupted = "interrupted" + RunStatusSucceeded = "succeeded" + RunStatusFailed = "failed" +) + // Run is one workflow execution row in runs (design doc §14.2). type Run struct { - RunID string - WorkflowName string - Env string - Status string - StartedAt time.Time - FinishedAt *time.Time - InputJSON string - OutputJSON string - ErrorText string - TotalCostUSD float64 + RunID string + WorkflowName string + Env string + Status string + StartedAt time.Time + FinishedAt *time.Time + InputJSON string + OutputJSON string + ErrorText string + TotalCostUSD float64 + WorkflowSpecHash string + EnvironmentName string } // RunStep is one row in run_steps (design doc §14.2). @@ -56,3 +66,24 @@ type TraceEvent struct { StepID string DataJSON string } + +// Checkpoint status values stored in run_checkpoints (issue #105). +const ( + CheckpointStatusRunning = "running" + CheckpointStatusInterrupted = "interrupted" + CheckpointStatusCompleted = "completed" + CheckpointStatusFailed = "failed" +) + +// RunCheckpoint is one row in run_checkpoints (issue #105). +// ContextJSON holds the opaque engine-owned execution snapshot (interpolation context, +// accumulated step outputs, total cost) serialized as canonical JSON. +type RunCheckpoint struct { + RunID string + Seq int64 + StepIndex int + StepID string + ContextJSON string + Status string + CreatedAt time.Time +} diff --git a/internal/state/sqlite/checkpoint.go b/internal/state/sqlite/checkpoint.go new file mode 100644 index 0000000..cc9ff63 --- /dev/null +++ b/internal/state/sqlite/checkpoint.go @@ -0,0 +1,88 @@ +package sqlite + +import ( + "context" + "database/sql" + "fmt" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +// SaveCheckpoint appends one checkpoint row with the next monotonic seq for run_id. +func (s *Store) SaveCheckpoint(ctx context.Context, cp state.RunCheckpoint) error { + if s == nil || s.db == nil { + return fmt.Errorf("sqlite: nil store") + } + ctxJ := cp.ContextJSON + if ctxJ == "" { + ctxJ = "{}" + } + created := cp.CreatedAt.UTC().Format(time.RFC3339Nano) + + tx, err := s.db.BeginTx(ctx, nil) + if err != nil { + return err + } + defer func() { _ = tx.Rollback() }() + + var seq int64 + if err := tx.QueryRowContext(ctx, `SELECT IFNULL(MAX(seq), 0) + 1 FROM run_checkpoints WHERE run_id = ?`, cp.RunID).Scan(&seq); err != nil { + return err + } + if _, err := tx.ExecContext(ctx, ` +INSERT INTO run_checkpoints (run_id, seq, step_index, step_id, context_json, status, created_at) +VALUES (?, ?, ?, ?, ?, ?, ?) +`, cp.RunID, seq, cp.StepIndex, cp.StepID, ctxJ, cp.Status, created); err != nil { + return err + } + return tx.Commit() +} + +func scanCheckpointRow(sc rowScanner) (*state.RunCheckpoint, error) { + var cp state.RunCheckpoint + var created string + if err := sc.Scan(&cp.RunID, &cp.Seq, &cp.StepIndex, &cp.StepID, &cp.ContextJSON, &cp.Status, &created); err != nil { + return nil, err + } + t, err := parseSQLiteTime(created) + if err != nil { + return nil, fmt.Errorf("created_at: %w", err) + } + cp.CreatedAt = t + return &cp, nil +} + +// GetLatestCheckpoint returns the newest checkpoint for run_id or sql.ErrNoRows. +func (s *Store) GetLatestCheckpoint(ctx context.Context, runID string) (*state.RunCheckpoint, error) { + if s == nil || s.db == nil { + return nil, fmt.Errorf("sqlite: nil store") + } + row := s.db.QueryRowContext(ctx, ` +SELECT run_id, seq, step_index, step_id, context_json, status, created_at +FROM run_checkpoints +WHERE run_id = ? +ORDER BY seq DESC +LIMIT 1 +`, runID) + return scanCheckpointRow(row) +} + +// UpdateRunStatus sets runs.status without updating finished_at or output. +func (s *Store) UpdateRunStatus(ctx context.Context, runID, status string) error { + if s == nil || s.db == nil { + return fmt.Errorf("sqlite: nil store") + } + res, err := s.db.ExecContext(ctx, `UPDATE runs SET status = ? WHERE run_id = ?`, status, runID) + if err != nil { + return err + } + n, err := res.RowsAffected() + if err != nil { + return err + } + if n == 0 { + return sql.ErrNoRows + } + return nil +} diff --git a/internal/state/sqlite/runtime.go b/internal/state/sqlite/runtime.go index d9fc954..e0f41c1 100644 --- a/internal/state/sqlite/runtime.go +++ b/internal/state/sqlite/runtime.go @@ -17,9 +17,9 @@ func (s *Store) StartRun(ctx context.Context, r state.Run) error { } at := r.StartedAt.UTC().Format(time.RFC3339Nano) _, err := s.db.ExecContext(ctx, ` -INSERT INTO runs (run_id, workflow_name, env, status, started_at, input_json, total_cost_usd) -VALUES (?, ?, ?, ?, ?, ?, ?) -`, r.RunID, r.WorkflowName, r.Env, r.Status, at, in, r.TotalCostUSD) +INSERT INTO runs (run_id, workflow_name, env, status, started_at, input_json, total_cost_usd, workflow_spec_hash, environment_name) +VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) +`, r.RunID, r.WorkflowName, r.Env, r.Status, at, in, r.TotalCostUSD, r.WorkflowSpecHash, r.EnvironmentName) return err } @@ -125,7 +125,7 @@ func scanRunRow(sc rowScanner) (*state.Run, error) { var r state.Run var started, finished sql.NullString var outJ, errT sql.NullString - if err := sc.Scan(&r.RunID, &r.WorkflowName, &r.Env, &r.Status, &started, &finished, &r.InputJSON, &outJ, &errT, &r.TotalCostUSD); err != nil { + if err := sc.Scan(&r.RunID, &r.WorkflowName, &r.Env, &r.Status, &started, &finished, &r.InputJSON, &outJ, &errT, &r.TotalCostUSD, &r.WorkflowSpecHash, &r.EnvironmentName); err != nil { return nil, err } st, err := parseSQLiteTime(started.String) @@ -152,7 +152,7 @@ func scanRunRow(sc rowScanner) (*state.Run, error) { // GetRun returns the run row or sql.ErrNoRows. func (s *Store) GetRun(ctx context.Context, runID string) (*state.Run, error) { row := s.db.QueryRowContext(ctx, ` -SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd +SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd, workflow_spec_hash, environment_name FROM runs WHERE run_id = ? `, runID) @@ -178,7 +178,7 @@ func (s *Store) ListRecentRuns(ctx context.Context, limit int) ([]state.Run, err } limit = clampRunListLimit(limit) rows, err := s.db.QueryContext(ctx, ` -SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd +SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd, workflow_spec_hash, environment_name FROM runs ORDER BY started_at DESC LIMIT ? @@ -205,7 +205,7 @@ func (s *Store) ListRunsByWorkflow(ctx context.Context, workflowName string, lim } limit = clampRunListLimit(limit) rows, err := s.db.QueryContext(ctx, ` -SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd +SELECT run_id, workflow_name, env, status, started_at, finished_at, input_json, output_json, error_text, total_cost_usd, workflow_spec_hash, environment_name FROM runs WHERE workflow_name = ? ORDER BY started_at DESC diff --git a/internal/state/sqlite/store_test.go b/internal/state/sqlite/store_test.go index 904848b..3f36bca 100644 --- a/internal/state/sqlite/store_test.go +++ b/internal/state/sqlite/store_test.go @@ -273,6 +273,13 @@ func TestDeleteRunsStartedBefore_cascadesChildRows(t *testing.T) { if _, err := st.AppendTraceEvent(ctx, "old-run", oldStart, "log", "", `{}`); err != nil { t.Fatal(err) } + if err := st.SaveCheckpoint(ctx, state.RunCheckpoint{ + RunID: "old-run", StepIndex: 0, StepID: "s1", + ContextJSON: `{"version":1,"input":{},"steps":{},"totalCostUsd":0}`, + Status: state.CheckpointStatusRunning, CreatedAt: oldStart, + }); err != nil { + t.Fatal(err) + } if err := st.StartRun(ctx, state.Run{ RunID: "new-run", WorkflowName: "wf", Env: "local", Status: "running", StartedAt: newStart, InputJSON: `{}`, TotalCostUSD: 0, @@ -299,6 +306,9 @@ func TestDeleteRunsStartedBefore_cascadesChildRows(t *testing.T) { if len(evs) != 0 { t.Fatalf("trace events for deleted run: %d", len(evs)) } + if _, err := st.GetLatestCheckpoint(ctx, "old-run"); !errors.Is(err, sql.ErrNoRows) { + t.Fatalf("checkpoint for deleted run: %v", err) + } got, err := st.GetRun(ctx, "new-run") if err != nil { t.Fatal(err) @@ -307,3 +317,122 @@ func TestDeleteRunsStartedBefore_cascadesChildRows(t *testing.T) { t.Fatalf("GetRun new: %+v", got) } } + +func TestSaveCheckpoint_roundTripAndLatest(t *testing.T) { + ctx := context.Background() + st, err := Open(ctx, filepath.Join(t.TempDir(), "cp.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + start := time.Date(2026, 5, 1, 10, 0, 0, 0, time.UTC) + if err := st.StartRun(ctx, state.Run{ + RunID: "r1", WorkflowName: "wf", Env: "local", Status: "running", + StartedAt: start, InputJSON: `{"x":1}`, TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + + cp1 := state.RunCheckpoint{ + RunID: "r1", StepIndex: 0, StepID: "step-a", + ContextJSON: `{"version":1,"input":{"x":1},"steps":{"step-a":{"output":{"ok":true}}},"totalCostUsd":0.01}`, + Status: state.CheckpointStatusRunning, CreatedAt: start, + } + if err := st.SaveCheckpoint(ctx, cp1); err != nil { + t.Fatal(err) + } + later := start.Add(time.Minute) + cp2 := state.RunCheckpoint{ + RunID: "r1", StepIndex: 1, StepID: "step-b", + ContextJSON: `{"version":1,"input":{"x":1},"steps":{},"totalCostUsd":0.02}`, + Status: state.CheckpointStatusInterrupted, CreatedAt: later, + } + if err := st.SaveCheckpoint(ctx, cp2); err != nil { + t.Fatal(err) + } + + got, err := st.GetLatestCheckpoint(ctx, "r1") + if err != nil { + t.Fatal(err) + } + if got.Seq != 2 { + t.Fatalf("Seq = %d want 2", got.Seq) + } + if got.StepIndex != 1 || got.StepID != "step-b" { + t.Fatalf("step = index %d id %q", got.StepIndex, got.StepID) + } + if got.Status != state.CheckpointStatusInterrupted { + t.Fatalf("Status = %q", got.Status) + } + if got.ContextJSON != cp2.ContextJSON { + t.Fatalf("ContextJSON = %q", got.ContextJSON) + } + if !got.CreatedAt.Equal(later) { + t.Fatalf("CreatedAt = %v", got.CreatedAt) + } +} + +func TestSaveCheckpoint_foreignKeyRequiresRun(t *testing.T) { + ctx := context.Background() + st, err := Open(ctx, filepath.Join(t.TempDir(), "cp-fk.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + now := time.Date(2026, 5, 1, 10, 0, 0, 0, time.UTC) + err = st.SaveCheckpoint(ctx, state.RunCheckpoint{ + RunID: "missing", StepIndex: 0, StepID: "s", + ContextJSON: `{}`, Status: state.CheckpointStatusRunning, CreatedAt: now, + }) + if err == nil { + t.Fatal("expected FK error") + } +} + +func TestGetLatestCheckpoint_noRows(t *testing.T) { + ctx := context.Background() + st, err := Open(ctx, filepath.Join(t.TempDir(), "cp-none.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + if _, err := st.GetLatestCheckpoint(ctx, "nope"); !errors.Is(err, sql.ErrNoRows) { + t.Fatalf("err = %v", err) + } +} + +func TestUpdateRunStatus(t *testing.T) { + ctx := context.Background() + st, err := Open(ctx, filepath.Join(t.TempDir(), "status.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + start := time.Date(2026, 5, 1, 10, 0, 0, 0, time.UTC) + if err := st.StartRun(ctx, state.Run{ + RunID: "r1", WorkflowName: "wf", Env: "local", Status: "running", + StartedAt: start, InputJSON: `{}`, TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + if err := st.UpdateRunStatus(ctx, "r1", "interrupted"); err != nil { + t.Fatal(err) + } + got, err := st.GetRun(ctx, "r1") + if err != nil { + t.Fatal(err) + } + if got.Status != "interrupted" { + t.Fatalf("status = %q", got.Status) + } + if got.FinishedAt != nil { + t.Fatalf("FinishedAt = %v want nil", got.FinishedAt) + } + if err := st.UpdateRunStatus(ctx, "missing", "running"); !errors.Is(err, sql.ErrNoRows) { + t.Fatalf("missing run: %v", err) + } +} diff --git a/internal/state/store.go b/internal/state/store.go index c4967d6..d723179 100644 --- a/internal/state/store.go +++ b/internal/state/store.go @@ -43,4 +43,10 @@ type RuntimeStore interface { // DeleteRunsStartedBefore removes every run with started_at strictly before cutoff (UTC), and // associated run_steps / trace_events (SQLite: ON DELETE CASCADE). Used for trace retention (issue #75). DeleteRunsStartedBefore(ctx context.Context, cutoff time.Time) (deleted int64, err error) + // SaveCheckpoint appends a checkpoint row for run_id (monotonic seq per run). + SaveCheckpoint(ctx context.Context, cp RunCheckpoint) error + // GetLatestCheckpoint returns the newest checkpoint for run_id or sql.ErrNoRows. + GetLatestCheckpoint(ctx context.Context, runID string) (*RunCheckpoint, error) + // UpdateRunStatus sets runs.status without finishing the run (issue #105 interrupted). + UpdateRunStatus(ctx context.Context, runID, status string) error } diff --git a/internal/trace/events.go b/internal/trace/events.go index e6e6d56..4bc31b8 100644 --- a/internal/trace/events.go +++ b/internal/trace/events.go @@ -9,6 +9,8 @@ type Event = state.TraceEvent const ( EventRunStarted = "run.started" EventRunFinished = "run.finished" + EventRunInterrupted = "run.interrupted" + EventRunResumed = "run.resumed" EventStepStarted = "step.started" EventStepFinished = "step.finished" EventStepFailed = "step.failed" diff --git a/migrations/sqlite/003_run_checkpoints.sql b/migrations/sqlite/003_run_checkpoints.sql new file mode 100644 index 0000000..a0cd846 --- /dev/null +++ b/migrations/sqlite/003_run_checkpoints.sql @@ -0,0 +1,16 @@ +-- Run checkpoints for pause/resume (issue #105). +-- Referential integrity: FOREIGN KEY to runs; requires PRAGMA foreign_keys=ON per connection. + +CREATE TABLE IF NOT EXISTS run_checkpoints ( + run_id TEXT NOT NULL, + seq INTEGER NOT NULL, + step_index INTEGER NOT NULL, + step_id TEXT NOT NULL, + context_json TEXT NOT NULL, + status TEXT NOT NULL, + created_at TEXT NOT NULL, + PRIMARY KEY (run_id, seq), + FOREIGN KEY (run_id) REFERENCES runs (run_id) ON DELETE CASCADE +); + +CREATE INDEX IF NOT EXISTS idx_run_checkpoints_run ON run_checkpoints (run_id, seq DESC); diff --git a/migrations/sqlite/004_run_resume_metadata.sql b/migrations/sqlite/004_run_resume_metadata.sql new file mode 100644 index 0000000..301ec43 --- /dev/null +++ b/migrations/sqlite/004_run_resume_metadata.sql @@ -0,0 +1,4 @@ +-- Resume pinning metadata (PR #127 review): workflow spec hash and environment overlay name. + +ALTER TABLE runs ADD COLUMN workflow_spec_hash TEXT NOT NULL DEFAULT ''; +ALTER TABLE runs ADD COLUMN environment_name TEXT NOT NULL DEFAULT '';