Skip to content

[fix][client] Fix failed to close consumer because of the error: param memorySize is a negative value#25805

Open
poorbarcode wants to merge 4 commits into
apache:masterfrom
poorbarcode:fix/msg_lost_2
Open

[fix][client] Fix failed to close consumer because of the error: param memorySize is a negative value#25805
poorbarcode wants to merge 4 commits into
apache:masterfrom
poorbarcode:fix/msg_lost_2

Conversation

@poorbarcode
Copy link
Copy Markdown
Contributor

@poorbarcode poorbarcode commented May 18, 2026

Motivation

Time/Threads consumer.receiveMessage Close Pulsar Source Receive messages from Server
1 push a message into consumer.incomingMessages
2 pop a message from consumer.incomingMessages
3 reduce incomingMessagesSize: 0 -> -45
4 Close Pulsar Source
5 Close producer
6 Failed due to Try to reserve/release memory failed, the param memorySize is a negative value
7 plus consumer.incomingMessagesSize

Modifications

  • Let increasing incomingMessagesSize and decreasing incomingMessagesSize run in the same thread
    • Since it is hard to add a test to reproduce the issue, I skipped writing the test
  • Improve logs

Other things to do

The accuracy of consumer.incomingMessagesSize(which was only used for batch receive API) still has issues, but this PR does not involve any related fixes

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

…ose consumer because of the error: param memorySize is a negative value
@poorbarcode poorbarcode self-assigned this May 18, 2026
@poorbarcode poorbarcode added this to the 5.0.0-M1 milestone May 18, 2026
@poorbarcode poorbarcode added the type/bug The PR fixed a bug or issue reported a bug label May 18, 2026
if (receivedFuture == null) {
if (getState() != State.Closing && getState() != State.Closed) {
log.error().attr("message", message)
.attr("receivedFuture-polled-out", null)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the meaning of this?

                    .attr("receivedFuture-polled-out", null)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready-to-test release/4.0.11 release/4.2.2 type/bug The PR fixed a bug or issue reported a bug

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants