From 9223156f49bf0ec75171066a14267f0b112b4b30 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 1 May 2026 14:39:25 -0700 Subject: [PATCH 1/3] [feat][client] PIP-468: Multi-topic QueueConsumer / StreamConsumer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Builds on the namespace scalable-topics watcher (PR #25648) to land the user-facing multi-topic consumer surface: subscribe to the union of scalable topics in a namespace that match a (possibly empty) set of property filters, follow the matching set live, multiplex from every per-topic consumer into one user-visible queue. Builder API - QueueConsumerBuilder / StreamConsumerBuilder gain: namespace(String namespace) namespace(String namespace, Map propertyFilters) Mutually exclusive with .topic(name); subscribe() rejects either-both or neither. - Drop the legacy multi-topic / pattern surface from v5: removed topics(List), topicsPattern(Pattern), topicsPattern(String), patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String). MultiTopicQueueConsumer - Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes per-topic ScalableQueueConsumers. - One pump thread per topic forwards messages from the per-topic queue into the shared mux queue, tagging each MessageV5 with the parent scalable topic via withTopicOverride. msg.topic() then surfaces the parent topic the user subscribed to (not the internal segment topic). - Ack routing: MessageIdV5 carries a parentTopic field so acknowledge(msgId) finds the right per-topic consumer. - Per-topic subscribe failure: retry forever with exp backoff (100 ms initial, 30 min cap). MultiTopicStreamConsumer - Same wrapper shape, but uses ScalableStreamConsumer per topic and carries a multi-topic position-vector snapshot per delivered message: Map> captured at enqueue time. acknowledgeCumulative(msg) fans out to every per-topic consumer with that topic's segment vector — same semantics as single-topic cumulative ack, lifted one level. - ScalableStreamConsumer exposes ackUpToVector(Map) as the multi-topic ack hook. - Topic Removed mid-stream: flush acks up to latestDelivered for that topic before closing the per-topic consumer. Supporting changes - MessageV5.topicOverride / withTopicOverride for parent-topic display. - MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg full constructor used by the multi-topic stream pump. - New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5 is shared between ScalableQueueConsumer and MultiTopicQueueConsumer. - ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl variants returning the concrete impl type — used by the multi-topic wrappers that hold typed references to per-topic consumers. Tests - V5MultiTopicQueueConsumerTest: receives from all topics in namespace; picks up topic created after subscribe (Diff path); filters by property so only matching topics attach. - V5MultiTopicStreamConsumerTest: same three flows plus cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls acknowledgeCumulative on the last message, re-subscribes, asserts no redelivery (proves the position vector reaches every topic). All v5 single-topic regression tests still pass. --- .../api/v5/V5MultiTopicQueueConsumerTest.java | 149 +++++ .../v5/V5MultiTopicStreamConsumerTest.java | 208 +++++++ .../client/api/v5/QueueConsumerBuilder.java | 45 +- .../client/api/v5/StreamConsumerBuilder.java | 29 +- .../client/impl/v5/AsyncQueueConsumerV5.java | 4 +- .../pulsar/client/impl/v5/MessageIdV5.java | 262 +++++++-- .../pulsar/client/impl/v5/MessageV5.java | 43 +- .../impl/v5/MultiTopicQueueConsumer.java | 426 ++++++++++++++ .../impl/v5/MultiTopicStreamConsumer.java | 542 ++++++++++++++++++ .../impl/v5/QueueConsumerBuilderV5.java | 49 +- .../client/impl/v5/QueueConsumerImpl.java | 35 ++ .../client/impl/v5/ScalableQueueConsumer.java | 46 +- .../impl/v5/ScalableStreamConsumer.java | 53 +- .../impl/v5/StreamConsumerBuilderV5.java | 38 +- .../client/impl/v5/MessageIdV5Test.java | 58 ++ 15 files changed, 1860 insertions(+), 127 deletions(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java create mode 100644 pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java create mode 100644 pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java create mode 100644 pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java new file mode 100644 index 0000000000000..4cf5f0ff666e9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * End-to-end tests for {@link QueueConsumerBuilder#namespace}: a multi-topic + * QueueConsumer follows the matching set live, multiplexes from every per-topic + * consumer into one user-visible queue, and routes individual acks back to the + * right topic for redelivery purposes. + */ +public class V5MultiTopicQueueConsumerTest extends V5ClientBaseTest { + + private String topicName(String suffix) { + return "topic://" + getNamespace() + "/" + suffix + "-" + + UUID.randomUUID().toString().substring(0, 8); + } + + @Test + public void receivesFromAllTopicsInNamespace() throws Exception { + String topicA = topicName("a"); + String topicB = topicName("b"); + admin.scalableTopics().createScalableTopic(topicA, 1); + admin.scalableTopics().createScalableTopic(topicB, 1); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(topicA).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(topicB).create(); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-q") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Send to both topics; the multi-topic consumer must receive both sets. + Set expected = new HashSet<>(); + for (int i = 0; i < 5; i++) { + String va = "a-" + i; + String vb = "b-" + i; + pa.newMessage().value(va).send(); + pb.newMessage().value(vb).send(); + expected.add(va); + expected.add(vb); + } + + Set received = new HashSet<>(); + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < expected.size() && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + } + assertEquals(received, expected, "should receive every message produced to either topic"); + } + + @Test + public void picksUpTopicCreatedAfterSubscribe() throws Exception { + // Fresh namespace, no topics yet — initial snapshot is empty. Earliest so the + // race between "topic created" and "per-topic consumer attached via Diff" can't + // drop messages produced in that window. + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-q-late") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Create a topic AFTER subscribe; the watcher's Diff event must trigger the + // consumer to attach. + String lateTopic = topicName("late"); + admin.scalableTopics().createScalableTopic(lateTopic, 1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(lateTopic).create(); + producer.newMessage().value("late-message").send(); + + Message msg = consumer.receive(Duration.ofSeconds(15)); + assertTrue(msg != null, "expected to receive message from late-added topic"); + assertEquals(msg.value(), "late-message"); + assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic"); + consumer.acknowledge(msg.id()); + } + + @Test + public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception { + String aliceTopic = topicName("alice"); + String bobTopic = topicName("bob"); + admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice")); + admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob")); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create(); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .namespace(getNamespace(), Map.of("owner", "alice")) + .subscriptionName("multi-q-filter") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + pa.newMessage().value("alice-msg").send(); + pb.newMessage().value("bob-msg").send(); + + // Only alice's message reaches this consumer. + Message got = consumer.receive(Duration.ofSeconds(10)); + assertTrue(got != null, "expected one message"); + assertEquals(got.value(), "alice-msg"); + consumer.acknowledge(got.id()); + + // Confirm bob's message never arrives within a generous window. + Message empty = consumer.receive(Duration.ofSeconds(2)); + assertTrue(empty == null, "bob's message must be filtered out, got " + empty); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java new file mode 100644 index 0000000000000..f3299b1c74bee --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * End-to-end tests for {@link StreamConsumerBuilder#namespace}: a multi-topic + * StreamConsumer follows the matching set live, multiplexes from every per-topic + * consumer into one user-visible queue, and supports cumulative ack across + * topics via per-message position vectors. + */ +public class V5MultiTopicStreamConsumerTest extends V5ClientBaseTest { + + private String topicName(String suffix) { + return "topic://" + getNamespace() + "/" + suffix + "-" + + UUID.randomUUID().toString().substring(0, 8); + } + + @Test + public void receivesFromAllTopicsInNamespace() throws Exception { + String topicA = topicName("a"); + String topicB = topicName("b"); + admin.scalableTopics().createScalableTopic(topicA, 1); + admin.scalableTopics().createScalableTopic(topicB, 1); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(topicA).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(topicB).create(); + + @Cleanup + StreamConsumer consumer = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + Set expected = new HashSet<>(); + for (int i = 0; i < 5; i++) { + String va = "a-" + i; + String vb = "b-" + i; + pa.newMessage().value(va).send(); + pb.newMessage().value(vb).send(); + expected.add(va); + expected.add(vb); + } + + Set received = new HashSet<>(); + MessageId last = null; + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < expected.size() && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + last = msg.id(); + } + } + assertEquals(received, expected, "should receive every message produced to either topic"); + // Cumulative ack must succeed across both per-topic consumers — exercises the + // multi-topic position vector embedded in the message id. + assertNotNull(last); + consumer.acknowledgeCumulative(last); + } + + @Test + public void picksUpTopicCreatedAfterSubscribe() throws Exception { + @Cleanup + StreamConsumer consumer = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream-late") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + String lateTopic = topicName("late"); + admin.scalableTopics().createScalableTopic(lateTopic, 1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(lateTopic).create(); + producer.newMessage().value("late-message").send(); + + Message msg = consumer.receive(Duration.ofSeconds(15)); + assertTrue(msg != null, "expected to receive message from late-added topic"); + assertEquals(msg.value(), "late-message"); + assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic"); + consumer.acknowledgeCumulative(msg.id()); + } + + @Test + public void cumulativeAckCoversEveryTopicSeenSoFar() throws Exception { + // Two topics, interleaved producers. After we cumulatively ack the LAST message, + // closing and re-subscribing must NOT redeliver any of the previous messages — + // that would only happen if the per-topic ack didn't fire for the topic that's + // not the message's own. + String topicA = topicName("a"); + String topicB = topicName("b"); + admin.scalableTopics().createScalableTopic(topicA, 1); + admin.scalableTopics().createScalableTopic(topicB, 1); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(topicA).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(topicB).create(); + + StreamConsumer first = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream-cumulative") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int n = 5; + for (int i = 0; i < n; i++) { + pa.newMessage().value("a-" + i).send(); + pb.newMessage().value("b-" + i).send(); + } + + // Drain everything, remember the last message id. + Set drained = new HashSet<>(); + MessageId last = null; + long deadline = System.currentTimeMillis() + 20_000L; + while (drained.size() < 2 * n && System.currentTimeMillis() < deadline) { + Message msg = first.receive(Duration.ofSeconds(1)); + if (msg != null) { + drained.add(msg.value()); + last = msg.id(); + } + } + assertEquals(drained.size(), 2 * n, "first consumer should drain every message"); + assertNotNull(last); + // Cumulative ack — should fan out to BOTH topics' per-segment acks. + first.acknowledgeCumulative(last); + // Block briefly so the async ack flushes through to the broker before we close. + Thread.sleep(500); + first.close(); + + // Re-subscribe with the same name. If the cumulative ack covered both topics, + // there's nothing to re-deliver. If it only acked the message's OWN topic, the + // other topic would re-deliver from the start. + @Cleanup + StreamConsumer second = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream-cumulative") + .subscribe(); + + Message stale = second.receive(Duration.ofSeconds(2)); + assertTrue(stale == null, + "cumulative ack should have covered every topic; got redelivery: " + + (stale != null ? stale.value() : "")); + } + + @Test + public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception { + String aliceTopic = topicName("alice"); + String bobTopic = topicName("bob"); + admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice")); + admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob")); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create(); + + @Cleanup + StreamConsumer consumer = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace(), Map.of("owner", "alice")) + .subscriptionName("multi-stream-filter") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + pa.newMessage().value("alice-msg").send(); + pb.newMessage().value("bob-msg").send(); + + Message got = consumer.receive(Duration.ofSeconds(10)); + assertTrue(got != null, "expected one message"); + assertEquals(got.value(), "alice-msg"); + + Message empty = consumer.receive(Duration.ofSeconds(2)); + assertTrue(empty == null, "bob's message must be filtered out, got " + empty); + } +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java index a6249efaca3f0..db03f3c546913 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java @@ -19,10 +19,8 @@ package org.apache.pulsar.client.api.v5; import java.time.Duration; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; import org.apache.pulsar.client.api.v5.config.BackoffPolicy; import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy; import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; @@ -52,38 +50,37 @@ public interface QueueConsumerBuilder { CompletableFuture> subscribeAsync(); // --- Topic selection --- + // Either {@link #topic(String)} or {@link #namespace} must be set, not both. /** - * The topic(s) to subscribe to. + * Subscribe to a single scalable topic by name. * - * @param topicNames one or more topic names + * @param topicName the fully-qualified topic name (e.g. {@code topic://tenant/ns/name}) * @return this builder instance for chaining */ - QueueConsumerBuilder topic(String... topicNames); + QueueConsumerBuilder topic(String topicName); /** - * The topics to subscribe to. + * Subscribe to every scalable topic under a namespace. The matching set follows + * live: when topics are created in or deleted from the namespace, the consumer + * attaches / detaches automatically. * - * @param topicNames the list of topic names + * @param namespace the namespace in {@code tenant/namespace} form * @return this builder instance for chaining */ - QueueConsumerBuilder topics(List topicNames); + QueueConsumerBuilder namespace(String namespace); /** - * Subscribe to all topics matching a regex pattern. + * Subscribe to scalable topics under a namespace whose properties match every + * key/value pair in {@code propertyFilters} (AND semantics). An empty map is + * equivalent to {@link #namespace(String)} — every topic in the namespace. + * The matching set follows live as topic properties change. * - * @param pattern the compiled regex pattern to match topic names against + * @param namespace the namespace in {@code tenant/namespace} form + * @param propertyFilters property name/value pairs that all must match * @return this builder instance for chaining */ - QueueConsumerBuilder topicsPattern(Pattern pattern); - - /** - * Subscribe to all topics matching a regex pattern (string form). - * - * @param regex the regex pattern string to match topic names against - * @return this builder instance for chaining - */ - QueueConsumerBuilder topicsPattern(String regex); + QueueConsumerBuilder namespace(String namespace, Map propertyFilters); // --- Subscription --- @@ -190,16 +187,6 @@ public interface QueueConsumerBuilder { */ QueueConsumerBuilder deadLetterPolicy(DeadLetterPolicy policy); - // --- Pattern subscription --- - - /** - * How often to re-discover topics matching the pattern. - * - * @param interval the auto-discovery interval - * @return this builder instance for chaining - */ - QueueConsumerBuilder patternAutoDiscoveryPeriod(Duration interval); - // --- Encryption --- /** diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java index ad6f0df0da6d6..0963f79203523 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java @@ -48,14 +48,37 @@ public interface StreamConsumerBuilder { CompletableFuture> subscribeAsync(); // --- Required --- + // Either {@link #topic(String)} or {@link #namespace} must be set, not both. /** - * The topic(s) to subscribe to. + * Subscribe to a single scalable topic by name. * - * @param topicNames one or more topic names + * @param topicName the fully-qualified topic name (e.g. {@code topic://tenant/ns/name}) * @return this builder instance for chaining */ - StreamConsumerBuilder topic(String... topicNames); + StreamConsumerBuilder topic(String topicName); + + /** + * Subscribe to every scalable topic under a namespace. The matching set follows + * live: when topics are created in or deleted from the namespace, the consumer + * attaches / detaches automatically. + * + * @param namespace the namespace in {@code tenant/namespace} form + * @return this builder instance for chaining + */ + StreamConsumerBuilder namespace(String namespace); + + /** + * Subscribe to scalable topics under a namespace whose properties match every + * key/value pair in {@code propertyFilters} (AND semantics). An empty map is + * equivalent to {@link #namespace(String)} — every topic in the namespace. + * The matching set follows live as topic properties change. + * + * @param namespace the namespace in {@code tenant/namespace} form + * @param propertyFilters property name/value pairs that all must match + * @return this builder instance for chaining + */ + StreamConsumerBuilder namespace(String namespace, Map propertyFilters); /** * The subscription name. diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java index f66e12277945f..7242b5e53f772 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java @@ -29,9 +29,9 @@ */ final class AsyncQueueConsumerV5 implements AsyncQueueConsumer { - private final ScalableQueueConsumer consumer; + private final QueueConsumerImpl consumer; - AsyncQueueConsumerV5(ScalableQueueConsumer consumer) { + AsyncQueueConsumerV5(QueueConsumerImpl consumer) { this.consumer = consumer; } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java index 3a39986f3c74d..97a47eff60f46 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java @@ -20,9 +20,16 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import java.util.Objects; -import org.apache.pulsar.client.api.v5.MessageId; +import org.apache.pulsar.client.api.MessageId; + +// The v5 MessageId interface and the underlying v4 MessageId class share the same simple +// name in different packages — a real conflict that Java imports can't resolve. We import +// the v4 type as `MessageId` (used heavily) and refer to v5 via FQN in the few places it +// appears (the `implements` clause and `compareTo`). /** * V5 MessageId implementation that wraps a v4 MessageId and includes a position @@ -36,17 +43,20 @@ *

For non-cumulative consumers (QueueConsumer) and readers (CheckpointConsumer), * only the single segment ID and v4 message ID are needed; the position vector is * empty. + * + *

Multi-topic consumer wrappers additionally tag the id with the parent scalable + * topic (for ack routing) and a cross-topic position vector (for cumulative ack + * across topics). Both fields are optional and round-trip through + * {@link #toByteArray} / {@link #fromByteArray}. */ -public final class MessageIdV5 implements MessageId { +public final class MessageIdV5 implements org.apache.pulsar.client.api.v5.MessageId { static final long NO_SEGMENT = -1; - static final MessageIdV5 EARLIEST = new MessageIdV5( - org.apache.pulsar.client.api.MessageId.earliest, NO_SEGMENT, Map.of()); - static final MessageIdV5 LATEST = new MessageIdV5( - org.apache.pulsar.client.api.MessageId.latest, NO_SEGMENT, Map.of()); + static final MessageIdV5 EARLIEST = new MessageIdV5(MessageId.earliest, NO_SEGMENT, Map.of()); + static final MessageIdV5 LATEST = new MessageIdV5(MessageId.latest, NO_SEGMENT, Map.of()); - private final org.apache.pulsar.client.api.MessageId v4MessageId; + private final MessageId v4MessageId; private final long segmentId; /** @@ -54,30 +64,73 @@ public final class MessageIdV5 implements MessageId { * taken at the moment this message was delivered to the application. * Used by StreamConsumer for cumulative ack across all segments. */ - private final Map positionVector; + private final Map positionVector; + + /** + * Parent scalable topic this message was delivered from. Set only by multi-topic + * consumer wrappers so they can route a subsequent ack back to the right + * per-topic consumer; {@code null} for single-topic consumers (where the consumer + * already knows its topic). + */ + private final String parentTopic; + + /** + * Cross-topic position vector for multi-topic StreamConsumer cumulative ack. + * Maps parent topic name → (segment id → latest-delivered msgId at the moment + * this message entered the multiplexed queue). {@code null} for single-topic + * messages — use {@link #positionVector()} instead. + */ + private final Map> multiTopicVector; /** - * Create a MessageIdV5 with a position vector for cumulative ack support. + * Create a MessageIdV5 with a position vector for single-topic cumulative ack. */ - public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId, + public MessageIdV5(MessageId v4MessageId, long segmentId, - Map positionVector) { - this.v4MessageId = Objects.requireNonNull(v4MessageId); - this.segmentId = segmentId; - this.positionVector = Map.copyOf(positionVector); + Map positionVector) { + this(v4MessageId, segmentId, positionVector, null, null); + } + + /** + * Constructor for multi-topic queue consumer messages: parent topic tag for ack + * routing; no cross-topic position vector (queue consumers don't do cumulative + * ack). + */ + public MessageIdV5(MessageId v4MessageId, + long segmentId, + Map positionVector, + String parentTopic) { + this(v4MessageId, segmentId, positionVector, parentTopic, null); } /** * Create a MessageIdV5 without a position vector (for individual ack / reader use). */ - public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId, long segmentId) { - this(v4MessageId, segmentId, Map.of()); + public MessageIdV5(MessageId v4MessageId, long segmentId) { + this(v4MessageId, segmentId, Map.of(), null, null); + } + + /** + * Full constructor for multi-topic StreamConsumer messages: parent topic + the + * cross-topic position vector captured at enqueue time. Used by the multi-topic + * stream consumer's pump. + */ + public MessageIdV5(MessageId v4MessageId, + long segmentId, + Map positionVector, + String parentTopic, + Map> multiTopicVector) { + this.v4MessageId = Objects.requireNonNull(v4MessageId); + this.segmentId = segmentId; + this.positionVector = Map.copyOf(positionVector); + this.parentTopic = parentTopic; + this.multiTopicVector = multiTopicVector; } /** * Get the underlying v4 MessageId. Package-private for internal use. */ - org.apache.pulsar.client.api.MessageId v4MessageId() { + MessageId v4MessageId() { return v4MessageId; } @@ -92,36 +145,123 @@ long segmentId() { * Get the position vector — the latest delivered message ID per segment at the * time this message was delivered. Used by StreamConsumer for cumulative ack. */ - Map positionVector() { + Map positionVector() { return positionVector; } + /** + * Parent scalable topic when this message was delivered through a multi-topic + * consumer; {@code null} for single-topic consumers. Package-private — used by + * multi-topic ack routing only. + */ + String parentTopic() { + return parentTopic; + } + + Map> multiTopicVector() { + return multiTopicVector; + } + + /** + * Wire format. All sections are length-prefixed so the reader can detect + * absent trailing sections (older serialised forms wrote only sections 1-3). + * + *

+     * 1. segmentId               : 8 bytes
+     * 2. v4MessageId             : 4-byte length + bytes
+     * 3. positionVector          : 4-byte count + repeating { 8-byte segId,
+     *                                                          4-byte len, idBytes }
+     * 4. parentTopic             : 4-byte length (-1 = null) + UTF-8 bytes
+     * 5. multiTopicVector        : 4-byte count (-1 = null) + repeating {
+     *                                4-byte topic name length, UTF-8 bytes,
+     *                                4-byte segCount, repeating { 8-byte segId,
+     *                                                              4-byte len, idBytes } }
+     * 
+ * + *

Sections 4 and 5 are present in every new id; older serialisations from a + * pre-multi-topic build are still readable — the reader treats the missing + * sections as null. + */ @Override public byte[] toByteArray() { byte[] v4Bytes = v4MessageId.toByteArray(); - // Format: [8 bytes segmentId] [4 bytes v4Length] [v4Bytes] - // [4 bytes numPositions] [for each: [8 bytes segId] [4 bytes idLen] [idBytes]] - int totalSize = 8 + 4 + v4Bytes.length + 4; - var serializedPositions = new java.util.HashMap(); - for (var entry : positionVector.entrySet()) { - byte[] idBytes = entry.getValue().toByteArray(); - serializedPositions.put(entry.getKey(), idBytes); - totalSize += 8 + 4 + idBytes.length; + + // Pre-serialise position-vector entries so we can size the buffer. + Map serializedPositions = serializeSegmentVector(positionVector); + int positionBytes = 4; // count + for (var entry : serializedPositions.entrySet()) { + positionBytes += 8 + 4 + entry.getValue().length; + } + + // Section 4: parent topic. + byte[] parentTopicBytes = parentTopic == null + ? null : parentTopic.getBytes(StandardCharsets.UTF_8); + + // Section 5: pre-serialise the multi-topic vector tree. + Map> serializedMulti = null; + int multiBytes = 4; // count or -1 + if (multiTopicVector != null) { + serializedMulti = new HashMap<>(multiTopicVector.size()); + for (var entry : multiTopicVector.entrySet()) { + byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8); + Map inner = serializeSegmentVector(entry.getValue()); + serializedMulti.put(topicBytes, inner); + multiBytes += 4 + topicBytes.length + 4; + for (var seg : inner.entrySet()) { + multiBytes += 8 + 4 + seg.getValue().length; + } + } } + int totalSize = 8 + 4 + v4Bytes.length + positionBytes + + 4 + (parentTopicBytes == null ? 0 : parentTopicBytes.length) + + multiBytes; + ByteBuffer buf = ByteBuffer.allocate(totalSize); buf.putLong(segmentId); buf.putInt(v4Bytes.length); buf.put(v4Bytes); - buf.putInt(positionVector.size()); + + buf.putInt(serializedPositions.size()); for (var entry : serializedPositions.entrySet()) { buf.putLong(entry.getKey()); buf.putInt(entry.getValue().length); buf.put(entry.getValue()); } + + if (parentTopicBytes == null) { + buf.putInt(-1); + } else { + buf.putInt(parentTopicBytes.length); + buf.put(parentTopicBytes); + } + + if (serializedMulti == null) { + buf.putInt(-1); + } else { + buf.putInt(serializedMulti.size()); + for (var entry : serializedMulti.entrySet()) { + buf.putInt(entry.getKey().length); + buf.put(entry.getKey()); + buf.putInt(entry.getValue().size()); + for (var seg : entry.getValue().entrySet()) { + buf.putLong(seg.getKey()); + buf.putInt(seg.getValue().length); + buf.put(seg.getValue()); + } + } + } return buf.array(); } + private static Map serializeSegmentVector(Map vector) { + Map out = new HashMap<>(vector.size()); + for (var entry : vector.entrySet()) { + out.put(entry.getKey(), entry.getValue().toByteArray()); + } + return out; + } + static MessageIdV5 fromByteArray(byte[] data) throws IOException { if (data == null || data.length < 12) { throw new IOException("Invalid MessageIdV5 data: too short"); @@ -134,30 +274,66 @@ static MessageIdV5 fromByteArray(byte[] data) throws IOException { } byte[] v4Bytes = new byte[v4Length]; buf.get(v4Bytes); - org.apache.pulsar.client.api.MessageId v4Id = - org.apache.pulsar.client.api.MessageId.fromByteArray(v4Bytes); + MessageId v4Id = MessageId.fromByteArray(v4Bytes); + + // Section 3: position vector (single-topic / per-segment). + Map positions = Map.of(); + if (buf.hasRemaining()) { + positions = readSegmentVector(buf); + } + + // Section 4: parent topic. Length -1 sentinel means "absent". + String parentTopic = null; + if (buf.hasRemaining()) { + int parentLen = buf.getInt(); + if (parentLen >= 0) { + if (parentLen > buf.remaining()) { + throw new IOException("Invalid MessageIdV5 data: bad parent-topic length"); + } + byte[] parentBytes = new byte[parentLen]; + buf.get(parentBytes); + parentTopic = new String(parentBytes, StandardCharsets.UTF_8); + } + } - // Read position vector if present - Map positions = Map.of(); + // Section 5: cross-topic vector. Count -1 sentinel means "absent". + Map> multiTopic = null; if (buf.hasRemaining()) { - int numPositions = buf.getInt(); - var posMap = new java.util.HashMap(); - for (int i = 0; i < numPositions; i++) { - long posSegId = buf.getLong(); - int idLen = buf.getInt(); - byte[] idBytes = new byte[idLen]; - buf.get(idBytes); - posMap.put(posSegId, - org.apache.pulsar.client.api.MessageId.fromByteArray(idBytes)); + int topicCount = buf.getInt(); + if (topicCount >= 0) { + multiTopic = new HashMap<>(topicCount); + for (int i = 0; i < topicCount; i++) { + int topicLen = buf.getInt(); + byte[] topicBytes = new byte[topicLen]; + buf.get(topicBytes); + String topic = new String(topicBytes, StandardCharsets.UTF_8); + Map inner = readSegmentVector(buf); + multiTopic.put(topic, inner); + } } - positions = posMap; } - return new MessageIdV5(v4Id, segmentId, positions); + return new MessageIdV5(v4Id, segmentId, positions, parentTopic, multiTopic); + } + + private static Map readSegmentVector(ByteBuffer buf) throws IOException { + int count = buf.getInt(); + Map out = new HashMap<>(count); + for (int i = 0; i < count; i++) { + long segId = buf.getLong(); + int idLen = buf.getInt(); + if (idLen < 0 || idLen > buf.remaining()) { + throw new IOException("Invalid MessageIdV5 data: bad inner id length"); + } + byte[] idBytes = new byte[idLen]; + buf.get(idBytes); + out.put(segId, MessageId.fromByteArray(idBytes)); + } + return out; } @Override - public int compareTo(MessageId other) { + public int compareTo(org.apache.pulsar.client.api.v5.MessageId other) { if (!(other instanceof MessageIdV5 o)) { throw new IllegalArgumentException("Cannot compare with " + other.getClass()); } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java index 910bb114e5165..1a9444911f46d 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java @@ -31,13 +31,19 @@ final class MessageV5 implements Message { private final org.apache.pulsar.client.api.Message v4Message; private final MessageIdV5 messageId; + /** + * Optional override for {@link #topic()}. Set by multi-topic consumer wrappers to + * the parent scalable topic name (the v4 message's own topic is the segment topic, + * which is internal). Null for single-topic consumers — {@code topic()} falls back + * to the v4 message topic in that case. + */ + private final String topicOverride; /** * Create with a simple segment ID (for queue consumer, checkpoint consumer, producer). */ MessageV5(org.apache.pulsar.client.api.Message v4Message, long segmentId) { - this.v4Message = v4Message; - this.messageId = new MessageIdV5(v4Message.getMessageId(), segmentId); + this(v4Message, new MessageIdV5(v4Message.getMessageId(), segmentId), null); } /** @@ -45,8 +51,36 @@ final class MessageV5 implements Message { * (for stream consumer cumulative ack support). */ MessageV5(org.apache.pulsar.client.api.Message v4Message, MessageIdV5 messageId) { + this(v4Message, messageId, null); + } + + /** + * Create with an explicit topic override — used by multi-topic consumer wrappers + * to surface the parent scalable topic via {@link #topic()} instead of the + * underlying segment topic. {@code topicOverride} may be {@code null}. + */ + MessageV5(org.apache.pulsar.client.api.Message v4Message, MessageIdV5 messageId, + String topicOverride) { this.v4Message = v4Message; this.messageId = messageId; + this.topicOverride = topicOverride; + } + + /** + * Re-brand this message with a parent scalable topic. Used by multi-topic consumer + * wrappers when forwarding from a per-topic consumer's queue into the shared + * multiplexed queue: the message id picks up the parent for ack routing, and + * {@link #topic()} starts returning the parent. + */ + MessageV5 withTopicOverride(String parentTopic) { + MessageIdV5 newId = new MessageIdV5(messageId.v4MessageId(), messageId.segmentId(), + messageId.positionVector(), parentTopic); + return new MessageV5<>(v4Message, newId, parentTopic); + } + + /** Underlying v4 message — exposed to multi-topic wrappers that re-build with a new id. */ + org.apache.pulsar.client.api.Message v4Message() { + return v4Message; } @Override @@ -98,7 +132,10 @@ public Optional producerName() { @Override public String topic() { - return v4Message.getTopicName(); + // Multi-topic consumer wrappers set topicOverride to the parent scalable topic + // so the user-visible topic() matches the topic they subscribed to (the + // v4 message carries the internal segment topic). + return topicOverride != null ? topicOverride : v4Message.getTopicName(); } @Override diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java new file mode 100644 index 0000000000000..3b929f63cb908 --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in contains + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.client.api.v5.Message; +import org.apache.pulsar.client.api.v5.MessageId; +import org.apache.pulsar.client.api.v5.PulsarClientException; +import org.apache.pulsar.client.api.v5.QueueConsumer; +import org.apache.pulsar.client.api.v5.Transaction; +import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +/** + * Multi-topic {@link QueueConsumer} that subscribes to every scalable topic in a + * namespace matching a (possibly empty) set of property filters. The matching set + * follows live: when topics enter or leave the filter, the consumer attaches / + * detaches automatically via a long-lived {@link ScalableTopicsWatcher} session + * to the broker. + * + *

Internals: + *

    + *
  • One {@link ScalableQueueConsumer} per matched topic.
  • + *
  • A pump thread per topic forwards from the per-topic queue into the shared + * multiplexed queue, tagging each message with the parent topic so the + * subsequent ack can be routed back.
  • + *
  • The watcher's {@code Snapshot} replaces the active set; {@code Diff} + * applies removals (flushing acks first) before additions to handle a + * rapid remove-then-add of the same topic name.
  • + *
  • Per-topic add failures retry forever with exponential backoff (100 ms + * initial, 30 min cap).
  • + *
+ */ +final class MultiTopicQueueConsumer implements QueueConsumerImpl { + + private static final Logger LOG = Logger.get(MultiTopicQueueConsumer.class); + /** + * Cap for per-topic subscribe retries. Matches v4 consumer reconnect semantics — + * the consumer never gives up; the user just sees no messages from the topic + * until the broker / topic recovers. + */ + private static final Duration RETRY_MAX = Duration.ofMinutes(30); + private final Logger log; + + private final PulsarClientV5 client; + private final Schema v5Schema; + private final ConsumerConfigurationData consumerConf; + private final NamespaceName namespace; + private final Map propertyFilters; + private final String subscriptionName; + + private final ScalableTopicsWatcher watcher; + private final ConcurrentHashMap> perTopic = new ConcurrentHashMap<>(); + private final LinkedTransferQueue> mux = new LinkedTransferQueue<>(); + + private volatile boolean closed = false; + private final AsyncQueueConsumerV5 asyncView; + + private MultiTopicQueueConsumer(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters, + ScalableTopicsWatcher watcher) { + this.client = client; + this.v5Schema = v5Schema; + this.consumerConf = consumerConf; + this.namespace = namespace; + this.propertyFilters = propertyFilters; + this.subscriptionName = consumerConf.getSubscriptionName(); + this.watcher = watcher; + this.log = LOG.with() + .attr("namespace", namespace) + .attr("subscription", subscriptionName) + .attr("filters", propertyFilters) + .build(); + this.asyncView = new AsyncQueueConsumerV5<>(this); + } + + static CompletableFuture> createAsync(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters) { + ScalableTopicsWatcher watcher = new ScalableTopicsWatcher( + client.v4Client(), namespace, propertyFilters); + MultiTopicQueueConsumer consumer = new MultiTopicQueueConsumer<>( + client, v5Schema, consumerConf, namespace, propertyFilters, watcher); + return watcher.start() + .thenCompose(initial -> consumer.openInitial(initial)) + .thenApply(__ -> { + watcher.setListener(consumer.new WatcherListener()); + return (QueueConsumer) consumer; + }) + .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { + throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); + })); + } + + /** + * Open one per-topic consumer per topic in the initial snapshot. Block on every + * future so {@code subscribeAsync} only resolves once the consumer is fully + * attached — gives the user the same all-or-nothing semantics as the + * single-topic builder. + */ + private CompletableFuture openInitial(List topics) { + if (topics.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + List> opens = new ArrayList<>(topics.size()); + for (String t : topics) { + opens.add(openTopic(t, /* retry= */ false)); + } + return CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new)); + } + + /** + * Subscribe to one topic. When {@code retry} is true, failures schedule a + * background retry with exponential backoff; the returned future completes as + * soon as the first attempt finishes (success or failure) so we don't hold up + * Snapshot / Diff processing. + */ + private CompletableFuture openTopic(String topicName, boolean retry) { + if (closed) { + return CompletableFuture.completedFuture(null); + } + if (perTopic.containsKey(topicName)) { + return CompletableFuture.completedFuture(null); + } + TopicName topic = V5Utils.asScalableTopicName(topicName); + DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic); + // Per-topic message sink: tag each delivered message with the parent scalable + // topic for ack routing + display, and forward into the shared mux. No pump + // thread; per-segment v4 receive loops fire this sink directly. + java.util.function.Consumer> sink = msg -> { + if (!closed) { + mux.add(msg.withTopicOverride(topicName)); + } + }; + return dagWatch.start() + .thenCompose(layout -> ScalableQueueConsumer.createAsyncImpl( + client, v5Schema, perTopicConf(topicName), dagWatch, layout, sink)) + .thenAccept(qc -> { + if (closed) { + qc.closeAsync(); + return; + } + PerTopicState state = new PerTopicState<>(topicName, qc); + PerTopicState existing = perTopic.putIfAbsent(topicName, state); + if (existing != null) { + // Concurrent open; drop the dup. + qc.closeAsync(); + return; + } + log.info().attr("topic", topicName).log("Per-topic consumer attached"); + }) + .exceptionally(ex -> { + Throwable cause = ex instanceof CompletionException ce && ce.getCause() != null + ? ce.getCause() : ex; + if (retry && !closed) { + scheduleRetry(topicName); + } + log.warn().attr("topic", topicName).exceptionMessage(cause) + .log("Per-topic subscribe failed"); + return null; + }); + } + + private void scheduleRetry(String topicName) { + long delayMs = nextBackoff(topicName); + log.info().attr("topic", topicName).attr("delayMs", delayMs) + .log("Retrying per-topic subscribe after backoff"); + client.v4Client().timer().newTimeout(timeout -> openTopic(topicName, /* retry= */ true), + delayMs, TimeUnit.MILLISECONDS); + } + + private final ConcurrentHashMap retryDelays = new ConcurrentHashMap<>(); + + /** Returns the next exponential-backoff delay (ms) for a topic and updates the state. */ + private long nextBackoff(String topicName) { + AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new AtomicLong(100)); + long current = al.get(); + long next = Math.min(current * 2, RETRY_MAX.toMillis()); + al.set(next); + return current; + } + + private void resetBackoff(String topicName) { + retryDelays.remove(topicName); + } + + /** + * Clone the user's consumer config for a per-topic consumer. Each per-topic + * consumer needs a unique {@code consumerName} so the broker can disambiguate + * them on the same subscription; we suffix with the topic name. + */ + private ConsumerConfigurationData perTopicConf(String topicName) { + var conf = consumerConf.clone(); + if (consumerConf.getConsumerName() != null) { + // Disambiguate across topics — the broker side cares about uniqueness on + // the same Shared subscription per segment. + String localName = TopicName.get(topicName).getLocalName(); + conf.setConsumerName(consumerConf.getConsumerName() + "-" + localName); + } + return conf; + } + + /** + * Close per-topic consumer for a topic that has dropped out of the matching set. + * No explicit ack flush — Shared subscription acks are independent per message + * and the per-topic consumer's existing close already flushes pending acks via + * its v4 segment consumers. + */ + private CompletableFuture closeTopic(String topicName) { + retryDelays.remove(topicName); + PerTopicState state = perTopic.remove(topicName); + if (state == null) { + return CompletableFuture.completedFuture(null); + } + return state.consumer.closeAsync() + .thenRun(() -> log.info().attr("topic", topicName) + .log("Per-topic consumer detached")); + } + + // --- QueueConsumer --- + + @Override + public String topic() { + return "namespace://" + namespace; + } + + @Override + public String subscription() { + return subscriptionName; + } + + @Override + public String consumerName() { + return consumerConf.getConsumerName(); + } + + @Override + public Message receive() throws PulsarClientException { + try { + return mux.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public Message receive(Duration timeout) throws PulsarClientException { + try { + return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public void acknowledge(MessageId messageId) { + routeAck(messageId, ptc -> ptc.acknowledge(messageId)); + } + + @Override + public void acknowledge(MessageId messageId, Transaction txn) { + routeAck(messageId, ptc -> ptc.acknowledge(messageId, txn)); + } + + @Override + public void negativeAcknowledge(MessageId messageId) { + routeAck(messageId, ptc -> ptc.negativeAcknowledge(messageId)); + } + + /** Look up the per-topic consumer via the parent topic tag and delegate. */ + private void routeAck(MessageId messageId, java.util.function.Consumer> action) { + if (!(messageId instanceof MessageIdV5 id)) { + throw new IllegalArgumentException("Expected MessageIdV5, got: " + messageId.getClass()); + } + String parent = id.parentTopic(); + if (parent == null) { + throw new IllegalStateException("MessageIdV5 missing parent topic — was the message" + + " delivered through a multi-topic consumer?"); + } + PerTopicState state = perTopic.get(parent); + if (state == null) { + // Topic was removed between deliver and ack. Fine — broker has dropped the + // session for that topic. Drop the ack silently. + log.debug().attr("topic", parent) + .log("Ack for removed topic; dropping"); + return; + } + action.accept(state.consumer); + } + + @Override + public AsyncQueueConsumer async() { + return asyncView; + } + + @Override + public void close() throws PulsarClientException { + try { + closeAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Close interrupted", e); + } catch (ExecutionException e) { + throw new PulsarClientException(e.getCause()); + } + } + + @Override + public CompletableFuture> receiveAsync() { + return CompletableFuture.supplyAsync(() -> { + try { + return receive(); + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public CompletableFuture closeAsync() { + if (closed) { + return CompletableFuture.completedFuture(null); + } + closed = true; + watcher.close(); + List> closes = new ArrayList<>(); + for (var topic : new HashSet<>(perTopic.keySet())) { + closes.add(closeTopic(topic)); + } + return CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new)); + } + + // --- Watcher listener --- + + private final class WatcherListener implements ScalableTopicsWatcher.Listener { + @Override + public void onSnapshot(List topics) { + // Reconcile: open anything new, close anything missing. Same as Diff but + // computed from the full snapshot — used on reconnect when broker hash + // differs from ours. + Set target = new HashSet<>(topics); + Set current = new HashSet<>(perTopic.keySet()); + // Remove first so a rapid remove-then-add of same name closes-then-reopens. + for (String t : current) { + if (!target.contains(t)) { + closeTopic(t); + } + } + for (String t : target) { + if (!current.contains(t)) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + @Override + public void onDiff(List added, List removed) { + // Apply removed before added — covers rapid remove-then-add of same name. + for (String t : removed) { + closeTopic(t); + } + for (String t : added) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + // --- Per-topic state --- + + /** + * Per-topic bookkeeping. Messages flow directly into the shared mux via the + * sink the wrapper installed on the per-topic consumer at create-time, so + * there's no pump thread to start/stop here — just hold a reference to the + * underlying consumer for ack routing and clean shutdown. + */ + private static final class PerTopicState { + private final String parentTopic; + private final ScalableQueueConsumer consumer; + + PerTopicState(String parentTopic, ScalableQueueConsumer consumer) { + this.parentTopic = parentTopic; + this.consumer = consumer; + } + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java new file mode 100644 index 0000000000000..e49b755cca471 --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.client.api.v5.Message; +import org.apache.pulsar.client.api.v5.MessageId; +import org.apache.pulsar.client.api.v5.Messages; +import org.apache.pulsar.client.api.v5.PulsarClientException; +import org.apache.pulsar.client.api.v5.StreamConsumer; +import org.apache.pulsar.client.api.v5.Transaction; +import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.ScalableConsumerType; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +/** + * Multi-topic {@link StreamConsumer} over the union of scalable topics in a + * namespace matching a (possibly empty) set of property filters. + * + *

Cumulative ack across topics works via a per-message position-vector + * snapshot: each message that enters the multiplexed queue carries + * {@code Map>} captured at enqueue time. + * On {@code acknowledgeCumulative(msg)}, the wrapper fans out to every + * per-topic consumer with the right segment vector — same semantics as the + * single-topic case, just lifted one level. + * + *

For Removed-mid-stream topics we flush acks up to {@code latestDelivered} + * for that topic before closing the per-topic consumer, so the user's + * processing-acked invariant is preserved if the topic is later re-added. + */ +final class MultiTopicStreamConsumer implements StreamConsumer { + + private static final Logger LOG = Logger.get(MultiTopicStreamConsumer.class); + /** + * Cap for per-topic subscribe retries. Matches v4 consumer reconnect semantics. + */ + private static final Duration RETRY_MAX = Duration.ofMinutes(30); + + private final Logger log; + + private final PulsarClientV5 client; + private final Schema v5Schema; + private final ConsumerConfigurationData consumerConf; + private final NamespaceName namespace; + private final Map propertyFilters; + private final String subscriptionName; + + private final ScalableTopicsWatcher watcher; + private final ConcurrentHashMap> perTopic = new ConcurrentHashMap<>(); + private final LinkedTransferQueue> mux = new LinkedTransferQueue<>(); + + /** + * Tracks the latest delivered message id per (parent topic, segment id) across + * every per-topic consumer. Snapshotted at enqueue time for each delivered + * message so cumulative ack covers everything visible up to that message. + */ + private final ConcurrentHashMap> + latestDeliveredPerTopicSegment = new ConcurrentHashMap<>(); + + private volatile boolean closed = false; + private final AsyncStreamConsumerV5Multi asyncView; + + private MultiTopicStreamConsumer(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters, + ScalableTopicsWatcher watcher) { + this.client = client; + this.v5Schema = v5Schema; + this.consumerConf = consumerConf; + this.namespace = namespace; + this.propertyFilters = propertyFilters; + this.subscriptionName = consumerConf.getSubscriptionName(); + this.watcher = watcher; + this.log = LOG.with() + .attr("namespace", namespace) + .attr("subscription", subscriptionName) + .attr("filters", propertyFilters) + .build(); + this.asyncView = new AsyncStreamConsumerV5Multi(); + } + + static CompletableFuture> createAsync(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters) { + ScalableTopicsWatcher watcher = new ScalableTopicsWatcher( + client.v4Client(), namespace, propertyFilters); + MultiTopicStreamConsumer consumer = new MultiTopicStreamConsumer<>( + client, v5Schema, consumerConf, namespace, propertyFilters, watcher); + return watcher.start() + .thenCompose(initial -> consumer.openInitial(initial)) + .thenApply(__ -> { + watcher.setListener(consumer.new WatcherListener()); + return (StreamConsumer) consumer; + }) + .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { + throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); + })); + } + + private CompletableFuture openInitial(List topics) { + if (topics.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + List> opens = new ArrayList<>(topics.size()); + for (String t : topics) { + opens.add(openTopic(t, /* retry= */ false)); + } + return CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new)); + } + + private CompletableFuture openTopic(String topicName, boolean retry) { + if (closed) { + return CompletableFuture.completedFuture(null); + } + if (perTopic.containsKey(topicName)) { + return CompletableFuture.completedFuture(null); + } + TopicName topic = V5Utils.asScalableTopicName(topicName); + // One ScalableConsumerClient session per topic, same as the single-topic builder. + ScalableConsumerClient session = new ScalableConsumerClient( + client.v4Client(), topic, + consumerConf.getSubscriptionName(), + perTopicConsumerName(topicName), + ScalableConsumerType.STREAM); + + // Per-topic message sink: each delivered message arrives with its + // single-topic positionVector (computed by ScalableStreamConsumer). Update + // our cross-topic latestDelivered map, snapshot the full cross-topic vector, + // and forward to the shared mux. No pump thread. + java.util.function.Consumer> sink = msg -> + onPerTopicMessage(topicName, msg); + + return session.start() + .thenCompose(initialAssignment -> ScalableStreamConsumer.createAsyncImpl( + client, v5Schema, perTopicConf(topicName), session, + topicName, initialAssignment, sink)) + .thenAccept(sc -> { + if (closed) { + sc.closeAsync(); + return; + } + PerTopic state = new PerTopic<>(topicName, sc); + PerTopic existing = perTopic.putIfAbsent(topicName, state); + if (existing != null) { + sc.closeAsync(); + return; + } + log.info().attr("topic", topicName).log("Per-topic stream consumer attached"); + }) + .exceptionally(ex -> { + Throwable cause = ex instanceof CompletionException ce && ce.getCause() != null + ? ce.getCause() : ex; + if (retry && !closed) { + scheduleRetry(topicName); + } + log.warn().attr("topic", topicName).exceptionMessage(cause) + .log("Per-topic stream subscribe failed"); + return null; + }); + } + + private void scheduleRetry(String topicName) { + long delayMs = nextBackoff(topicName); + log.info().attr("topic", topicName).attr("delayMs", delayMs) + .log("Retrying per-topic stream subscribe"); + client.v4Client().timer().newTimeout(timeout -> openTopic(topicName, /* retry= */ true), + delayMs, TimeUnit.MILLISECONDS); + } + + private final ConcurrentHashMap retryDelays = new ConcurrentHashMap<>(); + + private long nextBackoff(String topicName) { + AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new AtomicLong(100)); + long current = al.get(); + long next = Math.min(current * 2, RETRY_MAX.toMillis()); + al.set(next); + return current; + } + + private void resetBackoff(String topicName) { + retryDelays.remove(topicName); + } + + /** + * Per-topic consumer name. Each topic gets a distinct name so the broker's per-topic + * coordinator can register them as separate consumers (same Exclusive-per-segment + * semantics, no cross-topic identity coupling). + */ + private String perTopicConsumerName(String topicName) { + String localName = TopicName.get(topicName).getLocalName(); + if (consumerConf.getConsumerName() != null) { + return consumerConf.getConsumerName() + "-" + localName; + } + return "v5-stream-" + V5RandomIds.randomAlphanumeric(8) + "-" + localName; + } + + private ConsumerConfigurationData perTopicConf(String topicName) { + var conf = consumerConf.clone(); + conf.setConsumerName(perTopicConsumerName(topicName)); + return conf; + } + + /** + * Close per-topic consumer, flushing pending cumulative acks up to whatever was + * last delivered for that topic. If the topic later re-appears (re-Added), a + * fresh consumer subscribes and resumes from the broker-side cursor — already + * advanced past the messages we've delivered to the user. + */ + private CompletableFuture closeTopic(String topicName) { + retryDelays.remove(topicName); + PerTopic state = perTopic.remove(topicName); + if (state == null) { + return CompletableFuture.completedFuture(null); + } + // Flush: ack everything we delivered for this topic. + ConcurrentHashMap latest = + latestDeliveredPerTopicSegment.remove(topicName); + if (latest != null && !latest.isEmpty()) { + state.consumer.ackUpToVector(new HashMap<>(latest)); + } + return state.consumer.closeAsync() + .thenRun(() -> log.info().attr("topic", topicName) + .log("Per-topic stream consumer detached")); + } + + // --- StreamConsumer --- + + @Override + public String topic() { + return "namespace://" + namespace; + } + + @Override + public String subscription() { + return subscriptionName; + } + + @Override + public String consumerName() { + return consumerConf.getConsumerName(); + } + + @Override + public Message receive() throws PulsarClientException { + try { + return mux.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public Message receive(Duration timeout) throws PulsarClientException { + try { + return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public Messages receiveMulti(int maxNumMessages, Duration timeout) throws PulsarClientException { + // Block for up to `timeout` waiting for the first message, then drain whatever + // else is immediately available up to maxNumMessages. Same shape as the single + // topic StreamConsumer. + long deadline = System.nanoTime() + timeout.toNanos(); + List> batch = new ArrayList<>(); + try { + long remaining = deadline - System.nanoTime(); + while (batch.size() < maxNumMessages && remaining > 0) { + MessageV5 msg = mux.poll(remaining, TimeUnit.NANOSECONDS); + if (msg == null) { + break; + } + batch.add(msg); + remaining = deadline - System.nanoTime(); + } + // Opportunistic drain of anything else already queued. + List> tail = new ArrayList<>(); + mux.drainTo(tail, maxNumMessages - batch.size()); + batch.addAll(tail); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + return new MessagesV5<>(batch); + } + + @Override + public void acknowledgeCumulative(MessageId messageId) { + fanOutCumulativeAck(messageId, (sc, vector) -> sc.ackUpToVector(vector)); + } + + @Override + public void acknowledgeCumulative(MessageId messageId, Transaction txn) { + // Transactions on multi-topic are best-effort across per-topic consumers — each + // per-topic ack is independently transactional. See note in the design doc. + fanOutCumulativeAck(messageId, (sc, vector) -> sc.ackUpToVector(vector)); + } + + /** + * For a cumulative ack on a multi-topic message, look up its multi-topic vector + * and invoke the per-topic ack on every parent topic. + */ + private void fanOutCumulativeAck(MessageId messageId, + java.util.function.BiConsumer, + Map> action) { + if (!(messageId instanceof MessageIdV5 id)) { + throw new IllegalArgumentException("Expected MessageIdV5, got: " + messageId.getClass()); + } + Map> vector = id.multiTopicVector(); + if (vector == null) { + throw new IllegalStateException("MessageIdV5 missing multi-topic vector — was the" + + " message delivered through a multi-topic stream consumer?"); + } + for (var entry : vector.entrySet()) { + PerTopic state = perTopic.get(entry.getKey()); + if (state == null) { + // Topic was Removed since enqueue; closeTopic already flushed. + continue; + } + action.accept(state.consumer, entry.getValue()); + } + } + + @Override + public AsyncStreamConsumer async() { + return asyncView; + } + + @Override + public void close() throws PulsarClientException { + try { + closeAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Close interrupted", e); + } catch (ExecutionException e) { + throw new PulsarClientException(e.getCause()); + } + } + + CompletableFuture closeAsync() { + if (closed) { + return CompletableFuture.completedFuture(null); + } + closed = true; + watcher.close(); + List> closes = new ArrayList<>(); + for (var topic : new HashSet<>(perTopic.keySet())) { + closes.add(closeTopic(topic)); + } + return CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new)); + } + + // --- Watcher listener --- + + private final class WatcherListener implements ScalableTopicsWatcher.Listener { + @Override + public void onSnapshot(List topics) { + Set target = new HashSet<>(topics); + Set current = new HashSet<>(perTopic.keySet()); + for (String t : current) { + if (!target.contains(t)) { + closeTopic(t); + } + } + for (String t : target) { + if (!current.contains(t)) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + @Override + public void onDiff(List added, List removed) { + for (String t : removed) { + closeTopic(t); + } + for (String t : added) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + /** + * Per-topic message handler installed as the sink on each per-topic + * {@link ScalableStreamConsumer}. The single-topic consumer has already + * computed its per-segment position vector and stored it on the inbound + * message id; we adopt that as the per-topic slice of our cross-topic + * vector, snapshot the full map, and forward into the shared mux. + * + *

Runs on the netty IO thread that delivered the per-segment message — + * the only contention is the synchronized snapshot block which guards + * against torn cross-topic views during concurrent deliveries. + */ + private void onPerTopicMessage(String parentTopic, MessageV5 msg) { + if (closed) { + return; + } + MessageIdV5 origId = (MessageIdV5) msg.id(); + + // Adopt the message's own positionVector as our per-topic latest-delivered + // slice. ScalableStreamConsumer maintained the increasing invariant on + // each segment id; merging via putAll keeps the property cross-topic. + ConcurrentHashMap ours = + latestDeliveredPerTopicSegment.computeIfAbsent(parentTopic, + k -> new ConcurrentHashMap<>()); + ours.putAll(origId.positionVector()); + + // Snapshot the cross-topic vector under lock so concurrent deliveries + // can't observe a torn view. + Map> snapshot; + synchronized (latestDeliveredPerTopicSegment) { + snapshot = new HashMap<>(latestDeliveredPerTopicSegment.size()); + for (var e : latestDeliveredPerTopicSegment.entrySet()) { + snapshot.put(e.getKey(), new HashMap<>(e.getValue())); + } + } + + MessageIdV5 newId = new MessageIdV5( + origId.v4MessageId(), origId.segmentId(), + origId.positionVector(), parentTopic, snapshot); + mux.add(new MessageV5<>(msg.v4Message(), newId, parentTopic)); + } + + // --- Per-topic state --- + + /** + * Per-topic bookkeeping. Messages flow into the shared mux directly via the + * sink installed on the per-topic consumer at create-time, so there's no + * pump thread to manage — this is just a holder for ack routing and clean + * shutdown. + */ + private static final class PerTopic { + private final String parentTopic; + private final ScalableStreamConsumer consumer; + + PerTopic(String parentTopic, ScalableStreamConsumer consumer) { + this.parentTopic = parentTopic; + this.consumer = consumer; + } + } + + // --- Async view --- + + private final class AsyncStreamConsumerV5Multi implements AsyncStreamConsumer { + @Override + public CompletableFuture> receive() { + return CompletableFuture.supplyAsync(() -> { + try { + return MultiTopicStreamConsumer.this.receive(); + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public CompletableFuture> receive(Duration timeout) { + return CompletableFuture.supplyAsync(() -> { + try { + return MultiTopicStreamConsumer.this.receive(timeout); + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public CompletableFuture>> receiveMulti(int maxNumMessages, Duration timeout) { + return CompletableFuture.supplyAsync(() -> { + try { + Messages ms = MultiTopicStreamConsumer.this.receiveMulti(maxNumMessages, timeout); + List> out = new ArrayList<>(); + for (Message m : ms) { + out.add(m); + } + return out; + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public void acknowledgeCumulative(MessageId messageId) { + MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId); + } + + @Override + public void acknowledgeCumulative(MessageId messageId, Transaction txn) { + MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId, txn); + } + + @Override + public CompletableFuture close() { + return MultiTopicStreamConsumer.this.closeAsync(); + } + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java index a762964c7d75e..a268e70219b9d 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java @@ -19,11 +19,9 @@ package org.apache.pulsar.client.impl.v5; import java.time.Duration; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; import org.apache.pulsar.client.api.v5.PulsarClientException; import org.apache.pulsar.client.api.v5.QueueConsumer; import org.apache.pulsar.client.api.v5.QueueConsumerBuilder; @@ -34,6 +32,7 @@ import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; import org.apache.pulsar.client.api.v5.schema.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; /** @@ -44,7 +43,11 @@ final class QueueConsumerBuilderV5 implements QueueConsumerBuilder { private final PulsarClientV5 client; private final Schema v5Schema; private final ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + // Exactly one of {topicName, namespaceName} must be set at subscribe() time — + // single-topic vs multi-topic mode. private String topicName; + private NamespaceName namespaceName; + private Map propertyFilters; QueueConsumerBuilderV5(PulsarClientV5 client, Schema v5Schema) { this.client = client; @@ -65,48 +68,44 @@ public QueueConsumer subscribe() throws PulsarClientException { @Override public CompletableFuture> subscribeAsync() { - if (topicName == null || topicName.isEmpty()) { + boolean topicSet = topicName != null && !topicName.isEmpty(); + boolean namespaceSet = namespaceName != null; + if (topicSet == namespaceSet) { return CompletableFuture.failedFuture( - new PulsarClientException.InvalidConfigurationException("Topic name is required")); + new PulsarClientException.InvalidConfigurationException( + "Exactly one of .topic(name) or .namespace(...) must be set")); } if (conf.getSubscriptionName() == null || conf.getSubscriptionName().isEmpty()) { return CompletableFuture.failedFuture( new PulsarClientException.InvalidConfigurationException("Subscription name is required")); } + if (namespaceSet) { + return MultiTopicQueueConsumer.createAsync( + client, v5Schema, conf, namespaceName, propertyFilters); + } TopicName topic = V5Utils.asScalableTopicName(topicName); DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic); - return dagWatch.start() .thenCompose(initialLayout -> ScalableQueueConsumer.createAsync( client, v5Schema, conf, dagWatch, initialLayout)); } @Override - public QueueConsumerBuilderV5 topic(String... topicNames) { - if (topicNames.length > 0) { - this.topicName = topicNames[0]; - } + public QueueConsumerBuilderV5 topic(String topicName) { + this.topicName = topicName; return this; } @Override - public QueueConsumerBuilderV5 topics(List topicNames) { - if (!topicNames.isEmpty()) { - this.topicName = topicNames.get(0); - } - return this; + public QueueConsumerBuilderV5 namespace(String namespace) { + return namespace(namespace, Map.of()); } @Override - public QueueConsumerBuilderV5 topicsPattern(Pattern pattern) { - conf.setTopicsPattern(pattern); - return this; - } - - @Override - public QueueConsumerBuilderV5 topicsPattern(String regex) { - conf.setTopicsPattern(Pattern.compile(regex)); + public QueueConsumerBuilderV5 namespace(String namespace, Map propertyFilters) { + this.namespaceName = NamespaceName.get(namespace); + this.propertyFilters = propertyFilters == null ? Map.of() : Map.copyOf(propertyFilters); return this; } @@ -206,12 +205,6 @@ public QueueConsumerBuilderV5 deadLetterPolicy(DeadLetterPolicy policy) { return this; } - @Override - public QueueConsumerBuilderV5 patternAutoDiscoveryPeriod(Duration interval) { - conf.setPatternAutoDiscoveryPeriod((int) interval.getSeconds()); - return this; - } - @Override public QueueConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) { conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader())); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java new file mode 100644 index 0000000000000..61536f5b44283 --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.v5.Message; +import org.apache.pulsar.client.api.v5.QueueConsumer; + +/** + * Internal extension of {@link QueueConsumer} that exposes the async hooks + * needed by {@link AsyncQueueConsumerV5}. Implemented by both + * {@link ScalableQueueConsumer} (single-topic) and {@link MultiTopicQueueConsumer} + * so the async wrapper works against either without duplication. + */ +interface QueueConsumerImpl extends QueueConsumer { + CompletableFuture> receiveAsync(); + + CompletableFuture closeAsync(); +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java index 61f5e87644d5f..31cb556b70e32 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java @@ -51,7 +51,7 @@ * Individual acknowledgments and negative acknowledgments are routed to * the correct segment consumer via the segment ID in {@link MessageIdV5}. */ -final class ScalableQueueConsumer implements QueueConsumer, DagWatchClient.LayoutChangeListener { +final class ScalableQueueConsumer implements QueueConsumerImpl, DagWatchClient.LayoutChangeListener { private static final Logger LOG = Logger.get(ScalableQueueConsumer.class); private final Logger log; @@ -72,6 +72,13 @@ final class ScalableQueueConsumer implements QueueConsumer, DagWatchClient private final ConcurrentHashMap>> segmentConsumers = new ConcurrentHashMap<>(); private final LinkedTransferQueue> messageQueue = new LinkedTransferQueue<>(); + /** + * Where each per-segment receive loop deposits a freshly-arrived message. Defaults + * to enqueueing on {@link #messageQueue} for the user's {@link #receive()} to pull; + * the multi-topic wrapper overrides this to forward directly into its shared + * multiplexed queue, so no per-topic pump thread is needed. + */ + private final java.util.function.Consumer> messageSink; private volatile boolean closed = false; private final AsyncQueueConsumerV5 asyncView; @@ -88,7 +95,8 @@ final class ScalableQueueConsumer implements QueueConsumer, DagWatchClient private ScalableQueueConsumer(PulsarClientV5 client, Schema v5Schema, ConsumerConfigurationData consumerConf, - DagWatchClient dagWatch) { + DagWatchClient dagWatch, + java.util.function.Consumer> messageSink) { this.client = client; this.v5Schema = v5Schema; this.v4Schema = SchemaAdapter.toV4(v5Schema); @@ -96,6 +104,10 @@ private ScalableQueueConsumer(PulsarClientV5 client, this.dagWatch = dagWatch; this.topicName = dagWatch.topicName().toString(); this.subscriptionName = consumerConf.getSubscriptionName(); + // Default sink enqueues on the local messageQueue for receive()/receive(timeout). + // Multi-topic mode passes a sink that forwards into the shared mux instead — no + // per-topic pump thread needed. + this.messageSink = messageSink != null ? messageSink : messageQueue::add; this.log = LOG.with().attr("topic", topicName).attr("subscription", subscriptionName).build(); this.asyncView = new AsyncQueueConsumerV5<>(this); } @@ -111,11 +123,29 @@ static CompletableFuture> createAsync(PulsarClientV5 client ConsumerConfigurationData consumerConf, DagWatchClient dagWatch, ClientSegmentLayout initialLayout) { - ScalableQueueConsumer consumer = new ScalableQueueConsumer<>(client, v5Schema, consumerConf, dagWatch); + return createAsyncImpl(client, v5Schema, consumerConf, dagWatch, initialLayout, null) + .thenApply(c -> c); + } + + /** + * Like {@link #createAsync} but resolves to the concrete impl type and accepts an + * optional external message sink. Used by {@link MultiTopicQueueConsumer}: it + * passes a sink that forwards into the shared multiplexed queue, so per-segment + * v4 receive loops deliver messages to the wrapper without any pump thread. + */ + static CompletableFuture> createAsyncImpl( + PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + DagWatchClient dagWatch, + ClientSegmentLayout initialLayout, + java.util.function.Consumer> messageSink) { + ScalableQueueConsumer consumer = new ScalableQueueConsumer<>( + client, v5Schema, consumerConf, dagWatch, messageSink); return consumer.subscribeSegments(initialLayout) .thenApply(__ -> { dagWatch.setListener(consumer); - return (QueueConsumer) consumer; + return consumer; }) .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); @@ -209,7 +239,8 @@ public void close() throws PulsarClientException { // --- Async internals --- - CompletableFuture> receiveAsync() { + @Override + public CompletableFuture> receiveAsync() { return CompletableFuture.supplyAsync(() -> { try { return receive(); @@ -219,7 +250,8 @@ CompletableFuture> receiveAsync() { }); } - CompletableFuture closeAsync() { + @Override + public CompletableFuture closeAsync() { closed = true; dagWatch.close(); @@ -358,7 +390,7 @@ private CompletableFuture> createSegmen private void startReceiveLoop(org.apache.pulsar.client.api.Consumer v4Consumer, long segmentId) { v4Consumer.receiveAsync().thenAccept(v4Msg -> { - messageQueue.add(new MessageV5<>(v4Msg, segmentId)); + messageSink.accept(new MessageV5<>(v4Msg, segmentId)); if (!closed) { startReceiveLoop(v4Consumer, segmentId); } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java index ddffea7d84d33..afd703ae9a445 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java @@ -86,6 +86,13 @@ final class ScalableStreamConsumer new ConcurrentHashMap<>(); private final LinkedTransferQueue> messageQueue = new LinkedTransferQueue<>(); + /** + * Where each per-segment receive loop deposits a freshly-arrived message. Defaults + * to enqueueing on {@link #messageQueue} for the user's {@link #receive()} to pull; + * the multi-topic wrapper overrides this to forward into its shared multiplexed + * queue, applying its own multi-topic position-vector capture in the process. + */ + private final java.util.function.Consumer> messageSink; private volatile boolean closed = false; private final AsyncStreamConsumerV5 asyncView; @@ -94,7 +101,8 @@ private ScalableStreamConsumer(PulsarClientV5 client, Schema v5Schema, ConsumerConfigurationData consumerConf, ScalableConsumerClient session, - String topicName) { + String topicName, + java.util.function.Consumer> messageSink) { this.client = client; this.v5Schema = v5Schema; this.v4Schema = SchemaAdapter.toV4(v5Schema); @@ -102,6 +110,7 @@ private ScalableStreamConsumer(PulsarClientV5 client, this.session = session; this.topicName = topicName; this.subscriptionName = consumerConf.getSubscriptionName(); + this.messageSink = messageSink != null ? messageSink : messageQueue::add; this.log = LOG.with().attr("topic", topicName).attr("subscription", subscriptionName).build(); this.asyncView = new AsyncStreamConsumerV5<>(this); } @@ -118,18 +127,54 @@ static CompletableFuture> createAsync(PulsarClientV5 clien ScalableConsumerClient session, String topicName, List initialAssignment) { + return createAsyncImpl(client, v5Schema, consumerConf, session, topicName, initialAssignment, null) + .thenApply(c -> c); + } + + /** + * Like {@link #createAsync} but resolves to the concrete impl type and accepts an + * optional external message sink. Used by {@link MultiTopicStreamConsumer}: it + * passes a sink that forwards into the shared multiplexed queue, replacing the + * per-topic pump thread with direct delivery. + */ + static CompletableFuture> createAsyncImpl( + PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + ScalableConsumerClient session, + String topicName, + List initialAssignment, + java.util.function.Consumer> messageSink) { ScalableStreamConsumer consumer = new ScalableStreamConsumer<>( - client, v5Schema, consumerConf, session, topicName); + client, v5Schema, consumerConf, session, topicName, messageSink); return consumer.subscribeAssigned(initialAssignment) .thenApply(__ -> { session.setListener(consumer); - return (StreamConsumer) consumer; + return consumer; }) .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); })); } + /** + * Multi-topic ack hook. Synthesises a {@link MessageIdV5} carrying the supplied + * vector and routes it through the regular cumulative-ack path so segments are + * acked up to the recorded positions. Used by {@link MultiTopicStreamConsumer} + * to fan out a cumulative ack across every per-topic consumer. + */ + void ackUpToVector(java.util.Map vector) { + if (vector == null || vector.isEmpty()) { + return; + } + // The constructed id only needs the positionVector; v4MessageId / segmentId are + // unused on the cumulative-ack path. Pick any value for the non-vector slots — + // earliest is convenient and won't accidentally satisfy a peer's check. + var synthetic = new MessageIdV5(org.apache.pulsar.client.api.MessageId.earliest, + MessageIdV5.NO_SEGMENT, vector); + acknowledgeCumulative(synthetic); + } + @Override public String topic() { return topicName; @@ -358,7 +403,7 @@ private void startReceiveLoop(org.apache.pulsar.client.api.Consumer v4Consume // Create the V5 message with the position vector embedded in the ID var msgId = new MessageIdV5(v4Msg.getMessageId(), segmentId, positionVector); - messageQueue.add(new MessageV5<>(v4Msg, msgId)); + messageSink.accept(new MessageV5<>(v4Msg, msgId)); if (!closed) { startReceiveLoop(v4Consumer, segmentId); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java index d3c356ecb554c..6b9f6b67e0736 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java @@ -42,7 +42,11 @@ final class StreamConsumerBuilderV5 implements StreamConsumerBuilder { private final PulsarClientV5 client; private final Schema v5Schema; private final ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + // Exactly one of {topicName, namespaceName} must be set at subscribe() time — + // single-topic vs multi-topic mode. private String topicName; + private org.apache.pulsar.common.naming.NamespaceName namespaceName; + private Map propertyFilters; StreamConsumerBuilderV5(PulsarClientV5 client, Schema v5Schema) { this.client = client; @@ -63,21 +67,29 @@ public StreamConsumer subscribe() throws PulsarClientException { @Override public CompletableFuture> subscribeAsync() { - if (topicName == null || topicName.isEmpty()) { + boolean topicSet = topicName != null && !topicName.isEmpty(); + boolean namespaceSet = namespaceName != null; + if (topicSet == namespaceSet) { return CompletableFuture.failedFuture( - new PulsarClientException.InvalidConfigurationException("Topic name is required")); + new PulsarClientException.InvalidConfigurationException( + "Exactly one of .topic(name) or .namespace(...) must be set")); } if (conf.getSubscriptionName() == null || conf.getSubscriptionName().isEmpty()) { return CompletableFuture.failedFuture( new PulsarClientException.InvalidConfigurationException("Subscription name is required")); } - - TopicName topic = V5Utils.asScalableTopicName(topicName); // Default the consumer name to a stable random when the user didn't set one — // ScalableConsumerClient uses it as the registration key with the controller. if (conf.getConsumerName() == null || conf.getConsumerName().isEmpty()) { conf.setConsumerName("v5-stream-" + V5RandomIds.randomAlphanumeric(8)); } + + if (namespaceSet) { + return MultiTopicStreamConsumer.createAsync( + client, v5Schema, conf, namespaceName, propertyFilters); + } + + TopicName topic = V5Utils.asScalableTopicName(topicName); ScalableConsumerClient session = new ScalableConsumerClient( client.v4Client(), topic, @@ -91,10 +103,20 @@ public CompletableFuture> subscribeAsync() { } @Override - public StreamConsumerBuilderV5 topic(String... topicNames) { - if (topicNames.length > 0) { - this.topicName = topicNames[0]; - } + public StreamConsumerBuilderV5 topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public StreamConsumerBuilderV5 namespace(String namespace) { + return namespace(namespace, Map.of()); + } + + @Override + public StreamConsumerBuilderV5 namespace(String namespace, Map propertyFilters) { + this.namespaceName = org.apache.pulsar.common.naming.NamespaceName.get(namespace); + this.propertyFilters = propertyFilters == null ? Map.of() : Map.copyOf(propertyFilters); return this; } diff --git a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java index 577023070adf8..ebdbf73c0b4c6 100644 --- a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java +++ b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java @@ -168,6 +168,64 @@ public void testFromByteArrayRejectsNull() { assertThrows(java.io.IOException.class, () -> MessageIdV5.fromByteArray(null)); } + @Test + public void testRoundtripWithParentTopic() throws Exception { + // Multi-topic queue consumer ids carry a parent-topic tag for ack routing + // but no cross-topic vector. Both must survive serialisation. + MessageIdV5 original = new MessageIdV5(v4(11, 22, 0), 3L, + Map.of(0L, v4(1, 2, 0)), + "topic://tenant/ns/my-topic"); + + MessageIdV5 decoded = MessageIdV5.fromByteArray(original.toByteArray()); + + assertEquals(decoded.segmentId(), 3L); + assertEquals(decoded.parentTopic(), "topic://tenant/ns/my-topic"); + assertEquals(decoded.positionVector().size(), 1); + // No multi-topic vector was set; round-trip must preserve null. + assertEquals(decoded.multiTopicVector(), null); + } + + @Test + public void testRoundtripWithMultiTopicVector() throws Exception { + // Multi-topic stream consumer ids carry a cross-topic position vector and + // the parent topic. Whole tree must survive byte-level round trip so an + // application that serialises the id and sends it through can still call + // acknowledgeCumulative against the right per-topic / per-segment positions. + Map> multi = Map.of( + "topic://tenant/ns/a", Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)), + "topic://tenant/ns/b", Map.of(0L, v4(3, 3, 0))); + MessageIdV5 original = new MessageIdV5(v4(99, 100, 0), 5L, + Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)), + "topic://tenant/ns/a", + multi); + + MessageIdV5 decoded = MessageIdV5.fromByteArray(original.toByteArray()); + + assertEquals(decoded.segmentId(), 5L); + assertEquals(decoded.parentTopic(), "topic://tenant/ns/a"); + assertEquals(decoded.positionVector().size(), 2); + assertNotNull(decoded.multiTopicVector()); + assertEquals(decoded.multiTopicVector().keySet(), + java.util.Set.of("topic://tenant/ns/a", "topic://tenant/ns/b")); + assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/a"), + Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0))); + assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/b"), + Map.of(0L, v4(3, 3, 0))); + } + + @Test + public void testRoundtripPreservesNullMultiTopicVector() throws Exception { + // Single-topic ids leave both new fields null; we must not accidentally + // hydrate them on decode. + MessageIdV5 original = new MessageIdV5(v4(7, 8, 0), 1L, + Map.of(0L, v4(1, 2, 0))); + + MessageIdV5 decoded = MessageIdV5.fromByteArray(original.toByteArray()); + + assertEquals(decoded.parentTopic(), null); + assertEquals(decoded.multiTopicVector(), null); + } + @Test public void testFromByteArrayRejectsTooShort() { assertThrows(java.io.IOException.class, () -> MessageIdV5.fromByteArray(new byte[3])); From 4d32a98b6943250e68ce560b79c3e81220c7a767 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 1 May 2026 17:50:54 -0700 Subject: [PATCH 2/3] Fix typo in MultiTopicQueueConsumer license header --- .../apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java index 3b929f63cb908..102d6de6c526f 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java @@ -4,7 +4,7 @@ * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in contains + * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 From 1c7db38cf1315375a5e606a5e147bb5216a24d87 Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Fri, 1 May 2026 18:09:58 -0700 Subject: [PATCH 3/3] Update Examples.java to use the new namespace builder API MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The CI build's compileTestJava on pulsar-client-api-v5 surfaced that the v5 Examples doc still referenced .topicsPattern + .patternAutoDiscoveryPeriod, which were dropped from the builder in this PR. Replace the example with .namespace(ns, propertyFilters) — the same shape the multi-topic QueueConsumer is meant to be used with. --- .../apache/pulsar/client/api/v5/Examples.java | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java index 2cc17d67b68f3..582a41a0574fd 100644 --- a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java +++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.v5.async.AsyncProducer; import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer; @@ -394,15 +395,21 @@ void delayedDelivery(PulsarClient client) throws Exception { } // ================================================================================== - // 8. Multi-topic queue consumer with pattern + // 8. Multi-topic queue consumer over a namespace, optionally filtered by property // ================================================================================== - /** Subscribe to all topics matching a pattern. */ - void patternSubscription(PulsarClient client) throws Exception { + /** + * Subscribe to every scalable topic in a namespace whose properties match the + * given key/value pairs. The matching set follows live: when topics are created + * with matching properties, the consumer attaches automatically; when they're + * deleted or change properties out of the filter, it detaches. Pass + * {@code Map.of()} (or use the single-arg overload) to subscribe to every + * scalable topic in the namespace. + */ + void namespaceSubscription(PulsarClient client) throws Exception { try (var consumer = client.newQueueConsumer(Schema.string()) - .topicsPattern("persistent://public/default/events-.*") + .namespace("public/default", Map.of("kind", "events")) .subscriptionName("all-events") - .patternAutoDiscoveryPeriod(Duration.ofMinutes(1)) .subscribe()) { while (true) {