Skip to content

Commit 6a6435d

Browse files
committed
fix: return handler error from subscriber instead of swallowing it
handleSubscription was logging handler errors but returning nil, preventing the retry logic in startSubscriber from triggering. Now the error is propagated so failed messages are retried. Fixes #3215
1 parent d7bea67 commit 6a6435d

2 files changed

Lines changed: 49 additions & 1 deletion

File tree

pkg/gofr/subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (s *SubscriptionManager) handleSubscription(ctx context.Context, topic stri
6969
if err != nil {
7070
s.container.Logger.Errorf("error in handler for topic %s: %v", topic, err)
7171

72-
return nil
72+
return err
7373
}
7474

7575
if msg.Committer != nil {

pkg/gofr/subscriber_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"testing"
78

9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
12+
"gofr.dev/pkg/gofr/container"
813
"gofr.dev/pkg/gofr/datasource"
914
"gofr.dev/pkg/gofr/datasource/pubsub"
1015
"gofr.dev/pkg/gofr/datasource/pubsub/kafka"
@@ -56,3 +61,46 @@ func (mockSubscriber) Subscribe(ctx context.Context, topic string) (*pubsub.Mess
5661
func (mockSubscriber) Close() error {
5762
return nil
5863
}
64+
65+
var errHandler = errors.New("handler error")
66+
67+
func TestHandleSubscription_HandlerErrorReturned(t *testing.T) {
68+
testContainer, _ := container.NewMockContainer(t)
69+
testContainer.PubSub = mockSubscriber{}
70+
71+
sm := newSubscriptionManager(testContainer)
72+
73+
err := sm.handleSubscription(context.Background(), "test-topic", func(_ *Context) error {
74+
return errHandler
75+
})
76+
77+
require.Error(t, err)
78+
assert.ErrorIs(t, err, errHandler)
79+
}
80+
81+
func TestHandleSubscription_SuccessfulHandler(t *testing.T) {
82+
testContainer, _ := container.NewMockContainer(t)
83+
testContainer.PubSub = mockSubscriber{}
84+
85+
sm := newSubscriptionManager(testContainer)
86+
87+
err := sm.handleSubscription(context.Background(), "test-topic", func(_ *Context) error {
88+
return nil
89+
})
90+
91+
assert.NoError(t, err)
92+
}
93+
94+
func TestHandleSubscription_SubscribeError(t *testing.T) {
95+
testContainer, _ := container.NewMockContainer(t)
96+
testContainer.PubSub = mockSubscriber{}
97+
98+
sm := newSubscriptionManager(testContainer)
99+
100+
err := sm.handleSubscription(context.Background(), "test-err", func(_ *Context) error {
101+
return nil
102+
})
103+
104+
require.Error(t, err)
105+
assert.ErrorIs(t, err, kafka.ErrConsumerGroupNotProvided)
106+
}

0 commit comments

Comments
 (0)