Skip to content

Commit 9f5b862

Browse files
NitinKumar004claude
andcommitted
fix: return handler error from subscriber instead of swallowing it
handleSubscription was logging handler errors but returning nil, preventing the retry logic in startSubscriber from triggering. Now the error is propagated so failed messages are retried. Fixes #3215 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d7bea67 commit 9f5b862

2 files changed

Lines changed: 49 additions & 1 deletion

File tree

pkg/gofr/subscriber.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func (s *SubscriptionManager) handleSubscription(ctx context.Context, topic stri
6969
if err != nil {
7070
s.container.Logger.Errorf("error in handler for topic %s: %v", topic, err)
7171

72-
return nil
72+
return err
7373
}
7474

7575
if msg.Committer != nil {

pkg/gofr/subscriber_test.go

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,12 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"testing"
78

9+
"github.com/stretchr/testify/assert"
10+
"github.com/stretchr/testify/require"
11+
12+
"gofr.dev/pkg/gofr/container"
813
"gofr.dev/pkg/gofr/datasource"
914
"gofr.dev/pkg/gofr/datasource/pubsub"
1015
"gofr.dev/pkg/gofr/datasource/pubsub/kafka"
@@ -56,3 +61,46 @@ func (mockSubscriber) Subscribe(ctx context.Context, topic string) (*pubsub.Mess
5661
func (mockSubscriber) Close() error {
5762
return nil
5863
}
64+
65+
var errHandler = errors.New("handler error")
66+
67+
func TestHandleSubscription_HandlerErrorReturned(t *testing.T) {
68+
testContainer, _ := container.NewMockContainer(t)
69+
testContainer.PubSub = mockSubscriber{}
70+
71+
sm := newSubscriptionManager(testContainer)
72+
73+
err := sm.handleSubscription(context.Background(), "test-topic", func(_ *Context) error {
74+
return errHandler
75+
})
76+
77+
require.Error(t, err)
78+
assert.ErrorIs(t, err, errHandler)
79+
}
80+
81+
func TestHandleSubscription_SuccessfulHandler(t *testing.T) {
82+
testContainer, _ := container.NewMockContainer(t)
83+
testContainer.PubSub = mockSubscriber{}
84+
85+
sm := newSubscriptionManager(testContainer)
86+
87+
err := sm.handleSubscription(context.Background(), "test-topic", func(_ *Context) error {
88+
return nil
89+
})
90+
91+
assert.NoError(t, err)
92+
}
93+
94+
func TestHandleSubscription_SubscribeError(t *testing.T) {
95+
testContainer, _ := container.NewMockContainer(t)
96+
testContainer.PubSub = mockSubscriber{}
97+
98+
sm := newSubscriptionManager(testContainer)
99+
100+
err := sm.handleSubscription(context.Background(), "test-err", func(_ *Context) error {
101+
return nil
102+
})
103+
104+
require.Error(t, err)
105+
assert.ErrorIs(t, err, kafka.ErrConsumerGroupNotProvided)
106+
}

0 commit comments

Comments
 (0)