Skip to content

Commit e0a2536

Browse files
authored
Merge pull request #56 from LAA-Software-Engineering/issue/22-workflow-engine
feat(engine): sequential workflow executor core (closes #22)
2 parents 9d35011 + 6d6287a commit e0a2536

8 files changed

Lines changed: 719 additions & 1 deletion

File tree

internal/engine/doc.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
11
// Package engine orchestrates workflow execution, steps, and interpolation.
22
//
3-
// [InterpolateString] and [InterpolateWalk] implement ${input.*} and ${steps.*} dot paths only (§13.1 MVP).
3+
// [InterpolateString] and [InterpolateWalk] implement ${input.*} and ${steps.*} dot paths only (design doc section 13.1 MVP).
4+
//
5+
// [Executor.Run] executes sequential workflows: interpolated step inputs, policy checks from the
6+
// workflow's Policy resource, tool and agent steps, optional JSON Schema validation for agent output,
7+
// persisted run_steps rows, and trace events (design doc sections 12.2 E, 13.3, 13.4, 14.2).
48
package engine

internal/engine/execution.go

Lines changed: 220 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,220 @@
1+
package engine
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"fmt"
7+
"strings"
8+
"time"
9+
10+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/models"
11+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/policy"
12+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/spec"
13+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/state"
14+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/tools"
15+
"github.com/LAA-Software-Engineering/agentic-control-plane/internal/trace"
16+
)
17+
18+
// Executor runs sequential workflow steps (design doc section 12.2 E, section 13).
19+
type Executor struct {
20+
Graph *spec.ProjectGraph
21+
ProjectRoot string
22+
Tools tools.ToolExecutor
23+
Models *models.Registry
24+
// ModelResolve, if set, is used instead of Models.ClientFor (tests inject mocks).
25+
ModelResolve func(modelRef string) (models.ModelClient, string, error)
26+
Store state.RuntimeStore
27+
Trace *trace.Recorder
28+
Now func() time.Time
29+
}
30+
31+
// RunInput identifies the workflow run and parsed input map (already JSON-valid).
32+
type RunInput struct {
33+
RunID string
34+
WorkflowName string
35+
Env string
36+
StartedAt time.Time
37+
Input map[string]any
38+
ApprovedActions []string
39+
}
40+
41+
func (e *Executor) now() time.Time {
42+
if e != nil && e.Now != nil {
43+
return e.Now()
44+
}
45+
return time.Now().UTC()
46+
}
47+
48+
func (e *Executor) modelClient(modelRef string) (models.ModelClient, string, error) {
49+
if e.ModelResolve != nil {
50+
return e.ModelResolve(modelRef)
51+
}
52+
if e.Models == nil {
53+
return nil, "", fmt.Errorf("engine: Models registry is nil")
54+
}
55+
return e.Models.ClientFor(modelRef)
56+
}
57+
58+
// Run executes a workflow sequentially: interpolate step inputs, policy checks, tool/agent calls,
59+
// optional JSON Schema validation for agent output, persisted run_steps and trace events.
60+
// The run row must already exist in [state.RuntimeStore] (e.g. via [state.RuntimeStore.StartRun]).
61+
func (e *Executor) Run(ctx context.Context, in RunInput) error {
62+
if e == nil || e.Store == nil {
63+
return fmt.Errorf("engine: nil executor or store")
64+
}
65+
if e.Graph == nil {
66+
return fmt.Errorf("engine: nil project graph")
67+
}
68+
wf, err := lookupWorkflow(e.Graph, in.WorkflowName)
69+
if err != nil {
70+
return err
71+
}
72+
if err := validateWorkflowInput(e.ProjectRoot, wf, in.Input); err != nil {
73+
return e.failRun(ctx, in, err, 0)
74+
}
75+
76+
polEng := policy.NewEngine(e.Graph)
77+
wfPol := polEng.Evaluator(strings.TrimSpace(wf.Spec.Policy))
78+
79+
ictx := Context{Input: in.Input, Steps: make(map[string]StepResult)}
80+
var totalCost float64
81+
finishAt := e.now()
82+
83+
for _, step := range wf.Spec.Steps {
84+
step := step
85+
if strings.TrimSpace(step.ID) == "" {
86+
return e.failRun(ctx, in, fmt.Errorf("engine: workflow step missing id"), totalCost)
87+
}
88+
uses := strings.TrimSpace(step.Uses)
89+
agentName := strings.TrimSpace(step.Agent)
90+
if (uses == "") == (agentName == "") {
91+
return e.failRun(ctx, in, fmt.Errorf("engine: step %q must set exactly one of uses or agent", step.ID), totalCost)
92+
}
93+
94+
withAny, err := InterpolateWalk(step.With, ictx)
95+
if err != nil {
96+
return e.failRun(ctx, in, fmt.Errorf("engine: step %q with: %w", step.ID, err), totalCost)
97+
}
98+
with, ok := withAny.(map[string]any)
99+
if !ok {
100+
with = map[string]any{}
101+
}
102+
103+
elapsed := e.now().Sub(in.StartedAt)
104+
pctx := policy.RunContext{
105+
StartedAt: in.StartedAt,
106+
Elapsed: elapsed,
107+
AccumulatedCostUSD: totalCost,
108+
ApprovedActions: in.ApprovedActions,
109+
}
110+
if err := wfPol.CheckRun(ctx, pctx); err != nil {
111+
return e.failRunStep(ctx, in, step.ID, with, err, totalCost)
112+
}
113+
114+
inJSON, _ := json.Marshal(with)
115+
started := e.now()
116+
if err := e.Store.UpsertRunStep(ctx, state.RunStep{
117+
RunID: in.RunID,
118+
StepID: step.ID,
119+
Status: "running",
120+
StartedAt: &started,
121+
InputJSON: string(inJSON),
122+
}); err != nil {
123+
return e.failRun(ctx, in, fmt.Errorf("engine: upsert step %q: %w", step.ID, err), totalCost)
124+
}
125+
if e.Trace != nil {
126+
_, _ = e.Trace.Append(ctx, in.RunID, step.ID, trace.EventStepStarted, map[string]any{"uses": uses, "agent": agentName})
127+
}
128+
129+
var out map[string]any
130+
var stepCost float64
131+
if uses != "" {
132+
var meta tools.ToolCallMeta
133+
out, meta, err = e.runToolStep(ctx, wfPol, in.RunID, step, with, pctx)
134+
stepCost = meta.CostUSD
135+
} else {
136+
ar, ok := e.Graph.Agents[agentName]
137+
if !ok || ar == nil {
138+
err = fmt.Errorf("engine: unknown agent %q", agentName)
139+
} else {
140+
var gmeta models.GenerateMeta
141+
out, gmeta, err = e.runAgentStep(ctx, wfPol, in.RunID, step, with, pctx, ar)
142+
stepCost = gmeta.CostUSD
143+
}
144+
}
145+
146+
finished := e.now()
147+
totalCost += stepCost
148+
if err != nil {
149+
_ = e.Store.UpsertRunStep(ctx, state.RunStep{
150+
RunID: in.RunID,
151+
StepID: step.ID,
152+
Status: "failed",
153+
StartedAt: &started,
154+
FinishedAt: &finished,
155+
InputJSON: string(inJSON),
156+
ErrorText: err.Error(),
157+
CostUSD: stepCost,
158+
})
159+
if e.Trace != nil {
160+
_, _ = e.Trace.Append(ctx, in.RunID, step.ID, trace.EventStepFailed, map[string]any{"error": err.Error()})
161+
}
162+
return e.failRun(ctx, in, fmt.Errorf("engine: step %q: %w", step.ID, err), totalCost)
163+
}
164+
165+
outJSON, _ := json.Marshal(out)
166+
if err := e.Store.UpsertRunStep(ctx, state.RunStep{
167+
RunID: in.RunID,
168+
StepID: step.ID,
169+
Status: "succeeded",
170+
StartedAt: &started,
171+
FinishedAt: &finished,
172+
InputJSON: string(inJSON),
173+
OutputJSON: string(outJSON),
174+
CostUSD: stepCost,
175+
}); err != nil {
176+
return e.failRun(ctx, in, fmt.Errorf("engine: upsert step %q: %w", step.ID, err), totalCost)
177+
}
178+
if e.Trace != nil {
179+
_, _ = e.Trace.Append(ctx, in.RunID, step.ID, trace.EventStepFinished, map[string]any{"costUsd": stepCost})
180+
}
181+
182+
meta := map[string]any{"costUsd": stepCost, "durationMs": finished.Sub(started).Milliseconds()}
183+
ictx.Steps[step.ID] = StepResult{Output: out, Meta: meta}
184+
}
185+
186+
finalOut, err := buildWorkflowOutput(wf, ictx)
187+
if err != nil {
188+
return e.failRun(ctx, in, err, totalCost)
189+
}
190+
outBytes, err := json.Marshal(finalOut)
191+
if err != nil {
192+
return e.failRun(ctx, in, err, totalCost)
193+
}
194+
finishAt = e.now()
195+
return e.Store.FinishRun(ctx, in.RunID, "succeeded", finishAt, string(outBytes), "", totalCost)
196+
}
197+
198+
func (e *Executor) failRun(ctx context.Context, in RunInput, runErr error, totalCost float64) error {
199+
finishAt := e.now()
200+
_ = e.Store.FinishRun(ctx, in.RunID, "failed", finishAt, "", runErr.Error(), totalCost)
201+
return runErr
202+
}
203+
204+
func (e *Executor) failRunStep(ctx context.Context, in RunInput, stepID string, with map[string]any, runErr error, totalCost float64) error {
205+
inJSON, _ := json.Marshal(with)
206+
now := e.now()
207+
_ = e.Store.UpsertRunStep(ctx, state.RunStep{
208+
RunID: in.RunID,
209+
StepID: stepID,
210+
Status: "failed",
211+
StartedAt: &now,
212+
FinishedAt: &now,
213+
InputJSON: string(inJSON),
214+
ErrorText: runErr.Error(),
215+
})
216+
if e.Trace != nil {
217+
_, _ = e.Trace.Append(ctx, in.RunID, stepID, trace.EventStepFailed, map[string]any{"error": runErr.Error()})
218+
}
219+
return e.failRun(ctx, in, runErr, totalCost)
220+
}

0 commit comments

Comments
 (0)