From 89c0178ec2b054e7d3f7b316a7d05320ba0132a1 Mon Sep 17 00:00:00 2001 From: Leonardo Araujo Date: Sat, 11 Apr 2026 21:36:38 -0300 Subject: [PATCH] feat(runtime): local workflow orchestration (issue #23) - Add runtime.WorkflowRunner + WorkflowRunOptions; local.Runtime loads project, applies Environment overrides (agents/policies), validates JSON + input schema before StartRun, runs engine, appends run.started/run.finished trace events - util.NewRunID via google/uuid - Export engine.ValidateWorkflowInput for preflight validation - Tests: SQLite run+trace, invalid JSON/schema skip persistence, env merge, auto run id Closes #23 Made-with: Cursor --- go.mod | 2 +- internal/engine/workflow.go | 5 + internal/runtime/doc.go | 5 +- internal/runtime/local/doc.go | 4 + internal/runtime/local/runner.go | 125 +++++++++++++++ internal/runtime/local/runner_test.go | 149 ++++++++++++++++++ internal/runtime/local/runtime.go | 133 ++++++++++++++++ .../local/testdata/runproj/agents.yaml | 9 ++ .../local/testdata/runproj/environments.yaml | 10 ++ .../local/testdata/runproj/project.yaml | 14 ++ .../runproj/schemas/agent-out.schema.json | 9 ++ .../local/testdata/runproj/schemas/in.json | 9 ++ .../runtime/local/testdata/runproj/tools.yaml | 6 + .../local/testdata/runproj/workflows.yaml | 21 +++ internal/runtime/runtime.go | 24 +++ internal/util/doc.go | 2 +- internal/util/ids.go | 8 + 17 files changed, 532 insertions(+), 3 deletions(-) create mode 100644 internal/runtime/local/doc.go create mode 100644 internal/runtime/local/runner.go create mode 100644 internal/runtime/local/runner_test.go create mode 100644 internal/runtime/local/runtime.go create mode 100644 internal/runtime/local/testdata/runproj/agents.yaml create mode 100644 internal/runtime/local/testdata/runproj/environments.yaml create mode 100644 internal/runtime/local/testdata/runproj/project.yaml create mode 100644 internal/runtime/local/testdata/runproj/schemas/agent-out.schema.json create mode 100644 internal/runtime/local/testdata/runproj/schemas/in.json create mode 100644 internal/runtime/local/testdata/runproj/tools.yaml create mode 100644 internal/runtime/local/testdata/runproj/workflows.yaml create mode 100644 internal/runtime/runtime.go create mode 100644 internal/util/ids.go diff --git a/go.mod b/go.mod index c538ad8..7d0b540 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/LAA-Software-Engineering/agentic-control-plane go 1.22 require ( + github.com/google/uuid v1.6.0 github.com/santhosh-tekuri/jsonschema/v6 v6.0.2 github.com/spf13/cobra v1.8.1 gopkg.in/yaml.v3 v3.0.1 @@ -11,7 +12,6 @@ require ( require ( github.com/dustin/go-humanize v1.0.1 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/mattn/go-isatty v0.0.20 // indirect diff --git a/internal/engine/workflow.go b/internal/engine/workflow.go index 4f95ffc..f037648 100644 --- a/internal/engine/workflow.go +++ b/internal/engine/workflow.go @@ -19,6 +19,11 @@ func lookupWorkflow(g *spec.ProjectGraph, name string) (*spec.WorkflowResource, return wf, nil } +// ValidateWorkflowInput validates input against the workflow's input.schema when configured. +func ValidateWorkflowInput(projectRoot string, wf *spec.WorkflowResource, input map[string]any) error { + return validateWorkflowInput(projectRoot, wf, input) +} + func validateWorkflowInput(projectRoot string, wf *spec.WorkflowResource, input map[string]any) error { if wf == nil || wf.Spec.Input == nil { return nil diff --git a/internal/runtime/doc.go b/internal/runtime/doc.go index ecc70b3..37a8eef 100644 --- a/internal/runtime/doc.go +++ b/internal/runtime/doc.go @@ -1,2 +1,5 @@ -// Package runtime abstracts local and remote execution targets. +// Package runtime defines execution interfaces; the MVP local implementation lives in +// [github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime/local]. +// +// [Runner] persists run starts for callers that orchestrate execution themselves. package runtime diff --git a/internal/runtime/local/doc.go b/internal/runtime/local/doc.go new file mode 100644 index 0000000..671b3ba --- /dev/null +++ b/internal/runtime/local/doc.go @@ -0,0 +1,4 @@ +// Package local implements the MVP disk-backed workflow runtime (issue #23, design doc section 16). +// +// Use [NewRuntime] with a project root directory and [state.RuntimeStore], then [Runtime.ExecuteWorkflow]. +package local diff --git a/internal/runtime/local/runner.go b/internal/runtime/local/runner.go new file mode 100644 index 0000000..d932c2e --- /dev/null +++ b/internal/runtime/local/runner.go @@ -0,0 +1,125 @@ +package local + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/project" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/util" +) + +// Compile-time check that [Runtime] implements [runtime.WorkflowRunner]. +var _ runtime.WorkflowRunner = (*Runtime)(nil) + +// ExecuteWorkflow loads the project from [Runtime.ProjectRoot], applies optional environment overrides, +// validates input JSON and workflow input schema before persisting the run, then invokes [engine.Executor]. +func (r *Runtime) ExecuteWorkflow(ctx context.Context, opts runtime.WorkflowRunOptions) (string, error) { + if r == nil || r.Store == nil { + return "", fmt.Errorf("local: nil runtime or store") + } + root := strings.TrimSpace(r.ProjectRoot) + if root == "" { + return "", fmt.Errorf("local: empty project root") + } + + graph, err := project.LoadProject(root) + if err != nil { + return "", fmt.Errorf("local: load project: %w", err) + } + graph, err = ApplyEnvironment(graph, opts.EnvironmentName) + if err != nil { + return "", err + } + + wfName := strings.TrimSpace(opts.WorkflowName) + if wfName == "" { + return "", fmt.Errorf("local: empty workflow name") + } + wf, ok := graph.Workflows[wfName] + if !ok || wf == nil { + return "", fmt.Errorf("local: unknown workflow %q", wfName) + } + + var input map[string]any + if len(opts.InputJSON) == 0 { + input = map[string]any{} + } else { + if err := json.Unmarshal(opts.InputJSON, &input); err != nil { + return "", fmt.Errorf("local: invalid input JSON: %w", err) + } + } + + if err := engine.ValidateWorkflowInput(root, wf, input); err != nil { + return "", err + } + + runID := strings.TrimSpace(opts.RunID) + if runID == "" { + runID = util.NewRunID() + } + + inputBytes, err := json.Marshal(input) + if err != nil { + return "", fmt.Errorf("local: marshal input: %w", err) + } + + envLabel := strings.TrimSpace(opts.Env) + if envLabel == "" { + envLabel = "local" + } + + started := r.now() + if err := r.Store.StartRun(ctx, state.Run{ + RunID: runID, + WorkflowName: wfName, + Env: envLabel, + Status: "running", + StartedAt: started, + InputJSON: string(inputBytes), + TotalCostUSD: 0, + }); err != nil { + return runID, fmt.Errorf("local: start run: %w", err) + } + + rec := trace.NewRecorder(r.Store) + if _, err := rec.Append(ctx, runID, "", trace.EventRunStarted, map[string]any{ + "workflow": wfName, "environment": opts.EnvironmentName, + }); err != nil { + return runID, fmt.Errorf("local: trace run.started: %w", err) + } + + ex := &engine.Executor{ + Graph: graph, + ProjectRoot: root, + Tools: tools.NewRegistry(graph), + Models: models.NewRegistry(graph), + Store: r.Store, + Trace: rec, + Now: r.Now, + } + runErr := ex.Run(ctx, engine.RunInput{ + RunID: runID, + WorkflowName: wfName, + Env: envLabel, + StartedAt: started, + Input: input, + ApprovedActions: opts.ApprovedActions, + }) + + finData := map[string]any{} + if runErr != nil { + finData["error"] = runErr.Error() + } + if _, terr := rec.Append(ctx, runID, "", trace.EventRunFinished, finData); terr != nil && runErr == nil { + return runID, fmt.Errorf("local: trace run.finished: %w", terr) + } + return runID, runErr +} diff --git a/internal/runtime/local/runner_test.go b/internal/runtime/local/runner_test.go new file mode 100644 index 0000000..961deab --- /dev/null +++ b/internal/runtime/local/runner_test.go @@ -0,0 +1,149 @@ +package local + +import ( + "context" + "path/filepath" + "testing" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/project" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" +) + +func testRunProjRoot(t *testing.T) string { + t.Helper() + return filepath.Join("testdata", "runproj") +} + +func TestExecuteWorkflow_persistsRunAndTraceInSQLite(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "localrun.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + rt := NewRuntime(testRunProjRoot(t), st) + runID := "run-integration-1" + _, err = rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{ + RunID: runID, + WorkflowName: "demo", + EnvironmentName: "staging", + Env: "dev", + InputJSON: []byte(`{"topic":"from-local-runtime"}`), + }) + if err != nil { + t.Fatal(err) + } + + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != "succeeded" || got.ErrorText != "" { + t.Fatalf("run %+v", got) + } + + events, err := trace.NewReader(st).ListByRunID(ctx, runID) + if err != nil { + t.Fatal(err) + } + if len(events) < 3 { + t.Fatalf("want trace events, got %d", len(events)) + } + if events[0].Type != trace.EventRunStarted { + t.Fatalf("first event %q", events[0].Type) + } + if events[len(events)-1].Type != trace.EventRunFinished { + t.Fatalf("last event %q", events[len(events)-1].Type) + } +} + +func TestExecuteWorkflow_invalidInputJSON_noRunRow(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "norun.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + rt := NewRuntime(testRunProjRoot(t), st) + _, err = rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{ + RunID: "should-not-exist", + WorkflowName: "demo", + InputJSON: []byte(`{"topic":`), + }) + if err == nil { + t.Fatal("expected error") + } + + _, err = st.GetRun(ctx, "should-not-exist") + if err == nil { + t.Fatal("expected no run row") + } +} + +func TestExecuteWorkflow_invalidInputSchema_noRunRow(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "norun2.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + rt := NewRuntime(testRunProjRoot(t), st) + _, err = rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{ + RunID: "schema-fail", + WorkflowName: "demo", + InputJSON: []byte(`{"wrong":true}`), + }) + if err == nil { + t.Fatal("expected schema validation error") + } + + _, err = st.GetRun(ctx, "schema-fail") + if err == nil { + t.Fatal("expected no run row") + } +} + +func TestApplyEnvironment_mergesAgentConstraints(t *testing.T) { + g, err := project.LoadProject(testRunProjRoot(t)) + if err != nil { + t.Fatal(err) + } + out, err := ApplyEnvironment(g, "staging") + if err != nil { + t.Fatal(err) + } + a := out.Agents["reviewer"] + if a == nil || a.Spec.Constraints == nil || a.Spec.Constraints.TimeoutSeconds != 99 { + t.Fatalf("constraints %+v", a) + } +} + +func TestNewRunID_generatedWhenEmpty(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "genid.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + rt := NewRuntime(testRunProjRoot(t), st) + id, err := rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{ + WorkflowName: "demo", + InputJSON: []byte(`{"topic":"x"}`), + }) + if err != nil { + t.Fatal(err) + } + if id == "" { + t.Fatal("empty run id") + } + _, err = st.GetRun(ctx, id) + if err != nil { + t.Fatal(err) + } +} diff --git a/internal/runtime/local/runtime.go b/internal/runtime/local/runtime.go new file mode 100644 index 0000000..3cbeb86 --- /dev/null +++ b/internal/runtime/local/runtime.go @@ -0,0 +1,133 @@ +package local + +import ( + "fmt" + "maps" + "strings" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" +) + +// Runtime is the MVP local workflow runner backend (issue #23): project root on disk + SQLite (or any [state.RuntimeStore]). +type Runtime struct { + ProjectRoot string + Store state.RuntimeStore + Now func() time.Time +} + +// NewRuntime returns a local runtime. projectRoot is the directory containing project.yaml. +func NewRuntime(projectRoot string, store state.RuntimeStore) *Runtime { + return &Runtime{ProjectRoot: projectRoot, Store: store} +} + +func (r *Runtime) now() time.Time { + if r != nil && r.Now != nil { + return r.Now() + } + return time.Now().UTC() +} + +// ApplyEnvironment returns a shallow copy of g with Environment overrides applied (design doc section 7.6 MVP). +func ApplyEnvironment(g *spec.ProjectGraph, envName string) (*spec.ProjectGraph, error) { + envName = strings.TrimSpace(envName) + if envName == "" || g == nil { + return g, nil + } + env, ok := g.Environments[envName] + if !ok || env == nil { + return nil, fmt.Errorf("local: unknown environment %q", envName) + } + out := shallowCloneGraph(g) + if env.Spec.Overrides == nil { + return out, nil + } + ov := env.Spec.Overrides + + for agentName, ovr := range ov.Agents { + ar, ok := g.Agents[agentName] + if !ok || ar == nil { + return nil, fmt.Errorf("local: environment %q overrides unknown agent %q", envName, agentName) + } + cl := *ar + cl.Spec = ar.Spec + mergeAgentOverride(&cl.Spec, ovr) + out.Agents[agentName] = &cl + } + + for policyName, ovr := range ov.Policies { + pr, ok := g.Policies[policyName] + if !ok || pr == nil { + return nil, fmt.Errorf("local: environment %q overrides unknown policy %q", envName, policyName) + } + cl := *pr + cl.Spec = pr.Spec + mergePolicyOverride(&cl.Spec, ovr) + out.Policies[policyName] = &cl + } + + return out, nil +} + +func shallowCloneGraph(g *spec.ProjectGraph) *spec.ProjectGraph { + if g == nil { + return nil + } + out := *g + out.Agents = maps.Clone(g.Agents) + out.Tools = maps.Clone(g.Tools) + out.Workflows = maps.Clone(g.Workflows) + out.Policies = maps.Clone(g.Policies) + out.Environments = maps.Clone(g.Environments) + return &out +} + +func mergeAgentOverride(agentSpec *spec.AgentSpec, ovr spec.AgentOverride) { + if ovr.Model != "" { + agentSpec.Model = ovr.Model + } + if ovr.Constraints != nil { + base := agentSpec.Constraints + if base == nil { + base = &spec.AgentConstraints{} + } + merged := *base + co := ovr.Constraints + if co.MaxIterations != 0 { + merged.MaxIterations = co.MaxIterations + } + if co.TimeoutSeconds != 0 { + merged.TimeoutSeconds = co.TimeoutSeconds + } + if co.Temperature != 0 { + merged.Temperature = co.Temperature + } + if co.RequireStructuredOutput { + merged.RequireStructuredOutput = true + } + agentSpec.Constraints = &merged + } +} + +func mergePolicyOverride(pol *spec.PolicySpec, ovr spec.PolicyOverride) { + if ovr.Execution == nil { + return + } + base := pol.Execution + if base == nil { + base = &spec.PolicyExecution{} + } + merged := *base + pe := ovr.Execution + if pe.MaxWallClockSeconds != 0 { + merged.MaxWallClockSeconds = pe.MaxWallClockSeconds + } + if pe.MaxTotalCostUsd > 0 { + merged.MaxTotalCostUsd = pe.MaxTotalCostUsd + } + if pe.RequireStructuredOutput { + merged.RequireStructuredOutput = true + } + pol.Execution = &merged +} diff --git a/internal/runtime/local/testdata/runproj/agents.yaml b/internal/runtime/local/testdata/runproj/agents.yaml new file mode 100644 index 0000000..3b6c463 --- /dev/null +++ b/internal/runtime/local/testdata/runproj/agents.yaml @@ -0,0 +1,9 @@ +apiVersion: agentic.dev/v0 +kind: Agent +metadata: + name: reviewer +spec: + model: mock/gpt-4 + instructions: Summarize the tool payload as JSON. + output: + schema: ./schemas/agent-out.schema.json diff --git a/internal/runtime/local/testdata/runproj/environments.yaml b/internal/runtime/local/testdata/runproj/environments.yaml new file mode 100644 index 0000000..61c21d0 --- /dev/null +++ b/internal/runtime/local/testdata/runproj/environments.yaml @@ -0,0 +1,10 @@ +apiVersion: agentic.dev/v0 +kind: Environment +metadata: + name: staging +spec: + overrides: + agents: + reviewer: + constraints: + timeoutSeconds: 99 diff --git a/internal/runtime/local/testdata/runproj/project.yaml b/internal/runtime/local/testdata/runproj/project.yaml new file mode 100644 index 0000000..4cdc6ca --- /dev/null +++ b/internal/runtime/local/testdata/runproj/project.yaml @@ -0,0 +1,14 @@ +apiVersion: agentic.dev/v0 +kind: Project +metadata: + name: runproj +spec: + imports: + - ./tools.yaml + - ./agents.yaml + - ./workflows.yaml + - ./environments.yaml + providers: + models: + mock: + type: mock diff --git a/internal/runtime/local/testdata/runproj/schemas/agent-out.schema.json b/internal/runtime/local/testdata/runproj/schemas/agent-out.schema.json new file mode 100644 index 0000000..70af713 --- /dev/null +++ b/internal/runtime/local/testdata/runproj/schemas/agent-out.schema.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "required": ["summary"], + "properties": { + "summary": { "type": "string" } + }, + "additionalProperties": true +} diff --git a/internal/runtime/local/testdata/runproj/schemas/in.json b/internal/runtime/local/testdata/runproj/schemas/in.json new file mode 100644 index 0000000..8da6a82 --- /dev/null +++ b/internal/runtime/local/testdata/runproj/schemas/in.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "required": ["topic"], + "properties": { + "topic": { "type": "string" } + }, + "additionalProperties": true +} diff --git a/internal/runtime/local/testdata/runproj/tools.yaml b/internal/runtime/local/testdata/runproj/tools.yaml new file mode 100644 index 0000000..d4b6df0 --- /dev/null +++ b/internal/runtime/local/testdata/runproj/tools.yaml @@ -0,0 +1,6 @@ +apiVersion: agentic.dev/v0 +kind: Tool +metadata: + name: helper +spec: + type: native diff --git a/internal/runtime/local/testdata/runproj/workflows.yaml b/internal/runtime/local/testdata/runproj/workflows.yaml new file mode 100644 index 0000000..aee80f2 --- /dev/null +++ b/internal/runtime/local/testdata/runproj/workflows.yaml @@ -0,0 +1,21 @@ +apiVersion: agentic.dev/v0 +kind: Workflow +metadata: + name: demo +spec: + input: + schema: ./schemas/in.json + steps: + - id: fetch + uses: tool.helper.echo + with: + topic: "${input.topic}" + extra: "x" + - id: summarize + agent: reviewer + with: + echo: "${steps.fetch.output.echo}" + output: + value: + topic: "${input.topic}" + summary: "${steps.summarize.output.summary}" diff --git a/internal/runtime/runtime.go b/internal/runtime/runtime.go new file mode 100644 index 0000000..c461338 --- /dev/null +++ b/internal/runtime/runtime.go @@ -0,0 +1,24 @@ +package runtime + +import "context" + +// WorkflowRunOptions configures a single workflow execution for [WorkflowRunner]. +type WorkflowRunOptions struct { + // RunID optional; when empty the runner generates one via [util.NewRunID]. + RunID string + // WorkflowName is the metadata.name of a Workflow resource. + WorkflowName string + // EnvironmentName selects an Environment resource for overrides (agents/policies). Empty skips overrides. + EnvironmentName string + // Env is stored on the run row (e.g. deployment target label). + Env string + // InputJSON is JSON object bytes for workflow input. Empty means {}. + InputJSON []byte + // ApprovedActions are full tool uses strings approved for policy gates. + ApprovedActions []string +} + +// WorkflowRunner loads declarative state and executes a workflow locally (design doc section 16 MVP). +type WorkflowRunner interface { + ExecuteWorkflow(ctx context.Context, opts WorkflowRunOptions) (runID string, err error) +} diff --git a/internal/util/doc.go b/internal/util/doc.go index 42728a3..308554f 100644 --- a/internal/util/doc.go +++ b/internal/util/doc.go @@ -1,2 +1,2 @@ -// Package util holds small shared helpers (fs, ids, errors, etc.). +// Package util holds small shared helpers (identifiers, fs, errors, etc.). package util diff --git a/internal/util/ids.go b/internal/util/ids.go new file mode 100644 index 0000000..6ce4ada --- /dev/null +++ b/internal/util/ids.go @@ -0,0 +1,8 @@ +package util + +import "github.com/google/uuid" + +// NewRunID returns a new unique run identifier (issue #23, design doc section 14.2). +func NewRunID() string { + return uuid.NewString() +}