[fix][broker] PIP-468: StreamConsumer / CheckpointConsumer DAG-replay with parent-drain ordering#25642
Conversation
…dering The StreamConsumer subscription is controller-driven: its segment assignment comes from SubscriptionCoordinator. Previously computeAssignment hands out only active segments, so a fresh EARLIEST consumer joining after a split couldn't read messages produced before the split — those live on the now-sealed parent and were silently orphaned. The QueueConsumer fix in apache#25611 already extended the queue flow to active+sealed; this change brings StreamConsumer in line and adds a per-subscription ordering guarantee on top. Behavior: - Sealed segments are always included in the assignment. The client drains them via the existing per-segment v4 receive loop; an already-drained segment yields TopicTerminated immediately and the v4 consumer closes. - An active child of a split / merge is held back until *every* parent in the DAG has been drained for this subscription. Without this we'd hand a consumer the child of a just-split segment immediately, which breaks per-key ordering against unread messages still sitting in the parent. Initial active segments (no parents) are unaffected. Implementation: - New SegmentDrainChecker interface — async predicate for "is segment X drained for subscription S". - SubscriptionCoordinator gets the checker + a poll interval; tracks drainedSegmentIds in-memory; runs a periodic poll (2s default) that queries every undrained sealed segment and rebalances when state changes. Simple test constructors pass null and bypass the ordering logic entirely (preserves existing unit-test contracts). - ScalableTopicController.isSegmentDrained looks the segment topic up locally and reads numberOfEntriesInBacklog == 0. If the segment is not loaded on this broker, the check returns false and we retry on the next poll. Multi-broker drain when the controller and the segment owner are on different brokers needs a follow-up segment-aware stats admin endpoint — flagged in a code comment. - Controller.close stops every coordinator's drain poller. Tests: - SubscriptionCoordinatorTest.testActiveChildrenBlockedUntilParentDrained — controllable mock checker; asserts children of segment 0 are excluded until segment 0 is marked drained, then included after the next poll. - V5StreamConsumerDagReplayTest (new): - testEarliestSubscribePostSplitReadsSealedBacklog — fresh EARLIEST consumer reads pre-split sealed backlog AND post-split active. - testEarliestSubscribeAfterTwoSplitsReadsAllBacklog — two-deep DAG; fresh consumer reads every message across both sealed generations. - testSealedBacklogNotRedeliveredAfterFirstConsumerDrains — second consumer joining after the first drained sees nothing (cursor at end on the sealed segment).
…backoff
Two follow-ups on the StreamConsumer DAG-replay change:
1. The drain check used by SubscriptionCoordinator was a local-only lookup
(brokerService.getTopicIfExists), which silently returns "not drained"
when the segment topic is owned by a different broker — children would
then never unblock on a multi-broker cluster.
Add a new admin endpoint
GET /segments/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{sub}/backlog
that the v4 admin routes to the segment topic's owner; the controller
now calls it via getAdminClient().scalableTopics()
.getSegmentSubscriptionBacklogAsync(...). Returns 404 (treated as
"not drained, retry") when the topic / subscription isn't loaded yet.
2. The drain poller was a fixed-cadence scheduleWithFixedDelay (every 2s)
— too aggressive on long-tail backlogs (a stalled consumer can keep a
sealed parent un-drained for hours and we shouldn't be polling its
owner every 2s for the whole window).
Replace with an exponential `Backoff` (initial 2s, max 15min). Reset
on every progress event: a segment is observed drained, the layout
changes (new sealed segments to look at), or a fresh consumer joins.
Each iteration self-schedules the next via the backoff; once every
sealed segment is drained the poller stops and is re-armed only when
one of the events above happens.
Mirror V5StreamConsumerDagReplayTest for CheckpointConsumer. Two paths: - Unmanaged (no consumerGroup) — uses DagWatchClient and reads active + sealed segments client-side. Tests verify a fresh Checkpoint.earliest() consumer reads the pre-split sealed backlog after a single split, and again across a chain of two splits. - Managed (consumerGroup) — goes through SubscriptionCoordinator. Without the change in this commit the parent-drain ordering would block every child of a sealed segment forever, because checkpoint consumers track position client-side via Checkpoints and never create the per-segment cursor that the drain check looks at. Same two scenarios exercised on this path. Plumb the existing ScalableConsumerType through the broker so the SubscriptionCoordinator can decide whether to enforce parent-drain ordering: STREAM enforces (per-key ordering across split), CHECKPOINT and QUEUE skip it (no broker-side cursor / shared delivery semantics). - ServerCnx.handleCommandScalableTopicSubscribe now reads the type from the wire command and forwards it. - ScalableTopicService.registerConsumer / ScalableTopicController.registerConsumer gain a ScalableConsumerType parameter (deprecated overloads keep the old signature working with a STREAM default for unit-test mocks and the failover-restore path). - ScalableTopicController.createCoordinator picks the right SegmentDrainChecker (real one for STREAM, null/disabled for the rest).
|
Code review (Claude Code) Posting three correctness concerns surfaced by a local review with Claude Code. 1. [BUG] Backoff reset is ineffective if a poll is already scheduledIn drainBackoff.reset();
ensureDrainPollerRunning();
2. [BUG]
|
lhotari
left a comment
There was a problem hiding this comment.
LGTM, however Claude Code highlighted some correctness issues (please check the comment to see if these are valid).
…se race, restore default Three bugs flagged in the Claude Code review on PR apache#25642 (thanks @lhotari): 1. Backoff reset was ineffective if a poll was already scheduled. ensureDrainPollerRunning is a no-op when drainPollTask is in flight, so resetting drainBackoff after registerConsumer / onLayoutChange / markSegmentsDrained didn't actually shorten the next check — a fresh EARLIEST consumer joining during a quiet period could wait up to the 15-min cap. Fix: cancel the pending task before re-arming. 2. close() raced with an in-flight pollDrainStatus and could leak a scheduled task. The poll's whenComplete rearm path called ensureDrainPollerRunning without checking whether close() had already run, which would schedule a new task after the controller had gone away. Fix: closed flag, checked in ensureDrainPollerRunning and pollDrainStatus. 3. The restore path defaulted every recovered subscription to STREAM. Picking STREAM as the "conservative default" actually deadlocks CHECKPOINT and QUEUE — those types never drain parents (no per-segment cursor), so their children would stay blocked forever after a leader handoff. Fix: restore creates the coordinator with no drain checker; on the first register the controller installs one if and only if the consumer type is STREAM (new SubscriptionCoordinator.installDrainChecker). Tests added in SubscriptionCoordinatorTest: - testFreshRegisterCancelsBackedOffPollAndRearmsImmediately — exercises (1) by letting the backoff grow, then asserting a fresh register triggers a check well within the initial-delay window. - testCloseRaceWithInFlightPollDoesNotLeakRearm — blocks a drain check in-flight, calls close(), releases the check, asserts no further checks fire afterward. - testRestoredCoordinatorStartsWithoutParentDrainOrdering — restored coordinator hands out every DAG segment immediately (no enforcement). - testInstallDrainCheckerAfterRestoreEnablesOrdering — installing a checker on the coordinator post-restore re-enables parent-drain filtering on the next assignment.
|
@lhotari thanks for the review — addressed all three in 300b3dc:
Four new unit tests in
All tests pass; checkstyle clean. |
Summary
The Stream and Checkpoint consumer subscriptions are controller-driven: their
segment assignment comes from
SubscriptionCoordinator. PreviouslycomputeAssignmenthanded out only active segments, so a fresh EARLIEST consumerjoining after a split couldn't read messages produced before the split — those
live on the now-sealed parent and were silently orphaned. The QueueConsumer fix
in #25611 already extended the queue flow to active+sealed; this PR brings
StreamConsumer / CheckpointConsumer in line and adds a per-subscription parent-drain
ordering guarantee on top.
Behavior
Sealed segments are always included in the assignment. The client drains them
via the existing per-segment v4 receive loop / Reader; an already-drained
segment yields
TopicTerminatedimmediately and the v4 consumer/Reader closes.An active child of a split / merge is held back until every parent in the
DAG has been drained for this subscription. Without this we'd hand a consumer
the child of a just-split segment immediately, which breaks per-key ordering
against unread messages still sitting in the parent. Initial active segments
(no parents) are unaffected.
Parent-drain ordering only applies to STREAM consumers — they're the ones
that need per-key ordering across a split. CHECKPOINT consumers track
position client-side via
Checkpointand never create per-segment subscriptioncursors (they read via Readers), so the ordering machinery would block their
children indefinitely. QUEUE consumers are shared and accept out-of-order
delivery by design. The coordinator skips the ordering filter for both.
Implementation
New
SegmentDrainCheckerinterface — async predicate for "is segment X drainedfor subscription S".
SubscriptionCoordinatorgets the checker + an exponentialBackoff(2sinitial, 15min max). Tracks
drainedSegmentIdsin-memory, runs a self-reschedulingpoll that queries every undrained sealed segment and rebalances when state
changes. The backoff resets on every progress event (drain detected, layout
change, fresh consumer registration); the poller stops entirely once all
sealed segments are drained.
ScalableTopicController.isSegmentDrainedcalls the new admin endpointGET /segments/{tenant}/{ns}/{topic}/{descriptor}/subscription/{sub}/backlog,which the v4 admin routes to the segment topic's owner — works whether the
controller and the segment colocate or not. 404 (topic / subscription not yet
loaded) is treated as "not drained, retry on the next poll".
ScalableConsumerTypeplumbed fromServerCnxthroughScalableTopicService→
ScalableTopicController→SubscriptionCoordinator. The controllerpasses a real drain checker only for STREAM; for CHECKPOINT / QUEUE the
coordinator gets a
nullchecker and skips parent-drain entirely.ScalableTopicController.closestops every coordinator's drain poller.Tests
V5StreamConsumerDagReplayTest(new)testEarliestSubscribePostSplitReadsSealedBacklog— fresh EARLIEST consumerreads pre-split sealed backlog AND post-split active.
testEarliestSubscribeAfterTwoSplitsReadsAllBacklog— two-deep DAG; freshconsumer reads every message across both sealed generations.
testSealedBacklogNotRedeliveredAfterFirstConsumerDrains— second consumerjoining after the first drained sees nothing (cursor at end on the sealed
segment).
V5CheckpointConsumerDagReplayTest(new)Mirror of the Stream test for both unmanaged and managed (consumerGroup) paths,
single-split and two-splits scenarios. Four tests total.
SubscriptionCoordinatorTest.testActiveChildrenBlockedUntilParentDrained(new)Controllable mock checker; asserts children of segment 0 are excluded until
segment 0 is marked drained, then included after the next poll.
Matching PR(s) in forked repositories