diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java new file mode 100644 index 0000000000000..4cf5f0ff666e9 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicQueueConsumerTest.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * End-to-end tests for {@link QueueConsumerBuilder#namespace}: a multi-topic + * QueueConsumer follows the matching set live, multiplexes from every per-topic + * consumer into one user-visible queue, and routes individual acks back to the + * right topic for redelivery purposes. + */ +public class V5MultiTopicQueueConsumerTest extends V5ClientBaseTest { + + private String topicName(String suffix) { + return "topic://" + getNamespace() + "/" + suffix + "-" + + UUID.randomUUID().toString().substring(0, 8); + } + + @Test + public void receivesFromAllTopicsInNamespace() throws Exception { + String topicA = topicName("a"); + String topicB = topicName("b"); + admin.scalableTopics().createScalableTopic(topicA, 1); + admin.scalableTopics().createScalableTopic(topicB, 1); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(topicA).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(topicB).create(); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-q") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Send to both topics; the multi-topic consumer must receive both sets. + Set expected = new HashSet<>(); + for (int i = 0; i < 5; i++) { + String va = "a-" + i; + String vb = "b-" + i; + pa.newMessage().value(va).send(); + pb.newMessage().value(vb).send(); + expected.add(va); + expected.add(vb); + } + + Set received = new HashSet<>(); + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < expected.size() && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + consumer.acknowledge(msg.id()); + } + } + assertEquals(received, expected, "should receive every message produced to either topic"); + } + + @Test + public void picksUpTopicCreatedAfterSubscribe() throws Exception { + // Fresh namespace, no topics yet — initial snapshot is empty. Earliest so the + // race between "topic created" and "per-topic consumer attached via Diff" can't + // drop messages produced in that window. + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-q-late") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + // Create a topic AFTER subscribe; the watcher's Diff event must trigger the + // consumer to attach. + String lateTopic = topicName("late"); + admin.scalableTopics().createScalableTopic(lateTopic, 1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(lateTopic).create(); + producer.newMessage().value("late-message").send(); + + Message msg = consumer.receive(Duration.ofSeconds(15)); + assertTrue(msg != null, "expected to receive message from late-added topic"); + assertEquals(msg.value(), "late-message"); + assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic"); + consumer.acknowledge(msg.id()); + } + + @Test + public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception { + String aliceTopic = topicName("alice"); + String bobTopic = topicName("bob"); + admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice")); + admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob")); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create(); + + @Cleanup + QueueConsumer consumer = v5Client.newQueueConsumer(Schema.string()) + .namespace(getNamespace(), Map.of("owner", "alice")) + .subscriptionName("multi-q-filter") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + pa.newMessage().value("alice-msg").send(); + pb.newMessage().value("bob-msg").send(); + + // Only alice's message reaches this consumer. + Message got = consumer.receive(Duration.ofSeconds(10)); + assertTrue(got != null, "expected one message"); + assertEquals(got.value(), "alice-msg"); + consumer.acknowledge(got.id()); + + // Confirm bob's message never arrives within a generous window. + Message empty = consumer.receive(Duration.ofSeconds(2)); + assertTrue(empty == null, "bob's message must be filtered out, got " + empty); + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java new file mode 100644 index 0000000000000..f3299b1c74bee --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/v5/V5MultiTopicStreamConsumerTest.java @@ -0,0 +1,208 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.api.v5; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.time.Duration; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import lombok.Cleanup; +import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.testng.annotations.Test; + +/** + * End-to-end tests for {@link StreamConsumerBuilder#namespace}: a multi-topic + * StreamConsumer follows the matching set live, multiplexes from every per-topic + * consumer into one user-visible queue, and supports cumulative ack across + * topics via per-message position vectors. + */ +public class V5MultiTopicStreamConsumerTest extends V5ClientBaseTest { + + private String topicName(String suffix) { + return "topic://" + getNamespace() + "/" + suffix + "-" + + UUID.randomUUID().toString().substring(0, 8); + } + + @Test + public void receivesFromAllTopicsInNamespace() throws Exception { + String topicA = topicName("a"); + String topicB = topicName("b"); + admin.scalableTopics().createScalableTopic(topicA, 1); + admin.scalableTopics().createScalableTopic(topicB, 1); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(topicA).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(topicB).create(); + + @Cleanup + StreamConsumer consumer = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + Set expected = new HashSet<>(); + for (int i = 0; i < 5; i++) { + String va = "a-" + i; + String vb = "b-" + i; + pa.newMessage().value(va).send(); + pb.newMessage().value(vb).send(); + expected.add(va); + expected.add(vb); + } + + Set received = new HashSet<>(); + MessageId last = null; + long deadline = System.currentTimeMillis() + 30_000L; + while (received.size() < expected.size() && System.currentTimeMillis() < deadline) { + Message msg = consumer.receive(Duration.ofSeconds(1)); + if (msg != null) { + received.add(msg.value()); + last = msg.id(); + } + } + assertEquals(received, expected, "should receive every message produced to either topic"); + // Cumulative ack must succeed across both per-topic consumers — exercises the + // multi-topic position vector embedded in the message id. + assertNotNull(last); + consumer.acknowledgeCumulative(last); + } + + @Test + public void picksUpTopicCreatedAfterSubscribe() throws Exception { + @Cleanup + StreamConsumer consumer = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream-late") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + String lateTopic = topicName("late"); + admin.scalableTopics().createScalableTopic(lateTopic, 1); + + @Cleanup + Producer producer = v5Client.newProducer(Schema.string()) + .topic(lateTopic).create(); + producer.newMessage().value("late-message").send(); + + Message msg = consumer.receive(Duration.ofSeconds(15)); + assertTrue(msg != null, "expected to receive message from late-added topic"); + assertEquals(msg.value(), "late-message"); + assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic"); + consumer.acknowledgeCumulative(msg.id()); + } + + @Test + public void cumulativeAckCoversEveryTopicSeenSoFar() throws Exception { + // Two topics, interleaved producers. After we cumulatively ack the LAST message, + // closing and re-subscribing must NOT redeliver any of the previous messages — + // that would only happen if the per-topic ack didn't fire for the topic that's + // not the message's own. + String topicA = topicName("a"); + String topicB = topicName("b"); + admin.scalableTopics().createScalableTopic(topicA, 1); + admin.scalableTopics().createScalableTopic(topicB, 1); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(topicA).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(topicB).create(); + + StreamConsumer first = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream-cumulative") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + int n = 5; + for (int i = 0; i < n; i++) { + pa.newMessage().value("a-" + i).send(); + pb.newMessage().value("b-" + i).send(); + } + + // Drain everything, remember the last message id. + Set drained = new HashSet<>(); + MessageId last = null; + long deadline = System.currentTimeMillis() + 20_000L; + while (drained.size() < 2 * n && System.currentTimeMillis() < deadline) { + Message msg = first.receive(Duration.ofSeconds(1)); + if (msg != null) { + drained.add(msg.value()); + last = msg.id(); + } + } + assertEquals(drained.size(), 2 * n, "first consumer should drain every message"); + assertNotNull(last); + // Cumulative ack — should fan out to BOTH topics' per-segment acks. + first.acknowledgeCumulative(last); + // Block briefly so the async ack flushes through to the broker before we close. + Thread.sleep(500); + first.close(); + + // Re-subscribe with the same name. If the cumulative ack covered both topics, + // there's nothing to re-deliver. If it only acked the message's OWN topic, the + // other topic would re-deliver from the start. + @Cleanup + StreamConsumer second = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace()) + .subscriptionName("multi-stream-cumulative") + .subscribe(); + + Message stale = second.receive(Duration.ofSeconds(2)); + assertTrue(stale == null, + "cumulative ack should have covered every topic; got redelivery: " + + (stale != null ? stale.value() : "")); + } + + @Test + public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception { + String aliceTopic = topicName("alice"); + String bobTopic = topicName("bob"); + admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice")); + admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob")); + + @Cleanup + Producer pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create(); + @Cleanup + Producer pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create(); + + @Cleanup + StreamConsumer consumer = v5Client.newStreamConsumer(Schema.string()) + .namespace(getNamespace(), Map.of("owner", "alice")) + .subscriptionName("multi-stream-filter") + .subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST) + .subscribe(); + + pa.newMessage().value("alice-msg").send(); + pb.newMessage().value("bob-msg").send(); + + Message got = consumer.receive(Duration.ofSeconds(10)); + assertTrue(got != null, "expected one message"); + assertEquals(got.value(), "alice-msg"); + + Message empty = consumer.receive(Duration.ofSeconds(2)); + assertTrue(empty == null, "bob's message must be filtered out, got " + empty); + } +} diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java index a6249efaca3f0..db03f3c546913 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/QueueConsumerBuilder.java @@ -19,10 +19,8 @@ package org.apache.pulsar.client.api.v5; import java.time.Duration; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; -import java.util.regex.Pattern; import org.apache.pulsar.client.api.v5.config.BackoffPolicy; import org.apache.pulsar.client.api.v5.config.DeadLetterPolicy; import org.apache.pulsar.client.api.v5.config.EncryptionPolicy; @@ -52,38 +50,37 @@ public interface QueueConsumerBuilder { CompletableFuture> subscribeAsync(); // --- Topic selection --- + // Either {@link #topic(String)} or {@link #namespace} must be set, not both. /** - * The topic(s) to subscribe to. + * Subscribe to a single scalable topic by name. * - * @param topicNames one or more topic names + * @param topicName the fully-qualified topic name (e.g. {@code topic://tenant/ns/name}) * @return this builder instance for chaining */ - QueueConsumerBuilder topic(String... topicNames); + QueueConsumerBuilder topic(String topicName); /** - * The topics to subscribe to. + * Subscribe to every scalable topic under a namespace. The matching set follows + * live: when topics are created in or deleted from the namespace, the consumer + * attaches / detaches automatically. * - * @param topicNames the list of topic names + * @param namespace the namespace in {@code tenant/namespace} form * @return this builder instance for chaining */ - QueueConsumerBuilder topics(List topicNames); + QueueConsumerBuilder namespace(String namespace); /** - * Subscribe to all topics matching a regex pattern. + * Subscribe to scalable topics under a namespace whose properties match every + * key/value pair in {@code propertyFilters} (AND semantics). An empty map is + * equivalent to {@link #namespace(String)} — every topic in the namespace. + * The matching set follows live as topic properties change. * - * @param pattern the compiled regex pattern to match topic names against + * @param namespace the namespace in {@code tenant/namespace} form + * @param propertyFilters property name/value pairs that all must match * @return this builder instance for chaining */ - QueueConsumerBuilder topicsPattern(Pattern pattern); - - /** - * Subscribe to all topics matching a regex pattern (string form). - * - * @param regex the regex pattern string to match topic names against - * @return this builder instance for chaining - */ - QueueConsumerBuilder topicsPattern(String regex); + QueueConsumerBuilder namespace(String namespace, Map propertyFilters); // --- Subscription --- @@ -190,16 +187,6 @@ public interface QueueConsumerBuilder { */ QueueConsumerBuilder deadLetterPolicy(DeadLetterPolicy policy); - // --- Pattern subscription --- - - /** - * How often to re-discover topics matching the pattern. - * - * @param interval the auto-discovery interval - * @return this builder instance for chaining - */ - QueueConsumerBuilder patternAutoDiscoveryPeriod(Duration interval); - // --- Encryption --- /** diff --git a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java index ad6f0df0da6d6..0963f79203523 100644 --- a/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java +++ b/pulsar-client-api-v5/src/main/java/org/apache/pulsar/client/api/v5/StreamConsumerBuilder.java @@ -48,14 +48,37 @@ public interface StreamConsumerBuilder { CompletableFuture> subscribeAsync(); // --- Required --- + // Either {@link #topic(String)} or {@link #namespace} must be set, not both. /** - * The topic(s) to subscribe to. + * Subscribe to a single scalable topic by name. * - * @param topicNames one or more topic names + * @param topicName the fully-qualified topic name (e.g. {@code topic://tenant/ns/name}) * @return this builder instance for chaining */ - StreamConsumerBuilder topic(String... topicNames); + StreamConsumerBuilder topic(String topicName); + + /** + * Subscribe to every scalable topic under a namespace. The matching set follows + * live: when topics are created in or deleted from the namespace, the consumer + * attaches / detaches automatically. + * + * @param namespace the namespace in {@code tenant/namespace} form + * @return this builder instance for chaining + */ + StreamConsumerBuilder namespace(String namespace); + + /** + * Subscribe to scalable topics under a namespace whose properties match every + * key/value pair in {@code propertyFilters} (AND semantics). An empty map is + * equivalent to {@link #namespace(String)} — every topic in the namespace. + * The matching set follows live as topic properties change. + * + * @param namespace the namespace in {@code tenant/namespace} form + * @param propertyFilters property name/value pairs that all must match + * @return this builder instance for chaining + */ + StreamConsumerBuilder namespace(String namespace, Map propertyFilters); /** * The subscription name. diff --git a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java index 2cc17d67b68f3..582a41a0574fd 100644 --- a/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java +++ b/pulsar-client-api-v5/src/test/java/org/apache/pulsar/client/api/v5/Examples.java @@ -20,6 +20,7 @@ import java.time.Duration; import java.time.Instant; +import java.util.Map; import java.util.concurrent.CompletableFuture; import org.apache.pulsar.client.api.v5.async.AsyncProducer; import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer; @@ -394,15 +395,21 @@ void delayedDelivery(PulsarClient client) throws Exception { } // ================================================================================== - // 8. Multi-topic queue consumer with pattern + // 8. Multi-topic queue consumer over a namespace, optionally filtered by property // ================================================================================== - /** Subscribe to all topics matching a pattern. */ - void patternSubscription(PulsarClient client) throws Exception { + /** + * Subscribe to every scalable topic in a namespace whose properties match the + * given key/value pairs. The matching set follows live: when topics are created + * with matching properties, the consumer attaches automatically; when they're + * deleted or change properties out of the filter, it detaches. Pass + * {@code Map.of()} (or use the single-arg overload) to subscribe to every + * scalable topic in the namespace. + */ + void namespaceSubscription(PulsarClient client) throws Exception { try (var consumer = client.newQueueConsumer(Schema.string()) - .topicsPattern("persistent://public/default/events-.*") + .namespace("public/default", Map.of("kind", "events")) .subscriptionName("all-events") - .patternAutoDiscoveryPeriod(Duration.ofMinutes(1)) .subscribe()) { while (true) { diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java index f66e12277945f..7242b5e53f772 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/AsyncQueueConsumerV5.java @@ -29,9 +29,9 @@ */ final class AsyncQueueConsumerV5 implements AsyncQueueConsumer { - private final ScalableQueueConsumer consumer; + private final QueueConsumerImpl consumer; - AsyncQueueConsumerV5(ScalableQueueConsumer consumer) { + AsyncQueueConsumerV5(QueueConsumerImpl consumer) { this.consumer = consumer; } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java index 3a39986f3c74d..97a47eff60f46 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageIdV5.java @@ -20,9 +20,16 @@ import java.io.IOException; import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; import java.util.Map; import java.util.Objects; -import org.apache.pulsar.client.api.v5.MessageId; +import org.apache.pulsar.client.api.MessageId; + +// The v5 MessageId interface and the underlying v4 MessageId class share the same simple +// name in different packages — a real conflict that Java imports can't resolve. We import +// the v4 type as `MessageId` (used heavily) and refer to v5 via FQN in the few places it +// appears (the `implements` clause and `compareTo`). /** * V5 MessageId implementation that wraps a v4 MessageId and includes a position @@ -36,17 +43,20 @@ *

For non-cumulative consumers (QueueConsumer) and readers (CheckpointConsumer), * only the single segment ID and v4 message ID are needed; the position vector is * empty. + * + *

Multi-topic consumer wrappers additionally tag the id with the parent scalable + * topic (for ack routing) and a cross-topic position vector (for cumulative ack + * across topics). Both fields are optional and round-trip through + * {@link #toByteArray} / {@link #fromByteArray}. */ -public final class MessageIdV5 implements MessageId { +public final class MessageIdV5 implements org.apache.pulsar.client.api.v5.MessageId { static final long NO_SEGMENT = -1; - static final MessageIdV5 EARLIEST = new MessageIdV5( - org.apache.pulsar.client.api.MessageId.earliest, NO_SEGMENT, Map.of()); - static final MessageIdV5 LATEST = new MessageIdV5( - org.apache.pulsar.client.api.MessageId.latest, NO_SEGMENT, Map.of()); + static final MessageIdV5 EARLIEST = new MessageIdV5(MessageId.earliest, NO_SEGMENT, Map.of()); + static final MessageIdV5 LATEST = new MessageIdV5(MessageId.latest, NO_SEGMENT, Map.of()); - private final org.apache.pulsar.client.api.MessageId v4MessageId; + private final MessageId v4MessageId; private final long segmentId; /** @@ -54,30 +64,73 @@ public final class MessageIdV5 implements MessageId { * taken at the moment this message was delivered to the application. * Used by StreamConsumer for cumulative ack across all segments. */ - private final Map positionVector; + private final Map positionVector; + + /** + * Parent scalable topic this message was delivered from. Set only by multi-topic + * consumer wrappers so they can route a subsequent ack back to the right + * per-topic consumer; {@code null} for single-topic consumers (where the consumer + * already knows its topic). + */ + private final String parentTopic; + + /** + * Cross-topic position vector for multi-topic StreamConsumer cumulative ack. + * Maps parent topic name → (segment id → latest-delivered msgId at the moment + * this message entered the multiplexed queue). {@code null} for single-topic + * messages — use {@link #positionVector()} instead. + */ + private final Map> multiTopicVector; /** - * Create a MessageIdV5 with a position vector for cumulative ack support. + * Create a MessageIdV5 with a position vector for single-topic cumulative ack. */ - public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId, + public MessageIdV5(MessageId v4MessageId, long segmentId, - Map positionVector) { - this.v4MessageId = Objects.requireNonNull(v4MessageId); - this.segmentId = segmentId; - this.positionVector = Map.copyOf(positionVector); + Map positionVector) { + this(v4MessageId, segmentId, positionVector, null, null); + } + + /** + * Constructor for multi-topic queue consumer messages: parent topic tag for ack + * routing; no cross-topic position vector (queue consumers don't do cumulative + * ack). + */ + public MessageIdV5(MessageId v4MessageId, + long segmentId, + Map positionVector, + String parentTopic) { + this(v4MessageId, segmentId, positionVector, parentTopic, null); } /** * Create a MessageIdV5 without a position vector (for individual ack / reader use). */ - public MessageIdV5(org.apache.pulsar.client.api.MessageId v4MessageId, long segmentId) { - this(v4MessageId, segmentId, Map.of()); + public MessageIdV5(MessageId v4MessageId, long segmentId) { + this(v4MessageId, segmentId, Map.of(), null, null); + } + + /** + * Full constructor for multi-topic StreamConsumer messages: parent topic + the + * cross-topic position vector captured at enqueue time. Used by the multi-topic + * stream consumer's pump. + */ + public MessageIdV5(MessageId v4MessageId, + long segmentId, + Map positionVector, + String parentTopic, + Map> multiTopicVector) { + this.v4MessageId = Objects.requireNonNull(v4MessageId); + this.segmentId = segmentId; + this.positionVector = Map.copyOf(positionVector); + this.parentTopic = parentTopic; + this.multiTopicVector = multiTopicVector; } /** * Get the underlying v4 MessageId. Package-private for internal use. */ - org.apache.pulsar.client.api.MessageId v4MessageId() { + MessageId v4MessageId() { return v4MessageId; } @@ -92,36 +145,123 @@ long segmentId() { * Get the position vector — the latest delivered message ID per segment at the * time this message was delivered. Used by StreamConsumer for cumulative ack. */ - Map positionVector() { + Map positionVector() { return positionVector; } + /** + * Parent scalable topic when this message was delivered through a multi-topic + * consumer; {@code null} for single-topic consumers. Package-private — used by + * multi-topic ack routing only. + */ + String parentTopic() { + return parentTopic; + } + + Map> multiTopicVector() { + return multiTopicVector; + } + + /** + * Wire format. All sections are length-prefixed so the reader can detect + * absent trailing sections (older serialised forms wrote only sections 1-3). + * + *

+     * 1. segmentId               : 8 bytes
+     * 2. v4MessageId             : 4-byte length + bytes
+     * 3. positionVector          : 4-byte count + repeating { 8-byte segId,
+     *                                                          4-byte len, idBytes }
+     * 4. parentTopic             : 4-byte length (-1 = null) + UTF-8 bytes
+     * 5. multiTopicVector        : 4-byte count (-1 = null) + repeating {
+     *                                4-byte topic name length, UTF-8 bytes,
+     *                                4-byte segCount, repeating { 8-byte segId,
+     *                                                              4-byte len, idBytes } }
+     * 
+ * + *

Sections 4 and 5 are present in every new id; older serialisations from a + * pre-multi-topic build are still readable — the reader treats the missing + * sections as null. + */ @Override public byte[] toByteArray() { byte[] v4Bytes = v4MessageId.toByteArray(); - // Format: [8 bytes segmentId] [4 bytes v4Length] [v4Bytes] - // [4 bytes numPositions] [for each: [8 bytes segId] [4 bytes idLen] [idBytes]] - int totalSize = 8 + 4 + v4Bytes.length + 4; - var serializedPositions = new java.util.HashMap(); - for (var entry : positionVector.entrySet()) { - byte[] idBytes = entry.getValue().toByteArray(); - serializedPositions.put(entry.getKey(), idBytes); - totalSize += 8 + 4 + idBytes.length; + + // Pre-serialise position-vector entries so we can size the buffer. + Map serializedPositions = serializeSegmentVector(positionVector); + int positionBytes = 4; // count + for (var entry : serializedPositions.entrySet()) { + positionBytes += 8 + 4 + entry.getValue().length; + } + + // Section 4: parent topic. + byte[] parentTopicBytes = parentTopic == null + ? null : parentTopic.getBytes(StandardCharsets.UTF_8); + + // Section 5: pre-serialise the multi-topic vector tree. + Map> serializedMulti = null; + int multiBytes = 4; // count or -1 + if (multiTopicVector != null) { + serializedMulti = new HashMap<>(multiTopicVector.size()); + for (var entry : multiTopicVector.entrySet()) { + byte[] topicBytes = entry.getKey().getBytes(StandardCharsets.UTF_8); + Map inner = serializeSegmentVector(entry.getValue()); + serializedMulti.put(topicBytes, inner); + multiBytes += 4 + topicBytes.length + 4; + for (var seg : inner.entrySet()) { + multiBytes += 8 + 4 + seg.getValue().length; + } + } } + int totalSize = 8 + 4 + v4Bytes.length + positionBytes + + 4 + (parentTopicBytes == null ? 0 : parentTopicBytes.length) + + multiBytes; + ByteBuffer buf = ByteBuffer.allocate(totalSize); buf.putLong(segmentId); buf.putInt(v4Bytes.length); buf.put(v4Bytes); - buf.putInt(positionVector.size()); + + buf.putInt(serializedPositions.size()); for (var entry : serializedPositions.entrySet()) { buf.putLong(entry.getKey()); buf.putInt(entry.getValue().length); buf.put(entry.getValue()); } + + if (parentTopicBytes == null) { + buf.putInt(-1); + } else { + buf.putInt(parentTopicBytes.length); + buf.put(parentTopicBytes); + } + + if (serializedMulti == null) { + buf.putInt(-1); + } else { + buf.putInt(serializedMulti.size()); + for (var entry : serializedMulti.entrySet()) { + buf.putInt(entry.getKey().length); + buf.put(entry.getKey()); + buf.putInt(entry.getValue().size()); + for (var seg : entry.getValue().entrySet()) { + buf.putLong(seg.getKey()); + buf.putInt(seg.getValue().length); + buf.put(seg.getValue()); + } + } + } return buf.array(); } + private static Map serializeSegmentVector(Map vector) { + Map out = new HashMap<>(vector.size()); + for (var entry : vector.entrySet()) { + out.put(entry.getKey(), entry.getValue().toByteArray()); + } + return out; + } + static MessageIdV5 fromByteArray(byte[] data) throws IOException { if (data == null || data.length < 12) { throw new IOException("Invalid MessageIdV5 data: too short"); @@ -134,30 +274,66 @@ static MessageIdV5 fromByteArray(byte[] data) throws IOException { } byte[] v4Bytes = new byte[v4Length]; buf.get(v4Bytes); - org.apache.pulsar.client.api.MessageId v4Id = - org.apache.pulsar.client.api.MessageId.fromByteArray(v4Bytes); + MessageId v4Id = MessageId.fromByteArray(v4Bytes); + + // Section 3: position vector (single-topic / per-segment). + Map positions = Map.of(); + if (buf.hasRemaining()) { + positions = readSegmentVector(buf); + } + + // Section 4: parent topic. Length -1 sentinel means "absent". + String parentTopic = null; + if (buf.hasRemaining()) { + int parentLen = buf.getInt(); + if (parentLen >= 0) { + if (parentLen > buf.remaining()) { + throw new IOException("Invalid MessageIdV5 data: bad parent-topic length"); + } + byte[] parentBytes = new byte[parentLen]; + buf.get(parentBytes); + parentTopic = new String(parentBytes, StandardCharsets.UTF_8); + } + } - // Read position vector if present - Map positions = Map.of(); + // Section 5: cross-topic vector. Count -1 sentinel means "absent". + Map> multiTopic = null; if (buf.hasRemaining()) { - int numPositions = buf.getInt(); - var posMap = new java.util.HashMap(); - for (int i = 0; i < numPositions; i++) { - long posSegId = buf.getLong(); - int idLen = buf.getInt(); - byte[] idBytes = new byte[idLen]; - buf.get(idBytes); - posMap.put(posSegId, - org.apache.pulsar.client.api.MessageId.fromByteArray(idBytes)); + int topicCount = buf.getInt(); + if (topicCount >= 0) { + multiTopic = new HashMap<>(topicCount); + for (int i = 0; i < topicCount; i++) { + int topicLen = buf.getInt(); + byte[] topicBytes = new byte[topicLen]; + buf.get(topicBytes); + String topic = new String(topicBytes, StandardCharsets.UTF_8); + Map inner = readSegmentVector(buf); + multiTopic.put(topic, inner); + } } - positions = posMap; } - return new MessageIdV5(v4Id, segmentId, positions); + return new MessageIdV5(v4Id, segmentId, positions, parentTopic, multiTopic); + } + + private static Map readSegmentVector(ByteBuffer buf) throws IOException { + int count = buf.getInt(); + Map out = new HashMap<>(count); + for (int i = 0; i < count; i++) { + long segId = buf.getLong(); + int idLen = buf.getInt(); + if (idLen < 0 || idLen > buf.remaining()) { + throw new IOException("Invalid MessageIdV5 data: bad inner id length"); + } + byte[] idBytes = new byte[idLen]; + buf.get(idBytes); + out.put(segId, MessageId.fromByteArray(idBytes)); + } + return out; } @Override - public int compareTo(MessageId other) { + public int compareTo(org.apache.pulsar.client.api.v5.MessageId other) { if (!(other instanceof MessageIdV5 o)) { throw new IllegalArgumentException("Cannot compare with " + other.getClass()); } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java index 910bb114e5165..1a9444911f46d 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MessageV5.java @@ -31,13 +31,19 @@ final class MessageV5 implements Message { private final org.apache.pulsar.client.api.Message v4Message; private final MessageIdV5 messageId; + /** + * Optional override for {@link #topic()}. Set by multi-topic consumer wrappers to + * the parent scalable topic name (the v4 message's own topic is the segment topic, + * which is internal). Null for single-topic consumers — {@code topic()} falls back + * to the v4 message topic in that case. + */ + private final String topicOverride; /** * Create with a simple segment ID (for queue consumer, checkpoint consumer, producer). */ MessageV5(org.apache.pulsar.client.api.Message v4Message, long segmentId) { - this.v4Message = v4Message; - this.messageId = new MessageIdV5(v4Message.getMessageId(), segmentId); + this(v4Message, new MessageIdV5(v4Message.getMessageId(), segmentId), null); } /** @@ -45,8 +51,36 @@ final class MessageV5 implements Message { * (for stream consumer cumulative ack support). */ MessageV5(org.apache.pulsar.client.api.Message v4Message, MessageIdV5 messageId) { + this(v4Message, messageId, null); + } + + /** + * Create with an explicit topic override — used by multi-topic consumer wrappers + * to surface the parent scalable topic via {@link #topic()} instead of the + * underlying segment topic. {@code topicOverride} may be {@code null}. + */ + MessageV5(org.apache.pulsar.client.api.Message v4Message, MessageIdV5 messageId, + String topicOverride) { this.v4Message = v4Message; this.messageId = messageId; + this.topicOverride = topicOverride; + } + + /** + * Re-brand this message with a parent scalable topic. Used by multi-topic consumer + * wrappers when forwarding from a per-topic consumer's queue into the shared + * multiplexed queue: the message id picks up the parent for ack routing, and + * {@link #topic()} starts returning the parent. + */ + MessageV5 withTopicOverride(String parentTopic) { + MessageIdV5 newId = new MessageIdV5(messageId.v4MessageId(), messageId.segmentId(), + messageId.positionVector(), parentTopic); + return new MessageV5<>(v4Message, newId, parentTopic); + } + + /** Underlying v4 message — exposed to multi-topic wrappers that re-build with a new id. */ + org.apache.pulsar.client.api.Message v4Message() { + return v4Message; } @Override @@ -98,7 +132,10 @@ public Optional producerName() { @Override public String topic() { - return v4Message.getTopicName(); + // Multi-topic consumer wrappers set topicOverride to the parent scalable topic + // so the user-visible topic() matches the topic they subscribed to (the + // v4 message carries the internal segment topic). + return topicOverride != null ? topicOverride : v4Message.getTopicName(); } @Override diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java new file mode 100644 index 0000000000000..102d6de6c526f --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicQueueConsumer.java @@ -0,0 +1,426 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.client.api.v5.Message; +import org.apache.pulsar.client.api.v5.MessageId; +import org.apache.pulsar.client.api.v5.PulsarClientException; +import org.apache.pulsar.client.api.v5.QueueConsumer; +import org.apache.pulsar.client.api.v5.Transaction; +import org.apache.pulsar.client.api.v5.async.AsyncQueueConsumer; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +/** + * Multi-topic {@link QueueConsumer} that subscribes to every scalable topic in a + * namespace matching a (possibly empty) set of property filters. The matching set + * follows live: when topics enter or leave the filter, the consumer attaches / + * detaches automatically via a long-lived {@link ScalableTopicsWatcher} session + * to the broker. + * + *

Internals: + *

    + *
  • One {@link ScalableQueueConsumer} per matched topic.
  • + *
  • A pump thread per topic forwards from the per-topic queue into the shared + * multiplexed queue, tagging each message with the parent topic so the + * subsequent ack can be routed back.
  • + *
  • The watcher's {@code Snapshot} replaces the active set; {@code Diff} + * applies removals (flushing acks first) before additions to handle a + * rapid remove-then-add of the same topic name.
  • + *
  • Per-topic add failures retry forever with exponential backoff (100 ms + * initial, 30 min cap).
  • + *
+ */ +final class MultiTopicQueueConsumer implements QueueConsumerImpl { + + private static final Logger LOG = Logger.get(MultiTopicQueueConsumer.class); + /** + * Cap for per-topic subscribe retries. Matches v4 consumer reconnect semantics — + * the consumer never gives up; the user just sees no messages from the topic + * until the broker / topic recovers. + */ + private static final Duration RETRY_MAX = Duration.ofMinutes(30); + private final Logger log; + + private final PulsarClientV5 client; + private final Schema v5Schema; + private final ConsumerConfigurationData consumerConf; + private final NamespaceName namespace; + private final Map propertyFilters; + private final String subscriptionName; + + private final ScalableTopicsWatcher watcher; + private final ConcurrentHashMap> perTopic = new ConcurrentHashMap<>(); + private final LinkedTransferQueue> mux = new LinkedTransferQueue<>(); + + private volatile boolean closed = false; + private final AsyncQueueConsumerV5 asyncView; + + private MultiTopicQueueConsumer(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters, + ScalableTopicsWatcher watcher) { + this.client = client; + this.v5Schema = v5Schema; + this.consumerConf = consumerConf; + this.namespace = namespace; + this.propertyFilters = propertyFilters; + this.subscriptionName = consumerConf.getSubscriptionName(); + this.watcher = watcher; + this.log = LOG.with() + .attr("namespace", namespace) + .attr("subscription", subscriptionName) + .attr("filters", propertyFilters) + .build(); + this.asyncView = new AsyncQueueConsumerV5<>(this); + } + + static CompletableFuture> createAsync(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters) { + ScalableTopicsWatcher watcher = new ScalableTopicsWatcher( + client.v4Client(), namespace, propertyFilters); + MultiTopicQueueConsumer consumer = new MultiTopicQueueConsumer<>( + client, v5Schema, consumerConf, namespace, propertyFilters, watcher); + return watcher.start() + .thenCompose(initial -> consumer.openInitial(initial)) + .thenApply(__ -> { + watcher.setListener(consumer.new WatcherListener()); + return (QueueConsumer) consumer; + }) + .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { + throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); + })); + } + + /** + * Open one per-topic consumer per topic in the initial snapshot. Block on every + * future so {@code subscribeAsync} only resolves once the consumer is fully + * attached — gives the user the same all-or-nothing semantics as the + * single-topic builder. + */ + private CompletableFuture openInitial(List topics) { + if (topics.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + List> opens = new ArrayList<>(topics.size()); + for (String t : topics) { + opens.add(openTopic(t, /* retry= */ false)); + } + return CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new)); + } + + /** + * Subscribe to one topic. When {@code retry} is true, failures schedule a + * background retry with exponential backoff; the returned future completes as + * soon as the first attempt finishes (success or failure) so we don't hold up + * Snapshot / Diff processing. + */ + private CompletableFuture openTopic(String topicName, boolean retry) { + if (closed) { + return CompletableFuture.completedFuture(null); + } + if (perTopic.containsKey(topicName)) { + return CompletableFuture.completedFuture(null); + } + TopicName topic = V5Utils.asScalableTopicName(topicName); + DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic); + // Per-topic message sink: tag each delivered message with the parent scalable + // topic for ack routing + display, and forward into the shared mux. No pump + // thread; per-segment v4 receive loops fire this sink directly. + java.util.function.Consumer> sink = msg -> { + if (!closed) { + mux.add(msg.withTopicOverride(topicName)); + } + }; + return dagWatch.start() + .thenCompose(layout -> ScalableQueueConsumer.createAsyncImpl( + client, v5Schema, perTopicConf(topicName), dagWatch, layout, sink)) + .thenAccept(qc -> { + if (closed) { + qc.closeAsync(); + return; + } + PerTopicState state = new PerTopicState<>(topicName, qc); + PerTopicState existing = perTopic.putIfAbsent(topicName, state); + if (existing != null) { + // Concurrent open; drop the dup. + qc.closeAsync(); + return; + } + log.info().attr("topic", topicName).log("Per-topic consumer attached"); + }) + .exceptionally(ex -> { + Throwable cause = ex instanceof CompletionException ce && ce.getCause() != null + ? ce.getCause() : ex; + if (retry && !closed) { + scheduleRetry(topicName); + } + log.warn().attr("topic", topicName).exceptionMessage(cause) + .log("Per-topic subscribe failed"); + return null; + }); + } + + private void scheduleRetry(String topicName) { + long delayMs = nextBackoff(topicName); + log.info().attr("topic", topicName).attr("delayMs", delayMs) + .log("Retrying per-topic subscribe after backoff"); + client.v4Client().timer().newTimeout(timeout -> openTopic(topicName, /* retry= */ true), + delayMs, TimeUnit.MILLISECONDS); + } + + private final ConcurrentHashMap retryDelays = new ConcurrentHashMap<>(); + + /** Returns the next exponential-backoff delay (ms) for a topic and updates the state. */ + private long nextBackoff(String topicName) { + AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new AtomicLong(100)); + long current = al.get(); + long next = Math.min(current * 2, RETRY_MAX.toMillis()); + al.set(next); + return current; + } + + private void resetBackoff(String topicName) { + retryDelays.remove(topicName); + } + + /** + * Clone the user's consumer config for a per-topic consumer. Each per-topic + * consumer needs a unique {@code consumerName} so the broker can disambiguate + * them on the same subscription; we suffix with the topic name. + */ + private ConsumerConfigurationData perTopicConf(String topicName) { + var conf = consumerConf.clone(); + if (consumerConf.getConsumerName() != null) { + // Disambiguate across topics — the broker side cares about uniqueness on + // the same Shared subscription per segment. + String localName = TopicName.get(topicName).getLocalName(); + conf.setConsumerName(consumerConf.getConsumerName() + "-" + localName); + } + return conf; + } + + /** + * Close per-topic consumer for a topic that has dropped out of the matching set. + * No explicit ack flush — Shared subscription acks are independent per message + * and the per-topic consumer's existing close already flushes pending acks via + * its v4 segment consumers. + */ + private CompletableFuture closeTopic(String topicName) { + retryDelays.remove(topicName); + PerTopicState state = perTopic.remove(topicName); + if (state == null) { + return CompletableFuture.completedFuture(null); + } + return state.consumer.closeAsync() + .thenRun(() -> log.info().attr("topic", topicName) + .log("Per-topic consumer detached")); + } + + // --- QueueConsumer --- + + @Override + public String topic() { + return "namespace://" + namespace; + } + + @Override + public String subscription() { + return subscriptionName; + } + + @Override + public String consumerName() { + return consumerConf.getConsumerName(); + } + + @Override + public Message receive() throws PulsarClientException { + try { + return mux.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public Message receive(Duration timeout) throws PulsarClientException { + try { + return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public void acknowledge(MessageId messageId) { + routeAck(messageId, ptc -> ptc.acknowledge(messageId)); + } + + @Override + public void acknowledge(MessageId messageId, Transaction txn) { + routeAck(messageId, ptc -> ptc.acknowledge(messageId, txn)); + } + + @Override + public void negativeAcknowledge(MessageId messageId) { + routeAck(messageId, ptc -> ptc.negativeAcknowledge(messageId)); + } + + /** Look up the per-topic consumer via the parent topic tag and delegate. */ + private void routeAck(MessageId messageId, java.util.function.Consumer> action) { + if (!(messageId instanceof MessageIdV5 id)) { + throw new IllegalArgumentException("Expected MessageIdV5, got: " + messageId.getClass()); + } + String parent = id.parentTopic(); + if (parent == null) { + throw new IllegalStateException("MessageIdV5 missing parent topic — was the message" + + " delivered through a multi-topic consumer?"); + } + PerTopicState state = perTopic.get(parent); + if (state == null) { + // Topic was removed between deliver and ack. Fine — broker has dropped the + // session for that topic. Drop the ack silently. + log.debug().attr("topic", parent) + .log("Ack for removed topic; dropping"); + return; + } + action.accept(state.consumer); + } + + @Override + public AsyncQueueConsumer async() { + return asyncView; + } + + @Override + public void close() throws PulsarClientException { + try { + closeAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Close interrupted", e); + } catch (ExecutionException e) { + throw new PulsarClientException(e.getCause()); + } + } + + @Override + public CompletableFuture> receiveAsync() { + return CompletableFuture.supplyAsync(() -> { + try { + return receive(); + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public CompletableFuture closeAsync() { + if (closed) { + return CompletableFuture.completedFuture(null); + } + closed = true; + watcher.close(); + List> closes = new ArrayList<>(); + for (var topic : new HashSet<>(perTopic.keySet())) { + closes.add(closeTopic(topic)); + } + return CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new)); + } + + // --- Watcher listener --- + + private final class WatcherListener implements ScalableTopicsWatcher.Listener { + @Override + public void onSnapshot(List topics) { + // Reconcile: open anything new, close anything missing. Same as Diff but + // computed from the full snapshot — used on reconnect when broker hash + // differs from ours. + Set target = new HashSet<>(topics); + Set current = new HashSet<>(perTopic.keySet()); + // Remove first so a rapid remove-then-add of same name closes-then-reopens. + for (String t : current) { + if (!target.contains(t)) { + closeTopic(t); + } + } + for (String t : target) { + if (!current.contains(t)) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + @Override + public void onDiff(List added, List removed) { + // Apply removed before added — covers rapid remove-then-add of same name. + for (String t : removed) { + closeTopic(t); + } + for (String t : added) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + // --- Per-topic state --- + + /** + * Per-topic bookkeeping. Messages flow directly into the shared mux via the + * sink the wrapper installed on the per-topic consumer at create-time, so + * there's no pump thread to start/stop here — just hold a reference to the + * underlying consumer for ack routing and clean shutdown. + */ + private static final class PerTopicState { + private final String parentTopic; + private final ScalableQueueConsumer consumer; + + PerTopicState(String parentTopic, ScalableQueueConsumer consumer) { + this.parentTopic = parentTopic; + this.consumer = consumer; + } + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java new file mode 100644 index 0000000000000..e49b755cca471 --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/MultiTopicStreamConsumer.java @@ -0,0 +1,542 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import io.github.merlimat.slog.Logger; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.pulsar.client.api.v5.Message; +import org.apache.pulsar.client.api.v5.MessageId; +import org.apache.pulsar.client.api.v5.Messages; +import org.apache.pulsar.client.api.v5.PulsarClientException; +import org.apache.pulsar.client.api.v5.StreamConsumer; +import org.apache.pulsar.client.api.v5.Transaction; +import org.apache.pulsar.client.api.v5.async.AsyncStreamConsumer; +import org.apache.pulsar.client.api.v5.schema.Schema; +import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.api.proto.ScalableConsumerType; +import org.apache.pulsar.common.naming.NamespaceName; +import org.apache.pulsar.common.naming.TopicName; + +/** + * Multi-topic {@link StreamConsumer} over the union of scalable topics in a + * namespace matching a (possibly empty) set of property filters. + * + *

