Skip to content

Commit b8ba2b5

Browse files
committed
[DONTime] Rollout flag for pruning fix
1 parent 9f76f0c commit b8ba2b5

4 files changed

Lines changed: 120 additions & 51 deletions

File tree

pkg/workflows/dontime/pb/dontime.pb.go

Lines changed: 18 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/workflows/dontime/pb/dontime.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ option go_package = "github.com/smartcontractkit/chainlink-common/pkg/workflows/
55
message Observation {
66
int64 timestamp = 1;
77
map<string, int64> requests = 2;
8+
// Flag to roll out execution pruning fix.
9+
// TODO(CRE-2497): Remove after rollout.
10+
bool prune_executions = 3;
811
}
912

1013
message Observations {

pkg/workflows/dontime/plugin.go

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -97,8 +97,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
9797
}
9898

9999
observation := &pb.Observation{
100-
Timestamp: time.Now().UTC().UnixMilli(),
101-
Requests: requests,
100+
Timestamp: time.Now().UTC().UnixMilli(),
101+
Requests: requests,
102+
PruneExecutions: true,
102103
}
103104

104105
return proto.MarshalOptions{Deterministic: true}.Marshal(observation)
@@ -129,14 +130,33 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
129130
prevOutcome.ObservedDonTimes = make(map[string]*pb.ObservedDonTimes)
130131
}
131132

133+
// Unmarshal all observations once and compute pruneExecutions.
134+
// Only prune when all nodes are updated. Even if this rolls back, the logic is still correct.
135+
parsedAOs := make([]*pb.Observation, len(aos))
136+
pruneExecutions := true
132137
for idx, ao := range aos {
133138
observation := &pb.Observation{}
134139
if err := proto.Unmarshal(ao.Observation, observation); err != nil {
135140
p.lggr.Errorf("failed to unmarshal observation in Outcome phase")
136141
continue
137142
}
143+
parsedAOs[idx] = observation
144+
if !observation.PruneExecutions {
145+
pruneExecutions = false // need all nodes to agree
146+
}
147+
}
148+
149+
for idx, observation := range parsedAOs {
150+
if observation == nil {
151+
continue
152+
}
138153

139154
for id, requestSeqNum := range observation.Requests {
155+
if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout
156+
if _, ok := prevOutcome.ObservedDonTimes[id]; !ok {
157+
prevOutcome.ObservedDonTimes[id] = &pb.ObservedDonTimes{}
158+
}
159+
}
140160
var currSeqNum int64
141161
if times, ok := prevOutcome.ObservedDonTimes[id]; ok {
142162
currSeqNum = int64(len(times.Timestamps))
@@ -196,14 +216,23 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
196216

197217
// Remove expired and empty workflow executions
198218
for id, observedTimes := range outcome.ObservedDonTimes {
199-
if observedTimes == nil || len(observedTimes.Timestamps) == 0 {
200-
delete(outcome.ObservedDonTimes, id)
201-
p.store.deleteExecutionID(id)
202-
continue
203-
}
204-
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
205-
delete(outcome.ObservedDonTimes, id)
206-
p.store.deleteExecutionID(id)
219+
if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout
220+
if observedTimes != nil && len(observedTimes.Timestamps) > 0 {
221+
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
222+
delete(outcome.ObservedDonTimes, id)
223+
p.store.deleteExecutionID(id)
224+
}
225+
}
226+
} else {
227+
if observedTimes == nil || len(observedTimes.Timestamps) == 0 {
228+
delete(outcome.ObservedDonTimes, id)
229+
p.store.deleteExecutionID(id)
230+
continue
231+
}
232+
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
233+
delete(outcome.ObservedDonTimes, id)
234+
p.store.deleteExecutionID(id)
235+
}
207236
}
208237
}
209238

