Skip to content

[fix][broker] Handle synchronous schema lookup failures in replication#26108

Open
Denovo1998 wants to merge 3 commits into
apache:masterfrom
Denovo1998:handle_synchronous_schema_lookup_failures_in_replication
Open

[fix][broker] Handle synchronous schema lookup failures in replication#26108
Denovo1998 wants to merge 3 commits into
apache:masterfrom
Denovo1998:handle_synchronous_schema_lookup_failures_in_replication

Conversation

@Denovo1998

@Denovo1998 Denovo1998 commented Jun 29, 2026

Copy link
Copy Markdown
Contributor

Motivation

Geo replication pauses and rewinds the cursor when a replicated message needs schema information that is not immediately available. However, if the local schema lookup throws synchronously before returning a future, the current entry is not cleaned up through the schema-fetch path.

This can leave the in-flight task permit incomplete and skip releasing the entry resources for the failed message.

Modifications

  • Catch synchronous failures from getSchemaInfo(msg) in GeoPersistentReplicator.
  • Release the current entry, retained payload buffer, and recycled message on that failure path.
  • Mark the current in-flight entry as completed before rewinding the cursor.
  • Add a regression test covering synchronous schema lookup failure cleanup.

Verifying this change

  • Make sure that the change passes the CI checks.

  • gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.GeoPersistentReplicatorTest -PtestRetryCount=0`

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

@void-ptr974 void-ptr974 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the fix. The cleanup path makes sense to me.

I left a few comments around exception handling, retry behavior, and test coverage.

CompletableFuture<SchemaInfo> schemaFuture;
try {
schemaFuture = getSchemaInfo(msg);
} catch (Exception e) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to narrow this catch to the expected exception type? Since getSchemaInfo only declares ExecutionException, catching all Exceptions could accidentally turn unrelated bugs into schema retry loops. Another option might be to normalize getSchemaInfo to return a failed future and then reuse the existing schemaFuture.isCompletedExceptionally() path.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. I narrowed the catch to ExecutionException and normalize that synchronous schema lookup failure into a failed future, so unexpected exceptions are no longer converted into schema retry loops while the existing schema future cleanup path is reused.

headersAndPayload.release();
msg.recycle();
skipRemainingMessages = true;
doRewindCursor(false);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path can immediately rewind and re-read the same entry if the synchronous schema lookup failure persists. replicateEntries() returns false, so readEntriesComplete() may call readMoreEntries() right away.

One way to avoid a tight retry loop is to keep the replicator in the cursor-rewinding wait state and schedule doRewindCursor(true) after a small backoff instead of rewinding immediately.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. The exceptional schema future path now keeps the replicator in the cursor-rewinding wait state and schedules doRewindCursor(true) after MESSAGE_RATE_BACKOFF_MS. Successful schema fetches still rewind immediately.

return null;
}).when(entry).release();

List<Entry> entries = List.of(entry);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test only covers the current entry cleanup. It does not verify the batch behavior after skipRemainingMessages is set.

Please extend it to use a multi-entry batch and verify that the remaining entries are skipped/released, completedEntries reaches the full batch size, and the cursor is rewound.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extended the regression test to use a multi-entry batch. It now verifies the remaining entry is skipped and released, completedEntries reaches the full batch size, and cursor rewind/read retry is triggered only by the scheduled backoff task.

@void-ptr974

Copy link
Copy Markdown
Contributor

Thanks for the update. The main concerns look addressed.

One small follow-up: after this path calls beforeTerminateOrCursorRewinding(...), replicateEntries() returns false, so readEntriesComplete() will still call readMoreEntries(). That call should not start a read while waitForCursorRewindingRefCnf > 0, but it may schedule another delayed retry. Could we skip the outer readMoreEntries() when the replicator is already waiting for cursor rewind, and let the scheduled doRewindCursor(true) resume reads instead?

@Denovo1998

Copy link
Copy Markdown
Contributor Author

Thanks for the update. The main concerns look addressed.

One small follow-up: after this path calls beforeTerminateOrCursorRewinding(...), replicateEntries() returns false, so readEntriesComplete() will still call readMoreEntries(). That call should not start a read while waitForCursorRewindingRefCnf > 0, but it may schedule another delayed retry. Could we skip the outer readMoreEntries() when the replicator is already waiting for cursor rewind, and let the scheduled doRewindCursor(true) resume reads instead?

I added a guard in readEntriesComplete() to skip the outer readMoreEntries() while the replicator is already waiting for cursor rewind. This leaves the scheduled doRewindCursor(true) as the path that resumes reads. I also updated the regression test to exercise readEntriesComplete() end-to-end and verify no extra read is triggered before the scheduled rewind runs.

@void-ptr974 void-ptr974 left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for addressing the comments.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants