Skip to content

Commit 3ba9e30

Browse files
committed
Update base_trigger.go
1 parent 41d9453 commit 3ba9e30

1 file changed

Lines changed: 28 additions & 4 deletions

File tree

pkg/capabilities/base_trigger.go

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ func (b *BaseTriggerCapability) Start(ctx context.Context) error {
108108

109109
func (b *BaseTriggerCapability) Stop() {
110110
b.cancel()
111+
b.wg.Wait()
111112
}
112113

113114
func (b *BaseTriggerCapability) DeliverEvent(
@@ -174,15 +175,38 @@ func (b *BaseTriggerCapability) retransmitLoop() {
174175
func (b *BaseTriggerCapability) scanPending() {
175176
now := time.Now()
176177

177-
for _, rec := range b.pending {
178+
b.mu.Lock()
179+
toResend := make([]PendingEvent, 0, len(b.pending))
180+
toLost := make([]PendingEvent, 0)
181+
for k, rec := range b.pending {
182+
// LOST: exceeded max time without ACK
178183
if now.Sub(rec.FirstAt) >= b.tMax {
179-
_ = b.AckEvent(b.ctx, rec.TriggerId, rec.WorkflowId, rec.EventId)
180-
b.lost(b.ctx, *rec)
184+
toLost = append(toLost, *rec)
185+
delete(b.pending, k)
181186
continue
182187
}
188+
189+
// RESEND: hasn't been sent recently enough
183190
if rec.LastSentAt.IsZero() || now.Sub(rec.LastSentAt) >= b.tRetransmit {
184-
_ = b.trySend(b.ctx, rec.TriggerId, rec.WorkflowId, rec.EventId)
191+
toResend = append(toResend, PendingEvent{
192+
TriggerId: rec.TriggerId,
193+
WorkflowId: rec.WorkflowId,
194+
EventId: rec.EventId,
195+
})
196+
}
197+
}
198+
b.mu.Unlock()
199+
200+
for _, rec := range toLost {
201+
err := b.store.Delete(b.ctx, rec.TriggerId, rec.WorkflowId, rec.EventId)
202+
if err != nil {
203+
b.lggr.Errorw("failed to delete event from store")
185204
}
205+
b.lost(b.ctx, rec)
206+
}
207+
208+
for _, k := range toResend {
209+
_ = b.trySend(b.ctx, k.TriggerId, k.WorkflowId, k.EventId)
186210
}
187211
}
188212

0 commit comments

Comments
 (0)