Skip to content

Commit 90713c6

Browse files
DON Time: clean ObservedDonTimes (#1904)
* Prune dontime store properly * Update plugin.go * Log state size * Add tests --------- Co-authored-by: Prashant Yadav <prashant.yadav@smartcontract.com>
1 parent 08016f5 commit 90713c6

4 files changed

Lines changed: 246 additions & 11 deletions

File tree

pkg/workflows/dontime/plugin.go

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -140,11 +140,11 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
140140
}
141141

142142
for id, requestSeqNum := range observation.Requests {
143-
if _, ok := prevOutcome.ObservedDonTimes[id]; !ok {
144-
prevOutcome.ObservedDonTimes[id] = &pb.ObservedDonTimes{}
143+
var currSeqNum int64
144+
if times, ok := prevOutcome.ObservedDonTimes[id]; ok {
145+
currSeqNum = int64(len(times.Timestamps))
145146
}
146147
// We only count requests for the next sequence number and ignore all other ones.
147-
currSeqNum := int64(len(prevOutcome.ObservedDonTimes[id].Timestamps))
148148
if requestSeqNum == currSeqNum {
149149
observationCounts[id]++
150150
} else if requestSeqNum > currSeqNum {
@@ -197,17 +197,25 @@ func (p *Plugin) Outcome(_ context.Context, outctx ocr3types.OutcomeContext, _ t
197197
}
198198
}
199199

200-
// Remove expired workflow executions
200+
// Remove expired and empty workflow executions
201201
for id, observedTimes := range outcome.ObservedDonTimes {
202-
if observedTimes != nil && len(observedTimes.Timestamps) > 0 {
203-
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
204-
delete(outcome.ObservedDonTimes, id)
205-
p.store.deleteExecutionID(id)
206-
}
202+
if observedTimes == nil || len(observedTimes.Timestamps) == 0 {
203+
delete(outcome.ObservedDonTimes, id)
204+
p.store.deleteExecutionID(id)
205+
continue
206+
}
207+
if donTime >= observedTimes.Timestamps[0]+p.offChainConfig.ExecutionRemovalTime.AsDuration().Milliseconds() {
208+
delete(outcome.ObservedDonTimes, id)
209+
p.store.deleteExecutionID(id)
207210
}
208211
}
209212

210-
return proto.MarshalOptions{Deterministic: true}.Marshal(outcome)
213+
outcomeBytes, err := proto.MarshalOptions{Deterministic: true}.Marshal(outcome)
214+
p.lggr.Infow("Outcome computed",
215+
"observedDonTimesEntries", len(outcome.ObservedDonTimes),
216+
"outcomeSizeBytes", len(outcomeBytes),
217+
)
218+
return outcomeBytes, err
211219
}
212220

213221
func (p *Plugin) Reports(_ context.Context, _ uint64, outcome ocr3types.Outcome) ([]ocr3types.ReportPlus[[]byte], error) {

pkg/workflows/dontime/plugin_test.go

Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,210 @@ func TestPlugin_Outcome(t *testing.T) {
213213
require.Equal(t, []int64{timestamp}, outcomeProto.ObservedDonTimes[executionID].Timestamps)
214214
}
215215

216+
func TestPlugin_Outcome_SequenceNumberHandling(t *testing.T) {
217+
lggr := logger.Test(t)
218+
config, offchainCfg := newTestPluginConfig(t), newTestPluginOffchainConfig(t)
219+
ctx := t.Context()
220+
221+
makeObservations := func(t *testing.T, timestamp int64, requests map[string]int64, numNodes int) []types.AttributedObservation {
222+
t.Helper()
223+
aos := make([]types.AttributedObservation, numNodes)
224+
for i := 0; i < numNodes; i++ {
225+
obs := &pb.Observation{
226+
Timestamp: timestamp + int64(i),
227+
Requests: requests,
228+
}
229+
rawObs, err := proto.Marshal(obs)
230+
require.NoError(t, err)
231+
aos[i] = types.AttributedObservation{
232+
Observation: rawObs,
233+
Observer: commontypes.OracleID(i),
234+
}
235+
}
236+
return aos
237+
}
238+
239+
t.Run("new execution ID not in previous outcome defaults currSeqNum to 0", func(t *testing.T) {
240+
store := NewStore(DefaultRequestTimeout)
241+
plugin, err := NewPlugin(store, config, offchainCfg, lggr)
242+
require.NoError(t, err)
243+
244+
executionID := "new-workflow"
245+
_ = store.RequestDonTime(executionID, 0)
246+
247+
timestamp := time.Now().UnixMilli()
248+
aos := makeObservations(t, timestamp, map[string]int64{executionID: 0}, 4)
249+
250+
prevOutcome := &pb.Outcome{
251+
Timestamp: 0,
252+
ObservedDonTimes: map[string]*pb.ObservedDonTimes{},
253+
}
254+
prevOutcomeBytes, err := proto.Marshal(prevOutcome)
255+
require.NoError(t, err)
256+
257+
query, err := plugin.Query(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes})
258+
require.NoError(t, err)
259+
260+
outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos)
261+
require.NoError(t, err)
262+
263+
outcomeProto := &pb.Outcome{}
264+
err = proto.Unmarshal(outcome, outcomeProto)
265+
require.NoError(t, err)
266+
267+
require.Contains(t, outcomeProto.ObservedDonTimes, executionID)
268+
require.Len(t, outcomeProto.ObservedDonTimes[executionID].Timestamps, 1)
269+
})
270+
271+
t.Run("nil ObservedDonTimes in previous outcome does not panic", func(t *testing.T) {
272+
store := NewStore(DefaultRequestTimeout)
273+
plugin, err := NewPlugin(store, config, offchainCfg, lggr)
274+
require.NoError(t, err)
275+
276+
executionID := "nil-map-workflow"
277+
_ = store.RequestDonTime(executionID, 0)
278+
279+
timestamp := time.Now().UnixMilli()
280+
aos := makeObservations(t, timestamp, map[string]int64{executionID: 0}, 4)
281+
282+
prevOutcome := &pb.Outcome{
283+
Timestamp: 0,
284+
ObservedDonTimes: nil,
285+
}
286+
prevOutcomeBytes, err := proto.Marshal(prevOutcome)
287+
require.NoError(t, err)
288+
289+
query, err := plugin.Query(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes})
290+
require.NoError(t, err)
291+
292+
outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos)
293+
require.NoError(t, err)
294+
295+
outcomeProto := &pb.Outcome{}
296+
err = proto.Unmarshal(outcome, outcomeProto)
297+
require.NoError(t, err)
298+
299+
require.Contains(t, outcomeProto.ObservedDonTimes, executionID)
300+
require.Len(t, outcomeProto.ObservedDonTimes[executionID].Timestamps, 1)
301+
})
302+
303+
t.Run("existing execution ID uses len(Timestamps) as currSeqNum", func(t *testing.T) {
304+
store := NewStore(DefaultRequestTimeout)
305+
plugin, err := NewPlugin(store, config, offchainCfg, lggr)
306+
require.NoError(t, err)
307+
308+
executionID := "existing-workflow"
309+
_ = store.RequestDonTime(executionID, 1)
310+
311+
timestamp := time.Now().UnixMilli()
312+
aos := makeObservations(t, timestamp, map[string]int64{executionID: 1}, 4)
313+
314+
prevTimestamp := timestamp - 1000 // 1 second ago in millis
315+
prevOutcome := &pb.Outcome{
316+
Timestamp: prevTimestamp,
317+
ObservedDonTimes: map[string]*pb.ObservedDonTimes{
318+
executionID: {Timestamps: []int64{prevTimestamp}},
319+
},
320+
}
321+
prevOutcomeBytes, err := proto.Marshal(prevOutcome)
322+
require.NoError(t, err)
323+
324+
query, err := plugin.Query(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes})
325+
require.NoError(t, err)
326+
327+
outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos)
328+
require.NoError(t, err)
329+
330+
outcomeProto := &pb.Outcome{}
331+
err = proto.Unmarshal(outcome, outcomeProto)
332+
require.NoError(t, err)
333+
334+
require.Contains(t, outcomeProto.ObservedDonTimes, executionID)
335+
require.Len(t, outcomeProto.ObservedDonTimes[executionID].Timestamps, 2)
336+
})
337+
338+
t.Run("stale sequence number is ignored", func(t *testing.T) {
339+
store := NewStore(DefaultRequestTimeout)
340+
plugin, err := NewPlugin(store, config, offchainCfg, lggr)
341+
require.NoError(t, err)
342+
343+
executionID := "stale-workflow"
344+
345+
timestamp := time.Now().UnixMilli()
346+
// Observations report seqNum 0, but prevOutcome already has 2 timestamps (currSeqNum=2)
347+
aos := makeObservations(t, timestamp, map[string]int64{executionID: 0}, 4)
348+
349+
prevTimestamp := timestamp - 1000 // 1 second ago in millis
350+
prevOutcome := &pb.Outcome{
351+
Timestamp: prevTimestamp,
352+
ObservedDonTimes: map[string]*pb.ObservedDonTimes{
353+
executionID: {Timestamps: []int64{
354+
prevTimestamp - 1000,
355+
prevTimestamp,
356+
}},
357+
},
358+
}
359+
prevOutcomeBytes, err := proto.Marshal(prevOutcome)
360+
require.NoError(t, err)
361+
362+
query, err := plugin.Query(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes})
363+
require.NoError(t, err)
364+
365+
outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos)
366+
require.NoError(t, err)
367+
368+
outcomeProto := &pb.Outcome{}
369+
err = proto.Unmarshal(outcome, outcomeProto)
370+
require.NoError(t, err)
371+
372+
// Stale seqNum 0 should be ignored, so timestamps should remain unchanged at 2
373+
require.Len(t, outcomeProto.ObservedDonTimes[executionID].Timestamps, 2)
374+
})
375+
376+
t.Run("mix of new and existing execution IDs", func(t *testing.T) {
377+
store := NewStore(DefaultRequestTimeout)
378+
plugin, err := NewPlugin(store, config, offchainCfg, lggr)
379+
require.NoError(t, err)
380+
381+
existingID := "existing-workflow"
382+
newID := "new-workflow"
383+
_ = store.RequestDonTime(existingID, 1)
384+
_ = store.RequestDonTime(newID, 0)
385+
386+
timestamp := time.Now().UnixMilli()
387+
requests := map[string]int64{
388+
existingID: 1,
389+
newID: 0,
390+
}
391+
aos := makeObservations(t, timestamp, requests, 4)
392+
393+
prevTimestamp := timestamp - 1000 // 1 second ago in millis
394+
prevOutcome := &pb.Outcome{
395+
Timestamp: prevTimestamp,
396+
ObservedDonTimes: map[string]*pb.ObservedDonTimes{
397+
existingID: {Timestamps: []int64{prevTimestamp}},
398+
},
399+
}
400+
prevOutcomeBytes, err := proto.Marshal(prevOutcome)
401+
require.NoError(t, err)
402+
403+
query, err := plugin.Query(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes})
404+
require.NoError(t, err)
405+
406+
outcome, err := plugin.Outcome(ctx, ocr3types.OutcomeContext{PreviousOutcome: prevOutcomeBytes}, query, aos)
407+
require.NoError(t, err)
408+
409+
outcomeProto := &pb.Outcome{}
410+
err = proto.Unmarshal(outcome, outcomeProto)
411+
require.NoError(t, err)
412+
413+
require.Contains(t, outcomeProto.ObservedDonTimes, existingID)
414+
require.Len(t, outcomeProto.ObservedDonTimes[existingID].Timestamps, 2)
415+
require.Contains(t, outcomeProto.ObservedDonTimes, newID)
416+
require.Len(t, outcomeProto.ObservedDonTimes[newID].Timestamps, 1)
417+
})
418+
}
419+
216420
func TestPlugin_FinishedExecutions(t *testing.T) {
217421
lggr := logger.Test(t)
218422
store := NewStore(DefaultRequestTimeout)
@@ -280,11 +484,16 @@ func TestPlugin_FinishedExecutions(t *testing.T) {
280484
})
281485

282486
t.Run("Transmit: delete removed executionIDs", func(t *testing.T) {
487+
store.setDonTimes("workflow-123", []int64{time.Now().UnixMilli()})
488+
283489
r := ocr3types.ReportWithInfo[[]byte]{}
284490
r.Report, err = proto.Marshal(outcomeProto)
285491
require.NoError(t, err)
286492
err = transmitter.Transmit(ctx, types.ConfigDigest{}, 0, r, []types.AttributedOnchainSignature{})
287493
require.NoError(t, err)
494+
495+
_, err = store.GetDonTimes("workflow-123")
496+
require.ErrorContains(t, err, "no don time for executionID workflow-123")
288497
})
289498
}
290499

pkg/workflows/dontime/store.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,22 @@ func (s *Store) setDonTimes(executionID string, donTimes []int64) {
117117
s.donTimes[executionID] = donTimes
118118
}
119119

120+
func (s *Store) replaceDonTimes(donTimes map[string][]int64) {
121+
s.mu.Lock()
122+
defer s.mu.Unlock()
123+
124+
for executionID, timestamps := range donTimes {
125+
s.donTimes[executionID] = timestamps
126+
}
127+
128+
for executionID := range s.donTimes {
129+
if _, ok := donTimes[executionID]; !ok {
130+
delete(s.donTimes, executionID)
131+
delete(s.requests, executionID)
132+
}
133+
}
134+
}
135+
120136
func (s *Store) GetLastObservedDonTime() int64 {
121137
s.mu.Lock()
122138
defer s.mu.Unlock()

pkg/workflows/dontime/transmitter.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,11 @@ func (t *Transmitter) Transmit(_ context.Context, _ types.ConfigDigest, _ uint64
3333
return err
3434
}
3535

36+
currentDonTimes := make(map[string][]int64, len(outcome.ObservedDonTimes))
3637
for id, observedDonTimes := range outcome.ObservedDonTimes {
37-
t.store.setDonTimes(id, observedDonTimes.Timestamps)
38+
currentDonTimes[id] = observedDonTimes.Timestamps
3839
}
40+
t.store.replaceDonTimes(currentDonTimes)
3941
t.store.setLastObservedDonTime(outcome.Timestamp)
4042

4143
t.lggr.Infow("Transmitting timestamps", "lastObservedDonTime", outcome.Timestamp)

0 commit comments

Comments
 (0)