[feat] PIP-468: Namespace scalable-topics watcher (protocol + broker + V5 client)#25648
Conversation
…broker session
Foundation for multi-topic QueueConsumer / StreamConsumer subscriptions
filtered by topic properties. The wire layer plus the broker-side watcher
session that holds the metadata listener and pushes Snapshot / Diff frames
to the client.
Protocol (lightproto)
- New BaseCommand variants:
WATCH_SCALABLE_TOPICS (76) client -> broker: open watch
WATCH_SCALABLE_TOPICS_UPDATE (77) broker -> client: Snapshot or Diff
WATCH_SCALABLE_TOPICS_CLOSE (78) client -> broker: close
- Shapes:
CommandWatchScalableTopics { watch_id, namespace, property_filters,
consumer_name? }
CommandWatchScalableTopicsUpdate { watch_id, snapshot? | diff? | error }
ScalableTopicsSnapshot { topics: [string] }
ScalableTopicsDiff { added: [string], removed: [string] }
- Commands.java helpers + PulsarDecoder dispatch entries.
- consumer_name is carried from day one as a hook for a future
namespace-level subscription coordinator (see
multi-topic-consumer-design.md). Empty for now.
Broker session (ScalableTopicsWatcherSession)
- Registers a metadata-store listener BEFORE computing the initial set, so
events arriving mid-snapshot are captured and replayed via the same diff
path. The redundant Add for a topic already in the snapshot is a no-op
(set semantics on the client).
- Filter evaluation server-side. Created/Modified events read the new value
to test against AND filters; Deleted events emit Removed if the topic was
in currentSet.
- Coalescing window (50 ms) folds back-to-back events into one Diff frame.
Add and Remove for the same topic in the same window cancel out (rapid
remove-then-add of the same name).
- Any broker can serve the role — every broker observes the same metadata
events, so no namespace-level coordinator needed.
ServerCnx wiring
- Per-cnx scalableTopicsWatchers registry; closed in bulk on disconnect.
- Authz at subscribe time: NamespaceOperation.GET_TOPICS, mirroring
listScalableTopics.
- Idempotent close.
Helpers
- ScalableTopicResources.namespacePath(ns) — used by the watcher to filter
notifications to direct children of a namespace's topic prefix.
Adds the long-lived client session that opens a CommandWatchScalableTopics on a broker connection, hands the initial Snapshot to start()'s future, and forwards subsequent Snapshot / Diff frames to a Listener. Client wiring - New ScalableTopicsWatcherSession callback interface in pulsar-client (broker-side adds it to ClientCnx via cnx.scalableTopicsWatchers, parallel to dagWatchSessions / scalableConsumerSessions). Closed in bulk on channel-inactive; cleared in the same teardown path as the existing watch sessions. - ClientCnx.handleCommandWatchScalableTopicsUpdate dispatches by Snapshot / Diff / Error (terminal). Exposes register/remove for the watcher. - LightProto generates getAddedsCount / getRemovedsCount for the diff fields (always-append-s rule); commented at the call site. V5 ScalableTopicsWatcher - Mirrors DagWatchClient in shape: opens connection to service URL, registers itself on the cnx, sends WatchScalableTopics, awaits the first WatchScalableTopicsUpdate (Snapshot). - start() resolves with the initial topic set so callers can populate their per-topic-consumer map before the listener attaches; subsequent Snapshots flow through onSnapshot() on the listener. - Reconnect path: connectionClosed() schedules a retry with Backoff (100 ms initial, 30 s cap). On reconnect the broker pushes a fresh Snapshot which the listener applies as a full-state replacement — self-healing across any disconnect duration. - Carries optional consumerName from day one as a hook for the future namespace-level subscription coordinator (see design doc). Tests - V5ScalableTopicsWatcherTest covers create/delete event delivery, AND property filtering (and that updating a topic out of the filter doesn't surface), and that the initial Snapshot reflects pre-existing topics. Drives the package-private ScalableTopicsWatcher via reflection (the multi-topic consumer wrappers come in a follow-up PR; once those land the test will go through the public builder API).
Optimisation suggested in the design review: instead of always re-sending the full topic list when a multi-topic watcher reconnects, the client tracks a hash of its current set and passes it back. The broker computes the same hash over its freshly-evaluated set and stays silent when they match — the watch is live, the client's local state is correct, and any future Diffs flow as usual. For the common short-blip reconnect where membership didn't change, the wire cost collapses to a single inbound WatchScalableTopics frame and zero outbound. Protocol - CommandWatchScalableTopics gains optional current_hash. First subscribe sends it absent (broker emits initial Snapshot unconditionally); reconnect sends the cached hash. - Hash function reuses TopicList.calculateHash (CRC32C of sorted topic names) for parity with the existing CommandGetTopicsOfNamespace watch. Broker - ScalableTopicsWatcherSession.start() takes the optional clientHash. If it equals the hash of the freshly computed initial set, start() skips emitting the Snapshot and just marks the snapshot phase done so deltas can flow. Client - ScalableTopicsWatcher mirrors the broker's view in a synchronised set, updated on every Snapshot replace and Diff apply. On reconnect it hands the hash of that set to the broker. - attach() (first subscribe) keeps sending null hash so the broker emits the initial Snapshot unconditionally — start()'s future depends on it. Tests - ScalableTopicsWatcherSessionHashTest covers all three branches: no hash → snapshot emitted; matching hash → no snapshot; differing hash → snapshot emitted. Uses mocked ServerCnx + LocalMemoryMetadataStore. - Existing V5ScalableTopicsWatcherTest is unaffected — first-subscribe still passes null and gets a Snapshot, which is what the start() future depends on. Design doc updated to describe the hash short-circuit.
Address review on apache#25648: - Remove the optional consumer_name field on CommandWatchScalableTopics. Originally carried as a future-coordinator hook, but YAGNI — dropping it now keeps the wire surface minimal; the eventual namespace-level coordinator can add the field back when it actually needs it. - Switch ScalableTopicsSnapshot / ScalableTopicsDiff on CommandWatchScalableTopicsUpdate from two parallel optionals to a proto oneof, so the protocol enforces mutual exclusivity. Client-side dispatch becomes a switch on getEventCase() with explicit handling for SNAPSHOT, DIFF, and NOT_SET. Knock-ons - Commands.newWatchScalableTopics drops the consumerName parameter. - ScalableTopicsWatcher's constructor matches. - V5ScalableTopicsWatcherTest's openWatcher helper drops the consumerName plumbing.
A few points worth checking before this merges. 1. Metadata listener leak per
Suggested fix (primary) — mirror what
This is the established pattern in the codebase, and it removes both the leak and the linear dispatch cost without needing a Alternative — #24256 adds 2. `consumer_name` described in PR body but absent from the proto ( The PR description's "Wire protocol" section states 3. Reconnect backoff never reset on hash-matched reconnect (
|
lhotari
left a comment
There was a problem hiding this comment.
LGTM, please check the local Claude Code review findings to see if they are real.
…reset Two real bugs caught in the Claude Code review on apache#25648: 1. Metadata listener leak per ScalableTopicsWatcherSession. start() called resources.getStore().registerListener(...) for every watcher; close() only flipped a closed flag. MetadataStore exposes no unregisterListener, so every closed session left a stale listener registered for the broker's lifetime, and every metadata notification fanned out to all of them — linear dispatch tax. Fix mirrors the TopicResources pattern for TopicListener: register one shared listener at ScalableTopicResources construction time, maintain a Map<NamespaceListener, NamespaceName> registry. start() registers the session via resources.registerNamespaceListener(this); close() deregisters. The single fan-out filters down to direct children of the listener's namespace base path before dispatching. ScalableTopicsWatcherSession now implements ScalableTopicResources.NamespaceListener so the resources-level fan-out can call back into it directly. 2. ScalableTopicsWatcher reconnect backoff never reset on hash-matched reconnect. reconnectBackoff.reset() ran only inside onSnapshot. For the hash-skip path (the common short-blip happy case), the broker emits no Snapshot, so the backoff stayed at its last value and successive short reconnects compounded into an unbounded delay before the next attempt. Reset on write-success of the reconnect frame instead — that's the moment we know the connection is healthy regardless of whether the broker will follow up with a Snapshot or stay silent.
|
All good points. Fixed |
Builds on the namespace scalable-topics watcher (PR apache#25648) to land the user-facing multi-topic consumer surface: subscribe to the union of scalable topics in a namespace that match a (possibly empty) set of property filters, follow the matching set live, multiplex from every per-topic consumer into one user-visible queue. Builder API - QueueConsumerBuilder / StreamConsumerBuilder gain: namespace(String namespace) namespace(String namespace, Map<String, String> propertyFilters) Mutually exclusive with .topic(name); subscribe() rejects either-both or neither. - Drop the legacy multi-topic / pattern surface from v5: removed topics(List), topicsPattern(Pattern), topicsPattern(String), patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String). MultiTopicQueueConsumer - Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes per-topic ScalableQueueConsumers. - One pump thread per topic forwards messages from the per-topic queue into the shared mux queue, tagging each MessageV5 with the parent scalable topic via withTopicOverride. msg.topic() then surfaces the parent topic the user subscribed to (not the internal segment topic). - Ack routing: MessageIdV5 carries a parentTopic field so acknowledge(msgId) finds the right per-topic consumer. - Per-topic subscribe failure: retry forever with exp backoff (100 ms initial, 30 min cap). MultiTopicStreamConsumer - Same wrapper shape, but uses ScalableStreamConsumer per topic and carries a multi-topic position-vector snapshot per delivered message: Map<TopicName, Map<SegmentId, MessageId>> captured at enqueue time. acknowledgeCumulative(msg) fans out to every per-topic consumer with that topic's segment vector — same semantics as single-topic cumulative ack, lifted one level. - ScalableStreamConsumer exposes ackUpToVector(Map<SegmentId, MessageId>) as the multi-topic ack hook. - Topic Removed mid-stream: flush acks up to latestDelivered for that topic before closing the per-topic consumer. Supporting changes - MessageV5.topicOverride / withTopicOverride for parent-topic display. - MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg full constructor used by the multi-topic stream pump. - New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5 is shared between ScalableQueueConsumer and MultiTopicQueueConsumer. - ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl variants returning the concrete impl type — used by the multi-topic wrappers that hold typed references to per-topic consumers. Tests - V5MultiTopicQueueConsumerTest: receives from all topics in namespace; picks up topic created after subscribe (Diff path); filters by property so only matching topics attach. - V5MultiTopicStreamConsumerTest: same three flows plus cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls acknowledgeCumulative on the last message, re-subscribes, asserts no redelivery (proves the position vector reaches every topic). All v5 single-topic regression tests still pass.
Builds on the namespace scalable-topics watcher (PR apache#25648) to land the user-facing multi-topic consumer surface: subscribe to the union of scalable topics in a namespace that match a (possibly empty) set of property filters, follow the matching set live, multiplex from every per-topic consumer into one user-visible queue. Builder API - QueueConsumerBuilder / StreamConsumerBuilder gain: namespace(String namespace) namespace(String namespace, Map<String, String> propertyFilters) Mutually exclusive with .topic(name); subscribe() rejects either-both or neither. - Drop the legacy multi-topic / pattern surface from v5: removed topics(List), topicsPattern(Pattern), topicsPattern(String), patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String). MultiTopicQueueConsumer - Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes per-topic ScalableQueueConsumers. - One pump thread per topic forwards messages from the per-topic queue into the shared mux queue, tagging each MessageV5 with the parent scalable topic via withTopicOverride. msg.topic() then surfaces the parent topic the user subscribed to (not the internal segment topic). - Ack routing: MessageIdV5 carries a parentTopic field so acknowledge(msgId) finds the right per-topic consumer. - Per-topic subscribe failure: retry forever with exp backoff (100 ms initial, 30 min cap). MultiTopicStreamConsumer - Same wrapper shape, but uses ScalableStreamConsumer per topic and carries a multi-topic position-vector snapshot per delivered message: Map<TopicName, Map<SegmentId, MessageId>> captured at enqueue time. acknowledgeCumulative(msg) fans out to every per-topic consumer with that topic's segment vector — same semantics as single-topic cumulative ack, lifted one level. - ScalableStreamConsumer exposes ackUpToVector(Map<SegmentId, MessageId>) as the multi-topic ack hook. - Topic Removed mid-stream: flush acks up to latestDelivered for that topic before closing the per-topic consumer. Supporting changes - MessageV5.topicOverride / withTopicOverride for parent-topic display. - MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg full constructor used by the multi-topic stream pump. - New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5 is shared between ScalableQueueConsumer and MultiTopicQueueConsumer. - ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl variants returning the concrete impl type — used by the multi-topic wrappers that hold typed references to per-topic consumers. Tests - V5MultiTopicQueueConsumerTest: receives from all topics in namespace; picks up topic created after subscribe (Diff path); filters by property so only matching topics attach. - V5MultiTopicStreamConsumerTest: same three flows plus cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls acknowledgeCumulative on the last message, re-subscribes, asserts no redelivery (proves the position vector reaches every topic). All v5 single-topic regression tests still pass.
Summary
Foundation for multi-topic
QueueConsumer/StreamConsumersubscriptionsfiltered by topic properties: a long-lived broker → client watch session over
the union of scalable topics in a namespace that match a (possibly empty) set
of property filters. The broker pushes a full Snapshot on subscribe and an
incremental Diff (with adds + removes batched together) when membership
changes. Reliable across reconnects via a hash-skip optimisation — when the
client's hash of its current set matches the broker's freshly-computed one,
the broker stays silent.
This PR lands the wire layer, the broker session, and the V5 client watcher.
The multi-topic consumer wrappers themselves come in a follow-up PR.
Design
Goal
Let
QueueConsumerandStreamConsumersubscribe to the union of scalabletopics in a namespace that match a (possibly empty) set of property filters.
The matching set must follow live: when topics enter or leave the filter, the
consumer attaches/detaches automatically. Reliable across reconnects and
broker bounces.
Out of scope:
CheckpointConsumer(stays single-topic), partitioned andnon-partitioned legacy topics.
Wire protocol
CommandWatchScalableTopicscarrieswatch_id,namespace,property_filters, and optionalcurrent_hash.CommandWatchScalableTopicsUpdatecarries aneventprotooneof:Snapshot { topics: [string] }— full set, sent on initial subscribe andon reconnect when the hash differs;
Diff { added: [string], removed: [string] }— incremental changes, withapply-removed-first semantics (covers rapid remove-then-add).
Wire size: snapshot bounded by the 5 MB frame limit. Defer pagination —
revisit if a namespace ever exceeds.
Reliability
Resync-on-reconnect, no durable event log on the broker.
Subscribe. Broker registers as a per-namespace listener on the shared
ScalableTopicResourcesnotification fan-out first, computes the initialfiltered set, emits
Snapshot{topics}, populates server-sidecurrentSet.Events that arrived during snapshot computation are replayed after via the
same dedup logic the deltas use.
Steady state. Each metadata event is filter-evaluated. If membership
changes relative to
currentSet, broker emitsDiff{added, removed}andupdates
currentSet. Server-side coalescing window (~50 ms) batchesnearby events into one
Diff.Disconnect. Broker drops the session and deregisters from the
ScalableTopicResourceslistener registry.Reconnect with hash short-circuit. Client maintains a hash over its
current set (CRC32C of sorted topic names — same function used by
CommandGetTopicsOfNamespace). On reconnect it re-opens the watch andpasses that hash via
CommandWatchScalableTopics.current_hash. The brokercomputes the same hash over its freshly evaluated set:
client's local state is correct, future deltas flow as usual.
Snapshot. Client reconcileslocally — anything in the new snapshot it didn't know about → open
per-topic consumer; anything it knew but missing → close + flush.
First subscribe sends an empty / absent hash, which the broker treats as
"no prior state" and unconditionally emits the initial
Snapshot. Thereconnect-backoff reset is keyed off write success of the watch frame, so
the hash-skip path (no inbound Snapshot) doesn't keep the backoff at its
peak across successive short blips.
Properties: idempotent (snapshot is full-set replace; diffs are set ops);
self-healing across any disconnect duration; no broker affinity (any broker
can serve, every broker has the same metadata events). For the common
short-blip reconnect where membership didn't change, the wire cost collapses
to one inbound
WatchScalableTopicsframe and zero outbound.Filter evaluation: broker-side
Initial set is computed via the existing
findScalableTopicsByPropertiesAsync.Each metadata event:
Created/Modified: read the new value, evaluate filter, emitDiffifmembership changed vs
currentSet.Deleted: no new value; emitRemovedif topic was incurrentSet.Cost: one filter evaluation per metadata event per watcher. Filters are tiny;
fine.
Listener lifecycle
ScalableTopicResourcesregisters a single shared metadata-store listener atconstruction and exposes
registerNamespaceListener/deregisterNamespaceListenerto per-watcher subscribers — same pattern asTopicResourcesforTopicListener.MetadataStorehas nounregisterListener, so per-session direct registration would leak listenerreferences for the broker's lifetime; the registry-and-fan-out approach
avoids both the leak and the linear per-event dispatch tax.
Cross-topic load balancing — deferred
Today: every multi-topic consumer in
(namespace, filter, subscriptionName)will subscribe to all matching topics. Each topic's per-topic
SubscriptionCoordinatorindependently picks one consumer per segment — forStreamConsumer (Exclusive) the assignment is randomized across consumers, in
expectation roughly balanced but not deterministic.
Future: namespace-level
MultiTopicSubscriptionCoordinatorper(namespace, subscriptionName)that:Map<TopicName, ConsumerSession>for the group.When that lands it'll add a
consumer_name(or similar identity) field toCommandWatchScalableTopics. Not added speculatively in this PR.Authz
CommandWatchScalableTopics: requires namespace-level READ(
NamespaceOperation.GET_TOPICS, same aslistScalableTopics). Per-topicsubscribe that follows uses normal topic-level authz.
Test plan
ScalableTopicsWatcherSessionHashTest— broker session hash branches(no hash → snapshot; matching → silent; differing → snapshot). Mocked
ServerCnx+LocalMemoryMetadataStore.V5ScalableTopicsWatcherTest— end-to-end through a real shared cluster:create / delete events flow as Diffs; AND property filters narrow the set
correctly; pre-existing topics surface in the initial Snapshot.
Matching PR(s) in forked repositories