consumer: fix pulsar consumer stuck after broker restart#4904
consumer: fix pulsar consumer stuck after broker restart#4904
Conversation
|
[APPROVALNOTIFIER] This PR is NOT APPROVED This pull-request has been approved by: The full list of commands accepted by this bot can be found here. DetailsNeeds approval from an approver in each of these files:Approvers can indicate their approval by writing |
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 58 minutes and 33 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (1)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
@wk989898: The following test failed, say
Full PR test history. Your PR dashboard. DetailsInstructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
There was a problem hiding this comment.
Code Review
This pull request refactors the message consumption logic in cmd/pulsar-consumer/consumer.go by replacing the channel-based selection with a direct call to pulsarConsumer.Receive(ctx). Feedback suggests that this change causes context cancellations to be logged as errors rather than informational messages, which could be misleading during graceful shutdowns. It is recommended to check for context cancellation specifically to maintain the previous logging behavior.
| if err != nil { | ||
| log.Error("Receive message error", zap.Error(err)) | ||
| return errors.Trace(err) | ||
| } |
There was a problem hiding this comment.
The current implementation logs an error when the context is cancelled (e.g., during a graceful shutdown), as pulsarConsumer.Receive(ctx) returns context.Canceled. This is misleading because context cancellation is an expected way to stop the consumer. It's better to check for this specific error and log it at the Info level, consistent with the previous logic.
if err != nil {
if errors.Cause(err) == context.Canceled {
log.Info("terminating: context cancelled")
return errors.Trace(err)
}
log.Error("Receive message error", zap.Error(err))
return errors.Trace(err)
}
What problem does this PR solve?
Issue Number: close #4834
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note