Skip to content

Commit e8eaeb7

Browse files
Direct send if retransmit disabled (#1897)
* Direct send if retransmit disabled * Extract sendToInbox * Update base_trigger.go * Move log
1 parent 8651b60 commit e8eaeb7

2 files changed

Lines changed: 102 additions & 28 deletions

File tree

pkg/capabilities/base_trigger.go

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package capabilities
22

33
import (
44
"context"
5+
"fmt"
56
"sync"
67
"time"
78

@@ -103,9 +104,18 @@ func NewBaseTriggerCapability[T proto.Message](
103104
}
104105
}
105106

107+
func (b *BaseTriggerCapability[T]) retransmitEnabled() bool {
108+
return b.tRetransmit > 0
109+
}
110+
106111
func (b *BaseTriggerCapability[T]) Start(ctx context.Context) error {
107112
b.lggr.Info("starting base trigger")
108113

114+
if !b.retransmitEnabled() {
115+
b.lggr.Warn("retransmits disabled (tRetransmit <= 0), events will be delivered once without persistence or ACK tracking")
116+
return nil
117+
}
118+
109119
recs, err := b.store.List(ctx)
110120
if err != nil {
111121
b.lggr.Errorf("failed to load persisted trigger events")
@@ -169,6 +179,10 @@ func (b *BaseTriggerCapability[T]) DeliverEvent(
169179
te TriggerEvent,
170180
triggerID string,
171181
) error {
182+
if !b.retransmitEnabled() {
183+
return b.sendToInbox(triggerID, te.ID, te.Payload.GetValue())
184+
}
185+
172186
rec := PendingEvent{
173187
TriggerId: triggerID,
174188
EventId: te.ID,
@@ -192,8 +206,38 @@ func (b *BaseTriggerCapability[T]) DeliverEvent(
192206
return nil
193207
}
194208

209+
// sendToInbox unmarshals the payload and delivers it to the registered inbox channel.
210+
func (b *BaseTriggerCapability[T]) sendToInbox(triggerID, eventID string, payload []byte) error {
211+
b.mu.Lock()
212+
sendCh, ok := b.inboxes[triggerID]
213+
b.mu.Unlock()
214+
215+
if !ok {
216+
b.metrics.IncInboxMissing(triggerID)
217+
return fmt.Errorf("no inbox registered for trigger %s", triggerID)
218+
}
219+
220+
msg := b.newMsg()
221+
if err := proto.Unmarshal(payload, msg); err != nil {
222+
return fmt.Errorf("failed to unmarshal payload: %w", err)
223+
}
224+
225+
wrapped := TriggerAndId[T]{Trigger: msg, Id: eventID}
226+
if !safeSend(sendCh, wrapped) {
227+
b.metrics.IncInboxFull(triggerID)
228+
return fmt.Errorf("inbox full or closed for trigger %s", triggerID)
229+
}
230+
231+
b.lggr.Infof("event dispatched: capability=%s trigger=%s event=%s",
232+
b.capabilityId, triggerID, eventID)
233+
return nil
234+
}
235+
195236
func (b *BaseTriggerCapability[T]) AckEvent(ctx context.Context, triggerId string, eventId string) error {
196-
b.lggr.Infof("Event ACK (triggerID: %s, eventID %s)", triggerId, eventId)
237+
b.lggr.Infow("Event ACK", "triggerID", triggerId, "eventID", eventId)
238+
if !b.retransmitEnabled() {
239+
return nil
240+
}
197241

198242
var (
199243
attempts int
@@ -297,7 +341,6 @@ func (b *BaseTriggerCapability[T]) scanPending() {
297341
// It updates Attempts and LastSentAt on every attempt locally. Success is determined
298342
// later by an AckEvent call.
299343
func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) {
300-
b.lggr.Infof("resending event (triggerID: %s, eventID: %s)", event.TriggerId, event.EventId)
301344
b.mu.Lock()
302345
eventsForTrigger, ok := b.pending[event.TriggerId]
303346
if !ok || eventsForTrigger == nil {
@@ -314,9 +357,7 @@ func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) {
314357
rec.Attempts++
315358
rec.LastSentAt = time.Now()
316359

317-
typeURL := rec.AnyTypeURL
318360
payloadCopy := append([]byte(nil), rec.Payload...)
319-
sendCh, inboxOk := b.inboxes[event.TriggerId]
320361
attempts := rec.Attempts
321362
lastSent := rec.LastSentAt
322363
b.mu.Unlock()
@@ -326,31 +367,10 @@ func (b *BaseTriggerCapability[T]) trySend(event PendingEvent) {
326367
b.lggr.Errorf("failed to persist delivery update for trigger=%s event=%s: %v", event.TriggerId, event.EventId, err)
327368
}
328369

329-
if !inboxOk {
330-
b.metrics.IncInboxMissing(event.TriggerId)
331-
b.lggr.Errorf("no inbox registered for trigger %s", event.TriggerId)
332-
return
333-
}
334-
335-
msg := b.newMsg()
336-
if err := proto.Unmarshal(payloadCopy, msg); err != nil {
337-
b.lggr.Errorf("failed to unmarshal payload to message type (typeURL=%s): %v", typeURL, err)
338-
return
339-
}
340-
341-
wrapped := TriggerAndId[T]{
342-
Trigger: msg,
343-
Id: event.EventId,
344-
}
345-
346-
if !safeSend(sendCh, wrapped) {
347-
b.metrics.IncInboxFull(event.TriggerId)
348-
b.lggr.Warnf("inbox full or closed for trigger %s", event.TriggerId)
370+
if err := b.sendToInbox(event.TriggerId, event.EventId, payloadCopy); err != nil {
371+
b.lggr.Errorf("trySend failed: %v", err)
349372
return
350373
}
351-
352-
b.lggr.Infof("event dispatched: capability =%s trigger=%s event=%s attempt=%d",
353-
b.capabilityId, event.TriggerId, event.EventId, attempts)
354374
}
355375

356376
func safeSend[T any](ch chan<- T, val T) (sent bool) {

pkg/capabilities/base_trigger_test.go

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,14 @@ import (
1313
)
1414

1515
func newBase(t *testing.T, store EventStore) *BaseTriggerCapability[*wrapperspb.BytesValue] {
16+
return newBaseWithRetransmit(t, store, 100*time.Millisecond)
17+
}
18+
19+
func newBaseWithRetransmit(t *testing.T, store EventStore, tRetransmit time.Duration) *BaseTriggerCapability[*wrapperspb.BytesValue] {
1620
lggr, err := logger.New()
1721
require.NoError(t, err)
1822
return NewBaseTriggerCapability(store, func() *wrapperspb.BytesValue { return &wrapperspb.BytesValue{} }, lggr,
19-
"testCap", 100*time.Millisecond, 0, 0)
23+
"testCap", tRetransmit, 0, 0)
2024
}
2125

2226
func ctxWithCancel(t *testing.T) (context.Context, context.CancelFunc) {
@@ -252,3 +256,53 @@ func TestBaseTrigger_UndeliveredStateAlerting(t *testing.T) {
252256
})
253257
}
254258
}
259+
260+
func TestRetransmitDisabled_DeliversOnceWithoutPersistence(t *testing.T) {
261+
store := NewMemEventStore()
262+
sendCh := make(chan TriggerAndId[*wrapperspb.BytesValue], 10)
263+
264+
b := newBaseWithRetransmit(t, store, 0)
265+
ctx := t.Context()
266+
267+
b.RegisterTrigger("trigA", sendCh)
268+
269+
require.NoError(t, b.Start(ctx))
270+
t.Cleanup(func() {
271+
b.Stop()
272+
b.UnregisterTrigger("trigA")
273+
})
274+
275+
te := makeTE(t, "trigA", "e1", []byte("payload"))
276+
require.NoError(t, b.DeliverEvent(ctx, te, "trigA"))
277+
278+
// Should receive the event once
279+
select {
280+
case got := <-sendCh:
281+
require.Equal(t, "e1", got.Id)
282+
case <-time.After(time.Second):
283+
t.Fatal("expected event delivery")
284+
}
285+
286+
// Store should be empty (no persistence)
287+
recs, err := store.List(ctx)
288+
require.NoError(t, err)
289+
require.Empty(t, recs)
290+
291+
// Wait a bit and confirm no retransmits
292+
time.Sleep(200 * time.Millisecond)
293+
select {
294+
case got := <-sendCh:
295+
t.Fatalf("unexpected retransmit: %+v", got)
296+
default:
297+
}
298+
}
299+
300+
func TestRetransmitDisabled_AckIsNoop(t *testing.T) {
301+
store := NewMemEventStore()
302+
b := newBaseWithRetransmit(t, store, 0)
303+
304+
require.NoError(t, b.Start(t.Context()))
305+
t.Cleanup(func() { b.Stop() })
306+
307+
require.NoError(t, b.AckEvent(t.Context(), "anyTrigger", "anyEvent"))
308+
}

0 commit comments

Comments
 (0)