diff --git a/internal/engine/doc.go b/internal/engine/doc.go index 0b99e5d..249a697 100644 --- a/internal/engine/doc.go +++ b/internal/engine/doc.go @@ -1,4 +1,8 @@ // Package engine orchestrates workflow execution, steps, and interpolation. // -// [InterpolateString] and [InterpolateWalk] implement ${input.*} and ${steps.*} dot paths only (ยง13.1 MVP). +// [InterpolateString] and [InterpolateWalk] implement ${input.*} and ${steps.*} dot paths only (design doc section 13.1 MVP). +// +// [Executor.Run] executes sequential workflows: interpolated step inputs, policy checks from the +// workflow's Policy resource, tool and agent steps, optional JSON Schema validation for agent output, +// persisted run_steps rows, and trace events (design doc sections 12.2 E, 13.3, 13.4, 14.2). package engine diff --git a/internal/engine/execution.go b/internal/engine/execution.go new file mode 100644 index 0000000..fbade01 --- /dev/null +++ b/internal/engine/execution.go @@ -0,0 +1,220 @@ +package engine + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/policy" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" +) + +// Executor runs sequential workflow steps (design doc section 12.2 E, section 13). +type Executor struct { + Graph *spec.ProjectGraph + ProjectRoot string + Tools tools.ToolExecutor + Models *models.Registry + // ModelResolve, if set, is used instead of Models.ClientFor (tests inject mocks). + ModelResolve func(modelRef string) (models.ModelClient, string, error) + Store state.RuntimeStore + Trace *trace.Recorder + Now func() time.Time +} + +// RunInput identifies the workflow run and parsed input map (already JSON-valid). +type RunInput struct { + RunID string + WorkflowName string + Env string + StartedAt time.Time + Input map[string]any + ApprovedActions []string +} + +func (e *Executor) now() time.Time { + if e != nil && e.Now != nil { + return e.Now() + } + return time.Now().UTC() +} + +func (e *Executor) modelClient(modelRef string) (models.ModelClient, string, error) { + if e.ModelResolve != nil { + return e.ModelResolve(modelRef) + } + if e.Models == nil { + return nil, "", fmt.Errorf("engine: Models registry is nil") + } + return e.Models.ClientFor(modelRef) +} + +// Run executes a workflow sequentially: interpolate step inputs, policy checks, tool/agent calls, +// optional JSON Schema validation for agent output, persisted run_steps and trace events. +// The run row must already exist in [state.RuntimeStore] (e.g. via [state.RuntimeStore.StartRun]). +func (e *Executor) Run(ctx context.Context, in RunInput) error { + if e == nil || e.Store == nil { + return fmt.Errorf("engine: nil executor or store") + } + if e.Graph == nil { + return fmt.Errorf("engine: nil project graph") + } + wf, err := lookupWorkflow(e.Graph, in.WorkflowName) + if err != nil { + return err + } + if err := validateWorkflowInput(e.ProjectRoot, wf, in.Input); err != nil { + return e.failRun(ctx, in, err, 0) + } + + polEng := policy.NewEngine(e.Graph) + wfPol := polEng.Evaluator(strings.TrimSpace(wf.Spec.Policy)) + + ictx := Context{Input: in.Input, Steps: make(map[string]StepResult)} + var totalCost float64 + finishAt := e.now() + + for _, step := range wf.Spec.Steps { + step := step + if strings.TrimSpace(step.ID) == "" { + return e.failRun(ctx, in, fmt.Errorf("engine: workflow step missing id"), totalCost) + } + uses := strings.TrimSpace(step.Uses) + agentName := strings.TrimSpace(step.Agent) + if (uses == "") == (agentName == "") { + return e.failRun(ctx, in, fmt.Errorf("engine: step %q must set exactly one of uses or agent", step.ID), totalCost) + } + + withAny, err := InterpolateWalk(step.With, ictx) + if err != nil { + return e.failRun(ctx, in, fmt.Errorf("engine: step %q with: %w", step.ID, err), totalCost) + } + with, ok := withAny.(map[string]any) + if !ok { + with = map[string]any{} + } + + elapsed := e.now().Sub(in.StartedAt) + pctx := policy.RunContext{ + StartedAt: in.StartedAt, + Elapsed: elapsed, + AccumulatedCostUSD: totalCost, + ApprovedActions: in.ApprovedActions, + } + if err := wfPol.CheckRun(ctx, pctx); err != nil { + return e.failRunStep(ctx, in, step.ID, with, err, totalCost) + } + + inJSON, _ := json.Marshal(with) + started := e.now() + if err := e.Store.UpsertRunStep(ctx, state.RunStep{ + RunID: in.RunID, + StepID: step.ID, + Status: "running", + StartedAt: &started, + InputJSON: string(inJSON), + }); err != nil { + return e.failRun(ctx, in, fmt.Errorf("engine: upsert step %q: %w", step.ID, err), totalCost) + } + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, in.RunID, step.ID, trace.EventStepStarted, map[string]any{"uses": uses, "agent": agentName}) + } + + var out map[string]any + var stepCost float64 + if uses != "" { + var meta tools.ToolCallMeta + out, meta, err = e.runToolStep(ctx, wfPol, in.RunID, step, with, pctx) + stepCost = meta.CostUSD + } else { + ar, ok := e.Graph.Agents[agentName] + if !ok || ar == nil { + err = fmt.Errorf("engine: unknown agent %q", agentName) + } else { + var gmeta models.GenerateMeta + out, gmeta, err = e.runAgentStep(ctx, wfPol, in.RunID, step, with, pctx, ar) + stepCost = gmeta.CostUSD + } + } + + finished := e.now() + totalCost += stepCost + if err != nil { + _ = e.Store.UpsertRunStep(ctx, state.RunStep{ + RunID: in.RunID, + StepID: step.ID, + Status: "failed", + StartedAt: &started, + FinishedAt: &finished, + InputJSON: string(inJSON), + ErrorText: err.Error(), + CostUSD: stepCost, + }) + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, in.RunID, step.ID, trace.EventStepFailed, map[string]any{"error": err.Error()}) + } + return e.failRun(ctx, in, fmt.Errorf("engine: step %q: %w", step.ID, err), totalCost) + } + + outJSON, _ := json.Marshal(out) + if err := e.Store.UpsertRunStep(ctx, state.RunStep{ + RunID: in.RunID, + StepID: step.ID, + Status: "succeeded", + StartedAt: &started, + FinishedAt: &finished, + InputJSON: string(inJSON), + OutputJSON: string(outJSON), + CostUSD: stepCost, + }); err != nil { + return e.failRun(ctx, in, fmt.Errorf("engine: upsert step %q: %w", step.ID, err), totalCost) + } + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, in.RunID, step.ID, trace.EventStepFinished, map[string]any{"costUsd": stepCost}) + } + + meta := map[string]any{"costUsd": stepCost, "durationMs": finished.Sub(started).Milliseconds()} + ictx.Steps[step.ID] = StepResult{Output: out, Meta: meta} + } + + finalOut, err := buildWorkflowOutput(wf, ictx) + if err != nil { + return e.failRun(ctx, in, err, totalCost) + } + outBytes, err := json.Marshal(finalOut) + if err != nil { + return e.failRun(ctx, in, err, totalCost) + } + finishAt = e.now() + return e.Store.FinishRun(ctx, in.RunID, "succeeded", finishAt, string(outBytes), "", totalCost) +} + +func (e *Executor) failRun(ctx context.Context, in RunInput, runErr error, totalCost float64) error { + finishAt := e.now() + _ = e.Store.FinishRun(ctx, in.RunID, "failed", finishAt, "", runErr.Error(), totalCost) + return runErr +} + +func (e *Executor) failRunStep(ctx context.Context, in RunInput, stepID string, with map[string]any, runErr error, totalCost float64) error { + inJSON, _ := json.Marshal(with) + now := e.now() + _ = e.Store.UpsertRunStep(ctx, state.RunStep{ + RunID: in.RunID, + StepID: stepID, + Status: "failed", + StartedAt: &now, + FinishedAt: &now, + InputJSON: string(inJSON), + ErrorText: runErr.Error(), + }) + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, in.RunID, stepID, trace.EventStepFailed, map[string]any{"error": runErr.Error()}) + } + return e.failRun(ctx, in, runErr, totalCost) +} diff --git a/internal/engine/execution_test.go b/internal/engine/execution_test.go new file mode 100644 index 0000000..1c974c7 --- /dev/null +++ b/internal/engine/execution_test.go @@ -0,0 +1,257 @@ +package engine + +import ( + "context" + "encoding/json" + "path/filepath" + "runtime" + "testing" + "time" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/state/sqlite" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" +) + +func testProjectRoot(t *testing.T) string { + t.Helper() + _, file, _, ok := runtime.Caller(0) + if !ok { + t.Fatal("runtime.Caller") + } + return filepath.Clean(filepath.Join(filepath.Dir(file), "testdata", "wfproj")) +} + +func TestRun_sequentialToolAndAgent_mockModel(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "run.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + root := testProjectRoot(t) + graph := &spec.ProjectGraph{ + Spec: spec.ProjectSpec{ + Providers: &spec.ProjectProviders{ + Models: map[string]spec.ModelProviderConfig{ + "mock": {Type: "mock"}, + }, + }, + }, + Tools: map[string]*spec.ToolResource{ + "helper": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindTool, + Metadata: spec.Metadata{Name: "helper"}, + Spec: spec.ToolSpec{Type: "native"}, + }, + }, + Agents: map[string]*spec.AgentResource{ + "reviewer": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindAgent, + Metadata: spec.Metadata{Name: "reviewer"}, + Spec: spec.AgentSpec{ + Model: "mock/gpt-4", + Instructions: "Summarize the tool payload as JSON.", + Output: &spec.AgentIO{Schema: "./schemas/agent-out.schema.json"}, + }, + }, + }, + Workflows: map[string]*spec.WorkflowResource{ + "demo": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindWorkflow, + Metadata: spec.Metadata{Name: "demo"}, + Spec: spec.WorkflowSpec{ + Steps: []spec.WorkflowStep{ + { + ID: "fetch", + Uses: "tool.helper.echo", + With: map[string]any{ + "topic": "${input.topic}", + "extra": "x", + }, + }, + { + ID: "summarize", + Agent: "reviewer", + With: map[string]any{ + "echo": "${steps.fetch.output.echo}", + }, + }, + }, + Output: &spec.WorkflowOutput{ + Value: map[string]any{ + "topic": "${input.topic}", + "summary": "${steps.summarize.output.summary}", + }, + }, + }, + }, + }, + } + + runID := "run-1" + started := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + inJSON := `{"topic":"agents"}` + if err := st.StartRun(ctx, state.Run{ + RunID: runID, + WorkflowName: "demo", + Env: "dev", + Status: "running", + StartedAt: started, + InputJSON: inJSON, + TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + + var input map[string]any + if err := json.Unmarshal([]byte(inJSON), &input); err != nil { + t.Fatal(err) + } + + ex := &Executor{ + Graph: graph, + ProjectRoot: root, + Tools: tools.NewRegistry(graph), + Models: models.NewRegistry(graph), + Store: st, + Trace: trace.NewRecorder(st), + } + if err := ex.Run(ctx, RunInput{ + RunID: runID, + WorkflowName: "demo", + Env: "dev", + StartedAt: started, + Input: input, + }); err != nil { + t.Fatal(err) + } + + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != "succeeded" { + t.Fatalf("status %q err=%q", got.Status, got.ErrorText) + } + var out map[string]any + if err := json.Unmarshal([]byte(got.OutputJSON), &out); err != nil { + t.Fatal(err) + } + if out["topic"] != "agents" { + t.Fatalf("topic %+v", out) + } + if out["summary"] != "mock" { + t.Fatalf("summary %+v", out) + } + + events, err := trace.NewReader(st).ListByRunID(ctx, runID) + if err != nil { + t.Fatal(err) + } + if len(events) < 4 { + t.Fatalf("expected trace events, got %d", len(events)) + } +} + +func TestRun_agentOutputSchemaInvalid_failsRun(t *testing.T) { + ctx := context.Background() + st, err := sqlite.Open(ctx, filepath.Join(t.TempDir(), "run2.db")) + if err != nil { + t.Fatal(err) + } + t.Cleanup(func() { _ = st.Close() }) + + root := testProjectRoot(t) + graph := &spec.ProjectGraph{ + Agents: map[string]*spec.AgentResource{ + "bad": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindAgent, + Metadata: spec.Metadata{Name: "bad"}, + Spec: spec.AgentSpec{ + Model: "mock/x", + Instructions: "Return JSON.", + Output: &spec.AgentIO{Schema: "./schemas/agent-out.schema.json"}, + }, + }, + }, + Workflows: map[string]*spec.WorkflowResource{ + "one": { + APIVersion: spec.APIVersionV0, + Kind: spec.KindWorkflow, + Metadata: spec.Metadata{Name: "one"}, + Spec: spec.WorkflowSpec{ + Steps: []spec.WorkflowStep{ + {ID: "only", Agent: "bad", With: map[string]any{}}, + }, + }, + }, + }, + } + + runID := "run-bad" + started := time.Date(2026, 4, 11, 12, 0, 0, 0, time.UTC) + if err := st.StartRun(ctx, state.Run{ + RunID: runID, + WorkflowName: "one", + Env: "dev", + Status: "running", + StartedAt: started, + InputJSON: `{}`, + TotalCostUSD: 0, + }); err != nil { + t.Fatal(err) + } + + ex := &Executor{ + Graph: graph, + ProjectRoot: root, + Tools: tools.NewRegistry(graph), + ModelResolve: func(modelRef string) (models.ModelClient, string, error) { + _ = modelRef + return &models.MockClient{Content: `{"findings":[]}`}, "x", nil + }, + Store: st, + } + err = ex.Run(ctx, RunInput{ + RunID: runID, + WorkflowName: "one", + Env: "dev", + StartedAt: started, + Input: map[string]any{}, + }) + if err == nil { + t.Fatal("expected schema validation error") + } + + got, err := st.GetRun(ctx, runID) + if err != nil { + t.Fatal(err) + } + if got.Status != "failed" { + t.Fatalf("status %q", got.Status) + } +} + +func TestWithAgentRetry_transientTwiceSucceeds(t *testing.T) { + ctx := context.Background() + var n int + err := withAgentRetry(ctx, func() error { + n++ + if n == 1 { + return ErrTransientGeneration + } + return nil + }) + if err != nil || n != 2 { + t.Fatalf("n=%d err=%v", n, err) + } +} diff --git a/internal/engine/retries.go b/internal/engine/retries.go new file mode 100644 index 0000000..5e539ea --- /dev/null +++ b/internal/engine/retries.go @@ -0,0 +1,30 @@ +package engine + +import ( + "context" + "errors" + "time" +) + +// ErrTransientGeneration marks a model [models.ModelClient] error as eligible for one MVP retry +// (design doc section 13.4). Wrappers should use fmt.Errorf("...: %w", ErrTransientGeneration). +var ErrTransientGeneration = errors.New("engine: transient model generation failure") + +func isTransientGeneration(err error) bool { + return err != nil && errors.Is(err, ErrTransientGeneration) +} + +// withAgentRetry runs fn once, then once more if the first error is a transient generation failure +// and ctx is not done. +func withAgentRetry(ctx context.Context, fn func() error) error { + err := fn() + if err == nil || !isTransientGeneration(err) { + return err + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(50 * time.Millisecond): + } + return fn() +} diff --git a/internal/engine/steps.go b/internal/engine/steps.go new file mode 100644 index 0000000..47c6396 --- /dev/null +++ b/internal/engine/steps.go @@ -0,0 +1,127 @@ +package engine + +import ( + "context" + "encoding/json" + "fmt" + "strings" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/models" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/policy" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/schema" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace" +) + +func validateAgentOutput(projectRoot string, agent *spec.AgentResource, content string) error { + if agent == nil || agent.Spec.Output == nil { + return nil + } + sref := strings.TrimSpace(agent.Spec.Output.Schema) + if sref == "" { + return nil + } + path, err := schema.ResolveSchemaPath(projectRoot, sref) + if err != nil { + return fmt.Errorf("engine: agent output schema: %w", err) + } + if err := schema.Validate(path, []byte(strings.TrimSpace(content))); err != nil { + return fmt.Errorf("engine: agent output: %w", err) + } + return nil +} + +func parseAgentJSONObject(content string) (map[string]any, error) { + content = strings.TrimSpace(content) + if content == "" { + return nil, fmt.Errorf("engine: empty agent response") + } + var m map[string]any + if err := json.Unmarshal([]byte(content), &m); err != nil { + return nil, fmt.Errorf("engine: agent response is not a JSON object: %w", err) + } + return m, nil +} + +func (e *Executor) runToolStep(ctx context.Context, pol policy.PolicyEvaluator, runID string, step spec.WorkflowStep, with map[string]any, pctx policy.RunContext) (map[string]any, tools.ToolCallMeta, error) { + uses := strings.TrimSpace(step.Uses) + if err := pol.CheckToolCall(ctx, policy.ToolCallContext{Run: pctx, StepID: step.ID, Uses: uses}); err != nil { + return nil, tools.ToolCallMeta{}, err + } + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, runID, step.ID, trace.EventToolCalled, map[string]any{"uses": uses}) + } + if e.Tools == nil { + return nil, tools.ToolCallMeta{}, fmt.Errorf("engine: nil tool executor") + } + resp, err := e.Tools.Call(ctx, tools.ToolCallRequest{Uses: uses, With: with}) + if err != nil { + return nil, tools.ToolCallMeta{}, err + } + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, runID, step.ID, trace.EventToolCompleted, map[string]any{"uses": uses, "costUsd": resp.Meta.CostUSD}) + } + if err := pol.CheckStep(ctx, policy.StepContext{StepID: step.ID, OutputIsStructured: true}); err != nil { + return nil, resp.Meta, err + } + return resp.Output, resp.Meta, nil +} + +func (e *Executor) runAgentStep(ctx context.Context, pol policy.PolicyEvaluator, runID string, step spec.WorkflowStep, with map[string]any, pctx policy.RunContext, agent *spec.AgentResource) (map[string]any, models.GenerateMeta, error) { + if agent == nil { + return nil, models.GenerateMeta{}, fmt.Errorf("engine: nil agent resource") + } + modelRef := strings.TrimSpace(agent.Spec.Model) + cli, modelID, err := e.modelClient(modelRef) + if err != nil { + return nil, models.GenerateMeta{}, err + } + sec := 0 + if agent.Spec.Constraints != nil { + sec = agent.Spec.Constraints.TimeoutSeconds + } + ctx2, cancel := withSecondsTimeout(ctx, sec) + defer cancel() + + payload, err := json.Marshal(with) + if err != nil { + return nil, models.GenerateMeta{}, err + } + instructions := strings.TrimSpace(agent.Spec.Instructions) + messages := []models.ChatMessage{ + {Role: "system", Content: instructions}, + {Role: "user", Content: string(payload)}, + } + + var resp models.GenerateResponse + err = withAgentRetry(ctx2, func() error { + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, runID, step.ID, trace.EventModelCalled, map[string]any{"agent": step.Agent, "model": modelRef}) + } + r, genErr := cli.Generate(ctx2, models.GenerateRequest{Model: modelID, Messages: messages}) + if genErr != nil { + return genErr + } + resp = r + return nil + }) + if err != nil { + return nil, models.GenerateMeta{}, err + } + if e.Trace != nil { + _, _ = e.Trace.Append(ctx, runID, step.ID, trace.EventModelCompleted, map[string]any{"agent": step.Agent, "costUsd": resp.Meta.CostUSD}) + } + if err := validateAgentOutput(e.ProjectRoot, agent, resp.Content); err != nil { + return nil, resp.Meta, err + } + out, err := parseAgentJSONObject(resp.Content) + if err != nil { + return nil, resp.Meta, err + } + structured := true + if err := pol.CheckStep(ctx, policy.StepContext{StepID: step.ID, OutputIsStructured: structured}); err != nil { + return nil, resp.Meta, err + } + return out, resp.Meta, nil +} diff --git a/internal/engine/testdata/wfproj/schemas/agent-out.schema.json b/internal/engine/testdata/wfproj/schemas/agent-out.schema.json new file mode 100644 index 0000000..70af713 --- /dev/null +++ b/internal/engine/testdata/wfproj/schemas/agent-out.schema.json @@ -0,0 +1,9 @@ +{ + "$schema": "https://json-schema.org/draft/2020-12/schema", + "type": "object", + "required": ["summary"], + "properties": { + "summary": { "type": "string" } + }, + "additionalProperties": true +} diff --git a/internal/engine/timeout.go b/internal/engine/timeout.go new file mode 100644 index 0000000..8d29d0e --- /dev/null +++ b/internal/engine/timeout.go @@ -0,0 +1,14 @@ +package engine + +import ( + "context" + "time" +) + +// withSecondsTimeout returns a child context with timeout when seconds > 0; otherwise parent and a no-op cancel. +func withSecondsTimeout(parent context.Context, seconds int) (context.Context, context.CancelFunc) { + if seconds <= 0 { + return parent, func() {} + } + return context.WithTimeout(parent, time.Duration(seconds)*time.Second) +} diff --git a/internal/engine/workflow.go b/internal/engine/workflow.go new file mode 100644 index 0000000..4f95ffc --- /dev/null +++ b/internal/engine/workflow.go @@ -0,0 +1,57 @@ +package engine + +import ( + "encoding/json" + "fmt" + + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/schema" + "github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec" +) + +func lookupWorkflow(g *spec.ProjectGraph, name string) (*spec.WorkflowResource, error) { + if g == nil || g.Workflows == nil { + return nil, fmt.Errorf("engine: unknown workflow %q", name) + } + wf, ok := g.Workflows[name] + if !ok || wf == nil { + return nil, fmt.Errorf("engine: unknown workflow %q", name) + } + return wf, nil +} + +func validateWorkflowInput(projectRoot string, wf *spec.WorkflowResource, input map[string]any) error { + if wf == nil || wf.Spec.Input == nil { + return nil + } + sref := wf.Spec.Input.Schema + if sref == "" { + return nil + } + path, err := schema.ResolveSchemaPath(projectRoot, sref) + if err != nil { + return fmt.Errorf("engine: workflow input schema: %w", err) + } + raw, err := json.Marshal(input) + if err != nil { + return fmt.Errorf("engine: marshal workflow input: %w", err) + } + if err := schema.Validate(path, raw); err != nil { + return fmt.Errorf("engine: workflow input: %w", err) + } + return nil +} + +func buildWorkflowOutput(wf *spec.WorkflowResource, ictx Context) (map[string]any, error) { + if wf == nil || wf.Spec.Output == nil || wf.Spec.Output.Value == nil { + return map[string]any{}, nil + } + v, err := InterpolateWalk(wf.Spec.Output.Value, ictx) + if err != nil { + return nil, err + } + out, ok := v.(map[string]any) + if !ok { + return nil, fmt.Errorf("engine: workflow output value must interpolate to an object") + } + return out, nil +}