pkg/workflows/dontime/plugin_test.go

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -159,28 +159,24 @@ func TestPlugin_Outcome(t *testing.T) {
159159
timestamp := time.Now().UnixMilli()
160160
observations := []*pb.Observation{
161161
{
162-
Timestamp: timestamp,
163-
Requests: map[string]int64{
164-
executionID: 0,
165-
},
162+
Timestamp: timestamp,
163+
Requests: map[string]int64{executionID: 0},
164+
PruneExecutions: true,
166165
},
167166
{
168-
Timestamp: timestamp - int64(time.Second),
169-
Requests: map[string]int64{
170-
executionID: 0,
171-
},
167+
Timestamp: timestamp - int64(time.Second),
168+
Requests: map[string]int64{executionID: 0},
169+
PruneExecutions: true,
172170
},
173171
{
174-
Timestamp: timestamp + int64(time.Second),
175-
Requests: map[string]int64{
176-
executionID: 0,
177-
},
172+
Timestamp: timestamp + int64(time.Second),
173+
Requests: map[string]int64{executionID: 0},
174+
PruneExecutions: true,
178175
},
179176
{
180-
Timestamp: timestamp,
181-
Requests: map[string]int64{
182-
executionID: 0,
183-
},
177+
Timestamp: timestamp,
178+
Requests: map[string]int64{executionID: 0},
179+
PruneExecutions: true,
184180
},
185181
}
186182

@@ -224,8 +220,9 @@ func TestPlugin_Outcome_SequenceNumberHandling(t *testing.T) {
224220
aos := make([]types.AttributedObservation, numNodes)
225221
for i := 0; i < numNodes; i++ {
226222
obs := &pb.Observation{
227-
Timestamp: timestamp + int64(i),
228-
Requests: requests,
223+
Timestamp: timestamp + int64(i),
224+
Requests: requests,
225+
PruneExecutions: true,
229226
}
230227
rawObs, err := proto.Marshal(obs)
231228
require.NoError(t, err)
@@ -434,22 +431,10 @@ func TestPlugin_FinishedExecutions(t *testing.T) {
434431
t.Run("Outcome: remove expired workflow executions", func(t *testing.T) {
435432
timestamp := time.Now().UnixMilli()
436433
observations := []*pb.Observation{
437-
{
438-
Timestamp: timestamp,
439-
Requests: map[string]int64{},
440-
},
441-
{
442-
Timestamp: timestamp - int64(time.Second),
443-
Requests: map[string]int64{},
444-
},
445-
{
446-
Timestamp: timestamp + int64(time.Second),
447-
Requests: map[string]int64{},
448-
},
449-
{
450-
Timestamp: timestamp,
451-
Requests: map[string]int64{},
452-
},
434+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true},
435+
{Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true},
436+
{Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true},
437+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true},
453438
}
454439

455440
aos := make([]types.AttributedObservation, 4)
@@ -484,6 +469,47 @@ func TestPlugin_FinishedExecutions(t *testing.T) {
484469
require.NotContains(t, outcomeProto.ObservedDonTimes, "workflow-123")
485470
})
486471

472+
t.Run("Outcome: legacy path when only half nodes have PruneExecutions set", func(t *testing.T) {
473+
timestamp := time.Now().UnixMilli()
474+
emptyID := "empty-workflow"
475+
476+
// Only 2 of 4 nodes have PruneExecutions=true → pruneExecutions stays false → legacy path.
477+
observations := []*pb.Observation{
478+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true},
479+
{Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true},
480+
{Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: false},
481+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: false},
482+
}
483+
484+
aos := make([]types.AttributedObservation, len(observations))
485+
for i, obs := range observations {
486+
rawObs, err := proto.Marshal(obs)
487+
require.NoError(t, err)
488+
aos[i] = types.AttributedObservation{Observation: rawObs, Observer: commontypes.OracleID(i)}
489+
}
490+
491+
// prevOutcome contains an entry for emptyID with no timestamps.
492+
prevOutcome := &pb.Outcome{
493+
Timestamp: timestamp - 1000,
494+
ObservedDonTimes: map[string]*pb.ObservedDonTimes{
495+
emptyID: {Timestamps: []int64{}},
496+
},
497+
}
498+
prevOutcomeBytes, err := proto.Marshal(prevOutcome)
499+
require.NoError(t, err)
500+
501+
outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos)
502+
require.NoError(t, err)
503+
504+
legacyOutcomeProto := &pb.Outcome{}
505+
err = proto.Unmarshal(outcome, legacyOutcomeProto)
506+
require.NoError(t, err)
507+
508+
// Legacy behavior: empty-timestamps entry is NOT pruned.
509+
require.Contains(t, legacyOutcomeProto.ObservedDonTimes, emptyID)
510+
require.Empty(t, legacyOutcomeProto.ObservedDonTimes[emptyID].Timestamps)
511+
})
512+
487513
t.Run("Transmit: delete removed executionIDs", func(t *testing.T) {
488514
store.setDonTimes("workflow-123", []int64{time.Now().UnixMilli()})
489515

0 commit comments

Comments
 (0)