Skip to content

[feat] PIP-468: Namespace scalable-topics watcher (protocol + broker + V5 client)#25648

Merged
merlimat merged 6 commits intoapache:masterfrom
merlimat:st-scalable-topics-watcher
May 1, 2026
Merged

[feat] PIP-468: Namespace scalable-topics watcher (protocol + broker + V5 client)#25648
merlimat merged 6 commits intoapache:masterfrom
merlimat:st-scalable-topics-watcher

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 1, 2026

Summary

Foundation for multi-topic QueueConsumer / StreamConsumer subscriptions
filtered by topic properties: a long-lived broker → client watch session over
the union of scalable topics in a namespace that match a (possibly empty) set
of property filters. The broker pushes a full Snapshot on subscribe and an
incremental Diff (with adds + removes batched together) when membership
changes. Reliable across reconnects via a hash-skip optimisation — when the
client's hash of its current set matches the broker's freshly-computed one,
the broker stays silent.

This PR lands the wire layer, the broker session, and the V5 client watcher.
The multi-topic consumer wrappers themselves come in a follow-up PR.

Design

Goal

Let QueueConsumer and StreamConsumer subscribe to the union of scalable
topics in a namespace that match a (possibly empty) set of property filters.
The matching set must follow live: when topics enter or leave the filter, the
consumer attaches/detaches automatically. Reliable across reconnects and
broker bounces.

Out of scope: CheckpointConsumer (stays single-topic), partitioned and
non-partitioned legacy topics.

Wire protocol

WATCH_SCALABLE_TOPICS         (76)  client -> broker  open watch
WATCH_SCALABLE_TOPICS_UPDATE  (77)  broker -> client  Snapshot or Diff
WATCH_SCALABLE_TOPICS_CLOSE   (78)  client -> broker  close watch

CommandWatchScalableTopics carries watch_id, namespace,
property_filters, and optional current_hash.

CommandWatchScalableTopicsUpdate carries an event proto oneof:

  • Snapshot { topics: [string] } — full set, sent on initial subscribe and
    on reconnect when the hash differs;
  • Diff { added: [string], removed: [string] } — incremental changes, with
    apply-removed-first semantics (covers rapid remove-then-add).

Wire size: snapshot bounded by the 5 MB frame limit. Defer pagination —
revisit if a namespace ever exceeds.

Reliability

Resync-on-reconnect, no durable event log on the broker.

  1. Subscribe. Broker registers as a per-namespace listener on the shared
    ScalableTopicResources notification fan-out first, computes the initial
    filtered set, emits Snapshot{topics}, populates server-side currentSet.
    Events that arrived during snapshot computation are replayed after via the
    same dedup logic the deltas use.

  2. Steady state. Each metadata event is filter-evaluated. If membership
    changes relative to currentSet, broker emits Diff{added, removed} and
    updates currentSet. Server-side coalescing window (~50 ms) batches
    nearby events into one Diff.

  3. Disconnect. Broker drops the session and deregisters from the
    ScalableTopicResources listener registry.

  4. Reconnect with hash short-circuit. Client maintains a hash over its
    current set (CRC32C of sorted topic names — same function used by
    CommandGetTopicsOfNamespace). On reconnect it re-opens the watch and
    passes that hash via CommandWatchScalableTopics.current_hash. The broker
    computes the same hash over its freshly evaluated set:

    • Hash matches → broker emits nothing. The watch is live, the
      client's local state is correct, future deltas flow as usual.
    • Hash differs → broker emits a fresh Snapshot. Client reconciles
      locally — anything in the new snapshot it didn't know about → open
      per-topic consumer; anything it knew but missing → close + flush.

    First subscribe sends an empty / absent hash, which the broker treats as
    "no prior state" and unconditionally emits the initial Snapshot. The
    reconnect-backoff reset is keyed off write success of the watch frame, so
    the hash-skip path (no inbound Snapshot) doesn't keep the backoff at its
    peak across successive short blips.

Properties: idempotent (snapshot is full-set replace; diffs are set ops);
self-healing across any disconnect duration; no broker affinity (any broker
can serve, every broker has the same metadata events). For the common
short-blip reconnect where membership didn't change, the wire cost collapses
to one inbound WatchScalableTopics frame and zero outbound.

Filter evaluation: broker-side

Initial set is computed via the existing findScalableTopicsByPropertiesAsync.
Each metadata event:

  • Created / Modified: read the new value, evaluate filter, emit Diff if
    membership changed vs currentSet.
  • Deleted: no new value; emit Removed if topic was in currentSet.

Cost: one filter evaluation per metadata event per watcher. Filters are tiny;
fine.

Listener lifecycle

ScalableTopicResources registers a single shared metadata-store listener at
construction and exposes registerNamespaceListener /
deregisterNamespaceListener to per-watcher subscribers — same pattern as
TopicResources for TopicListener. MetadataStore has no
unregisterListener, so per-session direct registration would leak listener
references for the broker's lifetime; the registry-and-fan-out approach
avoids both the leak and the linear per-event dispatch tax.

Cross-topic load balancing — deferred

Today: every multi-topic consumer in (namespace, filter, subscriptionName)
will subscribe to all matching topics. Each topic's per-topic
SubscriptionCoordinator independently picks one consumer per segment — for
StreamConsumer (Exclusive) the assignment is randomized across consumers, in
expectation roughly balanced but not deterministic.

Future: namespace-level MultiTopicSubscriptionCoordinator per
(namespace, subscriptionName) that:

  • Persists Map<TopicName, ConsumerSession> for the group.
  • On consumer attach: assigns a slice of the matching topic set.
  • On topic Added/Removed: rebalances.
  • Server-side filters the watcher's emitted set to the consumer's slice.

When that lands it'll add a consumer_name (or similar identity) field to
CommandWatchScalableTopics. Not added speculatively in this PR.

Authz

CommandWatchScalableTopics: requires namespace-level READ
(NamespaceOperation.GET_TOPICS, same as listScalableTopics). Per-topic
subscribe that follows uses normal topic-level authz.

Test plan

  • ScalableTopicsWatcherSessionHashTest — broker session hash branches
    (no hash → snapshot; matching → silent; differing → snapshot). Mocked
    ServerCnx + LocalMemoryMetadataStore.
  • V5ScalableTopicsWatcherTest — end-to-end through a real shared cluster:
    create / delete events flow as Diffs; AND property filters narrow the set
    correctly; pre-existing topics surface in the initial Snapshot.

Matching PR(s) in forked repositories

  • area/broker
  • area/client

merlimat added 5 commits May 1, 2026 09:10
…broker session

Foundation for multi-topic QueueConsumer / StreamConsumer subscriptions
filtered by topic properties. The wire layer plus the broker-side watcher
session that holds the metadata listener and pushes Snapshot / Diff frames
to the client.

Protocol (lightproto)
- New BaseCommand variants:
    WATCH_SCALABLE_TOPICS         (76)  client -> broker: open watch
    WATCH_SCALABLE_TOPICS_UPDATE  (77)  broker -> client: Snapshot or Diff
    WATCH_SCALABLE_TOPICS_CLOSE   (78)  client -> broker: close
- Shapes:
    CommandWatchScalableTopics { watch_id, namespace, property_filters,
                                 consumer_name? }
    CommandWatchScalableTopicsUpdate { watch_id, snapshot? | diff? | error }
    ScalableTopicsSnapshot { topics: [string] }
    ScalableTopicsDiff     { added: [string], removed: [string] }
- Commands.java helpers + PulsarDecoder dispatch entries.
- consumer_name is carried from day one as a hook for a future
  namespace-level subscription coordinator (see
  multi-topic-consumer-design.md). Empty for now.

Broker session (ScalableTopicsWatcherSession)
- Registers a metadata-store listener BEFORE computing the initial set, so
  events arriving mid-snapshot are captured and replayed via the same diff
  path. The redundant Add for a topic already in the snapshot is a no-op
  (set semantics on the client).
- Filter evaluation server-side. Created/Modified events read the new value
  to test against AND filters; Deleted events emit Removed if the topic was
  in currentSet.
- Coalescing window (50 ms) folds back-to-back events into one Diff frame.
  Add and Remove for the same topic in the same window cancel out (rapid
  remove-then-add of the same name).
- Any broker can serve the role — every broker observes the same metadata
  events, so no namespace-level coordinator needed.

ServerCnx wiring
- Per-cnx scalableTopicsWatchers registry; closed in bulk on disconnect.
- Authz at subscribe time: NamespaceOperation.GET_TOPICS, mirroring
  listScalableTopics.
- Idempotent close.

Helpers
- ScalableTopicResources.namespacePath(ns) — used by the watcher to filter
  notifications to direct children of a namespace's topic prefix.
Adds the long-lived client session that opens a CommandWatchScalableTopics
on a broker connection, hands the initial Snapshot to start()'s future, and
forwards subsequent Snapshot / Diff frames to a Listener.

Client wiring
- New ScalableTopicsWatcherSession callback interface in pulsar-client
  (broker-side adds it to ClientCnx via cnx.scalableTopicsWatchers, parallel
  to dagWatchSessions / scalableConsumerSessions). Closed in bulk on
  channel-inactive; cleared in the same teardown path as the existing
  watch sessions.
- ClientCnx.handleCommandWatchScalableTopicsUpdate dispatches by Snapshot /
  Diff / Error (terminal). Exposes register/remove for the watcher.
- LightProto generates getAddedsCount / getRemovedsCount for the diff
  fields (always-append-s rule); commented at the call site.

V5 ScalableTopicsWatcher
- Mirrors DagWatchClient in shape: opens connection to service URL,
  registers itself on the cnx, sends WatchScalableTopics, awaits the first
  WatchScalableTopicsUpdate (Snapshot).
- start() resolves with the initial topic set so callers can populate their
  per-topic-consumer map before the listener attaches; subsequent Snapshots
  flow through onSnapshot() on the listener.
- Reconnect path: connectionClosed() schedules a retry with Backoff
  (100 ms initial, 30 s cap). On reconnect the broker pushes a fresh
  Snapshot which the listener applies as a full-state replacement —
  self-healing across any disconnect duration.
- Carries optional consumerName from day one as a hook for the future
  namespace-level subscription coordinator (see design doc).

Tests
- V5ScalableTopicsWatcherTest covers create/delete event delivery, AND
  property filtering (and that updating a topic out of the filter doesn't
  surface), and that the initial Snapshot reflects pre-existing topics.
  Drives the package-private ScalableTopicsWatcher via reflection (the
  multi-topic consumer wrappers come in a follow-up PR; once those land
  the test will go through the public builder API).
Optimisation suggested in the design review: instead of always re-sending
the full topic list when a multi-topic watcher reconnects, the client
tracks a hash of its current set and passes it back. The broker computes
the same hash over its freshly-evaluated set and stays silent when they
match — the watch is live, the client's local state is correct, and any
future Diffs flow as usual. For the common short-blip reconnect where
membership didn't change, the wire cost collapses to a single inbound
WatchScalableTopics frame and zero outbound.

Protocol
- CommandWatchScalableTopics gains optional current_hash. First subscribe
  sends it absent (broker emits initial Snapshot unconditionally);
  reconnect sends the cached hash.
- Hash function reuses TopicList.calculateHash (CRC32C of sorted topic
  names) for parity with the existing CommandGetTopicsOfNamespace watch.

Broker
- ScalableTopicsWatcherSession.start() takes the optional clientHash. If
  it equals the hash of the freshly computed initial set, start() skips
  emitting the Snapshot and just marks the snapshot phase done so deltas
  can flow.

Client
- ScalableTopicsWatcher mirrors the broker's view in a synchronised set,
  updated on every Snapshot replace and Diff apply. On reconnect it hands
  the hash of that set to the broker.
- attach() (first subscribe) keeps sending null hash so the broker emits
  the initial Snapshot unconditionally — start()'s future depends on it.

Tests
- ScalableTopicsWatcherSessionHashTest covers all three branches: no hash
  → snapshot emitted; matching hash → no snapshot; differing hash →
  snapshot emitted. Uses mocked ServerCnx + LocalMemoryMetadataStore.
- Existing V5ScalableTopicsWatcherTest is unaffected — first-subscribe
  still passes null and gets a Snapshot, which is what the start()
  future depends on.

Design doc updated to describe the hash short-circuit.
Address review on apache#25648:

- Remove the optional consumer_name field on CommandWatchScalableTopics.
  Originally carried as a future-coordinator hook, but YAGNI — dropping
  it now keeps the wire surface minimal; the eventual namespace-level
  coordinator can add the field back when it actually needs it.
- Switch ScalableTopicsSnapshot / ScalableTopicsDiff on
  CommandWatchScalableTopicsUpdate from two parallel optionals to a
  proto oneof, so the protocol enforces mutual exclusivity. Client-side
  dispatch becomes a switch on getEventCase() with explicit handling
  for SNAPSHOT, DIFF, and NOT_SET.

Knock-ons
- Commands.newWatchScalableTopics drops the consumerName parameter.
- ScalableTopicsWatcher's constructor matches.
- V5ScalableTopicsWatcherTest's openWatcher helper drops the
  consumerName plumbing.
@lhotari
Copy link
Copy Markdown
Member

lhotari commented May 1, 2026

Note: this is analysis by Claude Code, reviewed by me before posting.

A few points worth checking before this merges.

1. Metadata listener leak per ScalableTopicsWatcherSession (ScalableTopicsWatcherSession.start():305, close():466-472)

start() calls resources.getStore().registerListener(this::onNotification) per session, and close() only flips a closed flag — MetadataStore exposes no unregisterListener. So every closed session leaves a stale Consumer<Notification> registered, and every metadata notification fans out to all stale listeners over the broker's lifetime (each short-circuits on closed.get(), but the dispatch cost still scales linearly with total sessions ever opened). Not just memory growth — a real per-event throughput tax for long-running brokers serving many namespace watches.

Suggested fix (primary) — mirror what TopicResources does for TopicListService:

  • Have ScalableTopicResources register one handleNotification listener at construction time (TopicResources.java:52).
  • Maintain a Map<ScalableTopicNamespaceListener, NamespaceName> (or similar) inside ScalableTopicResources, with register…Listener(...) / deregister…Listener(...) methods (TopicResources.java:136-142).
  • ScalableTopicsWatcherSession.start() calls register…, close() calls deregister…. The single fan-out filters by the watcher's namespace base path.

This is the established pattern in the codebase, and it removes both the leak and the linear dispatch cost without needing a MetadataStore API change.

Alternative#24256 adds registerCancellableListener returning a handle that callers cancel on shutdown. With that landed, ScalableTopicsWatcherSession could store the handle and cancel it in close(). Works, but keeps one metadata-store listener per session; the fan-out approach above is cheaper at runtime and consistent with TopicListService.

2. `consumer_name` described in PR body but absent from the proto (PulsarApi.proto:1558-1569)

The PR description's "Wire protocol" section states CommandWatchScalableTopics carries consumer_name, and the "Cross-topic load balancing — deferred" section says "`consumer_name` is part of `CommandWatchScalableTopics` now, so the future coordinator has the identity it needs." The proto in this PR has only watch_id, namespace, property_filters, current_hash. Either add the field now (cheap — optional string consumer_name = 5;) or remove the design language so we don't need another wire bump for the future coordinator.

