Skip to content

Commit 625cb81

Browse files
fix(test/runtime): pass input down instead of using env var
1 parent ce6f4fd commit 625cb81

File tree

6 files changed

+200
-85
lines changed

6 files changed

+200
-85
lines changed

engine/cld/changeset/common.go

Lines changed: 84 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ type Configurations struct {
3232
type internalChangeSet interface {
3333
noop() // unexported function to prevent arbitrary structs from implementing ChangeSet.
3434
Apply(env fdeployment.Environment) (fdeployment.ChangesetOutput, error)
35+
applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error)
3536
Configurations() (Configurations, error)
3637
}
3738

@@ -77,6 +78,34 @@ type TypedJSON struct {
7778
ChainOverrides []uint64 `json:"chainOverrides"` // Optional field for chain overrides
7879
}
7980

81+
func parseTypedInput(inputStr string) (TypedJSON, error) {
82+
if inputStr == "" {
83+
return TypedJSON{}, errors.New("input is empty")
84+
}
85+
86+
var inputObject TypedJSON
87+
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
88+
return TypedJSON{}, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
89+
}
90+
if len(inputObject.Payload) == 0 {
91+
return TypedJSON{}, errors.New("'payload' field is required")
92+
}
93+
94+
return inputObject, nil
95+
}
96+
97+
func decodePayload[C any](payload json.RawMessage) (C, error) {
98+
var config C
99+
100+
payloadDecoder := json.NewDecoder(strings.NewReader(string(payload)))
101+
payloadDecoder.DisallowUnknownFields()
102+
if err := payloadDecoder.Decode(&config); err != nil {
103+
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
104+
}
105+
106+
return config, nil
107+
}
108+
80109
// WithJSON returns a fully configured changeset, which pairs a [fdeployment.ChangeSet] with its configuration based
81110
// a JSON input. It also allows extensions, such as a PostProcessing function.
82111
// InputStr must be a JSON object with a "payload" field that contains the actual input data for a Durable Pipeline.
@@ -92,30 +121,13 @@ type TypedJSON struct {
92121
// Note: Prefer WithEnvInput for durable_pipelines.go
93122
func (f WrappedChangeSet[C]) WithJSON(_ C, inputStr string) ConfiguredChangeSet {
94123
return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
95-
var config C
96-
97-
if inputStr == "" {
98-
return config, errors.New("input is empty")
99-
}
100-
101-
var inputObject TypedJSON
102-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
103-
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
104-
}
105-
106-
// If payload is null, decode it as null (which will give zero value)
107-
// If payload is missing, return an error
108-
if len(inputObject.Payload) == 0 {
109-
return config, errors.New("'payload' field is required")
110-
}
111-
112-
payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
113-
payloadDecoder.DisallowUnknownFields()
114-
if err := payloadDecoder.Decode(&config); err != nil {
115-
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
124+
inputObject, err := parseTypedInput(inputStr)
125+
if err != nil {
126+
var zero C
127+
return zero, err
116128
}
117129

118-
return config, nil
130+
return decodePayload[C](inputObject.Payload)
119131
},
120132
inputChainOverrides: func() ([]uint64, error) {
121133
return loadInputChainOverrides(inputStr)
@@ -151,41 +163,36 @@ func (f WrappedChangeSet[C]) WithEnvInput(opts ...EnvInputOption[C]) ConfiguredC
151163

152164
inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")
153165

154-
return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
155-
var config C
156-
157-
if inputStr == "" {
158-
return config, errors.New("input is empty")
159-
}
160-
161-
var inputObject TypedJSON
162-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
163-
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
164-
}
166+
providerFromInput := func(rawInput string) (C, error) {
167+
var zero C
165168

166-
// If payload is null, decode it as null (which will give zero value)
167-
// If payload is missing, return an error
168-
if len(inputObject.Payload) == 0 {
169-
return config, errors.New("'payload' field is required")
169+
inputObject, err := parseTypedInput(rawInput)
170+
if err != nil {
171+
return zero, err
170172
}
171173

172-
payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
173-
payloadDecoder.DisallowUnknownFields()
174-
if err := payloadDecoder.Decode(&config); err != nil {
175-
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
174+
config, err := decodePayload[C](inputObject.Payload)
175+
if err != nil {
176+
return zero, err
176177
}
177178

178179
if options.inputModifier != nil {
179-
conf, err := options.inputModifier(config)
180-
if err != nil {
181-
return conf, fmt.Errorf("failed to apply input modifier: %w", err)
180+
conf, modifierErr := options.inputModifier(config)
181+
if modifierErr != nil {
182+
return conf, fmt.Errorf("failed to apply input modifier: %w", modifierErr)
182183
}
183184

184185
return conf, nil
185186
}
186187

187188
return config, nil
188-
},
189+
}
190+
191+
return ChangeSetImpl[C]{changeset: f,
192+
configProvider: func() (C, error) {
193+
return providerFromInput(inputStr)
194+
},
195+
configProviderWithInput: providerFromInput,
189196
inputChainOverrides: func() ([]uint64, error) {
190197
return loadInputChainOverrides(inputStr)
191198
},
@@ -221,21 +228,17 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
221228
// Read input from environment variable
222229
inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")
223230

224-
configProvider := func() (C, error) {
231+
configProviderFromInput := func(rawInput string) (C, error) {
225232
var zero C
226233

227-
if inputStr == "" {
234+
if rawInput == "" {
228235
return zero, errors.New("input is empty")
229236
}
230237

231-
// Parse JSON input
232238
var inputObject TypedJSON
233-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
239+
if err := json.Unmarshal([]byte(rawInput), &inputObject); err != nil {
234240
return zero, fmt.Errorf("failed to parse resolver input as JSON: %w", err)
235241
}
236-
237-
// If payload is null, pass it to the resolver (which will receive null)
238-
// If payload field is missing, return an error
239242
if len(inputObject.Payload) == 0 {
240243
return zero, errors.New("'payload' field is required")
241244
}
@@ -249,8 +252,12 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
249252
return typedConfig, nil
250253
}
251254

252-
return ChangeSetImpl[C]{changeset: f, configProvider: configProvider,
253-
ConfigResolver: resolver,
255+
return ChangeSetImpl[C]{changeset: f,
256+
configProvider: func() (C, error) {
257+
return configProviderFromInput(inputStr)
258+
},
259+
configProviderWithInput: configProviderFromInput,
260+
ConfigResolver: resolver,
254261
inputChainOverrides: func() ([]uint64, error) {
255262
return loadInputChainOverrides(inputStr)
256263
},
@@ -260,9 +267,10 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
260267
var _ ConfiguredChangeSet = ChangeSetImpl[any]{}
261268

262269
type ChangeSetImpl[C any] struct {
263-
changeset WrappedChangeSet[C]
264-
configProvider func() (C, error)
265-
inputChainOverrides func() ([]uint64, error)
270+
changeset WrappedChangeSet[C]
271+
configProvider func() (C, error)
272+
configProviderWithInput func(inputStr string) (C, error)
273+
inputChainOverrides func() ([]uint64, error)
266274

267275
// Present only when the changeset was wired with
268276
// Configure(...).WithConfigResolver(...)
@@ -287,6 +295,25 @@ func (ccs ChangeSetImpl[C]) Apply(env fdeployment.Environment) (fdeployment.Chan
287295
return ccs.changeset.operation.Apply(env, c)
288296
}
289297

298+
func (ccs ChangeSetImpl[C]) applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error) {
299+
if inputStr == "" {
300+
return ccs.Apply(env)
301+
}
302+
if ccs.configProviderWithInput == nil {
303+
return ccs.Apply(env)
304+
}
305+
306+
c, err := ccs.configProviderWithInput(inputStr)
307+
if err != nil {
308+
return fdeployment.ChangesetOutput{}, err
309+
}
310+
if err := ccs.changeset.operation.VerifyPreconditions(env, c); err != nil {
311+
return fdeployment.ChangesetOutput{}, err
312+
}
313+
314+
return ccs.changeset.operation.Apply(env, c)
315+
}
316+
290317
func (ccs ChangeSetImpl[C]) Configurations() (Configurations, error) {
291318
var chainOverrides []uint64
292319
var err error

engine/cld/changeset/postprocess.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ func (ccs PostProcessingChangeSetImpl[C]) Apply(env fdeployment.Environment) (fd
3535
return ccs.postProcessor(env, output)
3636
}
3737

38+
func (ccs PostProcessingChangeSetImpl[C]) applyWithInput(
39+
env fdeployment.Environment, inputStr string,
40+
) (fdeployment.ChangesetOutput, error) {
41+
env.Logger.Debugf("Post-processing ChangesetOutput from %T", ccs.changeset.changeset.operation)
42+
output, err := ccs.changeset.applyWithInput(env, inputStr)
43+
if err != nil {
44+
return output, err
45+
}
46+
47+
return ccs.postProcessor(env, output)
48+
}
49+
3850
func (ccs PostProcessingChangeSetImpl[C]) Configurations() (Configurations, error) {
3951
return ccs.changeset.Configurations()
4052
}

engine/cld/changeset/registry.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,21 @@ func (r *ChangesetsRegistry) AddGlobalPostHooks(hooks ...PostHook) {
172172
// a failed Apply are logged but never mask the Apply error.
173173
func (r *ChangesetsRegistry) Apply(
174174
key string, e fdeployment.Environment,
175+
) (fdeployment.ChangesetOutput, error) {
176+
return r.applyWithInput(key, e, "")
177+
}
178+
179+
// ApplyWithInput applies a changeset with explicit input JSON for this apply invocation.
180+
// Changesets configured to read durable pipeline input from environment variables
181+
// consume this value directly when they support input-aware execution.
182+
func (r *ChangesetsRegistry) ApplyWithInput(
183+
key string, e fdeployment.Environment, inputStr string,
184+
) (fdeployment.ChangesetOutput, error) {
185+
return r.applyWithInput(key, e, inputStr)
186+
}
187+
188+
func (r *ChangesetsRegistry) applyWithInput(
189+
key string, e fdeployment.Environment, inputStr string,
175190
) (fdeployment.ChangesetOutput, error) {
176191
entry, globalPre, globalPost, err := r.getApplySnapshot(key)
177192
if err != nil {
@@ -204,7 +219,9 @@ func (r *ChangesetsRegistry) Apply(
204219
}
205220
}
206221

207-
output, applyErr := entry.changeset.Apply(e)
222+
var output fdeployment.ChangesetOutput
223+
var applyErr error
224+
output, applyErr = entry.changeset.applyWithInput(e, inputStr)
208225

209226
postParams := PostHookParams{
210227
Env: hookEnv,

engine/cld/changeset/registry_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,10 @@ func (noopChangeset) Apply(e fdeployment.Environment) (fdeployment.ChangesetOutp
2525
return fdeployment.ChangesetOutput{}, nil
2626
}
2727

28+
func (n noopChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
29+
return n.Apply(e)
30+
}
31+
2832
func (n noopChangeset) Configurations() (Configurations, error) {
2933
return Configurations{
3034
InputChainOverrides: n.chainOverrides,
@@ -45,6 +49,10 @@ func (r *recordingChangeset) Apply(_ fdeployment.Environment) (fdeployment.Chang
4549
return r.output, r.err
4650
}
4751

52+
func (r *recordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
53+
return r.Apply(e)
54+
}
55+
4856
func (*recordingChangeset) Configurations() (Configurations, error) {
4957
return Configurations{}, nil
5058
}
@@ -61,6 +69,10 @@ func (o *orderRecordingChangeset) Apply(_ fdeployment.Environment) (fdeployment.
6169
return fdeployment.ChangesetOutput{}, nil
6270
}
6371

72+
func (o *orderRecordingChangeset) applyWithInput(e fdeployment.Environment, _ string) (fdeployment.ChangesetOutput, error) {
73+
return o.Apply(e)
74+
}
75+
6476
func (*orderRecordingChangeset) Configurations() (Configurations, error) {
6577
return Configurations{}, nil
6678
}
@@ -148,6 +160,63 @@ func Test_Changesets_Apply(t *testing.T) {
148160
}
149161
}
150162

163+
//nolint:paralleltest // Uses process environment for fallback behavior assertions.
164+
func Test_Changesets_ApplyWithInput_WithEnvConfiguredChangeset(t *testing.T) {
165+
type inputConfig struct {
166+
Value int `json:"value"`
167+
}
168+
169+
t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"value":999}}`)
170+
171+
var received int
172+
cs := fdeployment.CreateChangeSet(
173+
func(_ fdeployment.Environment, cfg inputConfig) (fdeployment.ChangesetOutput, error) {
174+
received = cfg.Value
175+
return fdeployment.ChangesetOutput{}, nil
176+
},
177+
func(_ fdeployment.Environment, _ inputConfig) error { return nil },
178+
)
179+
180+
r := NewChangesetsRegistry()
181+
r.Add("0001_test", Configure(cs).WithEnvInput())
182+
183+
_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"value":1}}`)
184+
require.NoError(t, err)
185+
require.Equal(t, 1, received)
186+
}
187+
188+
//nolint:paralleltest // Uses process environment for fallback behavior assertions.
189+
func Test_Changesets_ApplyWithInput_WithResolverConfiguredChangeset(t *testing.T) {
190+
type resolverInput struct {
191+
Base int `json:"base"`
192+
}
193+
type resolverOutput struct {
194+
Value int `json:"value"`
195+
}
196+
197+
t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"base":100}}`)
198+
199+
resolver := func(input resolverInput) (resolverOutput, error) {
200+
return resolverOutput{Value: input.Base + 10}, nil
201+
}
202+
203+
var received int
204+
cs := fdeployment.CreateChangeSet(
205+
func(_ fdeployment.Environment, cfg resolverOutput) (fdeployment.ChangesetOutput, error) {
206+
received = cfg.Value
207+
return fdeployment.ChangesetOutput{}, nil
208+
},
209+
func(_ fdeployment.Environment, _ resolverOutput) error { return nil },
210+
)
211+
212+
r := NewChangesetsRegistry()
213+
r.Add("0001_test", Configure(cs).WithConfigResolver(resolver))
214+
215+
_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"base":7}}`)
216+
require.NoError(t, err)
217+
require.Equal(t, 17, received)
218+
}
219+
151220
func Test_Changesets_Add(t *testing.T) {
152221
t.Parallel()
153222

0 commit comments

Comments
 (0)