Skip to content

[feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer#25651

Merged
lhotari merged 3 commits intoapache:masterfrom
merlimat:st-multi-topic-consumer
May 2, 2026
Merged

[feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer#25651
lhotari merged 3 commits intoapache:masterfrom
merlimat:st-multi-topic-consumer

Conversation

@merlimat
Copy link
Copy Markdown
Contributor

@merlimat merlimat commented May 1, 2026

Summary

User-facing follow-up to the namespace scalable-topics watcher (#25648):
subscribe to the union of scalable topics in a namespace that match a
(possibly empty) set of property filters, follow the matching set live,
multiplex from every per-topic consumer into one user-visible queue.

Builder API

QueueConsumerBuilder / StreamConsumerBuilder gain:

.namespace(String namespace)
.namespace(String namespace, Map<String, String> propertyFilters)

Mutually exclusive with .topic(name)subscribe() rejects either-both
or neither.

While there, dropped the legacy multi-topic / pattern surface from v5 (it
was never wired through to scalable topics): topics(List),
topicsPattern(Pattern), topicsPattern(String), patternAutoDiscoveryPeriod.
Tightened topic(String...)topic(String) (single topic).

MultiTopicQueueConsumer

  • Wraps a ScalableTopicsWatcher; on Snapshot / Diff opens / closes
    per-topic ScalableQueueConsumers.
  • Per-segment v4 receive loops in each per-topic consumer dispatch
    through an injectable message sink. The wrapper installs a sink that
    tags each MessageV5 with the parent scalable topic via
    withTopicOverride and forwards into the shared mux queue. No pump
    thread per topic
    — same chained-async pattern as v4
    MultiTopicsConsumerImpl.
  • MessageIdV5 carries a parentTopic field so acknowledge(msgId)
    finds the right per-topic consumer.
  • msg.topic() surfaces the parent scalable topic the user subscribed
    to (not the internal segment topic).
  • Per-topic subscribe failure: retry forever with exp backoff (100 ms
    initial, 30 min cap).

MultiTopicStreamConsumer

  • Same wrapper shape, with a per-message cross-topic position vector:
    Map<TopicName, Map<SegmentId, MessageId>> captured at enqueue time.
  • acknowledgeCumulative(msg) fans out to every per-topic consumer with
    that topic's segment vector — same semantics as the single-topic
    cumulative ack, lifted one level.
  • ScalableStreamConsumer.ackUpToVector(Map<SegmentId, MessageId>)
    exposed as the multi-topic ack hook.
  • Topic Removed mid-stream: flush acks up to latestDelivered for the
    topic before closing the per-topic consumer.

Supporting changes

  • MessageV5.topicOverride / withTopicOverride for parent-topic
    display; v4Message() accessor for the wrapper to rebuild messages
    with augmented ids.
  • MessageIdV5 gets parentTopic + multiTopicVector fields, plus a
    5-arg full constructor used by the multi-topic stream sink. Wire format
    is length-prefixed by section so older serialised forms (without the
    new fields) still decode and so the new fields round-trip cleanly.
  • ScalableQueueConsumer / ScalableStreamConsumer accept an optional
    external Consumer<MessageV5<T>> sink. Default behaviour is
    unchanged (enqueue on the local messageQueue); multi-topic mode
    passes a sink that forwards into the shared mux.
  • New package-private QueueConsumerImpl interface so
    AsyncQueueConsumerV5 is shared between ScalableQueueConsumer and
    MultiTopicQueueConsumer.
  • createAsyncImpl variants on both per-topic consumers return the
    concrete impl type and accept the optional sink — used by the
    multi-topic wrappers that hold typed references.

Test plan

  • MessageIdV5Test adds three round-trip cases: parent-topic only,
    parent-topic + cross-topic vector, and the null-fields path (proves
    the new sections don't accidentally hydrate for single-topic ids).
  • V5MultiTopicQueueConsumerTest (3 cases): receives from every topic
    in the namespace; picks up a topic created after subscribe via
    the watcher's Diff event; property filter narrows the set so only
    matching topics deliver messages.
  • V5MultiTopicStreamConsumerTest (4 cases): same three plus
    cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls
    acknowledgeCumulative on the last message, re-subscribes with the
    same name, asserts no redelivery (proves the cross-topic position
    vector reaches every per-topic consumer).
  • All v5 single-topic regression tests still pass.

Matching PR(s) in forked repositories

  • area/client

Builds on the namespace scalable-topics watcher (PR apache#25648) to land the
user-facing multi-topic consumer surface: subscribe to the union of
scalable topics in a namespace that match a (possibly empty) set of
property filters, follow the matching set live, multiplex from every
per-topic consumer into one user-visible queue.

Builder API
- QueueConsumerBuilder / StreamConsumerBuilder gain:
    namespace(String namespace)
    namespace(String namespace, Map<String, String> propertyFilters)
  Mutually exclusive with .topic(name); subscribe() rejects either-both
  or neither.
- Drop the legacy multi-topic / pattern surface from v5: removed
  topics(List), topicsPattern(Pattern), topicsPattern(String),
  patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String).

MultiTopicQueueConsumer
- Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes
  per-topic ScalableQueueConsumers.
- One pump thread per topic forwards messages from the per-topic queue
  into the shared mux queue, tagging each MessageV5 with the parent
  scalable topic via withTopicOverride. msg.topic() then surfaces the
  parent topic the user subscribed to (not the internal segment topic).
- Ack routing: MessageIdV5 carries a parentTopic field so
  acknowledge(msgId) finds the right per-topic consumer.
- Per-topic subscribe failure: retry forever with exp backoff
  (100 ms initial, 30 min cap).

MultiTopicStreamConsumer
- Same wrapper shape, but uses ScalableStreamConsumer per topic and
  carries a multi-topic position-vector snapshot per delivered message:
    Map<TopicName, Map<SegmentId, MessageId>>
  captured at enqueue time. acknowledgeCumulative(msg) fans out to every
  per-topic consumer with that topic's segment vector — same semantics
  as single-topic cumulative ack, lifted one level.
- ScalableStreamConsumer exposes ackUpToVector(Map<SegmentId, MessageId>)
  as the multi-topic ack hook.
- Topic Removed mid-stream: flush acks up to latestDelivered for that
  topic before closing the per-topic consumer.

Supporting changes
- MessageV5.topicOverride / withTopicOverride for parent-topic display.
- MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg
  full constructor used by the multi-topic stream pump.
- New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5
  is shared between ScalableQueueConsumer and MultiTopicQueueConsumer.
- ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl
  variants returning the concrete impl type — used by the multi-topic
  wrappers that hold typed references to per-topic consumers.

Tests
- V5MultiTopicQueueConsumerTest: receives from all topics in namespace;
  picks up topic created after subscribe (Diff path); filters by
  property so only matching topics attach.
- V5MultiTopicStreamConsumerTest: same three flows plus
  cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls
  acknowledgeCumulative on the last message, re-subscribes, asserts no
  redelivery (proves the position vector reaches every topic).

All v5 single-topic regression tests still pass.
@merlimat merlimat force-pushed the st-multi-topic-consumer branch from 9223156 to fc25e74 Compare May 2, 2026 00:51
@merlimat merlimat force-pushed the st-multi-topic-consumer branch from fc25e74 to 4d32a98 Compare May 2, 2026 00:52
The CI build's compileTestJava on pulsar-client-api-v5 surfaced that the
v5 Examples doc still referenced .topicsPattern + .patternAutoDiscoveryPeriod,
which were dropped from the builder in this PR. Replace the example with
.namespace(ns, propertyFilters) — the same shape the multi-topic
QueueConsumer is meant to be used with.
Copy link
Copy Markdown
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@lhotari lhotari merged commit 7c7a7df into apache:master May 2, 2026
43 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants