Skip to content

Commit 782a4ec

Browse files
feat(operations): introduce WithIdempotencyKey (#1016)
Operation API cache/report reuse is based on the definition + input of the operation, sometimes we have use cases where the input is the same but the dependency is not the same and user would like the operation for it to be executed instead of being reused with the existing cache/reports. Eg when the chain client passed into dep can be for different chains but implements the same interface. Because of this, users have made [workarounds](https://github.com/smartcontractkit/payments/blob/5462c054fd879a0da77fffd68fa7b0c069d36d2d/changeset/shared/operations/access_control.go#L180-L185) by passing chain-selectors or custom uuid into the input purely for cache breaking reasons. This change introduce an extra hash key that user can provide as a differentiator. The API will also consider this new key as part of the hash key for reusing previous successful results. By providing this key, user avoid having to introduce extra placeholder param in the input struct. ```go result, err = operations.ExecuteOperation(bundle, op, deps, input, operations.WithIdempotencyKey[InputType, DepsType](extra-key)) ``` This change originated from discussing with Rayene and Rens. JIRA: https://smartcontract-it.atlassian.net/browse/CLD-2579
1 parent e4eb841 commit 782a4ec

7 files changed

Lines changed: 109 additions & 29 deletions

File tree

.changeset/curly-hoops-listen.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"chainlink-deployments-framework": minor
3+
---
4+
5+
feat(operations): introduce WithIdempotencyKey

operations/doc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,9 @@ Reporter:
5555
// Force execution and ignore previous successful reports.
5656
result, err = operations.ExecuteOperation(bundle, op, deps, input, operations.WithForceExecute[InputType, DepsType]())
5757
58+
// A different idempotency key runs again; the same key reuses the prior successful report.
59+
result, err = operations.ExecuteOperation(bundle, op, deps, input, operations.WithIdempotencyKey[InputType, DepsType]("extra-key"))
60+
5861
// Execute a sequence.
5962
_, err = operations.ExecuteSequence(bundle, sequence, deps, input)
6063
*/

operations/execute.go

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ type ExecuteConfig[IN, DEP any] struct {
1515
retryConfig RetryConfig[IN, DEP]
1616
// forceExecute controls whether execution should skip execution when previous successful report is found (set by WithForceExecute).
1717
forceExecute bool
18+
// idempotencyKey scopes report reuse beyond operation definition and input (set by WithIdempotencyKey).
19+
idempotencyKey string
1820
}
1921

2022
type ExecuteOption[IN, DEP any] func(*ExecuteConfig[IN, DEP])
@@ -94,6 +96,15 @@ func WithForceExecute[IN, DEP any]() ExecuteOption[IN, DEP] {
9496
}
9597
}
9698

99+
// WithIdempotencyKey is an ExecuteOption that adds an extra component to the idempotency hash.
100+
// The hash will then be built from the operation definition, input, and this key.
101+
// Use it when the same operation input can legitimately produce different results, so a later run should not reuse an earlier result.
102+
func WithIdempotencyKey[IN, DEP any](idempotencyKey string) ExecuteOption[IN, DEP] {
103+
return func(c *ExecuteConfig[IN, DEP]) {
104+
c.idempotencyKey = idempotencyKey
105+
}
106+
}
107+
97108
// WithOperationNRetry is an ExecuteOperationNOption that enables the default retry for each run in ExecuteOperationN.
98109
func WithOperationNRetry[IN, DEP any]() ExecuteOperationNOption[IN, DEP] {
99110
return func(c *ExecuteOperationNConfig[IN, DEP]) {
@@ -151,7 +162,7 @@ func ExecuteOperation[IN, OUT, DEP any](
151162
opt(executeConfig)
152163
}
153164
if !executeConfig.forceExecute {
154-
if previousReport, ok := loadPreviousSuccessfulReport[IN, OUT](b, operation.def, input); ok {
165+
if previousReport, ok := loadPreviousSuccessfulReport[IN, OUT](b, operation.def, input, executeConfig.idempotencyKey); ok {
155166
b.Logger.Infow("Operation already executed. Returning previous result", "id", operation.def.ID,
156167
"version", operation.def.Version, "description", operation.def.Description)
157168

@@ -173,6 +184,7 @@ func ExecuteOperation[IN, OUT, DEP any](
173184
}
174185

175186
report := NewReport(operation.def, input, output, err)
187+
report.IdempotencyKey = executeConfig.idempotencyKey
176188
if err = b.reporter.AddReport(genericReport(report)); err != nil {
177189
return Report[IN, OUT]{}, err
178190
}
@@ -314,7 +326,7 @@ func ExecuteSequence[IN, OUT, DEP any](
314326
return SequenceReport[IN, OUT]{}, fmt.Errorf("sequence %s input: %w", sequence.def.ID, ErrNotSerializable)
315327
}
316328

317-
if previousReport, ok := loadPreviousSuccessfulReport[IN, OUT](b, sequence.def, input); ok {
329+
if previousReport, ok := loadPreviousSuccessfulReport[IN, OUT](b, sequence.def, input, ""); ok {
318330
executionReports, err := b.reporter.GetExecutionReports(previousReport.ID)
319331
if err != nil {
320332
return SequenceReport[IN, OUT]{}, err
@@ -382,14 +394,14 @@ func NewUnrecoverableError(err error) error {
382394
}
383395

384396
func loadPreviousSuccessfulReport[IN, OUT any](
385-
b Bundle, def Definition, input IN,
397+
b Bundle, def Definition, input IN, idempotencyKey string,
386398
) (Report[IN, OUT], bool) {
387399
prevReports, err := b.reporter.GetReports()
388400
if err != nil {
389401
b.Logger.Errorw("Failed to get reports", "error", err)
390402
return Report[IN, OUT]{}, false
391403
}
392-
currentHash, err := constructUniqueHashFrom(b.reportHashCache, def, input)
404+
currentHash, err := constructUniqueHashFrom(b.reportHashCache, def, input, idempotencyKey)
393405
if err != nil {
394406
b.Logger.Errorw("Failed to construct unique hash", "error", err)
395407
return Report[IN, OUT]{}, false
@@ -400,7 +412,7 @@ func loadPreviousSuccessfulReport[IN, OUT any](
400412
for i := len(prevReports) - 1; i >= 0; i-- {
401413
report := prevReports[i]
402414
// Check if operation/sequence was run previously and return the report if successful
403-
reportHash, err := constructUniqueHashFrom(b.reportHashCache, report.Def, report.Input)
415+
reportHash, err := constructUniqueHashFrom(b.reportHashCache, report.Def, report.Input, report.IdempotencyKey)
404416
if err != nil {
405417
b.Logger.Errorw("Failed to construct unique hash for previous report", "error", err)
406418
continue
@@ -430,7 +442,7 @@ func loadSuccessfulExecutionSeriesReports[IN, OUT any](
430442
b.Logger.Errorw("Failed to get reports", "error", err)
431443
return []Report[IN, OUT]{}, false
432444
}
433-
currentHash, err := constructUniqueHashFrom(b.reportHashCache, def, input)
445+
currentHash, err := constructUniqueHashFrom(b.reportHashCache, def, input, "")
434446
if err != nil {
435447
b.Logger.Errorw("Failed to construct unique hash", "error", err)
436448
return []Report[IN, OUT]{}, false
@@ -442,7 +454,7 @@ func loadSuccessfulExecutionSeriesReports[IN, OUT any](
442454
if report.ExecutionSeries == nil || report.ExecutionSeries.ID != seriesID {
443455
continue
444456
}
445-
reportHash, err := constructUniqueHashFrom(b.reportHashCache, report.Def, report.Input)
457+
reportHash, err := constructUniqueHashFrom(b.reportHashCache, report.Def, report.Input, "")
446458
if err != nil {
447459
b.Logger.Errorw("Failed to construct unique hash for previous report", "error", err)
448460
continue

operations/execute_test.go

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,13 +212,29 @@ func Test_ExecuteOperation_WithPreviousRun(t *testing.T) {
212212
assert.Equal(t, 4, res.Output)
213213
assert.Equal(t, 3, handlerCalledTimes)
214214

215+
// same input with a different hash key should execute again
216+
res, err = ExecuteOperation(bundle, op, nil, 1, WithIdempotencyKey[int, any]("other-key"))
217+
require.NoError(t, err)
218+
require.Nil(t, res.Err)
219+
assert.Equal(t, 2, res.Output)
220+
assert.Equal(t, 4, handlerCalledTimes)
221+
idempotencyKeyRunID := res.ID
222+
assert.NotEqual(t, forcedRunID, idempotencyKeyRunID)
223+
224+
// same input and idempotency key should reuse that report
225+
res, err = ExecuteOperation(bundle, op, nil, 1, WithIdempotencyKey[int, any]("other-key"))
226+
require.NoError(t, err)
227+
require.Nil(t, res.Err)
228+
assert.Equal(t, idempotencyKeyRunID, res.ID)
229+
assert.Equal(t, 4, handlerCalledTimes)
230+
215231
// new run with different op, should perform execution
216232
op = NewOperation("plus1-v2", semver.MustParse("2.0.0"), "test operation", handler)
217233
res, err = ExecuteOperation(bundle, op, nil, 1)
218234
require.NoError(t, err)
219235
require.Nil(t, res.Err)
220236
assert.Equal(t, 2, res.Output)
221-
assert.Equal(t, 4, handlerCalledTimes)
237+
assert.Equal(t, 5, handlerCalledTimes)
222238

223239
// new run with op that returns error
224240
res, err = ExecuteOperation(bundle, opWithError, nil, 1)
@@ -718,7 +734,7 @@ func Test_loadPreviousSuccessfulReport(t *testing.T) {
718734
bundle.reporter = tt.setupReporter()
719735
}
720736

721-
report, found := loadPreviousSuccessfulReport[float64, int](bundle, definition, tt.input)
737+
report, found := loadPreviousSuccessfulReport[float64, int](bundle, definition, tt.input, "")
722738
assert.Equal(t, tt.wantFound, found)
723739

724740
if tt.wantFound {

operations/hashing.go

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,24 +9,37 @@ import (
99
"sync"
1010
)
1111

12-
func constructUniqueHashFrom(hashCache *sync.Map, def Definition, input any) (string, error) {
13-
// convert def and input into string to be used as cachekey
14-
// as input can be any type, some types cannot be used as cache argument for sync.map
15-
// converting to string ensure argument is always valid
16-
12+
func buildUniqueHashCacheKey(def Definition, input any, idempotencyKey string) ([]byte, error) {
13+
// convert def and input into bytes used as the sync.Map cache key
14+
// as input can be any type, some types cannot be used as cache argument for sync.Map
1715
key, err := json.Marshal(def)
1816
if err != nil {
19-
return "", err
17+
return nil, err
2018
}
2119

2220
// convert to a canonical representation that's stable regardless of field order
23-
// If the keys are not sorted, the hash generated can be different in maps.
2421
inputBytes, err := canonicalizeJSON(input)
2522
if err != nil {
26-
return "", err
23+
return nil, err
2724
}
2825

2926
key = append(key, inputBytes...)
27+
if idempotencyKey != "" {
28+
// NUL separates canonical input from the idempotency key so concatenations cannot collide
29+
// (for example, input "1" + key "23" vs input "12" + key "3").
30+
key = append(key, 0)
31+
key = append(key, []byte(idempotencyKey)...)
32+
}
33+
34+
return key, nil
35+
}
36+
37+
func constructUniqueHashFrom(hashCache *sync.Map, def Definition, input any, idempotencyKey string) (string, error) {
38+
key, err := buildUniqueHashCacheKey(def, input, idempotencyKey)
39+
if err != nil {
40+
return "", err
41+
}
42+
3043
if cached, ok := hashCache.Load(string(key)); ok {
3144
return cached.(string), nil
3245
}

operations/hashing_test.go

Lines changed: 38 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package operations
22

33
import (
4-
"encoding/json"
54
"math"
65
"sync"
76
"testing"
@@ -12,8 +11,9 @@ import (
1211
)
1312

1413
type argument struct {
15-
def Definition
16-
input any
14+
def Definition
15+
input any
16+
idempotencyKey string
1717
}
1818

1919
func Test_constructUniqueHashFrom(t *testing.T) {
@@ -153,15 +153,45 @@ func Test_constructUniqueHashFrom(t *testing.T) {
153153
},
154154
wantSame: true,
155155
},
156+
{
157+
name: "Same def and input with different idempotency keys should have different hash",
158+
left: argument{
159+
def: definition,
160+
input: Input{A: 1, B: 2},
161+
idempotencyKey: "a",
162+
},
163+
right: argument{
164+
def: definition,
165+
input: Input{A: 1, B: 2},
166+
idempotencyKey: "b",
167+
},
168+
wantSame: false,
169+
skipCacheCheck: true,
170+
},
171+
{
172+
name: "Input and idempotency key concatenation must not collide",
173+
left: argument{
174+
def: definition,
175+
input: 1,
176+
idempotencyKey: "23",
177+
},
178+
right: argument{
179+
def: definition,
180+
input: 12,
181+
idempotencyKey: "3",
182+
},
183+
wantSame: false,
184+
skipCacheCheck: true,
185+
},
156186
}
157187

158188
for _, tt := range tests {
159189
t.Run(tt.name, func(t *testing.T) {
160190
t.Parallel()
161191

162192
cache := &sync.Map{}
163-
hash1, err1 := constructUniqueHashFrom(cache, tt.left.def, tt.left.input)
164-
hash2, err2 := constructUniqueHashFrom(&sync.Map{}, tt.right.def, tt.right.input)
193+
hash1, err1 := constructUniqueHashFrom(cache, tt.left.def, tt.left.input, tt.left.idempotencyKey)
194+
hash2, err2 := constructUniqueHashFrom(&sync.Map{}, tt.right.def, tt.right.input, tt.right.idempotencyKey)
165195

166196
if tt.wantErr != "" {
167197
require.Error(t, err1)
@@ -182,19 +212,15 @@ func Test_constructUniqueHashFrom(t *testing.T) {
182212
}
183213

184214
if !tt.skipCacheCheck {
185-
// Verify cache behavior
186-
defBytes1, err := json.Marshal(tt.left.def)
187-
require.NoError(t, err)
188-
inputBytes1, err := json.Marshal(tt.left.input)
215+
cacheKey1, err := buildUniqueHashCacheKey(tt.left.def, tt.left.input, tt.left.idempotencyKey)
189216
require.NoError(t, err)
190-
cacheKey1 := string(append(defBytes1, inputBytes1...))
191217

192-
cached1, ok := cache.Load(cacheKey1)
218+
cached1, ok := cache.Load(string(cacheKey1))
193219
require.True(t, ok)
194220
assert.Equal(t, hash1, cached1)
195221

196222
// Second call should use cache
197-
cachedHash1, err := constructUniqueHashFrom(cache, tt.left.def, tt.left.input)
223+
cachedHash1, err := constructUniqueHashFrom(cache, tt.left.def, tt.left.input, tt.left.idempotencyKey)
198224
require.NoError(t, err)
199225
assert.Equal(t, hash1, cachedHash1)
200226
}

operations/report.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ type Report[IN, OUT any] struct {
2424
ChildOperationReports []string `json:"childOperationReports"`
2525
// ExecutionSeries is used to track the execution of an operation that was executed multiple times
2626
ExecutionSeries *ExecutionSeries `json:"executionSeries,omitempty"`
27+
// IdempotencyKey is an additional component of the idempotency hash. This is used when the same operation input
28+
// can legitimately produce different results by providing different idempotency keys. Set via WithIdempotencyKey on ExecuteOperation.
29+
IdempotencyKey string `json:"idempotencyKey,omitempty"`
2730
}
2831

2932
// ExecutionSeries is used to track the execution of an operation that was executed multiple times.
@@ -272,6 +275,7 @@ func genericReport[IN, OUT any](r Report[IN, OUT]) Report[any, any] {
272275
Err: r.Err,
273276
ChildOperationReports: r.ChildOperationReports,
274277
ExecutionSeries: r.ExecutionSeries,
278+
IdempotencyKey: r.IdempotencyKey,
275279
}
276280
}
277281

@@ -310,6 +314,7 @@ func typeReport[IN, OUT any](r Report[any, any]) (Report[IN, OUT], bool) {
310314
Err: r.Err,
311315
ChildOperationReports: r.ChildOperationReports,
312316
ExecutionSeries: r.ExecutionSeries,
317+
IdempotencyKey: r.IdempotencyKey,
313318
}, true
314319
}
315320

0 commit comments

Comments
 (0)