Skip to content

Commit 2dd27f1

Browse files
BaseTrigger Pending events gauge (#1952)
* Pending events guage * Add stuck event metrics * Update base_trigger.go
1 parent 720567e commit 2dd27f1

2 files changed

Lines changed: 77 additions & 2 deletions

File tree

pkg/capabilities/base_trigger.go

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ type BaseTriggerMetrics interface {
4545
IncAckError(reason string)
4646
// IncAckMemoryOutcome records how an ACK related to the in-memory pending map: hit, miss_no_trigger_bucket, miss_no_event, miss_nil_record.
4747
IncAckMemoryOutcome(outcome string)
48+
// AddPendingEvents adjusts the live gauge of events awaiting ACK. Positive on insert, negative on ACK/unregister.
49+
AddPendingEvents(delta int64)
50+
// IncStuckEvent increments the live gauge of events stuck past the critical undelivered threshold.
51+
// Keyed by (capability_id, trigger_id, event_id) so you can see exactly which events are stuck.
52+
IncStuckEvent(triggerID, eventID string)
53+
// DecStuckEvent decrements the stuck-event gauge when a previously-critical event is ACKed or unregistered.
54+
DecStuckEvent(triggerID, eventID string)
4855
}
4956

5057
type undeliveredState struct {
@@ -185,6 +192,10 @@ func (b *BaseTriggerCapability[T]) Start(ctx context.Context) error {
185192
}
186193
b.mu.Unlock()
187194

195+
if n := int64(len(recs)); n > 0 {
196+
b.metrics.AddPendingEvents(n)
197+
}
198+
188199
b.wg.Add(1)
189200
go func() {
190201
defer b.wg.Done()
@@ -212,14 +223,32 @@ func (b *BaseTriggerCapability[T]) RegisterTrigger(triggerID string, sendCh chan
212223
func (b *BaseTriggerCapability[T]) UnregisterTrigger(triggerID string) {
213224
b.mu.Lock()
214225
_, existed := b.inboxes[triggerID]
226+
pendingCount := int64(len(b.pending[triggerID]))
227+
228+
var criticalEvents []string
229+
if m, ok := b.undeliveredAlertStates[triggerID]; ok {
230+
for eventID, s := range m {
231+
if s != nil && s.emittedCritical {
232+
criticalEvents = append(criticalEvents, eventID)
233+
}
234+
}
235+
}
236+
215237
delete(b.inboxes, triggerID)
216238
delete(b.pending, triggerID)
217239
delete(b.undeliveredAlertStates, triggerID)
218240
b.mu.Unlock()
219241

242+
for _, eventID := range criticalEvents {
243+
b.metrics.DecStuckEvent(triggerID, eventID)
244+
}
245+
220246
if existed {
221247
b.metrics.DecActiveTriggers()
222248
}
249+
if pendingCount > 0 {
250+
b.metrics.AddPendingEvents(-pendingCount)
251+
}
223252

224253
if err := b.store.DeleteEventsForTrigger(b.ctx, triggerID); err != nil {
225254
b.lggr.Errorf("Failed to delete events for trigger (TriggerID=%s): %v", triggerID, err)
@@ -258,6 +287,7 @@ func (b *BaseTriggerCapability[T]) DeliverEvent(
258287
b.pending[triggerID][te.ID] = &rec
259288
b.mu.Unlock()
260289

290+
b.metrics.AddPendingEvents(1)
261291
b.trySend(rec)
262292
return nil
263293
}
@@ -327,14 +357,22 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin
327357
b.metrics.IncAckMemoryOutcome("miss_no_trigger_bucket")
328358
}
329359

360+
var wasCritical bool
330361
if m, ok := b.undeliveredAlertStates[triggerId]; ok {
362+
if s, exists := m[eventId]; exists && s != nil && s.emittedCritical {
363+
wasCritical = true
364+
}
331365
delete(m, eventId)
332366
if len(m) == 0 {
333367
delete(b.undeliveredAlertStates, triggerId)
334368
}
335369
}
336370
b.mu.Unlock()
337371

372+
if wasCritical {
373+
b.metrics.DecStuckEvent(triggerId, eventId)
374+
}
375+
338376
switch {
339377
case found:
340378
b.lggr.Infow("base trigger ACK matched in-memory pending event",
@@ -343,6 +381,7 @@ func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId strin
343381
b.metrics.IncAckMemoryOutcome("hit")
344382
b.metrics.IncAck(triggerId, eventId)
345383
b.metrics.ObserveTimeToAck(triggerId, eventId, time.Since(firstAt), attempts)
384+
b.metrics.AddPendingEvents(-1)
346385
case hadNilPendingRecord:
347386
b.lggr.Warnw("base trigger ACK: pending map had nil record for event (treating as miss; reconciling store)",
348387
"capabilityID", b.capabilityId, "triggerID", triggerId, "eventID", eventId)
@@ -396,8 +435,8 @@ func (b *BaseTriggerCapability[T]) scanPending() {
396435
return
397436
}
398437

399-
warnThreshold := 5 * interval
400-
critThreshold := 20 * interval
438+
warnThreshold := 1 * interval
439+
critThreshold := 3 * interval
401440

402441
b.mu.Lock()
403442
toResend := make([]PendingEvent, 0, len(b.pending))
@@ -433,6 +472,7 @@ func (b *BaseTriggerCapability[T]) scanPending() {
433472

434473
if critThreshold > 0 && !state.emittedCritical && age >= critThreshold {
435474
b.metrics.EmitUndeliveredCritical(triggerID, eventID)
475+
b.metrics.IncStuckEvent(triggerID, eventID)
436476
state.emittedCritical = true
437477
}
438478
}

pkg/capabilities/base_trigger_metrics.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ type BaseTriggerBeholderMetrics struct {
2323
timeToAckMs metric.Int64Histogram
2424
ackAttempts metric.Int64Histogram // attempts distribution at ACK time
2525
activeRegistrations metric.Int64UpDownCounter
26+
pendingEvents metric.Int64UpDownCounter
27+
stuckEvents metric.Int64UpDownCounter
2628
}
2729

2830
var _ BaseTriggerMetrics = &BaseTriggerBeholderMetrics{}
@@ -75,6 +77,16 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
7577
return nil, err
7678
}
7779

80+
pendingEvents, err := beholder.GetMeter().Int64UpDownCounter("capabilities_base_trigger_pending_events")
81+
if err != nil {
82+
return nil, err
83+
}
84+
85+
stuckEvents, err := beholder.GetMeter().Int64UpDownCounter("capabilities_base_trigger_stuck_events")
86+
if err != nil {
87+
return nil, err
88+
}
89+
7890
return &BaseTriggerBeholderMetrics{
7991
capabilityID: capabilityID,
8092
retryCount: retryCount,
@@ -88,6 +100,8 @@ func NewBaseTriggerBeholderMetrics(capabilityID string) (BaseTriggerMetrics, err
88100
timeToAckMs: timeToAckMs,
89101
ackAttempts: ackAttempts,
90102
activeRegistrations: activeRegistrations,
103+
pendingEvents: pendingEvents,
104+
stuckEvents: stuckEvents,
91105
}, nil
92106
}
93107

@@ -180,6 +194,24 @@ func (m *BaseTriggerBeholderMetrics) EmitUndeliveredCritical(triggerID, eventID
180194
)
181195
}
182196

197+
func (m *BaseTriggerBeholderMetrics) AddPendingEvents(delta int64) {
198+
m.pendingEvents.Add(context.Background(), delta,
199+
metric.WithAttributes(attribute.String("capability_id", m.capabilityID)),
200+
)
201+
}
202+
203+
func (m *BaseTriggerBeholderMetrics) IncStuckEvent(triggerID, eventID string) {
204+
m.stuckEvents.Add(context.Background(), 1,
205+
metric.WithAttributes(m.attrs(triggerID, eventID)...),
206+
)
207+
}
208+
209+
func (m *BaseTriggerBeholderMetrics) DecStuckEvent(triggerID, eventID string) {
210+
m.stuckEvents.Add(context.Background(), -1,
211+
metric.WithAttributes(m.attrs(triggerID, eventID)...),
212+
)
213+
}
214+
183215
type noopBaseTriggerMetrics struct{}
184216

185217
var _ BaseTriggerMetrics = &noopBaseTriggerMetrics{}
@@ -195,3 +227,6 @@ func (noopBaseTriggerMetrics) EmitUndeliveredWarning(string, string)
195227
func (noopBaseTriggerMetrics) EmitUndeliveredCritical(string, string) {}
196228
func (noopBaseTriggerMetrics) IncAckError(string) {}
197229
func (noopBaseTriggerMetrics) IncAckMemoryOutcome(string) {}
230+
func (noopBaseTriggerMetrics) AddPendingEvents(int64) {}
231+
func (noopBaseTriggerMetrics) IncStuckEvent(string, string) {}
232+
func (noopBaseTriggerMetrics) DecStuckEvent(string, string) {}

0 commit comments

Comments
 (0)