Skip to content

Commit fa1b00d

Browse files
fix(test/runtime): pass input down instead of using env var
1 parent ce6f4fd commit fa1b00d

File tree

6 files changed

+193
-85
lines changed

6 files changed

+193
-85
lines changed

engine/cld/changeset/common.go

Lines changed: 80 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,34 @@ type TypedJSON struct {
7777
ChainOverrides []uint64 `json:"chainOverrides"` // Optional field for chain overrides
7878
}
7979

80+
func parseTypedInput(inputStr string) (TypedJSON, error) {
81+
if inputStr == "" {
82+
return TypedJSON{}, errors.New("input is empty")
83+
}
84+
85+
var inputObject TypedJSON
86+
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
87+
return TypedJSON{}, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
88+
}
89+
if len(inputObject.Payload) == 0 {
90+
return TypedJSON{}, errors.New("'payload' field is required")
91+
}
92+
93+
return inputObject, nil
94+
}
95+
96+
func decodePayload[C any](payload json.RawMessage) (C, error) {
97+
var config C
98+
99+
payloadDecoder := json.NewDecoder(strings.NewReader(string(payload)))
100+
payloadDecoder.DisallowUnknownFields()
101+
if err := payloadDecoder.Decode(&config); err != nil {
102+
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
103+
}
104+
105+
return config, nil
106+
}
107+
80108
// WithJSON returns a fully configured changeset, which pairs a [fdeployment.ChangeSet] with its configuration based
81109
// a JSON input. It also allows extensions, such as a PostProcessing function.
82110
// InputStr must be a JSON object with a "payload" field that contains the actual input data for a Durable Pipeline.
@@ -92,30 +120,13 @@ type TypedJSON struct {
92120
// Note: Prefer WithEnvInput for durable_pipelines.go
93121
func (f WrappedChangeSet[C]) WithJSON(_ C, inputStr string) ConfiguredChangeSet {
94122
return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
95-
var config C
96-
97-
if inputStr == "" {
98-
return config, errors.New("input is empty")
99-
}
100-
101-
var inputObject TypedJSON
102-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
103-
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
104-
}
105-
106-
// If payload is null, decode it as null (which will give zero value)
107-
// If payload is missing, return an error
108-
if len(inputObject.Payload) == 0 {
109-
return config, errors.New("'payload' field is required")
110-
}
111-
112-
payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
113-
payloadDecoder.DisallowUnknownFields()
114-
if err := payloadDecoder.Decode(&config); err != nil {
115-
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
123+
inputObject, err := parseTypedInput(inputStr)
124+
if err != nil {
125+
var zero C
126+
return zero, err
116127
}
117128

118-
return config, nil
129+
return decodePayload[C](inputObject.Payload)
119130
},
120131
inputChainOverrides: func() ([]uint64, error) {
121132
return loadInputChainOverrides(inputStr)
@@ -151,41 +162,36 @@ func (f WrappedChangeSet[C]) WithEnvInput(opts ...EnvInputOption[C]) ConfiguredC
151162

152163
inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")
153164

154-
return ChangeSetImpl[C]{changeset: f, configProvider: func() (C, error) {
155-
var config C
156-
157-
if inputStr == "" {
158-
return config, errors.New("input is empty")
159-
}
160-
161-
var inputObject TypedJSON
162-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
163-
return config, fmt.Errorf("JSON must be in JSON format with 'payload' fields: %w", err)
164-
}
165+
providerFromInput := func(rawInput string) (C, error) {
166+
var zero C
165167

166-
// If payload is null, decode it as null (which will give zero value)
167-
// If payload is missing, return an error
168-
if len(inputObject.Payload) == 0 {
169-
return config, errors.New("'payload' field is required")
168+
inputObject, err := parseTypedInput(rawInput)
169+
if err != nil {
170+
return zero, err
170171
}
171172

172-
payloadDecoder := json.NewDecoder(strings.NewReader(string(inputObject.Payload)))
173-
payloadDecoder.DisallowUnknownFields()
174-
if err := payloadDecoder.Decode(&config); err != nil {
175-
return config, fmt.Errorf("failed to unmarshal payload: %w", err)
173+
config, err := decodePayload[C](inputObject.Payload)
174+
if err != nil {
175+
return zero, err
176176
}
177177

178178
if options.inputModifier != nil {
179-
conf, err := options.inputModifier(config)
180-
if err != nil {
181-
return conf, fmt.Errorf("failed to apply input modifier: %w", err)
179+
conf, modifierErr := options.inputModifier(config)
180+
if modifierErr != nil {
181+
return conf, fmt.Errorf("failed to apply input modifier: %w", modifierErr)
182182
}
183183

184184
return conf, nil
185185
}
186186

187187
return config, nil
188-
},
188+
}
189+
190+
return ChangeSetImpl[C]{changeset: f,
191+
configProvider: func() (C, error) {
192+
return providerFromInput(inputStr)
193+
},
194+
configProviderWithInput: providerFromInput,
189195
inputChainOverrides: func() ([]uint64, error) {
190196
return loadInputChainOverrides(inputStr)
191197
},
@@ -221,21 +227,17 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
221227
// Read input from environment variable
222228
inputStr := os.Getenv("DURABLE_PIPELINE_INPUT")
223229

224-
configProvider := func() (C, error) {
230+
configProviderFromInput := func(rawInput string) (C, error) {
225231
var zero C
226232

227-
if inputStr == "" {
233+
if rawInput == "" {
228234
return zero, errors.New("input is empty")
229235
}
230236

231-
// Parse JSON input
232237
var inputObject TypedJSON
233-
if err := json.Unmarshal([]byte(inputStr), &inputObject); err != nil {
238+
if err := json.Unmarshal([]byte(rawInput), &inputObject); err != nil {
234239
return zero, fmt.Errorf("failed to parse resolver input as JSON: %w", err)
235240
}
236-
237-
// If payload is null, pass it to the resolver (which will receive null)
238-
// If payload field is missing, return an error
239241
if len(inputObject.Payload) == 0 {
240242
return zero, errors.New("'payload' field is required")
241243
}
@@ -249,8 +251,12 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
249251
return typedConfig, nil
250252
}
251253

252-
return ChangeSetImpl[C]{changeset: f, configProvider: configProvider,
253-
ConfigResolver: resolver,
254+
return ChangeSetImpl[C]{changeset: f,
255+
configProvider: func() (C, error) {
256+
return configProviderFromInput(inputStr)
257+
},
258+
configProviderWithInput: configProviderFromInput,
259+
ConfigResolver: resolver,
254260
inputChainOverrides: func() ([]uint64, error) {
255261
return loadInputChainOverrides(inputStr)
256262
},
@@ -260,9 +266,10 @@ func (f WrappedChangeSet[C]) WithConfigResolver(resolver fresolvers.ConfigResolv
260266
var _ ConfiguredChangeSet = ChangeSetImpl[any]{}
261267

262268
type ChangeSetImpl[C any] struct {
263-
changeset WrappedChangeSet[C]
264-
configProvider func() (C, error)
265-
inputChainOverrides func() ([]uint64, error)
269+
changeset WrappedChangeSet[C]
270+
configProvider func() (C, error)
271+
configProviderWithInput func(inputStr string) (C, error)
272+
inputChainOverrides func() ([]uint64, error)
266273

267274
// Present only when the changeset was wired with
268275
// Configure(...).WithConfigResolver(...)
@@ -287,6 +294,22 @@ func (ccs ChangeSetImpl[C]) Apply(env fdeployment.Environment) (fdeployment.Chan
287294
return ccs.changeset.operation.Apply(env, c)
288295
}
289296

297+
func (ccs ChangeSetImpl[C]) applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error) {
298+
if ccs.configProviderWithInput == nil {
299+
return ccs.Apply(env)
300+
}
301+
302+
c, err := ccs.configProviderWithInput(inputStr)
303+
if err != nil {
304+
return fdeployment.ChangesetOutput{}, err
305+
}
306+
if err := ccs.changeset.operation.VerifyPreconditions(env, c); err != nil {
307+
return fdeployment.ChangesetOutput{}, err
308+
}
309+
310+
return ccs.changeset.operation.Apply(env, c)
311+
}
312+
290313
func (ccs ChangeSetImpl[C]) Configurations() (Configurations, error) {
291314
var chainOverrides []uint64
292315
var err error

engine/cld/changeset/postprocess.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,18 @@ func (ccs PostProcessingChangeSetImpl[C]) Apply(env fdeployment.Environment) (fd
3535
return ccs.postProcessor(env, output)
3636
}
3737

38+
func (ccs PostProcessingChangeSetImpl[C]) applyWithInput(
39+
env fdeployment.Environment, inputStr string,
40+
) (fdeployment.ChangesetOutput, error) {
41+
env.Logger.Debugf("Post-processing ChangesetOutput from %T", ccs.changeset.changeset.operation)
42+
output, err := ccs.changeset.applyWithInput(env, inputStr)
43+
if err != nil {
44+
return output, err
45+
}
46+
47+
return ccs.postProcessor(env, output)
48+
}
49+
3850
func (ccs PostProcessingChangeSetImpl[C]) Configurations() (Configurations, error) {
3951
return ccs.changeset.Configurations()
4052
}

engine/cld/changeset/registry.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,11 @@ type hookCarrier interface {
8585
getPostHooks() []PostHook
8686
}
8787

88+
// inputAwareChangeSet supports explicit per-apply input injection.
89+
type inputAwareChangeSet interface {
90+
applyWithInput(env fdeployment.Environment, inputStr string) (fdeployment.ChangesetOutput, error)
91+
}
92+
8893
// newRegistryEntry creates a new registry entry for a changeset.
8994
func newRegistryEntry(c ChangeSet, opts ChangesetConfig) registryEntry {
9095
entry := registryEntry{changeset: c, options: opts}
@@ -172,6 +177,21 @@ func (r *ChangesetsRegistry) AddGlobalPostHooks(hooks ...PostHook) {
172177
// a failed Apply are logged but never mask the Apply error.
173178
func (r *ChangesetsRegistry) Apply(
174179
key string, e fdeployment.Environment,
180+
) (fdeployment.ChangesetOutput, error) {
181+
return r.applyWithInput(key, e, "")
182+
}
183+
184+
// ApplyWithInput applies a changeset with explicit input JSON for this apply invocation.
185+
// Changesets configured to read durable pipeline input from environment variables
186+
// consume this value directly when they support input-aware execution.
187+
func (r *ChangesetsRegistry) ApplyWithInput(
188+
key string, e fdeployment.Environment, inputStr string,
189+
) (fdeployment.ChangesetOutput, error) {
190+
return r.applyWithInput(key, e, inputStr)
191+
}
192+
193+
func (r *ChangesetsRegistry) applyWithInput(
194+
key string, e fdeployment.Environment, inputStr string,
175195
) (fdeployment.ChangesetOutput, error) {
176196
entry, globalPre, globalPost, err := r.getApplySnapshot(key)
177197
if err != nil {
@@ -204,7 +224,13 @@ func (r *ChangesetsRegistry) Apply(
204224
}
205225
}
206226

207-
output, applyErr := entry.changeset.Apply(e)
227+
var output fdeployment.ChangesetOutput
228+
var applyErr error
229+
if inputAware, ok := entry.changeset.(inputAwareChangeSet); ok {
230+
output, applyErr = inputAware.applyWithInput(e, inputStr)
231+
} else {
232+
output, applyErr = entry.changeset.Apply(e)
233+
}
208234

209235
postParams := PostHookParams{
210236
Env: hookEnv,

engine/cld/changeset/registry_test.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,63 @@ func Test_Changesets_Apply(t *testing.T) {
148148
}
149149
}
150150

151+
//nolint:paralleltest // Uses process environment for fallback behavior assertions.
152+
func Test_Changesets_ApplyWithInput_WithEnvConfiguredChangeset(t *testing.T) {
153+
type inputConfig struct {
154+
Value int `json:"value"`
155+
}
156+
157+
t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"value":999}}`)
158+
159+
var received int
160+
cs := fdeployment.CreateChangeSet(
161+
func(_ fdeployment.Environment, cfg inputConfig) (fdeployment.ChangesetOutput, error) {
162+
received = cfg.Value
163+
return fdeployment.ChangesetOutput{}, nil
164+
},
165+
func(_ fdeployment.Environment, _ inputConfig) error { return nil },
166+
)
167+
168+
r := NewChangesetsRegistry()
169+
r.Add("0001_test", Configure(cs).WithEnvInput())
170+
171+
_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"value":1}}`)
172+
require.NoError(t, err)
173+
require.Equal(t, 1, received)
174+
}
175+
176+
//nolint:paralleltest // Uses process environment for fallback behavior assertions.
177+
func Test_Changesets_ApplyWithInput_WithResolverConfiguredChangeset(t *testing.T) {
178+
type resolverInput struct {
179+
Base int `json:"base"`
180+
}
181+
type resolverOutput struct {
182+
Value int `json:"value"`
183+
}
184+
185+
t.Setenv("DURABLE_PIPELINE_INPUT", `{"payload":{"base":100}}`)
186+
187+
resolver := func(input resolverInput) (resolverOutput, error) {
188+
return resolverOutput{Value: input.Base + 10}, nil
189+
}
190+
191+
var received int
192+
cs := fdeployment.CreateChangeSet(
193+
func(_ fdeployment.Environment, cfg resolverOutput) (fdeployment.ChangesetOutput, error) {
194+
received = cfg.Value
195+
return fdeployment.ChangesetOutput{}, nil
196+
},
197+
func(_ fdeployment.Environment, _ resolverOutput) error { return nil },
198+
)
199+
200+
r := NewChangesetsRegistry()
201+
r.Add("0001_test", Configure(cs).WithConfigResolver(resolver))
202+
203+
_, err := r.ApplyWithInput("0001_test", fdeployment.Environment{}, `{"payload":{"base":7}}`)
204+
require.NoError(t, err)
205+
require.Equal(t, 17, received)
206+
}
207+
151208
func Test_Changesets_Add(t *testing.T) {
152209
t.Parallel()
153210

engine/test/runtime/runtime_registered_changesets.go

Lines changed: 3 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package runtime
33
import (
44
"errors"
55
"fmt"
6-
"os"
76

87
"github.com/segmentio/ksuid"
98

@@ -14,14 +13,10 @@ import (
1413
// ExecRegisteredChangesetsFromYAML executes registered changesets from durable-pipeline YAML input.
1514
//
1615
// For each changeset entry in YAML order:
17-
// 1. Set DURABLE_PIPELINE_INPUT from that entry's payload/chainOverrides.
16+
// 1. Build per-entry durable-pipeline JSON input from payload/chainOverrides.
1817
// 2. Create and initialize a fresh registry provider.
19-
// 3. Apply the named changeset against the current runtime environment.
18+
// 3. Apply the named changeset against the current runtime environment with explicit input.
2019
// 4. Merge output into runtime state and regenerate environment for the next step.
21-
//
22-
// Do not run this in parallel. This is not thread-safe. It temporarily mutates the process-wide
23-
// DURABLE_PIPELINE_INPUT environment variable while applying each changeset.
24-
// Once we move the reliance on the environment variable in the implementation, we can remove this restriction.
2520
func (r *Runtime) ExecRegisteredChangesetsFromYAML(
2621
providerFactory func() changeset.RegistryProvider,
2722
inputYAML []byte,
@@ -46,23 +41,11 @@ func (r *Runtime) ExecRegisteredChangesetsFromYAML(
4641
return fmt.Errorf("input file %s has empty 'changesets' array", "runtime-input.yaml")
4742
}
4843

49-
oldInput, hadInput := os.LookupEnv("DURABLE_PIPELINE_INPUT")
50-
defer func() {
51-
if hadInput {
52-
_ = os.Setenv("DURABLE_PIPELINE_INPUT", oldInput)
53-
} else {
54-
_ = os.Unsetenv("DURABLE_PIPELINE_INPUT")
55-
}
56-
}()
57-
5844
for _, cs := range ordered {
5945
inputJSON, err := durablepipeline.BuildChangesetInputJSON(cs.Name, cs.Data)
6046
if err != nil {
6147
return fmt.Errorf("failed to build input for changeset %q in input file %s: %w", cs.Name, "runtime-input.yaml", err)
6248
}
63-
if setEnvErr := os.Setenv("DURABLE_PIPELINE_INPUT", inputJSON); setEnvErr != nil {
64-
return fmt.Errorf("failed to set DURABLE_PIPELINE_INPUT environment variable: %w", setEnvErr)
65-
}
6649

6750
provider := providerFactory()
6851
if provider == nil {
@@ -72,7 +55,7 @@ func (r *Runtime) ExecRegisteredChangesetsFromYAML(
7255
return fmt.Errorf("failed to init registry provider: %w", initErr)
7356
}
7457

75-
out, err := provider.Registry().Apply(cs.Name, r.currentEnv)
58+
out, err := provider.Registry().ApplyWithInput(cs.Name, r.currentEnv, inputJSON)
7659
if err != nil {
7760
return err
7861
}

0 commit comments

Comments
 (0)