[fix][broker] Handle synchronous schema lookup failures in replication#26108
[fix][broker] Handle synchronous schema lookup failures in replication#26108Denovo1998 wants to merge 3 commits into
Conversation
void-ptr974
left a comment
There was a problem hiding this comment.
Thanks for the fix. The cleanup path makes sense to me.
I left a few comments around exception handling, retry behavior, and test coverage.
| CompletableFuture<SchemaInfo> schemaFuture; | ||
| try { | ||
| schemaFuture = getSchemaInfo(msg); | ||
| } catch (Exception e) { |
There was a problem hiding this comment.
Would it be better to narrow this catch to the expected exception type? Since getSchemaInfo only declares ExecutionException, catching all Exceptions could accidentally turn unrelated bugs into schema retry loops. Another option might be to normalize getSchemaInfo to return a failed future and then reuse the existing schemaFuture.isCompletedExceptionally() path.
There was a problem hiding this comment.
Good point. I narrowed the catch to ExecutionException and normalize that synchronous schema lookup failure into a failed future, so unexpected exceptions are no longer converted into schema retry loops while the existing schema future cleanup path is reused.
| headersAndPayload.release(); | ||
| msg.recycle(); | ||
| skipRemainingMessages = true; | ||
| doRewindCursor(false); |
There was a problem hiding this comment.
This path can immediately rewind and re-read the same entry if the synchronous schema lookup failure persists. replicateEntries() returns false, so readEntriesComplete() may call readMoreEntries() right away.
One way to avoid a tight retry loop is to keep the replicator in the cursor-rewinding wait state and schedule doRewindCursor(true) after a small backoff instead of rewinding immediately.
There was a problem hiding this comment.
Agreed. The exceptional schema future path now keeps the replicator in the cursor-rewinding wait state and schedules doRewindCursor(true) after MESSAGE_RATE_BACKOFF_MS. Successful schema fetches still rewind immediately.
| return null; | ||
| }).when(entry).release(); | ||
|
|
||
| List<Entry> entries = List.of(entry); |
There was a problem hiding this comment.
This test only covers the current entry cleanup. It does not verify the batch behavior after skipRemainingMessages is set.
Please extend it to use a multi-entry batch and verify that the remaining entries are skipped/released, completedEntries reaches the full batch size, and the cursor is rewound.
There was a problem hiding this comment.
Extended the regression test to use a multi-entry batch. It now verifies the remaining entry is skipped and released, completedEntries reaches the full batch size, and cursor rewind/read retry is triggered only by the scheduled backoff task.
|
Thanks for the update. The main concerns look addressed. One small follow-up: after this path calls |
I added a guard in readEntriesComplete() to skip the outer readMoreEntries() while the replicator is already waiting for cursor rewind. This leaves the scheduled doRewindCursor(true) as the path that resumes reads. I also updated the regression test to exercise readEntriesComplete() end-to-end and verify no extra read is triggered before the scheduled rewind runs. |
void-ptr974
left a comment
There was a problem hiding this comment.
LGTM. Thanks for addressing the comments.
Motivation
Geo replication pauses and rewinds the cursor when a replicated message needs schema information that is not immediately available. However, if the local schema lookup throws synchronously before returning a future, the current entry is not cleaned up through the schema-fetch path.
This can leave the in-flight task permit incomplete and skip releasing the entry resources for the failed message.
Modifications
getSchemaInfo(msg)inGeoPersistentReplicator.Verifying this change
Make sure that the change passes the CI checks.
gradlew :pulsar-broker:test --tests org.apache.pulsar.broker.service.persistent.GeoPersistentReplicatorTest -PtestRetryCount=0`
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes