Skip to content

Commit fc25e74

Browse files
committed
[feat][client] PIP-468: Multi-topic QueueConsumer / StreamConsumer
Builds on the namespace scalable-topics watcher (PR #25648) to land the user-facing multi-topic consumer surface: subscribe to the union of scalable topics in a namespace that match a (possibly empty) set of property filters, follow the matching set live, multiplex from every per-topic consumer into one user-visible queue. Builder API - QueueConsumerBuilder / StreamConsumerBuilder gain: namespace(String namespace) namespace(String namespace, Map<String, String> propertyFilters) Mutually exclusive with .topic(name); subscribe() rejects either-both or neither. - Drop the legacy multi-topic / pattern surface from v5: removed topics(List), topicsPattern(Pattern), topicsPattern(String), patternAutoDiscoveryPeriod. Tightened topic(String...) → topic(String). MultiTopicQueueConsumer - Wraps a ScalableTopicsWatcher; on Snapshot/Diff opens / closes per-topic ScalableQueueConsumers. - One pump thread per topic forwards messages from the per-topic queue into the shared mux queue, tagging each MessageV5 with the parent scalable topic via withTopicOverride. msg.topic() then surfaces the parent topic the user subscribed to (not the internal segment topic). - Ack routing: MessageIdV5 carries a parentTopic field so acknowledge(msgId) finds the right per-topic consumer. - Per-topic subscribe failure: retry forever with exp backoff (100 ms initial, 30 min cap). MultiTopicStreamConsumer - Same wrapper shape, but uses ScalableStreamConsumer per topic and carries a multi-topic position-vector snapshot per delivered message: Map<TopicName, Map<SegmentId, MessageId>> captured at enqueue time. acknowledgeCumulative(msg) fans out to every per-topic consumer with that topic's segment vector — same semantics as single-topic cumulative ack, lifted one level. - ScalableStreamConsumer exposes ackUpToVector(Map<SegmentId, MessageId>) as the multi-topic ack hook. - Topic Removed mid-stream: flush acks up to latestDelivered for that topic before closing the per-topic consumer. Supporting changes - MessageV5.topicOverride / withTopicOverride for parent-topic display. - MessageIdV5 gets parentTopic + multiTopicVector fields, plus a 5-arg full constructor used by the multi-topic stream pump. - New package-private QueueConsumerImpl interface so AsyncQueueConsumerV5 is shared between ScalableQueueConsumer and MultiTopicQueueConsumer. - ScalableQueueConsumer / ScalableStreamConsumer expose createAsyncImpl variants returning the concrete impl type — used by the multi-topic wrappers that hold typed references to per-topic consumers. Tests - V5MultiTopicQueueConsumerTest: receives from all topics in namespace; picks up topic created after subscribe (Diff path); filters by property so only matching topics attach. - V5MultiTopicStreamConsumerTest: same three flows plus cumulativeAckCoversEveryTopicSeenSoFar — drains both topics, calls acknowledgeCumulative on the last message, re-subscribes, asserts no redelivery (proves the position vector reaches every topic). All v5 single-topic regression tests still pass.
1 parent d0e557f commit fc25e74

15 files changed

Lines changed: 1860 additions & 127 deletions

File tree

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertTrue;
23+
import java.time.Duration;
24+
import java.util.HashSet;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.UUID;
28+
import lombok.Cleanup;
29+
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
30+
import org.apache.pulsar.client.api.v5.schema.Schema;
31+
import org.testng.annotations.Test;
32+
33+
/**
34+
* End-to-end tests for {@link QueueConsumerBuilder#namespace}: a multi-topic
35+
* QueueConsumer follows the matching set live, multiplexes from every per-topic
36+
* consumer into one user-visible queue, and routes individual acks back to the
37+
* right topic for redelivery purposes.
38+
*/
39+
public class V5MultiTopicQueueConsumerTest extends V5ClientBaseTest {
40+
41+
private String topicName(String suffix) {
42+
return "topic://" + getNamespace() + "/" + suffix + "-"
43+
+ UUID.randomUUID().toString().substring(0, 8);
44+
}
45+
46+
@Test
47+
public void receivesFromAllTopicsInNamespace() throws Exception {
48+
String topicA = topicName("a");
49+
String topicB = topicName("b");
50+
admin.scalableTopics().createScalableTopic(topicA, 1);
51+
admin.scalableTopics().createScalableTopic(topicB, 1);
52+
53+
@Cleanup
54+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(topicA).create();
55+
@Cleanup
56+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(topicB).create();
57+
58+
@Cleanup
59+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
60+
.namespace(getNamespace())
61+
.subscriptionName("multi-q")
62+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
63+
.subscribe();
64+
65+
// Send to both topics; the multi-topic consumer must receive both sets.
66+
Set<String> expected = new HashSet<>();
67+
for (int i = 0; i < 5; i++) {
68+
String va = "a-" + i;
69+
String vb = "b-" + i;
70+
pa.newMessage().value(va).send();
71+
pb.newMessage().value(vb).send();
72+
expected.add(va);
73+
expected.add(vb);
74+
}
75+
76+
Set<String> received = new HashSet<>();
77+
long deadline = System.currentTimeMillis() + 30_000L;
78+
while (received.size() < expected.size() && System.currentTimeMillis() < deadline) {
79+
Message<String> msg = consumer.receive(Duration.ofSeconds(1));
80+
if (msg != null) {
81+
received.add(msg.value());
82+
consumer.acknowledge(msg.id());
83+
}
84+
}
85+
assertEquals(received, expected, "should receive every message produced to either topic");
86+
}
87+
88+
@Test
89+
public void picksUpTopicCreatedAfterSubscribe() throws Exception {
90+
// Fresh namespace, no topics yet — initial snapshot is empty. Earliest so the
91+
// race between "topic created" and "per-topic consumer attached via Diff" can't
92+
// drop messages produced in that window.
93+
@Cleanup
94+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
95+
.namespace(getNamespace())
96+
.subscriptionName("multi-q-late")
97+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
98+
.subscribe();
99+
100+
// Create a topic AFTER subscribe; the watcher's Diff event must trigger the
101+
// consumer to attach.
102+
String lateTopic = topicName("late");
103+
admin.scalableTopics().createScalableTopic(lateTopic, 1);
104+
105+
@Cleanup
106+
Producer<String> producer = v5Client.newProducer(Schema.string())
107+
.topic(lateTopic).create();
108+
producer.newMessage().value("late-message").send();
109+
110+
Message<String> msg = consumer.receive(Duration.ofSeconds(15));
111+
assertTrue(msg != null, "expected to receive message from late-added topic");
112+
assertEquals(msg.value(), "late-message");
113+
assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic");
114+
consumer.acknowledge(msg.id());
115+
}
116+
117+
@Test
118+
public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception {
119+
String aliceTopic = topicName("alice");
120+
String bobTopic = topicName("bob");
121+
admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice"));
122+
admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob"));
123+
124+
@Cleanup
125+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
126+
@Cleanup
127+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create();
128+
129+
@Cleanup
130+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
131+
.namespace(getNamespace(), Map.of("owner", "alice"))
132+
.subscriptionName("multi-q-filter")
133+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
134+
.subscribe();
135+
136+
pa.newMessage().value("alice-msg").send();
137+
pb.newMessage().value("bob-msg").send();
138+
139+
// Only alice's message reaches this consumer.
140+
Message<String> got = consumer.receive(Duration.ofSeconds(10));
141+
assertTrue(got != null, "expected one message");
142+
assertEquals(got.value(), "alice-msg");
143+
consumer.acknowledge(got.id());
144+
145+
// Confirm bob's message never arrives within a generous window.
146+
Message<String> empty = consumer.receive(Duration.ofSeconds(2));
147+
assertTrue(empty == null, "bob's message must be filtered out, got " + empty);
148+
}
149+
}
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertNotNull;
23+
import static org.testng.Assert.assertTrue;
24+
import java.time.Duration;
25+
import java.util.HashSet;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.UUID;
29+
import lombok.Cleanup;
30+
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
31+
import org.apache.pulsar.client.api.v5.schema.Schema;
32+
import org.testng.annotations.Test;
33+
34+
/**
35+
* End-to-end tests for {@link StreamConsumerBuilder#namespace}: a multi-topic
36+
* StreamConsumer follows the matching set live, multiplexes from every per-topic
37+
* consumer into one user-visible queue, and supports cumulative ack across
38+
* topics via per-message position vectors.
39+
*/
40+
public class V5MultiTopicStreamConsumerTest extends V5ClientBaseTest {
41+
42+
private String topicName(String suffix) {
43+
return "topic://" + getNamespace() + "/" + suffix + "-"
44+
+ UUID.randomUUID().toString().substring(0, 8);
45+
}
46+
47+
@Test
48+
public void receivesFromAllTopicsInNamespace() throws Exception {
49+
String topicA = topicName("a");
50+
String topicB = topicName("b");
51+
admin.scalableTopics().createScalableTopic(topicA, 1);
52+
admin.scalableTopics().createScalableTopic(topicB, 1);
53+
54+
@Cleanup
55+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(topicA).create();
56+
@Cleanup
57+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(topicB).create();
58+
59+
@Cleanup
60+
StreamConsumer<String> consumer = v5Client.newStreamConsumer(Schema.string())
61+
.namespace(getNamespace())
62+
.subscriptionName("multi-stream")
63+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
64+
.subscribe();
65+
66+
Set<String> expected = new HashSet<>();
67+
for (int i = 0; i < 5; i++) {
68+
String va = "a-" + i;
69+
String vb = "b-" + i;
70+
pa.newMessage().value(va).send();
71+
pb.newMessage().value(vb).send();
72+
expected.add(va);
73+
expected.add(vb);
74+
}
75+
76+
Set<String> received = new HashSet<>();
77+
MessageId last = null;
78+
long deadline = System.currentTimeMillis() + 30_000L;
79+
while (received.size() < expected.size() && System.currentTimeMillis() < deadline) {
80+
Message<String> msg = consumer.receive(Duration.ofSeconds(1));
81+
if (msg != null) {
82+
received.add(msg.value());
83+
last = msg.id();
84+
}
85+
}
86+
assertEquals(received, expected, "should receive every message produced to either topic");
87+
// Cumulative ack must succeed across both per-topic consumers — exercises the
88+
// multi-topic position vector embedded in the message id.
89+
assertNotNull(last);
90+
consumer.acknowledgeCumulative(last);
91+
}
92+
93+
@Test
94+
public void picksUpTopicCreatedAfterSubscribe() throws Exception {
95+
@Cleanup
96+
StreamConsumer<String> consumer = v5Client.newStreamConsumer(Schema.string())
97+
.namespace(getNamespace())
98+
.subscriptionName("multi-stream-late")
99+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
100+
.subscribe();
101+
102+
String lateTopic = topicName("late");
103+
admin.scalableTopics().createScalableTopic(lateTopic, 1);
104+
105+
@Cleanup
106+
Producer<String> producer = v5Client.newProducer(Schema.string())
107+
.topic(lateTopic).create();
108+
producer.newMessage().value("late-message").send();
109+
110+
Message<String> msg = consumer.receive(Duration.ofSeconds(15));
111+
assertTrue(msg != null, "expected to receive message from late-added topic");
112+
assertEquals(msg.value(), "late-message");
113+
assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic");
114+
consumer.acknowledgeCumulative(msg.id());
115+
}
116+
117+
@Test
118+
public void cumulativeAckCoversEveryTopicSeenSoFar() throws Exception {
119+
// Two topics, interleaved producers. After we cumulatively ack the LAST message,
120+
// closing and re-subscribing must NOT redeliver any of the previous messages —
121+
// that would only happen if the per-topic ack didn't fire for the topic that's
122+
// not the message's own.
123+
String topicA = topicName("a");
124+
String topicB = topicName("b");
125+
admin.scalableTopics().createScalableTopic(topicA, 1);
126+
admin.scalableTopics().createScalableTopic(topicB, 1);
127+
128+
@Cleanup
129+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(topicA).create();
130+
@Cleanup
131+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(topicB).create();
132+
133+
StreamConsumer<String> first = v5Client.newStreamConsumer(Schema.string())
134+
.namespace(getNamespace())
135+
.subscriptionName("multi-stream-cumulative")
136+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
137+
.subscribe();
138+
139+
int n = 5;
140+
for (int i = 0; i < n; i++) {
141+
pa.newMessage().value("a-" + i).send();
142+
pb.newMessage().value("b-" + i).send();
143+
}
144+
145+
// Drain everything, remember the last message id.
146+
Set<String> drained = new HashSet<>();
147+
MessageId last = null;
148+
long deadline = System.currentTimeMillis() + 20_000L;
149+
while (drained.size() < 2 * n && System.currentTimeMillis() < deadline) {
150+
Message<String> msg = first.receive(Duration.ofSeconds(1));
151+
if (msg != null) {
152+
drained.add(msg.value());
153+
last = msg.id();
154+
}
155+
}
156+
assertEquals(drained.size(), 2 * n, "first consumer should drain every message");
157+
assertNotNull(last);
158+
// Cumulative ack — should fan out to BOTH topics' per-segment acks.
159+
first.acknowledgeCumulative(last);
160+
// Block briefly so the async ack flushes through to the broker before we close.
161+
Thread.sleep(500);
162+
first.close();
163+
164+
// Re-subscribe with the same name. If the cumulative ack covered both topics,
165+
// there's nothing to re-deliver. If it only acked the message's OWN topic, the
166+
// other topic would re-deliver from the start.
167+
@Cleanup
168+
StreamConsumer<String> second = v5Client.newStreamConsumer(Schema.string())
169+
.namespace(getNamespace())
170+
.subscriptionName("multi-stream-cumulative")
171+
.subscribe();
172+
173+
Message<String> stale = second.receive(Duration.ofSeconds(2));
174+
assertTrue(stale == null,
175+
"cumulative ack should have covered every topic; got redelivery: "
176+
+ (stale != null ? stale.value() : ""));
177+
}
178+
179+
@Test
180+
public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception {
181+
String aliceTopic = topicName("alice");
182+
String bobTopic = topicName("bob");
183+
admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice"));
184+
admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob"));
185+
186+
@Cleanup
187+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
188+
@Cleanup
189+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create();
190+
191+
@Cleanup
192+
StreamConsumer<String> consumer = v5Client.newStreamConsumer(Schema.string())
193+
.namespace(getNamespace(), Map.of("owner", "alice"))
194+
.subscriptionName("multi-stream-filter")
195+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
196+
.subscribe();
197+
198+
pa.newMessage().value("alice-msg").send();
199+
pb.newMessage().value("bob-msg").send();
200+
201+
Message<String> got = consumer.receive(Duration.ofSeconds(10));
202+
assertTrue(got != null, "expected one message");
203+
assertEquals(got.value(), "alice-msg");
204+
205+
Message<String> empty = consumer.receive(Duration.ofSeconds(2));
206+
assertTrue(empty == null, "bob's message must be filtered out, got " + empty);
207+
}
208+
}

0 commit comments

Comments
 (0)