3131import org .apache .pulsar .broker .resources .ScalableTopicResources ;
3232import org .apache .pulsar .broker .service .BrokerService ;
3333import org .apache .pulsar .broker .service .TransportCnx ;
34+ import org .apache .pulsar .common .api .proto .ScalableConsumerType ;
3435import org .apache .pulsar .common .naming .TopicName ;
3536import org .apache .pulsar .common .scalable .HashRange ;
3637import org .apache .pulsar .common .scalable .SegmentInfo ;
@@ -155,7 +156,32 @@ private CompletableFuture<Void> restoreSubscription(String subscription) {
155156 });
156157 }
157158
159+ /**
160+ * Restore-path entry: consumer type isn't persisted in metadata yet, so we don't
161+ * know whether the original subscription was STREAM (needs parent-drain ordering)
162+ * or CHECKPOINT / QUEUE (mustn't have it — CHECKPOINT never drains parents because
163+ * it doesn't create per-segment cursors). Default to <em>no enforcement</em>; on the
164+ * first register-after-restore the controller calls
165+ * {@link SubscriptionCoordinator#installDrainChecker} if the type is STREAM.
166+ */
158167 private SubscriptionCoordinator createCoordinator (String subscription ) {
168+ return createCoordinator (subscription , null );
169+ }
170+
171+ private SubscriptionCoordinator createCoordinator (String subscription ,
172+ ScalableConsumerType consumerType ) {
173+ // Parent-drain ordering matters only for STREAM consumers (Exclusive per-segment
174+ // subscription with broker-tracked cursors → preserving per-key order across a
175+ // split requires waiting for the parent to drain before handing out children).
176+ // CHECKPOINT consumers track position client-side via Checkpoints and don't even
177+ // create per-segment cursors — their parent never reports as drained, so the
178+ // ordering machinery would block their children indefinitely. QUEUE consumers
179+ // are shared and accept out-of-order delivery by design. Null type (restore
180+ // path) starts without a checker; it's installed lazily on first STREAM
181+ // register.
182+ SegmentDrainChecker checker =
183+ consumerType == ScalableConsumerType .STREAM ? this ::isSegmentDrained : null ;
184+
159185 // Defensive: PulsarService.getConfig() is null in some unit-test mocks. Fall
160186 // back to the SubscriptionCoordinator's default grace period in that case.
161187 var config = brokerService .getPulsar ().getConfig ();
@@ -175,7 +201,42 @@ private SubscriptionCoordinator createCoordinator(String subscription) {
175201 currentLayout ,
176202 resources ,
177203 brokerService .getPulsar ().getExecutor (),
178- gracePeriod );
204+ gracePeriod ,
205+ checker ,
206+ SubscriptionCoordinator .DEFAULT_DRAIN_INITIAL_DELAY ,
207+ SubscriptionCoordinator .DEFAULT_DRAIN_MAX_DELAY );
208+ }
209+
210+ /**
211+ * Drain check used by every {@link SubscriptionCoordinator} on this topic. Asks the
212+ * segment topic's owning broker for the per-subscription backlog via the
213+ * {@code /segments/.../subscription/.../backlog} admin endpoint, which redirects to
214+ * the topic owner — works whether the controller and the segment colocate or not.
215+ *
216+ * <p>Returns {@code false} if the segment topic or subscription is not yet loaded
217+ * (the admin endpoint replies 404). The next poll will succeed once the consumer's
218+ * subscribe lands the topic on its owning broker.
219+ */
220+ private CompletableFuture <Boolean > isSegmentDrained (SegmentInfo segment , String subscription ) {
221+ String segmentTopicName = toSegmentPersistentName (segment );
222+ try {
223+ return brokerService .getPulsar ().getAdminClient ()
224+ .scalableTopics ()
225+ .getSegmentSubscriptionBacklogAsync (segmentTopicName , subscription )
226+ .thenApply (backlog -> backlog != null && backlog <= 0 )
227+ .exceptionally (ex -> {
228+ Throwable cause =
229+ org .apache .pulsar .common .util .FutureUtil .unwrapCompletionException (ex );
230+ if (cause instanceof org .apache .pulsar .client .admin .PulsarAdminException .NotFoundException ) {
231+ // Topic or subscription not loaded yet — try again on the
232+ // next poll. The consumer's subscribe will materialize it.
233+ return false ;
234+ }
235+ throw org .apache .pulsar .common .util .FutureUtil .wrapToCompletionException (cause );
236+ });
237+ } catch (PulsarServerException e ) {
238+ return CompletableFuture .failedFuture (e );
239+ }
179240 }
180241
181242 private CompletableFuture <Void > electLeader () {
@@ -309,14 +370,42 @@ public CompletableFuture<SegmentLayout> mergeSegments(long segmentId1, long segm
309370 * <p>If a session with the same {@code consumerName} already exists (for example
310371 * because the consumer is reconnecting within the grace period), the existing
311372 * assignment is reused and no rebalance occurs.
373+ *
374+ * <p>The {@code consumerType} is used at coordinator creation time to decide whether
375+ * to enforce parent-drain ordering on assignments — see
376+ * {@link SubscriptionCoordinator}. The coordinator's setting is fixed at first
377+ * registration (a subscription's type doesn't change in practice); subsequent
378+ * registers with a different type still work but won't change the ordering policy.
379+ */
380+ /**
381+ * @deprecated Defaults to {@link ScalableConsumerType#STREAM}
382+ * for backward compatibility. New callers should pass the explicit type.
312383 */
384+ @ Deprecated
313385 public CompletableFuture <ConsumerAssignment > registerConsumer (String subscription ,
314386 String consumerName ,
315387 long consumerId ,
316388 TransportCnx cnx ) {
389+ return registerConsumer (subscription , consumerName , consumerId ,
390+ ScalableConsumerType .STREAM , cnx );
391+ }
392+
393+ public CompletableFuture <ConsumerAssignment > registerConsumer (String subscription ,
394+ String consumerName ,
395+ long consumerId ,
396+ ScalableConsumerType
397+ consumerType ,
398+ TransportCnx cnx ) {
317399 checkLeader ();
318400 SubscriptionCoordinator coordinator = subscriptions .computeIfAbsent (
319- subscription , this ::createCoordinator );
401+ subscription , sub -> createCoordinator (sub , consumerType ));
402+ // The coordinator may have been created on the failover-restore path (consumer
403+ // type unknown then; we defaulted to "no parent-drain enforcement"). Now that we
404+ // know the type, upgrade if it's STREAM. installDrainChecker is a no-op if the
405+ // coordinator already has a checker, so safe to call unconditionally.
406+ if (consumerType == ScalableConsumerType .STREAM ) {
407+ coordinator .installDrainChecker (this ::isSegmentDrained );
408+ }
320409 return coordinator .registerConsumer (consumerName , consumerId , cnx )
321410 .thenApply (assignments -> {
322411 // Look up by name since the key may have been an existing session
@@ -525,6 +614,9 @@ public CompletableFuture<org.apache.pulsar.common.policies.data.ScalableTopicSta
525614
526615 public CompletableFuture <Void > close () {
527616 closed = true ;
617+ // Stop each coordinator's drain poller before clearing — otherwise the scheduler
618+ // task keeps running after the controller goes away.
619+ subscriptions .values ().forEach (SubscriptionCoordinator ::close );
528620 subscriptions .clear ();
529621 return leaderElection .asyncClose ();
530622 }
0 commit comments