Skip to content

Commit 390fbc9

Browse files
committed
[DONTime] Rollout flag for pruning fix
1 parent 9f76f0c commit 390fbc9

4 files changed

Lines changed: 174 additions & 52 deletions

File tree

pkg/workflows/dontime/pb/dontime.pb.go

Lines changed: 18 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/workflows/dontime/pb/dontime.proto

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ option go_package = "github.com/smartcontractkit/chainlink-common/pkg/workflows/
55
message Observation {
66
int64 timestamp = 1;
77
map<string, int64> requests = 2;
8+
// Flag to roll out execution pruning fix.
9+
// TODO(CRE-2497): Remove after rollout.
10+
bool prune_executions = 3;
811
}
912

1013
message Observations {

pkg/workflows/dontime/plugin.go

Lines changed: 93 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"slices"
88
"time"
99

10+
"go.opentelemetry.io/otel/metric"
1011
"google.golang.org/protobuf/proto"
1112
"google.golang.org/protobuf/types/known/structpb"
1213

@@ -15,10 +16,51 @@ import (
1516
"github.com/smartcontractkit/libocr/offchainreporting2plus/types"
1617
"github.com/smartcontractkit/libocr/quorumhelper"
1718

19+
"github.com/smartcontractkit/chainlink-common/pkg/beholder"
1820
"github.com/smartcontractkit/chainlink-common/pkg/logger"
1921
"github.com/smartcontractkit/chainlink-common/pkg/workflows/dontime/pb"
2022
)
2123

24+
type pluginMetrics struct {
25+
donTime metric.Int64Gauge
26+
donTimeEntries metric.Int64Gauge
27+
outcomeSize metric.Int64Gauge
28+
}
29+
30+
func newPluginMetrics() (pluginMetrics, error) {
31+
meter := beholder.GetMeter()
32+
33+
donTime, err := meter.Int64Gauge("dontime_outcome_don_time_ms",
34+
metric.WithDescription("DON consensus timestamp included in the latest outcome, in milliseconds"),
35+
metric.WithUnit("ms"),
36+
)
37+
if err != nil {
38+
return pluginMetrics{}, fmt.Errorf("failed to create don_time gauge: %w", err)
39+
}
40+
41+
donTimeEntries, err := meter.Int64Gauge("dontime_outcome_entries",
42+
metric.WithDescription("Number of workflow execution entries tracked in the latest outcome"),
43+
metric.WithUnit("{entry}"),
44+
)
45+
if err != nil {
46+
return pluginMetrics{}, fmt.Errorf("failed to create don_time_entries gauge: %w", err)
47+
}
48+
49+
outcomeSize, err := meter.Int64Gauge("dontime_outcome_size_bytes",
50+
metric.WithDescription("Serialised size of the latest outcome in bytes"),
51+
metric.WithUnit("By"),
52+
)
53+
if err != nil {
54+
return pluginMetrics{}, fmt.Errorf("failed to create outcome_size gauge: %w", err)
55+
}
56+
57+
return pluginMetrics{
58+
donTime: donTime,
59+
donTimeEntries: donTimeEntries,
60+
outcomeSize: outcomeSize,
61+
}, nil
62+
}
63+
2264
type Plugin struct {
2365
store *Store
2466
config ocr3types.ReportingPluginConfig
@@ -27,6 +69,8 @@ type Plugin struct {
2769

2870
batchSize int
2971
minTimeIncrease int64
72+
73+
metrics pluginMetrics
3074
}
3175

3276
var _ ocr3types.ReportingPlugin[[]byte] = (*Plugin)(nil)
@@ -42,13 +86,19 @@ func NewPlugin(store *Store, config ocr3types.ReportingPluginConfig, offchainCfg
4286
return nil, errors.New("execution removal time must be positive")
4387
}
4488

89+
metrics, err := newPluginMetrics()
90+
if err != nil {
91+
return nil, err
92+
}
93+
4594
return &Plugin{
4695
store: store,
4796
config: config,
4897
offChainConfig: offchainCfg,
4998
lggr: logger.Named(lggr, "DONTimePlugin"),
5099
batchSize: int(offchainCfg.MaxBatchSize),
51100
minTimeIncrease: offchainCfg.MinTimeIncrease / int64(time.Millisecond),
101+
metrics: metrics,
52102
}, nil
53103
}
54104

@@ -97,8 +147,9 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
97147
}
98148

99149
observation := &pb.Observation{
100-
Timestamp: time.Now().UTC().UnixMilli(),
101-
Requests: requests,
150+
Timestamp: time.Now().UTC().UnixMilli(),
151+
Requests: requests,
152+
PruneExecutions: true,
102153
}
103154

104155
return proto.MarshalOptions{Deterministic: true}.Marshal(observation)
@@ -112,7 +163,7 @@ func (p *Plugin) ObservationQuorum(_ context.Context, _ ocr3types.OutcomeContext
112163
return quorumhelper.ObservationCountReachesObservationQuorum(quorumhelper.QuorumTwoFPlusOne, p.config.N, p.config.F, aos), nil
113164
}
114165

115-
func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
166+
func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _ types.Query, aos []types.AttributedObservation) (ocr3types.Outcome, error) {
116167
observationCounts := map[string]int64{} // counts how many nodes reported where a new DON timestamp might be needed
117168
type timestampNodePair struct {
118169
Timestamp int64
@@ -129,14 +180,33 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
129180
prevOutcome.ObservedDonTimes = make(map[string]*pb.ObservedDonTimes)
130181
}
131182

183+
// Unmarshal all observations once and compute pruneExecutions.
184+
// Only prune when all nodes are updated. Even if this rolls back, the logic is still correct.
185+
parsedAOs := make([]*pb.Observation, len(aos))
186+
pruneExecutions := true
132187
for idx, ao := range aos {
133188
observation := &pb.Observation{}
134189
if err := proto.Unmarshal(ao.Observation, observation); err != nil {
135190
p.lggr.Errorf("failed to unmarshal observation in Outcome phase")
136191
continue
137192
}
193+
parsedAOs[idx] = observation
194+
if !observation.PruneExecutions {
195+
pruneExecutions = false // need all nodes to agree
196+
}
197+
}
198+
199+
for idx, observation := range parsedAOs {
200+
if observation == nil {
201+
continue
202+
}
138203

139204
for id, requestSeqNum := range observation.Requests {
205+
if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout
206+
if _, ok := prevOutcome.ObservedDonTimes[id]; !ok {
207+
prevOutcome.ObservedDonTimes[id] = &pb.ObservedDonTimes{}
208+
}
209+
}
140210
var currSeqNum int64
141211
if times, ok := prevOutcome.ObservedDonTimes[id]; ok {
142212
currSeqNum = int64(len(times.Timestamps))
@@ -196,14 +266,23 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
196266

197267
// Remove expired and empty workflow executions
198268
for id, observedTimes := range outcome.ObservedDonTimes {
199-
if observedTimes == nil || len(observedTimes.Timestamps) == 0 {
200-
delete(outcome.ObservedDonTimes, id)
201-
p.store.deleteExecutionID(id)
202-
continue
203-
}
204-
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
205-
delete(outcome.ObservedDonTimes, id)
206-
p.store.deleteExecutionID(id)
269+
if !pruneExecutions { // TODO(CRE-2497): legacy behavior, remove after rollout
270+
if observedTimes != nil && len(observedTimes.Timestamps) > 0 {
271+
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
272+
delete(outcome.ObservedDonTimes, id)
273+
p.store.deleteExecutionID(id)
274+
}
275+
}
276+
} else {
277+
if observedTimes == nil || len(observedTimes.Timestamps) == 0 {
278+
delete(outcome.ObservedDonTimes, id)
279+
p.store.deleteExecutionID(id)
280+
continue
281+
}
282+
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
283+
delete(outcome.ObservedDonTimes, id)
284+
p.store.deleteExecutionID(id)
285+
}
207286
}
208287
}
209288

@@ -212,6 +291,9 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
212291
"observedDonTimesEntries", len(outcome.ObservedDonTimes),
213292
"outcomeSizeBytes", len(outcomeBytes),
214293
)
294+
p.metrics.donTime.Record(ctx, outcome.Timestamp)
295+
p.metrics.donTimeEntries.Record(ctx, int64(len(outcome.ObservedDonTimes)))
296+
p.metrics.outcomeSize.Record(ctx, int64(len(outcomeBytes)))
215297
return outcomeBytes, err
216298
}
217299

pkg/workflows/dontime/plugin_test.go

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -159,28 +159,24 @@ func TestPlugin_Outcome(t *testing.T) {
159159
timestamp := time.Now().UnixMilli()
160160
observations := []*pb.Observation{
161161
{
162-
Timestamp: timestamp,
163-
Requests: map[string]int64{
164-
executionID: 0,
165-
},
162+
Timestamp: timestamp,
163+
Requests: map[string]int64{executionID: 0},
164+
PruneExecutions: true,
166165
},
167166
{
168-
Timestamp: timestamp - int64(time.Second),
169-
Requests: map[string]int64{
170-
executionID: 0,
171-
},
167+
Timestamp: timestamp - int64(time.Second),
168+
Requests: map[string]int64{executionID: 0},
169+
PruneExecutions: true,
172170
},
173171
{
174-
Timestamp: timestamp + int64(time.Second),
175-
Requests: map[string]int64{
176-
executionID: 0,
177-
},
172+
Timestamp: timestamp + int64(time.Second),
173+
Requests: map[string]int64{executionID: 0},
174+
PruneExecutions: true,
178175
},
179176
{
180-
Timestamp: timestamp,
181-
Requests: map[string]int64{
182-
executionID: 0,
183-
},
177+
Timestamp: timestamp,
178+
Requests: map[string]int64{executionID: 0},
179+
PruneExecutions: true,
184180
},
185181
}
186182

@@ -224,8 +220,9 @@ func TestPlugin_Outcome_SequenceNumberHandling(t *testing.T) {
224220
aos := make([]types.AttributedObservation, numNodes)
225221
for i := 0; i < numNodes; i++ {
226222
obs := &pb.Observation{
227-
Timestamp: timestamp + int64(i),
228-
Requests: requests,
223+
Timestamp: timestamp + int64(i),
224+
Requests: requests,
225+
PruneExecutions: true,
229226
}
230227
rawObs, err := proto.Marshal(obs)
231228
require.NoError(t, err)
@@ -434,22 +431,10 @@ func TestPlugin_FinishedExecutions(t *testing.T) {
434431
t.Run("Outcome: remove expired workflow executions", func(t *testing.T) {
435432
timestamp := time.Now().UnixMilli()
436433
observations := []*pb.Observation{
437-
{
438-
Timestamp: timestamp,
439-
Requests: map[string]int64{},
440-
},
441-
{
442-
Timestamp: timestamp - int64(time.Second),
443-
Requests: map[string]int64{},
444-
},
445-
{
446-
Timestamp: timestamp + int64(time.Second),
447-
Requests: map[string]int64{},
448-
},
449-
{
450-
Timestamp: timestamp,
451-
Requests: map[string]int64{},
452-
},
434+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true},
435+
{Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true},
436+
{Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true},
437+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true},
453438
}
454439

455440
aos := make([]types.AttributedObservation, 4)
@@ -484,6 +469,47 @@ func TestPlugin_FinishedExecutions(t *testing.T) {
484469
require.NotContains(t, outcomeProto.ObservedDonTimes, "workflow-123")
485470
})
486471

472+
t.Run("Outcome: legacy path when only half nodes have PruneExecutions set", func(t *testing.T) {
473+
timestamp := time.Now().UnixMilli()
474+
emptyID := "empty-workflow"
475+
476+
// Only 2 of 4 nodes have PruneExecutions=true → pruneExecutions stays false → legacy path.
477+
observations := []*pb.Observation{
478+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: true},
479+
{Timestamp: timestamp - int64(time.Second), Requests: map[string]int64{}, PruneExecutions: true},
480+
{Timestamp: timestamp + int64(time.Second), Requests: map[string]int64{}, PruneExecutions: false},
481+
{Timestamp: timestamp, Requests: map[string]int64{}, PruneExecutions: false},
482+
}
483+
484+
aos := make([]types.AttributedObservation, len(observations))
485+
for i, obs := range observations {
486+
rawObs, err := proto.Marshal(obs)
487+
require.NoError(t, err)
488+
aos[i] = types.AttributedObservation{Observation: rawObs, Observer: commontypes.OracleID(i)}
489+
}
490+
491+
// prevOutcome contains an entry for emptyID with no timestamps.
492+
prevOutcome := &pb.Outcome{
493+
Timestamp: timestamp - 1000,
494+
ObservedDonTimes: map[string]*pb.ObservedDonTimes{
495+
emptyID: {Timestamps: []int64{}},
496+
},
497+
}
498+
prevOutcomeBytes, err := proto.Marshal(prevOutcome)
499+
require.NoError(t, err)
500+
501+
outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos)
502+
require.NoError(t, err)
503+
504+
legacyOutcomeProto := &pb.Outcome{}
505+
err = proto.Unmarshal(outcome, legacyOutcomeProto)
506+
require.NoError(t, err)
507+
508+
// Legacy behavior: empty-timestamps entry is NOT pruned.
509+
require.Contains(t, legacyOutcomeProto.ObservedDonTimes, emptyID)
510+
require.Empty(t, legacyOutcomeProto.ObservedDonTimes[emptyID].Timestamps)
511+
})
512+
487513
t.Run("Transmit: delete removed executionIDs", func(t *testing.T) {
488514
store.setDonTimes("workflow-123", []int64{time.Now().UnixMilli()})
489515

0 commit comments

Comments
 (0)