Cumulative ack across topics works via a per-message position-vector + * snapshot: each message that enters the multiplexed queue carries + * {@code Map>} captured at enqueue time. + * On {@code acknowledgeCumulative(msg)}, the wrapper fans out to every + * per-topic consumer with the right segment vector — same semantics as the + * single-topic case, just lifted one level. + * + *

For Removed-mid-stream topics we flush acks up to {@code latestDelivered} + * for that topic before closing the per-topic consumer, so the user's + * processing-acked invariant is preserved if the topic is later re-added. + */ +final class MultiTopicStreamConsumer implements StreamConsumer { + + private static final Logger LOG = Logger.get(MultiTopicStreamConsumer.class); + /** + * Cap for per-topic subscribe retries. Matches v4 consumer reconnect semantics. + */ + private static final Duration RETRY_MAX = Duration.ofMinutes(30); + + private final Logger log; + + private final PulsarClientV5 client; + private final Schema v5Schema; + private final ConsumerConfigurationData consumerConf; + private final NamespaceName namespace; + private final Map propertyFilters; + private final String subscriptionName; + + private final ScalableTopicsWatcher watcher; + private final ConcurrentHashMap> perTopic = new ConcurrentHashMap<>(); + private final LinkedTransferQueue> mux = new LinkedTransferQueue<>(); + + /** + * Tracks the latest delivered message id per (parent topic, segment id) across + * every per-topic consumer. Snapshotted at enqueue time for each delivered + * message so cumulative ack covers everything visible up to that message. + */ + private final ConcurrentHashMap> + latestDeliveredPerTopicSegment = new ConcurrentHashMap<>(); + + private volatile boolean closed = false; + private final AsyncStreamConsumerV5Multi asyncView; + + private MultiTopicStreamConsumer(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters, + ScalableTopicsWatcher watcher) { + this.client = client; + this.v5Schema = v5Schema; + this.consumerConf = consumerConf; + this.namespace = namespace; + this.propertyFilters = propertyFilters; + this.subscriptionName = consumerConf.getSubscriptionName(); + this.watcher = watcher; + this.log = LOG.with() + .attr("namespace", namespace) + .attr("subscription", subscriptionName) + .attr("filters", propertyFilters) + .build(); + this.asyncView = new AsyncStreamConsumerV5Multi(); + } + + static CompletableFuture> createAsync(PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + NamespaceName namespace, + Map propertyFilters) { + ScalableTopicsWatcher watcher = new ScalableTopicsWatcher( + client.v4Client(), namespace, propertyFilters); + MultiTopicStreamConsumer consumer = new MultiTopicStreamConsumer<>( + client, v5Schema, consumerConf, namespace, propertyFilters, watcher); + return watcher.start() + .thenCompose(initial -> consumer.openInitial(initial)) + .thenApply(__ -> { + watcher.setListener(consumer.new WatcherListener()); + return (StreamConsumer) consumer; + }) + .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { + throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); + })); + } + + private CompletableFuture openInitial(List topics) { + if (topics.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + List> opens = new ArrayList<>(topics.size()); + for (String t : topics) { + opens.add(openTopic(t, /* retry= */ false)); + } + return CompletableFuture.allOf(opens.toArray(CompletableFuture[]::new)); + } + + private CompletableFuture openTopic(String topicName, boolean retry) { + if (closed) { + return CompletableFuture.completedFuture(null); + } + if (perTopic.containsKey(topicName)) { + return CompletableFuture.completedFuture(null); + } + TopicName topic = V5Utils.asScalableTopicName(topicName); + // One ScalableConsumerClient session per topic, same as the single-topic builder. + ScalableConsumerClient session = new ScalableConsumerClient( + client.v4Client(), topic, + consumerConf.getSubscriptionName(), + perTopicConsumerName(topicName), + ScalableConsumerType.STREAM); + + // Per-topic message sink: each delivered message arrives with its + // single-topic positionVector (computed by ScalableStreamConsumer). Update + // our cross-topic latestDelivered map, snapshot the full cross-topic vector, + // and forward to the shared mux. No pump thread. + java.util.function.Consumer> sink = msg -> + onPerTopicMessage(topicName, msg); + + return session.start() + .thenCompose(initialAssignment -> ScalableStreamConsumer.createAsyncImpl( + client, v5Schema, perTopicConf(topicName), session, + topicName, initialAssignment, sink)) + .thenAccept(sc -> { + if (closed) { + sc.closeAsync(); + return; + } + PerTopic state = new PerTopic<>(topicName, sc); + PerTopic existing = perTopic.putIfAbsent(topicName, state); + if (existing != null) { + sc.closeAsync(); + return; + } + log.info().attr("topic", topicName).log("Per-topic stream consumer attached"); + }) + .exceptionally(ex -> { + Throwable cause = ex instanceof CompletionException ce && ce.getCause() != null + ? ce.getCause() : ex; + if (retry && !closed) { + scheduleRetry(topicName); + } + log.warn().attr("topic", topicName).exceptionMessage(cause) + .log("Per-topic stream subscribe failed"); + return null; + }); + } + + private void scheduleRetry(String topicName) { + long delayMs = nextBackoff(topicName); + log.info().attr("topic", topicName).attr("delayMs", delayMs) + .log("Retrying per-topic stream subscribe"); + client.v4Client().timer().newTimeout(timeout -> openTopic(topicName, /* retry= */ true), + delayMs, TimeUnit.MILLISECONDS); + } + + private final ConcurrentHashMap retryDelays = new ConcurrentHashMap<>(); + + private long nextBackoff(String topicName) { + AtomicLong al = retryDelays.computeIfAbsent(topicName, t -> new AtomicLong(100)); + long current = al.get(); + long next = Math.min(current * 2, RETRY_MAX.toMillis()); + al.set(next); + return current; + } + + private void resetBackoff(String topicName) { + retryDelays.remove(topicName); + } + + /** + * Per-topic consumer name. Each topic gets a distinct name so the broker's per-topic + * coordinator can register them as separate consumers (same Exclusive-per-segment + * semantics, no cross-topic identity coupling). + */ + private String perTopicConsumerName(String topicName) { + String localName = TopicName.get(topicName).getLocalName(); + if (consumerConf.getConsumerName() != null) { + return consumerConf.getConsumerName() + "-" + localName; + } + return "v5-stream-" + V5RandomIds.randomAlphanumeric(8) + "-" + localName; + } + + private ConsumerConfigurationData perTopicConf(String topicName) { + var conf = consumerConf.clone(); + conf.setConsumerName(perTopicConsumerName(topicName)); + return conf; + } + + /** + * Close per-topic consumer, flushing pending cumulative acks up to whatever was + * last delivered for that topic. If the topic later re-appears (re-Added), a + * fresh consumer subscribes and resumes from the broker-side cursor — already + * advanced past the messages we've delivered to the user. + */ + private CompletableFuture closeTopic(String topicName) { + retryDelays.remove(topicName); + PerTopic state = perTopic.remove(topicName); + if (state == null) { + return CompletableFuture.completedFuture(null); + } + // Flush: ack everything we delivered for this topic. + ConcurrentHashMap latest = + latestDeliveredPerTopicSegment.remove(topicName); + if (latest != null && !latest.isEmpty()) { + state.consumer.ackUpToVector(new HashMap<>(latest)); + } + return state.consumer.closeAsync() + .thenRun(() -> log.info().attr("topic", topicName) + .log("Per-topic stream consumer detached")); + } + + // --- StreamConsumer --- + + @Override + public String topic() { + return "namespace://" + namespace; + } + + @Override + public String subscription() { + return subscriptionName; + } + + @Override + public String consumerName() { + return consumerConf.getConsumerName(); + } + + @Override + public Message receive() throws PulsarClientException { + try { + return mux.take(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public Message receive(Duration timeout) throws PulsarClientException { + try { + return mux.poll(timeout.toMillis(), TimeUnit.MILLISECONDS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + } + + @Override + public Messages receiveMulti(int maxNumMessages, Duration timeout) throws PulsarClientException { + // Block for up to `timeout` waiting for the first message, then drain whatever + // else is immediately available up to maxNumMessages. Same shape as the single + // topic StreamConsumer. + long deadline = System.nanoTime() + timeout.toNanos(); + List> batch = new ArrayList<>(); + try { + long remaining = deadline - System.nanoTime(); + while (batch.size() < maxNumMessages && remaining > 0) { + MessageV5 msg = mux.poll(remaining, TimeUnit.NANOSECONDS); + if (msg == null) { + break; + } + batch.add(msg); + remaining = deadline - System.nanoTime(); + } + // Opportunistic drain of anything else already queued. + List> tail = new ArrayList<>(); + mux.drainTo(tail, maxNumMessages - batch.size()); + batch.addAll(tail); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Receive interrupted", e); + } + return new MessagesV5<>(batch); + } + + @Override + public void acknowledgeCumulative(MessageId messageId) { + fanOutCumulativeAck(messageId, (sc, vector) -> sc.ackUpToVector(vector)); + } + + @Override + public void acknowledgeCumulative(MessageId messageId, Transaction txn) { + // Transactions on multi-topic are best-effort across per-topic consumers — each + // per-topic ack is independently transactional. See note in the design doc. + fanOutCumulativeAck(messageId, (sc, vector) -> sc.ackUpToVector(vector)); + } + + /** + * For a cumulative ack on a multi-topic message, look up its multi-topic vector + * and invoke the per-topic ack on every parent topic. + */ + private void fanOutCumulativeAck(MessageId messageId, + java.util.function.BiConsumer, + Map> action) { + if (!(messageId instanceof MessageIdV5 id)) { + throw new IllegalArgumentException("Expected MessageIdV5, got: " + messageId.getClass()); + } + Map> vector = id.multiTopicVector(); + if (vector == null) { + throw new IllegalStateException("MessageIdV5 missing multi-topic vector — was the" + + " message delivered through a multi-topic stream consumer?"); + } + for (var entry : vector.entrySet()) { + PerTopic state = perTopic.get(entry.getKey()); + if (state == null) { + // Topic was Removed since enqueue; closeTopic already flushed. + continue; + } + action.accept(state.consumer, entry.getValue()); + } + } + + @Override + public AsyncStreamConsumer async() { + return asyncView; + } + + @Override + public void close() throws PulsarClientException { + try { + closeAsync().get(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new PulsarClientException("Close interrupted", e); + } catch (ExecutionException e) { + throw new PulsarClientException(e.getCause()); + } + } + + CompletableFuture closeAsync() { + if (closed) { + return CompletableFuture.completedFuture(null); + } + closed = true; + watcher.close(); + List> closes = new ArrayList<>(); + for (var topic : new HashSet<>(perTopic.keySet())) { + closes.add(closeTopic(topic)); + } + return CompletableFuture.allOf(closes.toArray(CompletableFuture[]::new)); + } + + // --- Watcher listener --- + + private final class WatcherListener implements ScalableTopicsWatcher.Listener { + @Override + public void onSnapshot(List topics) { + Set target = new HashSet<>(topics); + Set current = new HashSet<>(perTopic.keySet()); + for (String t : current) { + if (!target.contains(t)) { + closeTopic(t); + } + } + for (String t : target) { + if (!current.contains(t)) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + @Override + public void onDiff(List added, List removed) { + for (String t : removed) { + closeTopic(t); + } + for (String t : added) { + openTopic(t, /* retry= */ true); + resetBackoff(t); + } + } + } + + /** + * Per-topic message handler installed as the sink on each per-topic + * {@link ScalableStreamConsumer}. The single-topic consumer has already + * computed its per-segment position vector and stored it on the inbound + * message id; we adopt that as the per-topic slice of our cross-topic + * vector, snapshot the full map, and forward into the shared mux. + * + *

Runs on the netty IO thread that delivered the per-segment message — + * the only contention is the synchronized snapshot block which guards + * against torn cross-topic views during concurrent deliveries. + */ + private void onPerTopicMessage(String parentTopic, MessageV5 msg) { + if (closed) { + return; + } + MessageIdV5 origId = (MessageIdV5) msg.id(); + + // Adopt the message's own positionVector as our per-topic latest-delivered + // slice. ScalableStreamConsumer maintained the increasing invariant on + // each segment id; merging via putAll keeps the property cross-topic. + ConcurrentHashMap ours = + latestDeliveredPerTopicSegment.computeIfAbsent(parentTopic, + k -> new ConcurrentHashMap<>()); + ours.putAll(origId.positionVector()); + + // Snapshot the cross-topic vector under lock so concurrent deliveries + // can't observe a torn view. + Map> snapshot; + synchronized (latestDeliveredPerTopicSegment) { + snapshot = new HashMap<>(latestDeliveredPerTopicSegment.size()); + for (var e : latestDeliveredPerTopicSegment.entrySet()) { + snapshot.put(e.getKey(), new HashMap<>(e.getValue())); + } + } + + MessageIdV5 newId = new MessageIdV5( + origId.v4MessageId(), origId.segmentId(), + origId.positionVector(), parentTopic, snapshot); + mux.add(new MessageV5<>(msg.v4Message(), newId, parentTopic)); + } + + // --- Per-topic state --- + + /** + * Per-topic bookkeeping. Messages flow into the shared mux directly via the + * sink installed on the per-topic consumer at create-time, so there's no + * pump thread to manage — this is just a holder for ack routing and clean + * shutdown. + */ + private static final class PerTopic { + private final String parentTopic; + private final ScalableStreamConsumer consumer; + + PerTopic(String parentTopic, ScalableStreamConsumer consumer) { + this.parentTopic = parentTopic; + this.consumer = consumer; + } + } + + // --- Async view --- + + private final class AsyncStreamConsumerV5Multi implements AsyncStreamConsumer { + @Override + public CompletableFuture> receive() { + return CompletableFuture.supplyAsync(() -> { + try { + return MultiTopicStreamConsumer.this.receive(); + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public CompletableFuture> receive(Duration timeout) { + return CompletableFuture.supplyAsync(() -> { + try { + return MultiTopicStreamConsumer.this.receive(timeout); + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public CompletableFuture>> receiveMulti(int maxNumMessages, Duration timeout) { + return CompletableFuture.supplyAsync(() -> { + try { + Messages ms = MultiTopicStreamConsumer.this.receiveMulti(maxNumMessages, timeout); + List> out = new ArrayList<>(); + for (Message m : ms) { + out.add(m); + } + return out; + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }); + } + + @Override + public void acknowledgeCumulative(MessageId messageId) { + MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId); + } + + @Override + public void acknowledgeCumulative(MessageId messageId, Transaction txn) { + MultiTopicStreamConsumer.this.acknowledgeCumulative(messageId, txn); + } + + @Override + public CompletableFuture close() { + return MultiTopicStreamConsumer.this.closeAsync(); + } + } +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java index a762964c7d75e..a268e70219b9d 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerBuilderV5.java @@ -19,11 +19,9 @@ package org.apache.pulsar.client.impl.v5; import java.time.Duration; -import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; -import java.util.regex.Pattern; import org.apache.pulsar.client.api.v5.PulsarClientException; import org.apache.pulsar.client.api.v5.QueueConsumer; import org.apache.pulsar.client.api.v5.QueueConsumerBuilder; @@ -34,6 +32,7 @@ import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition; import org.apache.pulsar.client.api.v5.schema.Schema; import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; +import org.apache.pulsar.common.naming.NamespaceName; import org.apache.pulsar.common.naming.TopicName; /** @@ -44,7 +43,11 @@ final class QueueConsumerBuilderV5 implements QueueConsumerBuilder { private final PulsarClientV5 client; private final Schema v5Schema; private final ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + // Exactly one of {topicName, namespaceName} must be set at subscribe() time — + // single-topic vs multi-topic mode. private String topicName; + private NamespaceName namespaceName; + private Map propertyFilters; QueueConsumerBuilderV5(PulsarClientV5 client, Schema v5Schema) { this.client = client; @@ -65,48 +68,44 @@ public QueueConsumer subscribe() throws PulsarClientException { @Override public CompletableFuture> subscribeAsync() { - if (topicName == null || topicName.isEmpty()) { + boolean topicSet = topicName != null && !topicName.isEmpty(); + boolean namespaceSet = namespaceName != null; + if (topicSet == namespaceSet) { return CompletableFuture.failedFuture( - new PulsarClientException.InvalidConfigurationException("Topic name is required")); + new PulsarClientException.InvalidConfigurationException( + "Exactly one of .topic(name) or .namespace(...) must be set")); } if (conf.getSubscriptionName() == null || conf.getSubscriptionName().isEmpty()) { return CompletableFuture.failedFuture( new PulsarClientException.InvalidConfigurationException("Subscription name is required")); } + if (namespaceSet) { + return MultiTopicQueueConsumer.createAsync( + client, v5Schema, conf, namespaceName, propertyFilters); + } TopicName topic = V5Utils.asScalableTopicName(topicName); DagWatchClient dagWatch = new DagWatchClient(client.v4Client(), topic); - return dagWatch.start() .thenCompose(initialLayout -> ScalableQueueConsumer.createAsync( client, v5Schema, conf, dagWatch, initialLayout)); } @Override - public QueueConsumerBuilderV5 topic(String... topicNames) { - if (topicNames.length > 0) { - this.topicName = topicNames[0]; - } + public QueueConsumerBuilderV5 topic(String topicName) { + this.topicName = topicName; return this; } @Override - public QueueConsumerBuilderV5 topics(List topicNames) { - if (!topicNames.isEmpty()) { - this.topicName = topicNames.get(0); - } - return this; + public QueueConsumerBuilderV5 namespace(String namespace) { + return namespace(namespace, Map.of()); } @Override - public QueueConsumerBuilderV5 topicsPattern(Pattern pattern) { - conf.setTopicsPattern(pattern); - return this; - } - - @Override - public QueueConsumerBuilderV5 topicsPattern(String regex) { - conf.setTopicsPattern(Pattern.compile(regex)); + public QueueConsumerBuilderV5 namespace(String namespace, Map propertyFilters) { + this.namespaceName = NamespaceName.get(namespace); + this.propertyFilters = propertyFilters == null ? Map.of() : Map.copyOf(propertyFilters); return this; } @@ -206,12 +205,6 @@ public QueueConsumerBuilderV5 deadLetterPolicy(DeadLetterPolicy policy) { return this; } - @Override - public QueueConsumerBuilderV5 patternAutoDiscoveryPeriod(Duration interval) { - conf.setPatternAutoDiscoveryPeriod((int) interval.getSeconds()); - return this; - } - @Override public QueueConsumerBuilderV5 encryptionPolicy(EncryptionPolicy policy) { conf.setCryptoKeyReader(CryptoKeyReaderAdapter.wrap(policy.keyReader())); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java new file mode 100644 index 0000000000000..61536f5b44283 --- /dev/null +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/QueueConsumerImpl.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.client.impl.v5; + +import java.util.concurrent.CompletableFuture; +import org.apache.pulsar.client.api.v5.Message; +import org.apache.pulsar.client.api.v5.QueueConsumer; + +/** + * Internal extension of {@link QueueConsumer} that exposes the async hooks + * needed by {@link AsyncQueueConsumerV5}. Implemented by both + * {@link ScalableQueueConsumer} (single-topic) and {@link MultiTopicQueueConsumer} + * so the async wrapper works against either without duplication. + */ +interface QueueConsumerImpl extends QueueConsumer { + CompletableFuture> receiveAsync(); + + CompletableFuture closeAsync(); +} diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java index 61f5e87644d5f..31cb556b70e32 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableQueueConsumer.java @@ -51,7 +51,7 @@ * Individual acknowledgments and negative acknowledgments are routed to * the correct segment consumer via the segment ID in {@link MessageIdV5}. */ -final class ScalableQueueConsumer implements QueueConsumer, DagWatchClient.LayoutChangeListener { +final class ScalableQueueConsumer implements QueueConsumerImpl, DagWatchClient.LayoutChangeListener { private static final Logger LOG = Logger.get(ScalableQueueConsumer.class); private final Logger log; @@ -72,6 +72,13 @@ final class ScalableQueueConsumer implements QueueConsumer, DagWatchClient private final ConcurrentHashMap>> segmentConsumers = new ConcurrentHashMap<>(); private final LinkedTransferQueue> messageQueue = new LinkedTransferQueue<>(); + /** + * Where each per-segment receive loop deposits a freshly-arrived message. Defaults + * to enqueueing on {@link #messageQueue} for the user's {@link #receive()} to pull; + * the multi-topic wrapper overrides this to forward directly into its shared + * multiplexed queue, so no per-topic pump thread is needed. + */ + private final java.util.function.Consumer> messageSink; private volatile boolean closed = false; private final AsyncQueueConsumerV5 asyncView; @@ -88,7 +95,8 @@ final class ScalableQueueConsumer implements QueueConsumer, DagWatchClient private ScalableQueueConsumer(PulsarClientV5 client, Schema v5Schema, ConsumerConfigurationData consumerConf, - DagWatchClient dagWatch) { + DagWatchClient dagWatch, + java.util.function.Consumer> messageSink) { this.client = client; this.v5Schema = v5Schema; this.v4Schema = SchemaAdapter.toV4(v5Schema); @@ -96,6 +104,10 @@ private ScalableQueueConsumer(PulsarClientV5 client, this.dagWatch = dagWatch; this.topicName = dagWatch.topicName().toString(); this.subscriptionName = consumerConf.getSubscriptionName(); + // Default sink enqueues on the local messageQueue for receive()/receive(timeout). + // Multi-topic mode passes a sink that forwards into the shared mux instead — no + // per-topic pump thread needed. + this.messageSink = messageSink != null ? messageSink : messageQueue::add; this.log = LOG.with().attr("topic", topicName).attr("subscription", subscriptionName).build(); this.asyncView = new AsyncQueueConsumerV5<>(this); } @@ -111,11 +123,29 @@ static CompletableFuture> createAsync(PulsarClientV5 client ConsumerConfigurationData consumerConf, DagWatchClient dagWatch, ClientSegmentLayout initialLayout) { - ScalableQueueConsumer consumer = new ScalableQueueConsumer<>(client, v5Schema, consumerConf, dagWatch); + return createAsyncImpl(client, v5Schema, consumerConf, dagWatch, initialLayout, null) + .thenApply(c -> c); + } + + /** + * Like {@link #createAsync} but resolves to the concrete impl type and accepts an + * optional external message sink. Used by {@link MultiTopicQueueConsumer}: it + * passes a sink that forwards into the shared multiplexed queue, so per-segment + * v4 receive loops deliver messages to the wrapper without any pump thread. + */ + static CompletableFuture> createAsyncImpl( + PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + DagWatchClient dagWatch, + ClientSegmentLayout initialLayout, + java.util.function.Consumer> messageSink) { + ScalableQueueConsumer consumer = new ScalableQueueConsumer<>( + client, v5Schema, consumerConf, dagWatch, messageSink); return consumer.subscribeSegments(initialLayout) .thenApply(__ -> { dagWatch.setListener(consumer); - return (QueueConsumer) consumer; + return consumer; }) .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); @@ -209,7 +239,8 @@ public void close() throws PulsarClientException { // --- Async internals --- - CompletableFuture> receiveAsync() { + @Override + public CompletableFuture> receiveAsync() { return CompletableFuture.supplyAsync(() -> { try { return receive(); @@ -219,7 +250,8 @@ CompletableFuture> receiveAsync() { }); } - CompletableFuture closeAsync() { + @Override + public CompletableFuture closeAsync() { closed = true; dagWatch.close(); @@ -358,7 +390,7 @@ private CompletableFuture> createSegmen private void startReceiveLoop(org.apache.pulsar.client.api.Consumer v4Consumer, long segmentId) { v4Consumer.receiveAsync().thenAccept(v4Msg -> { - messageQueue.add(new MessageV5<>(v4Msg, segmentId)); + messageSink.accept(new MessageV5<>(v4Msg, segmentId)); if (!closed) { startReceiveLoop(v4Consumer, segmentId); } diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java index ddffea7d84d33..afd703ae9a445 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/ScalableStreamConsumer.java @@ -86,6 +86,13 @@ final class ScalableStreamConsumer new ConcurrentHashMap<>(); private final LinkedTransferQueue> messageQueue = new LinkedTransferQueue<>(); + /** + * Where each per-segment receive loop deposits a freshly-arrived message. Defaults + * to enqueueing on {@link #messageQueue} for the user's {@link #receive()} to pull; + * the multi-topic wrapper overrides this to forward into its shared multiplexed + * queue, applying its own multi-topic position-vector capture in the process. + */ + private final java.util.function.Consumer> messageSink; private volatile boolean closed = false; private final AsyncStreamConsumerV5 asyncView; @@ -94,7 +101,8 @@ private ScalableStreamConsumer(PulsarClientV5 client, Schema v5Schema, ConsumerConfigurationData consumerConf, ScalableConsumerClient session, - String topicName) { + String topicName, + java.util.function.Consumer> messageSink) { this.client = client; this.v5Schema = v5Schema; this.v4Schema = SchemaAdapter.toV4(v5Schema); @@ -102,6 +110,7 @@ private ScalableStreamConsumer(PulsarClientV5 client, this.session = session; this.topicName = topicName; this.subscriptionName = consumerConf.getSubscriptionName(); + this.messageSink = messageSink != null ? messageSink : messageQueue::add; this.log = LOG.with().attr("topic", topicName).attr("subscription", subscriptionName).build(); this.asyncView = new AsyncStreamConsumerV5<>(this); } @@ -118,18 +127,54 @@ static CompletableFuture> createAsync(PulsarClientV5 clien ScalableConsumerClient session, String topicName, List initialAssignment) { + return createAsyncImpl(client, v5Schema, consumerConf, session, topicName, initialAssignment, null) + .thenApply(c -> c); + } + + /** + * Like {@link #createAsync} but resolves to the concrete impl type and accepts an + * optional external message sink. Used by {@link MultiTopicStreamConsumer}: it + * passes a sink that forwards into the shared multiplexed queue, replacing the + * per-topic pump thread with direct delivery. + */ + static CompletableFuture> createAsyncImpl( + PulsarClientV5 client, + Schema v5Schema, + ConsumerConfigurationData consumerConf, + ScalableConsumerClient session, + String topicName, + List initialAssignment, + java.util.function.Consumer> messageSink) { ScalableStreamConsumer consumer = new ScalableStreamConsumer<>( - client, v5Schema, consumerConf, session, topicName); + client, v5Schema, consumerConf, session, topicName, messageSink); return consumer.subscribeAssigned(initialAssignment) .thenApply(__ -> { session.setListener(consumer); - return (StreamConsumer) consumer; + return consumer; }) .exceptionallyCompose(ex -> consumer.closeAsync().handle((__, ___) -> { throw ex instanceof CompletionException ce ? ce : new CompletionException(ex); })); } + /** + * Multi-topic ack hook. Synthesises a {@link MessageIdV5} carrying the supplied + * vector and routes it through the regular cumulative-ack path so segments are + * acked up to the recorded positions. Used by {@link MultiTopicStreamConsumer} + * to fan out a cumulative ack across every per-topic consumer. + */ + void ackUpToVector(java.util.Map vector) { + if (vector == null || vector.isEmpty()) { + return; + } + // The constructed id only needs the positionVector; v4MessageId / segmentId are + // unused on the cumulative-ack path. Pick any value for the non-vector slots — + // earliest is convenient and won't accidentally satisfy a peer's check. + var synthetic = new MessageIdV5(org.apache.pulsar.client.api.MessageId.earliest, + MessageIdV5.NO_SEGMENT, vector); + acknowledgeCumulative(synthetic); + } + @Override public String topic() { return topicName; @@ -358,7 +403,7 @@ private void startReceiveLoop(org.apache.pulsar.client.api.Consumer v4Consume // Create the V5 message with the position vector embedded in the ID var msgId = new MessageIdV5(v4Msg.getMessageId(), segmentId, positionVector); - messageQueue.add(new MessageV5<>(v4Msg, msgId)); + messageSink.accept(new MessageV5<>(v4Msg, msgId)); if (!closed) { startReceiveLoop(v4Consumer, segmentId); diff --git a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java index d3c356ecb554c..6b9f6b67e0736 100644 --- a/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java +++ b/pulsar-client-v5/src/main/java/org/apache/pulsar/client/impl/v5/StreamConsumerBuilderV5.java @@ -42,7 +42,11 @@ final class StreamConsumerBuilderV5 implements StreamConsumerBuilder { private final PulsarClientV5 client; private final Schema v5Schema; private final ConsumerConfigurationData conf = new ConsumerConfigurationData<>(); + // Exactly one of {topicName, namespaceName} must be set at subscribe() time — + // single-topic vs multi-topic mode. private String topicName; + private org.apache.pulsar.common.naming.NamespaceName namespaceName; + private Map propertyFilters; StreamConsumerBuilderV5(PulsarClientV5 client, Schema v5Schema) { this.client = client; @@ -63,21 +67,29 @@ public StreamConsumer subscribe() throws PulsarClientException { @Override public CompletableFuture> subscribeAsync() { - if (topicName == null || topicName.isEmpty()) { + boolean topicSet = topicName != null && !topicName.isEmpty(); + boolean namespaceSet = namespaceName != null; + if (topicSet == namespaceSet) { return CompletableFuture.failedFuture( - new PulsarClientException.InvalidConfigurationException("Topic name is required")); + new PulsarClientException.InvalidConfigurationException( + "Exactly one of .topic(name) or .namespace(...) must be set")); } if (conf.getSubscriptionName() == null || conf.getSubscriptionName().isEmpty()) { return CompletableFuture.failedFuture( new PulsarClientException.InvalidConfigurationException("Subscription name is required")); } - - TopicName topic = V5Utils.asScalableTopicName(topicName); // Default the consumer name to a stable random when the user didn't set one — // ScalableConsumerClient uses it as the registration key with the controller. if (conf.getConsumerName() == null || conf.getConsumerName().isEmpty()) { conf.setConsumerName("v5-stream-" + V5RandomIds.randomAlphanumeric(8)); } + + if (namespaceSet) { + return MultiTopicStreamConsumer.createAsync( + client, v5Schema, conf, namespaceName, propertyFilters); + } + + TopicName topic = V5Utils.asScalableTopicName(topicName); ScalableConsumerClient session = new ScalableConsumerClient( client.v4Client(), topic, @@ -91,10 +103,20 @@ public CompletableFuture> subscribeAsync() { } @Override - public StreamConsumerBuilderV5 topic(String... topicNames) { - if (topicNames.length > 0) { - this.topicName = topicNames[0]; - } + public StreamConsumerBuilderV5 topic(String topicName) { + this.topicName = topicName; + return this; + } + + @Override + public StreamConsumerBuilderV5 namespace(String namespace) { + return namespace(namespace, Map.of()); + } + + @Override + public StreamConsumerBuilderV5 namespace(String namespace, Map propertyFilters) { + this.namespaceName = org.apache.pulsar.common.naming.NamespaceName.get(namespace); + this.propertyFilters = propertyFilters == null ? Map.of() : Map.copyOf(propertyFilters); return this; } diff --git a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java index 577023070adf8..ebdbf73c0b4c6 100644 --- a/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java +++ b/pulsar-client-v5/src/test/java/org/apache/pulsar/client/impl/v5/MessageIdV5Test.java @@ -168,6 +168,64 @@ public void testFromByteArrayRejectsNull() { assertThrows(java.io.IOException.class, () -> MessageIdV5.fromByteArray(null)); } + @Test + public void testRoundtripWithParentTopic() throws Exception { + // Multi-topic queue consumer ids carry a parent-topic tag for ack routing + // but no cross-topic vector. Both must survive serialisation. + MessageIdV5 original = new MessageIdV5(v4(11, 22, 0), 3L, + Map.of(0L, v4(1, 2, 0)), + "topic://tenant/ns/my-topic"); + + MessageIdV5 decoded = MessageIdV5.fromByteArray(original.toByteArray()); + + assertEquals(decoded.segmentId(), 3L); + assertEquals(decoded.parentTopic(), "topic://tenant/ns/my-topic"); + assertEquals(decoded.positionVector().size(), 1); + // No multi-topic vector was set; round-trip must preserve null. + assertEquals(decoded.multiTopicVector(), null); + } + + @Test + public void testRoundtripWithMultiTopicVector() throws Exception { + // Multi-topic stream consumer ids carry a cross-topic position vector and + // the parent topic. Whole tree must survive byte-level round trip so an + // application that serialises the id and sends it through can still call + // acknowledgeCumulative against the right per-topic / per-segment positions. + Map> multi = Map.of( + "topic://tenant/ns/a", Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)), + "topic://tenant/ns/b", Map.of(0L, v4(3, 3, 0))); + MessageIdV5 original = new MessageIdV5(v4(99, 100, 0), 5L, + Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0)), + "topic://tenant/ns/a", + multi); + + MessageIdV5 decoded = MessageIdV5.fromByteArray(original.toByteArray()); + + assertEquals(decoded.segmentId(), 5L); + assertEquals(decoded.parentTopic(), "topic://tenant/ns/a"); + assertEquals(decoded.positionVector().size(), 2); + assertNotNull(decoded.multiTopicVector()); + assertEquals(decoded.multiTopicVector().keySet(), + java.util.Set.of("topic://tenant/ns/a", "topic://tenant/ns/b")); + assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/a"), + Map.of(0L, v4(1, 1, 0), 1L, v4(2, 2, 0))); + assertEquals(decoded.multiTopicVector().get("topic://tenant/ns/b"), + Map.of(0L, v4(3, 3, 0))); + } + + @Test + public void testRoundtripPreservesNullMultiTopicVector() throws Exception { + // Single-topic ids leave both new fields null; we must not accidentally + // hydrate them on decode. + MessageIdV5 original = new MessageIdV5(v4(7, 8, 0), 1L, + Map.of(0L, v4(1, 2, 0))); + + MessageIdV5 decoded = MessageIdV5.fromByteArray(original.toByteArray()); + + assertEquals(decoded.parentTopic(), null); + assertEquals(decoded.multiTopicVector(), null); + } + @Test public void testFromByteArrayRejectsTooShort() { assertThrows(java.io.IOException.class, () -> MessageIdV5.fromByteArray(new byte[3]));