Skip to content

[fix][broker] PIP-468: StreamConsumer / CheckpointConsumer DAG-replay with parent-drain ordering#25642

Merged
merlimat merged 4 commits intoapache:masterfrom
merlimat:st-stream-dag-replay
May 1, 2026
Merged

[fix][broker] PIP-468: StreamConsumer / CheckpointConsumer DAG-replay with parent-drain ordering#25642
merlimat merged 4 commits intoapache:masterfrom
merlimat:st-stream-dag-replay

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 1, 2026

Summary

The Stream and Checkpoint consumer subscriptions are controller-driven: their
segment assignment comes from SubscriptionCoordinator. Previously
computeAssignment handed out only active segments, so a fresh EARLIEST consumer
joining after a split couldn't read messages produced before the split — those
live on the now-sealed parent and were silently orphaned. The QueueConsumer fix
in #25611 already extended the queue flow to active+sealed; this PR brings
StreamConsumer / CheckpointConsumer in line and adds a per-subscription parent-drain
ordering guarantee on top.

Behavior

  • Sealed segments are always included in the assignment. The client drains them
    via the existing per-segment v4 receive loop / Reader; an already-drained
    segment yields TopicTerminated immediately and the v4 consumer/Reader closes.

  • An active child of a split / merge is held back until every parent in the
    DAG has been drained for this subscription. Without this we'd hand a consumer
    the child of a just-split segment immediately, which breaks per-key ordering
    against unread messages still sitting in the parent. Initial active segments
    (no parents) are unaffected.

  • Parent-drain ordering only applies to STREAM consumers — they're the ones
    that need per-key ordering across a split. CHECKPOINT consumers track
    position client-side via Checkpoint and never create per-segment subscription
    cursors (they read via Readers), so the ordering machinery would block their
    children indefinitely. QUEUE consumers are shared and accept out-of-order
    delivery by design. The coordinator skips the ordering filter for both.

Implementation

  • New SegmentDrainChecker interface — async predicate for "is segment X drained
    for subscription S".

  • SubscriptionCoordinator gets the checker + an exponential Backoff (2s
    initial, 15min max). Tracks drainedSegmentIds in-memory, runs a self-rescheduling
    poll that queries every undrained sealed segment and rebalances when state
    changes. The backoff resets on every progress event (drain detected, layout
    change, fresh consumer registration); the poller stops entirely once all
    sealed segments are drained.

  • ScalableTopicController.isSegmentDrained calls the new admin endpoint
    GET /segments/{tenant}/{ns}/{topic}/{descriptor}/subscription/{sub}/backlog,
    which the v4 admin routes to the segment topic's owner — works whether the
    controller and the segment colocate or not. 404 (topic / subscription not yet
    loaded) is treated as "not drained, retry on the next poll".

  • ScalableConsumerType plumbed from ServerCnx through ScalableTopicService
    ScalableTopicControllerSubscriptionCoordinator. The controller
    passes a real drain checker only for STREAM; for CHECKPOINT / QUEUE the
    coordinator gets a null checker and skips parent-drain entirely.

  • ScalableTopicController.close stops every coordinator's drain poller.

Tests

V5StreamConsumerDagReplayTest (new)

  • testEarliestSubscribePostSplitReadsSealedBacklog — fresh EARLIEST consumer
    reads pre-split sealed backlog AND post-split active.
  • testEarliestSubscribeAfterTwoSplitsReadsAllBacklog — two-deep DAG; fresh
    consumer reads every message across both sealed generations.
  • testSealedBacklogNotRedeliveredAfterFirstConsumerDrains — second consumer
    joining after the first drained sees nothing (cursor at end on the sealed
    segment).

V5CheckpointConsumerDagReplayTest (new)

Mirror of the Stream test for both unmanaged and managed (consumerGroup) paths,
single-split and two-splits scenarios. Four tests total.

SubscriptionCoordinatorTest.testActiveChildrenBlockedUntilParentDrained (new)

Controllable mock checker; asserts children of segment 0 are excluded until
segment 0 is marked drained, then included after the next poll.

Matching PR(s) in forked repositories

  • area/broker
  • area/test

…dering

The StreamConsumer subscription is controller-driven: its segment
assignment comes from SubscriptionCoordinator. Previously
computeAssignment hands out only active segments, so a fresh EARLIEST
consumer joining after a split couldn't read messages produced before
the split — those live on the now-sealed parent and were silently
orphaned. The QueueConsumer fix in apache#25611 already extended the queue
flow to active+sealed; this change brings StreamConsumer in line and
adds a per-subscription ordering guarantee on top.

Behavior:

- Sealed segments are always included in the assignment. The client
  drains them via the existing per-segment v4 receive loop; an
  already-drained segment yields TopicTerminated immediately and the
  v4 consumer closes.
- An active child of a split / merge is held back until *every* parent
  in the DAG has been drained for this subscription. Without this we'd
  hand a consumer the child of a just-split segment immediately, which
  breaks per-key ordering against unread messages still sitting in the
  parent. Initial active segments (no parents) are unaffected.

Implementation:

- New SegmentDrainChecker interface — async predicate for "is segment
  X drained for subscription S".
- SubscriptionCoordinator gets the checker + a poll interval; tracks
  drainedSegmentIds in-memory; runs a periodic poll (2s default) that
  queries every undrained sealed segment and rebalances when state
  changes. Simple test constructors pass null and bypass the ordering
  logic entirely (preserves existing unit-test contracts).
- ScalableTopicController.isSegmentDrained looks the segment topic up
  locally and reads numberOfEntriesInBacklog == 0. If the segment is
  not loaded on this broker, the check returns false and we retry on
  the next poll. Multi-broker drain when the controller and the
  segment owner are on different brokers needs a follow-up
  segment-aware stats admin endpoint — flagged in a code comment.
- Controller.close stops every coordinator's drain poller.

Tests:

- SubscriptionCoordinatorTest.testActiveChildrenBlockedUntilParentDrained
  — controllable mock checker; asserts children of segment 0 are
  excluded until segment 0 is marked drained, then included after the
  next poll.
- V5StreamConsumerDagReplayTest (new):
  - testEarliestSubscribePostSplitReadsSealedBacklog — fresh EARLIEST
    consumer reads pre-split sealed backlog AND post-split active.
  - testEarliestSubscribeAfterTwoSplitsReadsAllBacklog — two-deep DAG;
    fresh consumer reads every message across both sealed generations.
  - testSealedBacklogNotRedeliveredAfterFirstConsumerDrains — second
    consumer joining after the first drained sees nothing (cursor at
    end on the sealed segment).
…backoff

Two follow-ups on the StreamConsumer DAG-replay change:

1. The drain check used by SubscriptionCoordinator was a local-only lookup
   (brokerService.getTopicIfExists), which silently returns "not drained"
   when the segment topic is owned by a different broker — children would
   then never unblock on a multi-broker cluster.

   Add a new admin endpoint
       GET /segments/{tenant}/{namespace}/{topic}/{descriptor}/subscription/{sub}/backlog
   that the v4 admin routes to the segment topic's owner; the controller
   now calls it via getAdminClient().scalableTopics()
   .getSegmentSubscriptionBacklogAsync(...). Returns 404 (treated as
   "not drained, retry") when the topic / subscription isn't loaded yet.

2. The drain poller was a fixed-cadence scheduleWithFixedDelay (every 2s)
   — too aggressive on long-tail backlogs (a stalled consumer can keep a
   sealed parent un-drained for hours and we shouldn't be polling its
   owner every 2s for the whole window).

   Replace with an exponential `Backoff` (initial 2s, max 15min). Reset
   on every progress event: a segment is observed drained, the layout
   changes (new sealed segments to look at), or a fresh consumer joins.
   Each iteration self-schedules the next via the backoff; once every
   sealed segment is drained the poller stops and is re-armed only when
   one of the events above happens.
Mirror V5StreamConsumerDagReplayTest for CheckpointConsumer. Two paths:

- Unmanaged (no consumerGroup) — uses DagWatchClient and reads active +
  sealed segments client-side. Tests verify a fresh Checkpoint.earliest()
  consumer reads the pre-split sealed backlog after a single split, and
  again across a chain of two splits.

- Managed (consumerGroup) — goes through SubscriptionCoordinator. Without
  the change in this commit the parent-drain ordering would block every
  child of a sealed segment forever, because checkpoint consumers track
  position client-side via Checkpoints and never create the per-segment
  cursor that the drain check looks at. Same two scenarios exercised on
  this path.

Plumb the existing ScalableConsumerType through the broker so the
SubscriptionCoordinator can decide whether to enforce parent-drain
ordering: STREAM enforces (per-key ordering across split), CHECKPOINT and
QUEUE skip it (no broker-side cursor / shared delivery semantics).

- ServerCnx.handleCommandScalableTopicSubscribe now reads the type from
  the wire command and forwards it.
- ScalableTopicService.registerConsumer / ScalableTopicController.registerConsumer
  gain a ScalableConsumerType parameter (deprecated overloads keep the
  old signature working with a STREAM default for unit-test mocks and the
  failover-restore path).
- ScalableTopicController.createCoordinator picks the right
  SegmentDrainChecker (real one for STREAM, null/disabled for the rest).
@lhotari lhotari added this to the 5.0.0-M1 milestone May 1, 2026
@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 1, 2026

Code review (Claude Code)

Posting three correctness concerns surfaced by a local review with Claude Code.

1. [BUG] Backoff reset is ineffective if a poll is already scheduled

In SubscriptionCoordinator (registerConsumer, onLayoutChange, markSegmentsDrained):

drainBackoff.reset();
ensureDrainPollerRunning();

ensureDrainPollerRunning is a no-op when drainPollTask is still scheduled and not yet cancelled/done. If the poller had backed off to the 15-minute cap during a quiet period, a fresh EARLIEST consumer joining (or a layout change, or an observed drain) will still wait up to 15 minutes for the next drain check — directly contradicting the comment ("kick off drain checks at the shortest delay") and the docstring on DEFAULT_DRAIN_INITIAL_DELAY ("a freshly subscribed EARLIEST consumer doesn't wait"). Fix: cancel the existing scheduled task before re-arming, or have these progress events explicitly reschedule with the reset delay.

2. [BUG] close() can race with an in-flight poll and leak a scheduled task

Also in SubscriptionCoordinator: pollDrainStatus nullifies drainPollTask at the top of its synchronized block, then the async whenComplete callback calls ensureDrainPollerRunning() which schedules a new task. If close() runs in that window, it sees drainPollTask == null and does nothing; the subsequent re-arm then schedules a task that fires after the controller has gone away. There is no closed flag checked in ensureDrainPollerRunning or in the whenComplete rearm path. Fix: add a closed boolean (set in close()) and short-circuit ensureDrainPollerRunning / the whenComplete rearm on it.

3. [BUG] Restore path defaults every recovered subscription to STREAM, deadlocking CHECKPOINT/QUEUE on leader handoff

In ScalableTopicController.createCoordinator(String):

private SubscriptionCoordinator createCoordinator(String subscription) {
    return createCoordinator(subscription, ScalableConsumerType.STREAM);
}

restoreSubscription calls this — meaning after any controller restart / leader change, every restored subscription gets a coordinator with parent-drain enforcement, regardless of the consumer type that originally created it. The Javadoc on registerConsumer confirms the policy is then locked: "The coordinator's setting is fixed at first registration; subsequent registers with a different type still work but won't change the ordering policy."

For a CHECKPOINT subscription on a topic with sealed parents, the parent will never report drained (CHECKPOINT doesn't create per-segment cursors — explicitly noted in the PR body), so children remain blocked indefinitely after a restart. The "conservative default" comment is inverted — STREAM is the blocking mode, so it's the least-conservative default in this context. Fix: persist the subscription type in metadata alongside the subscription so it can be restored faithfully, or use a null checker (no enforcement) on the restore path until a real consumer re-registers and reveals the type.

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, however Claude Code highlighted some correctness issues (please check the comment to see if these are valid).

…se race, restore default

Three bugs flagged in the Claude Code review on PR apache#25642 (thanks @lhotari):

1. Backoff reset was ineffective if a poll was already scheduled.
   ensureDrainPollerRunning is a no-op when drainPollTask is in flight, so
   resetting drainBackoff after registerConsumer / onLayoutChange /
   markSegmentsDrained didn't actually shorten the next check — a fresh
   EARLIEST consumer joining during a quiet period could wait up to the 15-min
   cap. Fix: cancel the pending task before re-arming.

2. close() raced with an in-flight pollDrainStatus and could leak a scheduled
   task. The poll's whenComplete rearm path called ensureDrainPollerRunning
   without checking whether close() had already run, which would schedule a
   new task after the controller had gone away. Fix: closed flag, checked in
   ensureDrainPollerRunning and pollDrainStatus.

3. The restore path defaulted every recovered subscription to STREAM. Picking
   STREAM as the "conservative default" actually deadlocks CHECKPOINT and
   QUEUE — those types never drain parents (no per-segment cursor), so their
   children would stay blocked forever after a leader handoff. Fix: restore
   creates the coordinator with no drain checker; on the first register
   the controller installs one if and only if the consumer type is STREAM
   (new SubscriptionCoordinator.installDrainChecker).

Tests added in SubscriptionCoordinatorTest:

- testFreshRegisterCancelsBackedOffPollAndRearmsImmediately — exercises (1)
  by letting the backoff grow, then asserting a fresh register triggers a
  check well within the initial-delay window.
- testCloseRaceWithInFlightPollDoesNotLeakRearm — blocks a drain check
  in-flight, calls close(), releases the check, asserts no further checks
  fire afterward.
- testRestoredCoordinatorStartsWithoutParentDrainOrdering — restored
  coordinator hands out every DAG segment immediately (no enforcement).
- testInstallDrainCheckerAfterRestoreEnablesOrdering — installing a checker
  on the coordinator post-restore re-enables parent-drain filtering on the
  next assignment.
@merlimat
Copy link
Copy Markdown
Contributor Author

merlimat commented May 1, 2026

@lhotari thanks for the review — addressed all three in 300b3dc:

  1. Backoff reset on a pending task — added resetAndRearmDrainPoll() that cancels the in-flight drainPollTask before resetting and re-arming. Used everywhere a progress event happens (register, layout change, observed drain).

  2. close() race with in-flight poll — added a closed flag set by close(), checked in both ensureDrainPollerRunning and pollDrainStatus so the whenComplete rearm path becomes a no-op once the coordinator has been closed.

  3. Restore-path STREAM default — restore now creates the coordinator with null drain checker (no enforcement). On first register-after-restore, ScalableTopicController installs a real checker via the new SubscriptionCoordinator.installDrainChecker(...) if and only if the consumer type is STREAM. CHECKPOINT / QUEUE subscriptions stay un-blocked. Persisting the type in metadata is the proper long-term fix and noted in the code comment as a follow-up.

Four new unit tests in SubscriptionCoordinatorTest covering each fix:

  • testFreshRegisterCancelsBackedOffPollAndRearmsImmediately
  • testCloseRaceWithInFlightPollDoesNotLeakRearm
  • testRestoredCoordinatorStartsWithoutParentDrainOrdering
  • testInstallDrainCheckerAfterRestoreEnablesOrdering

All tests pass; checkstyle clean.

@merlimat merlimat merged commit 7b3835c into apache:master May 1, 2026
43 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants