[feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer#25651
Merged
lhotari merged 3 commits intoapache:masterfrom May 2, 2026
Merged
[feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer#25651lhotari merged 3 commits intoapache:masterfrom
lhotari merged 3 commits intoapache:masterfrom
Conversation
Builds on the namespace scalable-topics watcher (PR apache#25648) to land the user-facing multi-topic consumer surface: subscribe to the union of scalable topics in a namespace that match a (possibly empty) set of property filters, follow the matching set live, multiplex from every per-topic consumer into one user-visible queue. Builder API - QueueConsumerBuilder / StreamConsumerBuilder gain: namespace(String namespace) namespace(String namespace, Map<String, String> propertyFilters) Mutually exclusive with .topic(name); subscribe() rejects either-both or neither. - Drop the legacy multi-topic / pattern surface from v5: removed topics(List), topicsPattern(Pattern), topicsPattern(String), patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String). MultiTopicQueueConsumer - Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes per-topic ScalableQueueConsumers. - One pump thread per topic forwards messages from the per-topic queue into the shared mux queue, tagging each MessageV5 with the parent scalable topic via withTopicOverride. msg.topic() then surfaces the parent topic the user subscribed to (not the internal segment topic). - Ack routing: MessageIdV5 carries a parentTopic field so acknowledge(msgId) finds the right per-topic consumer. - Per-topic subscribe failure: retry forever with exp backoff (100 ms initial, 30 min cap). MultiTopicStreamConsumer - Same wrapper shape, but uses ScalableStreamConsumer per topic and carries a multi-topic position-vector snapshot per delivered message: Map<TopicName, Map<SegmentId, MessageId>> captured at enqueue time. acknowledgeCumulative(msg) fans out to every per-topic consumer with that topic's segment vector — same semantics as single-topic cumulative ack, lifted one level. - ScalableStreamConsumer exposes ackUpToVector(Map<SegmentId, MessageId>) as the multi-topic ack hook. - Topic Removed mid-stream: flush acks up to latestDelivered for that topic before closing the per-topic consumer. Supporting changes - MessageV5.topicOverride / withTopicOverride for parent-topic display. - MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg full constructor used by the multi-topic stream pump. - New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5 is shared between ScalableQueueConsumer and MultiTopicQueueConsumer. - ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl variants returning the concrete impl type — used by the multi-topic wrappers that hold typed references to per-topic consumers. Tests - V5MultiTopicQueueConsumerTest: receives from all topics in namespace; picks up topic created after subscribe (Diff path); filters by property so only matching topics attach. - V5MultiTopicStreamConsumerTest: same three flows plus cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls acknowledgeCumulative on the last message, re-subscribes, asserts no redelivery (proves the position vector reaches every topic). All v5 single-topic regression tests still pass.
9223156 to
fc25e74
Compare
fc25e74 to
4d32a98
Compare
The CI build's compileTestJava on pulsar-client-api-v5 surfaced that the v5 Examples doc still referenced .topicsPattern + .patternAutoDiscoveryPeriod, which were dropped from the builder in this PR. Replace the example with .namespace(ns, propertyFilters) — the same shape the multi-topic QueueConsumer is meant to be used with.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
User-facing follow-up to the namespace scalable-topics watcher (#25648):
subscribe to the union of scalable topics in a namespace that match a
(possibly empty) set of property filters, follow the matching set live,
multiplex from every per-topic consumer into one user-visible queue.
Builder API
QueueConsumerBuilder/StreamConsumerBuildergain:Mutually exclusive with
.topic(name)—subscribe()rejects either-bothor neither.
While there, dropped the legacy multi-topic / pattern surface from v5 (it
was never wired through to scalable topics):
topics(List),topicsPattern(Pattern),topicsPattern(String),patternAutoDiscoveryPeriod.Tightened
topic(String...)→topic(String)(single topic).MultiTopicQueueConsumer
ScalableTopicsWatcher; on Snapshot / Diff opens / closesper-topic
ScalableQueueConsumers.through an injectable message sink. The wrapper installs a sink that
tags each
MessageV5with the parent scalable topic viawithTopicOverrideand forwards into the shared mux queue. No pumpthread per topic — same chained-async pattern as v4
MultiTopicsConsumerImpl.MessageIdV5carries aparentTopicfield soacknowledge(msgId)finds the right per-topic consumer.
msg.topic()surfaces the parent scalable topic the user subscribedto (not the internal segment topic).
initial, 30 min cap).
MultiTopicStreamConsumer
Map<TopicName, Map<SegmentId, MessageId>>captured at enqueue time.acknowledgeCumulative(msg)fans out to every per-topic consumer withthat topic's segment vector — same semantics as the single-topic
cumulative ack, lifted one level.
ScalableStreamConsumer.ackUpToVector(Map<SegmentId, MessageId>)exposed as the multi-topic ack hook.
latestDeliveredfor thetopic before closing the per-topic consumer.
Supporting changes
MessageV5.topicOverride/withTopicOverridefor parent-topicdisplay;
v4Message()accessor for the wrapper to rebuild messageswith augmented ids.
MessageIdV5getsparentTopic+multiTopicVectorfields, plus a5-arg full constructor used by the multi-topic stream sink. Wire format
is length-prefixed by section so older serialised forms (without the
new fields) still decode and so the new fields round-trip cleanly.
ScalableQueueConsumer/ScalableStreamConsumeraccept an optionalexternal
Consumer<MessageV5<T>>sink. Default behaviour isunchanged (enqueue on the local
messageQueue); multi-topic modepasses a sink that forwards into the shared mux.
QueueConsumerImplinterface soAsyncQueueConsumerV5is shared betweenScalableQueueConsumerandMultiTopicQueueConsumer.createAsyncImplvariants on both per-topic consumers return theconcrete impl type and accept the optional sink — used by the
multi-topic wrappers that hold typed references.
Test plan
MessageIdV5Testadds three round-trip cases: parent-topic only,parent-topic + cross-topic vector, and the null-fields path (proves
the new sections don't accidentally hydrate for single-topic ids).
V5MultiTopicQueueConsumerTest(3 cases): receives from every topicin the namespace; picks up a topic created after subscribe via
the watcher's Diff event; property filter narrows the set so only
matching topics deliver messages.
V5MultiTopicStreamConsumerTest(4 cases): same three pluscumulativeAckCoversEveryTopicSeenSoFar— drains both topics, callsacknowledgeCumulativeon the last message, re-subscribes with thesame name, asserts no redelivery (proves the cross-topic position
vector reaches every per-topic consumer).
Matching PR(s) in forked repositories