@@ -22,9 +22,11 @@ import (
2222)
2323
2424type pluginMetrics struct {
25- donTime metric.Int64Gauge
26- donTimeEntries metric.Int64Gauge
27- outcomeSize metric.Int64Gauge
25+ donTime metric.Int64Gauge
26+ donTimeEntries metric.Int64Gauge
27+ outcomeSize metric.Int64Gauge
28+ observationBatchOverflow metric.Int64Gauge
29+ outcomeBatchOverflow metric.Int64Gauge
2830}
2931
3032func newPluginMetrics () (pluginMetrics , error ) {
@@ -54,10 +56,28 @@ func newPluginMetrics() (pluginMetrics, error) {
5456 return pluginMetrics {}, fmt .Errorf ("failed to create outcome_size gauge: %w" , err )
5557 }
5658
59+ observationBatchOverflow , err := meter .Int64Gauge ("platform_dontime_observation_batch_overflow" ,
60+ metric .WithDescription ("Number of pending requests excluded from the observation due to batch size limit" ),
61+ metric .WithUnit ("{request}" ),
62+ )
63+ if err != nil {
64+ return pluginMetrics {}, fmt .Errorf ("failed to create observation_batch_overflow gauge: %w" , err )
65+ }
66+
67+ outcomeBatchOverflow , err := meter .Int64Gauge ("platform_dontime_outcome_batch_overflow" ,
68+ metric .WithDescription ("Number of workflow execution entries removed from the outcome due to batch size limit" ),
69+ metric .WithUnit ("{entry}" ),
70+ )
71+ if err != nil {
72+ return pluginMetrics {}, fmt .Errorf ("failed to create outcome_batch_overflow gauge: %w" , err )
73+ }
74+
5775 return pluginMetrics {
58- donTime : donTime ,
59- donTimeEntries : donTimeEntries ,
60- outcomeSize : outcomeSize ,
76+ donTime : donTime ,
77+ donTimeEntries : donTimeEntries ,
78+ outcomeSize : outcomeSize ,
79+ observationBatchOverflow : observationBatchOverflow ,
80+ outcomeBatchOverflow : outcomeBatchOverflow ,
6181 }, nil
6282}
6383
@@ -106,14 +126,33 @@ func (p *Plugin) Query(_ context.Context, _ ocr3types.OutcomeContext) (types.Que
106126 return nil , nil
107127}
108128
109- func (p * Plugin ) Observation (_ context.Context , outctx ocr3types.OutcomeContext , query types.Query ) (types.Observation , error ) {
129+ func sortedRequests (requests map [string ]* Request ) []* Request {
130+ if len (requests ) == 0 {
131+ return nil
132+ }
133+
134+ ids := make ([]string , 0 , len (requests ))
135+ for id := range requests {
136+ ids = append (ids , id )
137+ }
138+ slices .Sort (ids )
139+
140+ sorted := make ([]* Request , 0 , len (ids ))
141+ for _ , id := range ids {
142+ sorted = append (sorted , requests [id ])
143+ }
144+ return sorted
145+ }
146+
147+ func (p * Plugin ) Observation (ctx context.Context , outctx ocr3types.OutcomeContext , query types.Query ) (types.Observation , error ) {
110148 previousOutcome := & pb.Outcome {}
111149 if err := proto .Unmarshal (outctx .PreviousOutcome , previousOutcome ); err != nil {
112150 p .lggr .Errorf ("failed to unmarshal previous outcome in Observation phase" )
113151 }
114152
153+ sortedRequests := sortedRequests (p .store .GetRequests ())
115154 requests := map [string ]int64 {} // Maps executionID --> seqNum
116- for _ , req := range p . store . GetRequests () {
155+ for _ , req := range sortedRequests {
117156 // Validate request sequence number
118157 numObservedDonTimes := 0
119158 times , ok := previousOutcome .ObservedDonTimes [req .WorkflowExecutionID ]
@@ -135,12 +174,27 @@ func (p *Plugin) Observation(_ context.Context, outctx ocr3types.OutcomeContext,
135174 }
136175
137176 requests [req .WorkflowExecutionID ] = int64 (req .SeqNum )
177+ if len (requests ) >= p .batchSize {
178+ break
179+ }
138180 }
139181
182+ overflowCount := len (sortedRequests ) - len (requests )
183+ p .lggr .Debugw ("Observation batch processed" ,
184+ "inputRequests" , len (sortedRequests ),
185+ "batchSize" , p .batchSize ,
186+ "includedRequests" , len (requests ),
187+ "overflowRequests" , overflowCount ,
188+ )
189+ if overflowCount > 0 {
190+ p .lggr .Warnw ("Observation batch overflow" , "overflowRequests" , overflowCount )
191+ }
192+ p .metrics .observationBatchOverflow .Record (ctx , int64 (overflowCount ))
193+
140194 observation := & pb.Observation {
141- Timestamp : time .Now ().UTC ().UnixMilli (),
142- Requests : requests ,
143- PruneExecutions : true ,
195+ Timestamp : time .Now ().UTC ().UnixMilli (),
196+ Requests : requests ,
197+ LimitByBatchSizeFlag : true ,
144198 }
145199
146200 return proto.MarshalOptions {Deterministic : true }.Marshal (observation )
@@ -162,6 +216,7 @@ func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _
162216 OffsetFromMedian int64
163217 }
164218 var timestampNodePairs []timestampNodePair
219+ limitByBatchSizeFlagEnabled := true
165220
166221 prevOutcome := & pb.Outcome {}
167222 if err := proto .Unmarshal (outctx .PreviousOutcome , prevOutcome ); err != nil {
@@ -178,6 +233,10 @@ func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _
178233 continue
179234 }
180235
236+ if ! observation .GetLimitByBatchSizeFlag () {
237+ limitByBatchSizeFlagEnabled = false
238+ }
239+
181240 for id , requestSeqNum := range observation .Requests {
182241 var currSeqNum int64
183242 if times , ok := prevOutcome .ObservedDonTimes [id ]; ok {
@@ -249,13 +308,31 @@ func (p *Plugin) Outcome(ctx context.Context, outctx ocr3types.OutcomeContext, _
249308 }
250309 }
251310
311+ var outcomeBatchOverflowCount int64
312+ if len (outcome .ObservedDonTimes ) > p .batchSize && limitByBatchSizeFlagEnabled {
313+ ids := make ([]string , 0 , len (outcome .ObservedDonTimes ))
314+ for id := range outcome .ObservedDonTimes {
315+ ids = append (ids , id )
316+ }
317+ slices .Sort (ids )
318+ outcomeBatchOverflowCount = int64 (len (ids ) - p .batchSize )
319+ for _ , id := range ids [p .batchSize :] {
320+ delete (outcome .ObservedDonTimes , id )
321+ }
322+ p .lggr .Warnw ("Trimmed outcome observed don times to batch size" ,
323+ "batchSize" , p .batchSize ,
324+ "removedEntries" , outcomeBatchOverflowCount ,
325+ )
326+ }
327+
252328 outcomeBytes , err := proto.MarshalOptions {Deterministic : true }.Marshal (outcome )
253329 p .lggr .Infow ("Outcome computed" ,
254330 "observedDonTimesEntries" , len (outcome .ObservedDonTimes ),
255331 "outcomeSizeBytes" , len (outcomeBytes ),
256332 )
257333 p .metrics .donTime .Record (ctx , outcome .Timestamp )
258334 p .metrics .donTimeEntries .Record (ctx , int64 (len (outcome .ObservedDonTimes )))
335+ p .metrics .outcomeBatchOverflow .Record (ctx , outcomeBatchOverflowCount )
259336 p .metrics .outcomeSize .Record (ctx , int64 (len (outcomeBytes )))
260337 return outcomeBytes , err
261338}
0 commit comments