Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/LAA-Software-Engineering/agentic-control-plane
go 1.22

require (
github.com/google/uuid v1.6.0
github.com/santhosh-tekuri/jsonschema/v6 v6.0.2
github.com/spf13/cobra v1.8.1
gopkg.in/yaml.v3 v3.0.1
Expand All @@ -11,7 +12,6 @@ require (

require (
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
Expand Down
5 changes: 5 additions & 0 deletions internal/engine/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ func lookupWorkflow(g *spec.ProjectGraph, name string) (*spec.WorkflowResource,
return wf, nil
}

// ValidateWorkflowInput validates input against the workflow's input.schema when configured.
func ValidateWorkflowInput(projectRoot string, wf *spec.WorkflowResource, input map[string]any) error {
return validateWorkflowInput(projectRoot, wf, input)
}

func validateWorkflowInput(projectRoot string, wf *spec.WorkflowResource, input map[string]any) error {
if wf == nil || wf.Spec.Input == nil {
return nil
Expand Down
5 changes: 4 additions & 1 deletion internal/runtime/doc.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
// Package runtime abstracts local and remote execution targets.
// Package runtime defines execution interfaces; the MVP local implementation lives in
// [github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime/local].
//
// [Runner] persists run starts for callers that orchestrate execution themselves.
package runtime
4 changes: 4 additions & 0 deletions internal/runtime/local/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
// Package local implements the MVP disk-backed workflow runtime (issue #23, design doc section 16).
//
// Use [NewRuntime] with a project root directory and [state.RuntimeStore], then [Runtime.ExecuteWorkflow].
package local
125 changes: 125 additions & 0 deletions internal/runtime/local/runner.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package local

import (
"context"
"encoding/json"
"fmt"
"strings"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/engine"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/models"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/project"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/util"
)

// Compile-time check that [Runtime] implements [runtime.WorkflowRunner].
var _ runtime.WorkflowRunner = (*Runtime)(nil)

// ExecuteWorkflow loads the project from [Runtime.ProjectRoot], applies optional environment overrides,
// validates input JSON and workflow input schema before persisting the run, then invokes [engine.Executor].
func (r *Runtime) ExecuteWorkflow(ctx context.Context, opts runtime.WorkflowRunOptions) (string, error) {
if r == nil || r.Store == nil {
return "", fmt.Errorf("local: nil runtime or store")
}
root := strings.TrimSpace(r.ProjectRoot)
if root == "" {
return "", fmt.Errorf("local: empty project root")
}

graph, err := project.LoadProject(root)
if err != nil {
return "", fmt.Errorf("local: load project: %w", err)
}
graph, err = ApplyEnvironment(graph, opts.EnvironmentName)
if err != nil {
return "", err
}

wfName := strings.TrimSpace(opts.WorkflowName)
if wfName == "" {
return "", fmt.Errorf("local: empty workflow name")
}
wf, ok := graph.Workflows[wfName]
if !ok || wf == nil {
return "", fmt.Errorf("local: unknown workflow %q", wfName)
}

var input map[string]any
if len(opts.InputJSON) == 0 {
input = map[string]any{}
} else {
if err := json.Unmarshal(opts.InputJSON, &input); err != nil {
return "", fmt.Errorf("local: invalid input JSON: %w", err)
}
}

if err := engine.ValidateWorkflowInput(root, wf, input); err != nil {
return "", err
}

runID := strings.TrimSpace(opts.RunID)
if runID == "" {
runID = util.NewRunID()
}

inputBytes, err := json.Marshal(input)
if err != nil {
return "", fmt.Errorf("local: marshal input: %w", err)
}

envLabel := strings.TrimSpace(opts.Env)
if envLabel == "" {
envLabel = "local"
}

started := r.now()
if err := r.Store.StartRun(ctx, state.Run{
RunID: runID,
WorkflowName: wfName,
Env: envLabel,
Status: "running",
StartedAt: started,
InputJSON: string(inputBytes),
TotalCostUSD: 0,
}); err != nil {
return runID, fmt.Errorf("local: start run: %w", err)
}

rec := trace.NewRecorder(r.Store)
if _, err := rec.Append(ctx, runID, "", trace.EventRunStarted, map[string]any{
"workflow": wfName, "environment": opts.EnvironmentName,
}); err != nil {
return runID, fmt.Errorf("local: trace run.started: %w", err)
}

ex := &engine.Executor{
Graph: graph,
ProjectRoot: root,
Tools: tools.NewRegistry(graph),
Models: models.NewRegistry(graph),
Store: r.Store,
Trace: rec,
Now: r.Now,
}
runErr := ex.Run(ctx, engine.RunInput{
RunID: runID,
WorkflowName: wfName,
Env: envLabel,
StartedAt: started,
Input: input,
ApprovedActions: opts.ApprovedActions,
})

finData := map[string]any{}
if runErr != nil {
finData["error"] = runErr.Error()
}
if _, terr := rec.Append(ctx, runID, "", trace.EventRunFinished, finData); terr != nil && runErr == nil {
return runID, fmt.Errorf("local: trace run.finished: %w", terr)
}
return runID, runErr
}
149 changes: 149 additions & 0 deletions internal/runtime/local/runner_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
package local

import (
"context"
"path/filepath"
"testing"

"github.com/LAA-Software-Engineering/agentic-control-plane/internal/project"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/runtime"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite"
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace"
)

func testRunProjRoot(t *testing.T) string {
t.Helper()
return filepath.Join("testdata", "runproj")
}

func TestExecuteWorkflow_persistsRunAndTraceInSQLite(t *testing.T) {
ctx := context.Background()
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "localrun.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

rt := NewRuntime(testRunProjRoot(t), st)
runID := "run-integration-1"
_, err = rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{
RunID: runID,
WorkflowName: "demo",
EnvironmentName: "staging",
Env: "dev",
InputJSON: []byte(`{"topic":"from-local-runtime"}`),
})
if err != nil {
t.Fatal(err)
}

got, err := st.GetRun(ctx, runID)
if err != nil {
t.Fatal(err)
}
if got.Status != "succeeded" || got.ErrorText != "" {
t.Fatalf("run %+v", got)
}

events, err := trace.NewReader(st).ListByRunID(ctx, runID)
if err != nil {
t.Fatal(err)
}
if len(events) < 3 {
t.Fatalf("want trace events, got %d", len(events))
}
if events[0].Type != trace.EventRunStarted {
t.Fatalf("first event %q", events[0].Type)
}
if events[len(events)-1].Type != trace.EventRunFinished {
t.Fatalf("last event %q", events[len(events)-1].Type)
}
}

func TestExecuteWorkflow_invalidInputJSON_noRunRow(t *testing.T) {
ctx := context.Background()
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "norun.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

rt := NewRuntime(testRunProjRoot(t), st)
_, err = rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{
RunID: "should-not-exist",
WorkflowName: "demo",
InputJSON: []byte(`{"topic":`),
})
if err == nil {
t.Fatal("expected error")
}

_, err = st.GetRun(ctx, "should-not-exist")
if err == nil {
t.Fatal("expected no run row")
}
}

func TestExecuteWorkflow_invalidInputSchema_noRunRow(t *testing.T) {
ctx := context.Background()
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "norun2.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

rt := NewRuntime(testRunProjRoot(t), st)
_, err = rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{
RunID: "schema-fail",
WorkflowName: "demo",
InputJSON: []byte(`{"wrong":true}`),
})
if err == nil {
t.Fatal("expected schema validation error")
}

_, err = st.GetRun(ctx, "schema-fail")
if err == nil {
t.Fatal("expected no run row")
}
}

func TestApplyEnvironment_mergesAgentConstraints(t *testing.T) {
g, err := project.LoadProject(testRunProjRoot(t))
if err != nil {
t.Fatal(err)
}
out, err := ApplyEnvironment(g, "staging")
if err != nil {
t.Fatal(err)
}
a := out.Agents["reviewer"]
if a == nil || a.Spec.Constraints == nil || a.Spec.Constraints.TimeoutSeconds != 99 {
t.Fatalf("constraints %+v", a)
}
}

func TestNewRunID_generatedWhenEmpty(t *testing.T) {
ctx := context.Background()
st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "genid.db"))
if err != nil {
t.Fatal(err)
}
t.Cleanup(func() { _ = st.Close() })

rt := NewRuntime(testRunProjRoot(t), st)
id, err := rt.ExecuteWorkflow(ctx, runtime.WorkflowRunOptions{
WorkflowName: "demo",
InputJSON: []byte(`{"topic":"x"}`),
})
if err != nil {
t.Fatal(err)
}
if id == "" {
t.Fatal("empty run id")
}
_, err = st.GetRun(ctx, id)
if err != nil {
t.Fatal(err)
}
}
Loading
Loading