3. Reconnect backoff never reset on hash-matched reconnect (ScalableTopicsWatcher.java:1037)

reconnectBackoff.reset() runs only inside onSnapshot. The hash-skip optimisation is the common short-blip happy path — the broker emits no Snapshot, so the backoff stays at its last value across successive successful reconnects. The next disconnect then waits much longer than expected. Suggest resetting on successful reconnect (after the watch frame's write completes, or on any successful broker→client traffic).

Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, please check the local Claude Code review findings to see if they are real.

…reset

Two real bugs caught in the Claude Code review on apache#25648:

1. Metadata listener leak per ScalableTopicsWatcherSession.
   start() called resources.getStore().registerListener(...) for every
   watcher; close() only flipped a closed flag. MetadataStore exposes no
   unregisterListener, so every closed session left a stale listener
   registered for the broker's lifetime, and every metadata notification
   fanned out to all of them — linear dispatch tax.

   Fix mirrors the TopicResources pattern for TopicListener: register one
   shared listener at ScalableTopicResources construction time, maintain
   a Map<NamespaceListener, NamespaceName> registry. start() registers
   the session via resources.registerNamespaceListener(this); close()
   deregisters. The single fan-out filters down to direct children of
   the listener's namespace base path before dispatching.

   ScalableTopicsWatcherSession now implements
   ScalableTopicResources.NamespaceListener so the resources-level
   fan-out can call back into it directly.

2. ScalableTopicsWatcher reconnect backoff never reset on hash-matched
   reconnect.
   reconnectBackoff.reset() ran only inside onSnapshot. For the
   hash-skip path (the common short-blip happy case), the broker emits
   no Snapshot, so the backoff stayed at its last value and successive
   short reconnects compounded into an unbounded delay before the next
   attempt.

   Reset on write-success of the reconnect frame instead — that's the
   moment we know the connection is healthy regardless of whether the
   broker will follow up with a Snapshot or stay silent.
@merlimat
Copy link
Copy Markdown
Contributor Author

merlimat commented May 1, 2026

All good points. Fixed

@merlimat merlimat merged commit d0e557f into apache:master May 1, 2026
44 checks passed
@merlimat merlimat deleted the st-scalable-topics-watcher branch May 1, 2026 22:18
merlimat added a commit to merlimat/pulsar that referenced this pull request May 2, 2026
Builds on the namespace scalable-topics watcher (PR apache#25648) to land the
user-facing multi-topic consumer surface: subscribe to the union of
scalable topics in a namespace that match a (possibly empty) set of
property filters, follow the matching set live, multiplex from every
per-topic consumer into one user-visible queue.

Builder API
- QueueConsumerBuilder / StreamConsumerBuilder gain:
    namespace(String namespace)
    namespace(String namespace, Map<String, String> propertyFilters)
  Mutually exclusive with .topic(name); subscribe() rejects either-both
  or neither.
- Drop the legacy multi-topic / pattern surface from v5: removed
  topics(List), topicsPattern(Pattern), topicsPattern(String),
  patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String).

MultiTopicQueueConsumer
- Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes
  per-topic ScalableQueueConsumers.
- One pump thread per topic forwards messages from the per-topic queue
  into the shared mux queue, tagging each MessageV5 with the parent
  scalable topic via withTopicOverride. msg.topic() then surfaces the
  parent topic the user subscribed to (not the internal segment topic).
- Ack routing: MessageIdV5 carries a parentTopic field so
  acknowledge(msgId) finds the right per-topic consumer.
- Per-topic subscribe failure: retry forever with exp backoff
  (100 ms initial, 30 min cap).

MultiTopicStreamConsumer
- Same wrapper shape, but uses ScalableStreamConsumer per topic and
  carries a multi-topic position-vector snapshot per delivered message:
    Map<TopicName, Map<SegmentId, MessageId>>
  captured at enqueue time. acknowledgeCumulative(msg) fans out to every
  per-topic consumer with that topic's segment vector — same semantics
  as single-topic cumulative ack, lifted one level.
- ScalableStreamConsumer exposes ackUpToVector(Map<SegmentId, MessageId>)
  as the multi-topic ack hook.
- Topic Removed mid-stream: flush acks up to latestDelivered for that
  topic before closing the per-topic consumer.

Supporting changes
- MessageV5.topicOverride / withTopicOverride for parent-topic display.
- MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg
  full constructor used by the multi-topic stream pump.
- New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5
  is shared between ScalableQueueConsumer and MultiTopicQueueConsumer.
- ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl
  variants returning the concrete impl type — used by the multi-topic
  wrappers that hold typed references to per-topic consumers.

Tests
- V5MultiTopicQueueConsumerTest: receives from all topics in namespace;
  picks up topic created after subscribe (Diff path); filters by
  property so only matching topics attach.
- V5MultiTopicStreamConsumerTest: same three flows plus
  cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls
  acknowledgeCumulative on the last message, re-subscribes, asserts no
  redelivery (proves the position vector reaches every topic).

All v5 single-topic regression tests still pass.
merlimat added a commit to merlimat/pulsar that referenced this pull request May 2, 2026
Builds on the namespace scalable-topics watcher (PR apache#25648) to land the
user-facing multi-topic consumer surface: subscribe to the union of
scalable topics in a namespace that match a (possibly empty) set of
property filters, follow the matching set live, multiplex from every
per-topic consumer into one user-visible queue.

Builder API
- QueueConsumerBuilder / StreamConsumerBuilder gain:
    namespace(String namespace)
    namespace(String namespace, Map<String, String> propertyFilters)
  Mutually exclusive with .topic(name); subscribe() rejects either-both
  or neither.
- Drop the legacy multi-topic / pattern surface from v5: removed
  topics(List), topicsPattern(Pattern), topicsPattern(String),
  patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String).

MultiTopicQueueConsumer
- Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes
  per-topic ScalableQueueConsumers.
- One pump thread per topic forwards messages from the per-topic queue
  into the shared mux queue, tagging each MessageV5 with the parent
  scalable topic via withTopicOverride. msg.topic() then surfaces the
  parent topic the user subscribed to (not the internal segment topic).
- Ack routing: MessageIdV5 carries a parentTopic field so
  acknowledge(msgId) finds the right per-topic consumer.
- Per-topic subscribe failure: retry forever with exp backoff
  (100 ms initial, 30 min cap).

MultiTopicStreamConsumer
- Same wrapper shape, but uses ScalableStreamConsumer per topic and
  carries a multi-topic position-vector snapshot per delivered message:
    Map<TopicName, Map<SegmentId, MessageId>>
  captured at enqueue time. acknowledgeCumulative(msg) fans out to every
  per-topic consumer with that topic's segment vector — same semantics
  as single-topic cumulative ack, lifted one level.
- ScalableStreamConsumer exposes ackUpToVector(Map<SegmentId, MessageId>)
  as the multi-topic ack hook.
- Topic Removed mid-stream: flush acks up to latestDelivered for that
  topic before closing the per-topic consumer.

Supporting changes
- MessageV5.topicOverride / withTopicOverride for parent-topic display.
- MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg
  full constructor used by the multi-topic stream pump.
- New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5
  is shared between ScalableQueueConsumer and MultiTopicQueueConsumer.
- ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl
  variants returning the concrete impl type — used by the multi-topic
  wrappers that hold typed references to per-topic consumers.

Tests
- V5MultiTopicQueueConsumerTest: receives from all topics in namespace;
  picks up topic created after subscribe (Diff path); filters by
  property so only matching topics attach.
- V5MultiTopicStreamConsumerTest: same three flows plus
  cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls
  acknowledgeCumulative on the last message, re-subscribes, asserts no
  redelivery (proves the position vector reaches every topic).

All v5 single-topic regression tests still pass.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants