From 567d3696fddccd640dbe266a2d284f8d1121cc18 Mon Sep 17 00:00:00 2001 From: Nitin Kumar Date: Thu, 14 May 2026 17:56:53 +0530 Subject: [PATCH] fix(subscriber): propagate handler error to engage startSubscriber backoff MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously, when a handler returned a non-panic error, handleSubscription logged it and returned nil. That kept startSubscriber's loop tight: it never tripped the err != nil branch, so the 2s backoff never engaged and the loop hot-spun. This change keeps the existing error log and returns the error so startSubscriber backs off for 2s before re-subscribing. The recovered- panic sentinel (errSubscriberHandlerPanic, added in #3424) still returns nil so panicRecovery's log isn't duplicated. Also reset the loop's delay to 0 after a successful iteration — without that, a single failure pinned subsequent successful iterations at the 2s cadence forever. Note: this does not change commit semantics. The handler-error path never reached msg.Commit() before; the broker still redelivers. --- pkg/gofr/subscriber.go | 6 +++++- pkg/gofr/subscriber_test.go | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/gofr/subscriber.go b/pkg/gofr/subscriber.go index ee5bbd99c..ef019eaed 100644 --- a/pkg/gofr/subscriber.go +++ b/pkg/gofr/subscriber.go @@ -43,7 +43,11 @@ func (s *SubscriptionManager) startSubscriber(ctx context.Context, topic string, s.container.Logger.Errorf("error in subscription for topic %s: %v", topic, err) delay = time.Second * 2 + + continue } + + delay = 0 } } } @@ -80,7 +84,7 @@ func (s *SubscriptionManager) handleSubscription(ctx context.Context, topic stri } if err != nil { s.container.Logger.Errorf("error in handler for topic %s: %v", topic, err) - return nil + return err } if msg.Committer != nil { diff --git a/pkg/gofr/subscriber_test.go b/pkg/gofr/subscriber_test.go index d9bcd677f..47be84ad3 100644 --- a/pkg/gofr/subscriber_test.go +++ b/pkg/gofr/subscriber_test.go @@ -148,7 +148,7 @@ func TestSubscriptionManager_handleSubscription_HandlerErrorDoesNotCommit(t *tes s := SubscriptionManager{container: c} err := s.handleSubscription(t.Context(), topicHandleSubErr, func(*Context) error { return errHandlerFail }) - require.NoError(t, err) + require.ErrorIs(t, err, errHandlerFail, "handler error must propagate so startSubscriber can back off") require.NotNil(t, ps.lastCommitter) require.Zero(t, ps.lastCommitter.n, "handler error must not commit") }