Skip to content

Commit 97d280f

Browse files
committed
Merge remote-tracking branch 'upstream/wolo/workflows' into edges_validator
2 parents aaa487a + 01c5d96 commit 97d280f

26 files changed

Lines changed: 2371 additions & 220 deletions

agent/agent.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,11 @@ func (c *invocationContext) WithContext(ctx context.Context) InvocationContext {
504504
return &newCtx
505505
}
506506

507+
// TriggeredBy returns "" for non-workflow invocation contexts. The
508+
// workflow engine wraps this context in a private per-node context
509+
// that overrides the method when running inside a workflow.Node.
510+
func (c *invocationContext) TriggeredBy() string { return "" }
511+
507512
func pluginManagerFromContext(ctx context.Context) pluginManager {
508513
a := ctx.Value(plugincontext.PluginManagerCtxKey)
509514
m, ok := a.(pluginManager)

agent/context.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,14 @@ type InvocationContext interface {
9696
// Ended returns whether the invocation has ended.
9797
Ended() bool
9898

99+
// TriggeredBy returns the name of the upstream node whose output
100+
// scheduled the current node activation, when this context is
101+
// running inside a workflow.Node. Returns "" outside a workflow
102+
// (e.g. in plain agent runs) and for the initial workflow START
103+
// activation. The workflow engine populates this via an internal
104+
// per-node context wrapper.
105+
TriggeredBy() string
106+
99107
// WithContext returns a new instance of the context with overridden embedded context.
100108
// NOTE: This is a temporary solution and will be removed later. The proper solution
101109
// we plan is to stop embedding go context in adk context types and split it.

agent/workflowagent/workflow_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ func (m *MockInvocationContext) Branch() string { return "" }
9494
func (m *MockInvocationContext) RunConfig() *agent.RunConfig { return nil }
9595
func (m *MockInvocationContext) Ended() bool { return false }
9696
func (m *MockInvocationContext) EndInvocation() {}
97+
func (m *MockInvocationContext) TriggeredBy() string { return "" }
9798

9899
func TestWorkflowAgent(t *testing.T) {
99100
upperFn := func(ctx agent.InvocationContext, input any) (string, error) {

internal/configurable/conformance/replayplugin/replay_plugin_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -400,6 +400,7 @@ func (m *MockInvocationContext) RunConfig() *agent.RunConfig
400400
func (m *MockInvocationContext) EndInvocation() {}
401401
func (m *MockInvocationContext) Ended() bool { return false }
402402
func (m *MockInvocationContext) WithContext(ctx context.Context) agent.InvocationContext { return m }
403+
func (m *MockInvocationContext) TriggeredBy() string { return "" }
403404
func (m *MockInvocationContext) Value(key any) any { return nil }
404405
func (m *MockInvocationContext) Deadline() (deadline time.Time, ok bool) { return time.Time{}, false }
405406
func (m *MockInvocationContext) Done() <-chan struct{} { return nil }

internal/context/invocation_context.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,9 @@ func (c *InvocationContext) WithContext(ctx context.Context) agent.InvocationCon
100100
return &newCtx
101101
}
102102

103+
// TriggeredBy returns "" for non-workflow invocation contexts. The
104+
// workflow engine wraps this context in a private per-node context
105+
// that overrides the method when running inside a workflow.Node.
106+
func (c *InvocationContext) TriggeredBy() string { return "" }
107+
103108
var _ agent.InvocationContext = (*InvocationContext)(nil)

internal/llminternal/functions_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ func (m *mockInvocationContext) Branch() string {
5555
return m.branch
5656
}
5757

58+
func (m *mockInvocationContext) TriggeredBy() string { return "" }
59+
5860
func TestGenerateRequestConfirmationEvent(t *testing.T) {
5961
confirmingFunctionCall := &genai.FunctionCall{
6062
ID: "call_1",

workflow/base_node.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package workflow
16+
17+
// BaseNode provides identity and a default Config implementation for
18+
// types that satisfy the Node interface. Custom node types embed
19+
// BaseNode by value and supply only Run.
20+
type BaseNode struct {
21+
name string
22+
desc string
23+
config NodeConfig
24+
}
25+
26+
// NewBaseNode returns a BaseNode with the given identity and
27+
// configuration. Embedders typically call it from their own
28+
// constructor:
29+
//
30+
// type CustomNode struct {
31+
// BaseNode
32+
// // ...
33+
// }
34+
//
35+
// func NewCustomNode(name string, cfg NodeConfig) *CustomNode {
36+
// return &CustomNode{BaseNode: NewBaseNode(name, "", cfg)}
37+
// }
38+
func NewBaseNode(name, description string, cfg NodeConfig) BaseNode {
39+
return BaseNode{name: name, desc: description, config: cfg}
40+
}
41+
42+
// Name returns the node's name.
43+
func (b BaseNode) Name() string { return b.name }
44+
45+
// Description returns the node's human-readable description.
46+
func (b BaseNode) Description() string { return b.desc }
47+
48+
// Config returns the node's configuration.
49+
func (b BaseNode) Config() NodeConfig { return b.config }

workflow/base_node_test.go

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,108 @@
1+
// Copyright 2026 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package workflow
16+
17+
import (
18+
"testing"
19+
"time"
20+
21+
"github.com/google/go-cmp/cmp"
22+
)
23+
24+
// Compile-time assertions: every built-in workflow node must satisfy
25+
// the Node interface.
26+
var (
27+
_ Node = (*startNode)(nil)
28+
_ Node = (*FunctionNode)(nil)
29+
)
30+
31+
func TestNewBaseNode_RoundTrip(t *testing.T) {
32+
tTrue := true
33+
tFalse := false
34+
tests := []struct {
35+
name string
36+
nameArg string
37+
descArg string
38+
cfg NodeConfig
39+
wantConfig NodeConfig
40+
}{
41+
{
42+
name: "zero config",
43+
nameArg: "n",
44+
descArg: "desc",
45+
},
46+
{
47+
name: "WaitForOutput=true (JoinNode shape)",
48+
nameArg: "join",
49+
descArg: "fan-in",
50+
cfg: NodeConfig{WaitForOutput: &tTrue},
51+
wantConfig: NodeConfig{WaitForOutput: &tTrue},
52+
},
53+
{
54+
name: "ParallelWorker=true",
55+
nameArg: "mapper",
56+
descArg: "data parallel",
57+
cfg: NodeConfig{ParallelWorker: true},
58+
wantConfig: NodeConfig{ParallelWorker: true},
59+
},
60+
{
61+
name: "empty name and description",
62+
cfg: NodeConfig{},
63+
wantConfig: NodeConfig{},
64+
},
65+
{
66+
name: "fully populated configuration",
67+
nameArg: "full_node",
68+
descArg: "Node with all config fields set",
69+
cfg: NodeConfig{
70+
ParallelWorker: true,
71+
RerunOnResume: &tFalse,
72+
WaitForOutput: &tTrue,
73+
RetryConfig: &RetryConfig{
74+
MaxAttempts: 3,
75+
},
76+
Timeout: 10 * time.Second,
77+
},
78+
wantConfig: NodeConfig{
79+
ParallelWorker: true,
80+
RerunOnResume: &tFalse,
81+
WaitForOutput: &tTrue,
82+
RetryConfig: &RetryConfig{
83+
MaxAttempts: 3,
84+
},
85+
Timeout: 10 * time.Second,
86+
},
87+
},
88+
}
89+
90+
for _, tt := range tests {
91+
t.Run(tt.name, func(t *testing.T) {
92+
b := NewBaseNode(tt.nameArg, tt.descArg, tt.cfg)
93+
if got := b.Name(); got != tt.nameArg {
94+
t.Errorf("Name() = %q, want %q", got, tt.nameArg)
95+
}
96+
if got := b.Description(); got != tt.descArg {
97+
t.Errorf("Description() = %q, want %q", got, tt.descArg)
98+
}
99+
want := tt.wantConfig
100+
if (tt.cfg.WaitForOutput == nil) && (tt.cfg.ParallelWorker == false) {
101+
want = NodeConfig{}
102+
}
103+
if diff := cmp.Diff(want, b.Config()); diff != "" {
104+
t.Errorf("Config() mismatch (-want +got):\n%s", diff)
105+
}
106+
})
107+
}
108+
}

workflow/config.go

Lines changed: 56 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -28,26 +28,61 @@ var defaultRetryConfig = RetryConfig{
2828
},
2929
}
3030

31+
// DefaultRetryConfig returns a copy of the default retry policy
32+
// (5 attempts, 1s initial delay, 60s cap, 2x backoff, full jitter,
33+
// retry every error). Override fields on the returned value to
34+
// customise:
35+
//
36+
// rc := workflow.DefaultRetryConfig()
37+
// rc.MaxAttempts = 10
38+
// cfg := workflow.NodeConfig{RetryConfig: rc}
3139
func DefaultRetryConfig() *RetryConfig {
3240
cfg := defaultRetryConfig
3341
return &cfg
3442
}
3543

3644
// NodeConfig defines the configuration for a node.
45+
//
46+
// All fields are optional. The pointer-typed fields (RerunOnResume,
47+
// WaitForOutput) are tri-state: nil means "use the engine default for
48+
// this node kind", which mirrors Python's per-node-type defaults
49+
// (e.g. AgentNode in task mode defaults WaitForOutput to true while
50+
// other nodes default to false). Use the *Or accessor helpers to
51+
// read these values with an explicit per-call-site default.
3752
type NodeConfig struct {
38-
// Enables data parallelism (runs node concurrently for each item in input collection)
53+
// ParallelWorker, when true, runs the node concurrently for each
54+
// item of a list-typed input. The engine collects per-item
55+
// outputs and emits a single aggregate output event.
3956
ParallelWorker bool
40-
// Re-runs node on resume. Defaults to true for AgentNode
57+
58+
// RerunOnResume controls human-in-the-loop resume behaviour. When
59+
// true, an interrupted node re-runs from scratch on resume; when
60+
// false, the resume payload is treated as the node's output.
61+
// nil means "use the engine default", which is true for AgentNode
62+
// and false elsewhere.
4163
RerunOnResume *bool
42-
// Wait for output before triggering edges. Defaults to true for Task agents
64+
65+
// WaitForOutput, when true, keeps the node in NodeWaiting
66+
// (re-triggerable) until it actually yields an event carrying an
67+
// "output" key in Actions.StateDelta, instead of moving it to
68+
// NodeCompleted on first return. JoinNode and any custom fan-in
69+
// node sets this. nil means "use the engine default" — false for
70+
// most node kinds.
4371
WaitForOutput *bool
44-
// Retry configuration on failure
72+
73+
// RetryConfig, when non-nil, makes the scheduler retry this node
74+
// on failure per the policy. nil means "no retries".
4575
RetryConfig *RetryConfig
46-
// Max duration for node to complete. Optional for global defaults
47-
Timeout *time.Duration
76+
77+
// Timeout, when > 0, bounds a single activation of the node via
78+
// context.WithTimeout on the per-node context. Zero (the default)
79+
// means the node is bounded only by the parent invocation
80+
// context's deadline, if any.
81+
Timeout time.Duration
4882
}
4983

5084
// RetryConfig defines the parameters for retrying a failed node.
85+
//
5186
// Recommended construction is via DefaultRetryConfig and override
5287
// the fields you want to customize:
5388
//
@@ -59,17 +94,30 @@ type NodeConfig struct {
5994
// but discouraged: any unset field defaults to its zero value, not
6095
// to DefaultRetryConfig's value. The zero RetryConfig is a valid
6196
// "no retry, no backoff, no jitter" policy.
97+
//
98+
// The struct shape is deliberately a flat set of plain values: no
99+
// dependency on cenkalti/backoff/v5.
62100
type RetryConfig struct {
63101
// Maximum number of attempts, including the original request. If 0 or 1, it means no retries. If not specified, default to 5.
64102
MaxAttempts int
103+
65104
// Initial delay before the first retry, in fractions of a second. If not specified, default to 1 second.
66105
InitialDelay time.Duration
106+
67107
// Maximum delay between retries, in fractions of a second. If not specified, default to 60 seconds.
68108
MaxDelay time.Duration
69-
// Multiplier by which the delay increases after each attempt. If not specified, default to 2.0.
109+
110+
// BackoffFactor is the per-attempt multiplier applied to the
111+
// delay. A factor of 1.0 means a constant InitialDelay between
112+
// retries; 2.0 means classic exponential backoff. Values < 1.0
113+
// shrink the delay each attempt (rare but permitted).
70114
BackoffFactor float64
71-
// Randomness factor for the delay. Use 0.0 to remove randomness. If not specified, default to 1.0.
115+
116+
// Jitter is a randomness factor in [0.0, 1.0]. The actual delay
117+
// is sampled from delay * (1 ± Jitter). Zero means deterministic
118+
// delays.
72119
Jitter float64
120+
73121
// Predicate that defines when to retry (true means retry). If not specified, default to true.
74122
ShouldRetry func(error) bool
75123
}

workflow/config_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,9 @@ import (
2222
"github.com/google/go-cmp/cmp/cmpopts"
2323
)
2424

25+
// TestDefaultRetryConfig verifies the values returned by
26+
// DefaultRetryConfig (5 attempts, 1s initial delay, 60s cap, 2x
27+
// backoff, full jitter, retry-every-error predicate).
2528
func TestDefaultRetryConfig(t *testing.T) {
2629
want := &RetryConfig{
2730
MaxAttempts: 5,

0 commit comments

Comments
 (0)