Skip to content

Commit 7c7a7df

Browse files
authored
[feat] PIP-468: Multi-topic QueueConsumer / StreamConsumer (#25651)
1 parent d0e557f commit 7c7a7df

16 files changed

Lines changed: 1872 additions & 132 deletions

File tree

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertTrue;
23+
import java.time.Duration;
24+
import java.util.HashSet;
25+
import java.util.Map;
26+
import java.util.Set;
27+
import java.util.UUID;
28+
import lombok.Cleanup;
29+
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
30+
import org.apache.pulsar.client.api.v5.schema.Schema;
31+
import org.testng.annotations.Test;
32+
33+
/**
34+
* End-to-end tests for {@link QueueConsumerBuilder#namespace}: a multi-topic
35+
* QueueConsumer follows the matching set live, multiplexes from every per-topic
36+
* consumer into one user-visible queue, and routes individual acks back to the
37+
* right topic for redelivery purposes.
38+
*/
39+
public class V5MultiTopicQueueConsumerTest extends V5ClientBaseTest {
40+
41+
private String topicName(String suffix) {
42+
return "topic://" + getNamespace() + "/" + suffix + "-"
43+
+ UUID.randomUUID().toString().substring(0, 8);
44+
}
45+
46+
@Test
47+
public void receivesFromAllTopicsInNamespace() throws Exception {
48+
String topicA = topicName("a");
49+
String topicB = topicName("b");
50+
admin.scalableTopics().createScalableTopic(topicA, 1);
51+
admin.scalableTopics().createScalableTopic(topicB, 1);
52+
53+
@Cleanup
54+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(topicA).create();
55+
@Cleanup
56+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(topicB).create();
57+
58+
@Cleanup
59+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
60+
.namespace(getNamespace())
61+
.subscriptionName("multi-q")
62+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
63+
.subscribe();
64+
65+
// Send to both topics; the multi-topic consumer must receive both sets.
66+
Set<String> expected = new HashSet<>();
67+
for (int i = 0; i < 5; i++) {
68+
String va = "a-" + i;
69+
String vb = "b-" + i;
70+
pa.newMessage().value(va).send();
71+
pb.newMessage().value(vb).send();
72+
expected.add(va);
73+
expected.add(vb);
74+
}
75+
76+
Set<String> received = new HashSet<>();
77+
long deadline = System.currentTimeMillis() + 30_000L;
78+
while (received.size() < expected.size() && System.currentTimeMillis() < deadline) {
79+
Message<String> msg = consumer.receive(Duration.ofSeconds(1));
80+
if (msg != null) {
81+
received.add(msg.value());
82+
consumer.acknowledge(msg.id());
83+
}
84+
}
85+
assertEquals(received, expected, "should receive every message produced to either topic");
86+
}
87+
88+
@Test
89+
public void picksUpTopicCreatedAfterSubscribe() throws Exception {
90+
// Fresh namespace, no topics yet — initial snapshot is empty. Earliest so the
91+
// race between "topic created" and "per-topic consumer attached via Diff" can't
92+
// drop messages produced in that window.
93+
@Cleanup
94+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
95+
.namespace(getNamespace())
96+
.subscriptionName("multi-q-late")
97+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
98+
.subscribe();
99+
100+
// Create a topic AFTER subscribe; the watcher's Diff event must trigger the
101+
// consumer to attach.
102+
String lateTopic = topicName("late");
103+
admin.scalableTopics().createScalableTopic(lateTopic, 1);
104+
105+
@Cleanup
106+
Producer<String> producer = v5Client.newProducer(Schema.string())
107+
.topic(lateTopic).create();
108+
producer.newMessage().value("late-message").send();
109+
110+
Message<String> msg = consumer.receive(Duration.ofSeconds(15));
111+
assertTrue(msg != null, "expected to receive message from late-added topic");
112+
assertEquals(msg.value(), "late-message");
113+
assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic");
114+
consumer.acknowledge(msg.id());
115+
}
116+
117+
@Test
118+
public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception {
119+
String aliceTopic = topicName("alice");
120+
String bobTopic = topicName("bob");
121+
admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice"));
122+
admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob"));
123+
124+
@Cleanup
125+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
126+
@Cleanup
127+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create();
128+
129+
@Cleanup
130+
QueueConsumer<String> consumer = v5Client.newQueueConsumer(Schema.string())
131+
.namespace(getNamespace(), Map.of("owner", "alice"))
132+
.subscriptionName("multi-q-filter")
133+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
134+
.subscribe();
135+
136+
pa.newMessage().value("alice-msg").send();
137+
pb.newMessage().value("bob-msg").send();
138+
139+
// Only alice's message reaches this consumer.
140+
Message<String> got = consumer.receive(Duration.ofSeconds(10));
141+
assertTrue(got != null, "expected one message");
142+
assertEquals(got.value(), "alice-msg");
143+
consumer.acknowledge(got.id());
144+
145+
// Confirm bob's message never arrives within a generous window.
146+
Message<String> empty = consumer.receive(Duration.ofSeconds(2));
147+
assertTrue(empty == null, "bob's message must be filtered out, got " + empty);
148+
}
149+
}
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertNotNull;
23+
import static org.testng.Assert.assertTrue;
24+
import java.time.Duration;
25+
import java.util.HashSet;
26+
import java.util.Map;
27+
import java.util.Set;
28+
import java.util.UUID;
29+
import lombok.Cleanup;
30+
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
31+
import org.apache.pulsar.client.api.v5.schema.Schema;
32+
import org.testng.annotations.Test;
33+
34+
/**
35+
* End-to-end tests for {@link StreamConsumerBuilder#namespace}: a multi-topic
36+
* StreamConsumer follows the matching set live, multiplexes from every per-topic
37+
* consumer into one user-visible queue, and supports cumulative ack across
38+
* topics via per-message position vectors.
39+
*/
40+
public class V5MultiTopicStreamConsumerTest extends V5ClientBaseTest {
41+
42+
private String topicName(String suffix) {
43+
return "topic://" + getNamespace() + "/" + suffix + "-"
44+
+ UUID.randomUUID().toString().substring(0, 8);
45+
}
46+
47+
@Test
48+
public void receivesFromAllTopicsInNamespace() throws Exception {
49+
String topicA = topicName("a");
50+
String topicB = topicName("b");
51+
admin.scalableTopics().createScalableTopic(topicA, 1);
52+
admin.scalableTopics().createScalableTopic(topicB, 1);
53+
54+
@Cleanup
55+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(topicA).create();
56+
@Cleanup
57+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(topicB).create();
58+
59+
@Cleanup
60+
StreamConsumer<String> consumer = v5Client.newStreamConsumer(Schema.string())
61+
.namespace(getNamespace())
62+
.subscriptionName("multi-stream")
63+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
64+
.subscribe();
65+
66+
Set<String> expected = new HashSet<>();
67+
for (int i = 0; i < 5; i++) {
68+
String va = "a-" + i;
69+
String vb = "b-" + i;
70+
pa.newMessage().value(va).send();
71+
pb.newMessage().value(vb).send();
72+
expected.add(va);
73+
expected.add(vb);
74+
}
75+
76+
Set<String> received = new HashSet<>();
77+
MessageId last = null;
78+
long deadline = System.currentTimeMillis() + 30_000L;
79+
while (received.size() < expected.size() && System.currentTimeMillis() < deadline) {
80+
Message<String> msg = consumer.receive(Duration.ofSeconds(1));
81+
if (msg != null) {
82+
received.add(msg.value());
83+
last = msg.id();
84+
}
85+
}
86+
assertEquals(received, expected, "should receive every message produced to either topic");
87+
// Cumulative ack must succeed across both per-topic consumers — exercises the
88+
// multi-topic position vector embedded in the message id.
89+
assertNotNull(last);
90+
consumer.acknowledgeCumulative(last);
91+
}
92+
93+
@Test
94+
public void picksUpTopicCreatedAfterSubscribe() throws Exception {
95+
@Cleanup
96+
StreamConsumer<String> consumer = v5Client.newStreamConsumer(Schema.string())
97+
.namespace(getNamespace())
98+
.subscriptionName("multi-stream-late")
99+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
100+
.subscribe();
101+
102+
String lateTopic = topicName("late");
103+
admin.scalableTopics().createScalableTopic(lateTopic, 1);
104+
105+
@Cleanup
106+
Producer<String> producer = v5Client.newProducer(Schema.string())
107+
.topic(lateTopic).create();
108+
producer.newMessage().value("late-message").send();
109+
110+
Message<String> msg = consumer.receive(Duration.ofSeconds(15));
111+
assertTrue(msg != null, "expected to receive message from late-added topic");
112+
assertEquals(msg.value(), "late-message");
113+
assertEquals(msg.topic(), lateTopic, "topic() should surface the parent scalable topic");
114+
consumer.acknowledgeCumulative(msg.id());
115+
}
116+
117+
@Test
118+
public void cumulativeAckCoversEveryTopicSeenSoFar() throws Exception {
119+
// Two topics, interleaved producers. After we cumulatively ack the LAST message,
120+
// closing and re-subscribing must NOT redeliver any of the previous messages —
121+
// that would only happen if the per-topic ack didn't fire for the topic that's
122+
// not the message's own.
123+
String topicA = topicName("a");
124+
String topicB = topicName("b");
125+
admin.scalableTopics().createScalableTopic(topicA, 1);
126+
admin.scalableTopics().createScalableTopic(topicB, 1);
127+
128+
@Cleanup
129+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(topicA).create();
130+
@Cleanup
131+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(topicB).create();
132+
133+
StreamConsumer<String> first = v5Client.newStreamConsumer(Schema.string())
134+
.namespace(getNamespace())
135+
.subscriptionName("multi-stream-cumulative")
136+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
137+
.subscribe();
138+
139+
int n = 5;
140+
for (int i = 0; i < n; i++) {
141+
pa.newMessage().value("a-" + i).send();
142+
pb.newMessage().value("b-" + i).send();
143+
}
144+
145+
// Drain everything, remember the last message id.
146+
Set<String> drained = new HashSet<>();
147+
MessageId last = null;
148+
long deadline = System.currentTimeMillis() + 20_000L;
149+
while (drained.size() < 2 * n && System.currentTimeMillis() < deadline) {
150+
Message<String> msg = first.receive(Duration.ofSeconds(1));
151+
if (msg != null) {
152+
drained.add(msg.value());
153+
last = msg.id();
154+
}
155+
}
156+
assertEquals(drained.size(), 2 * n, "first consumer should drain every message");
157+
assertNotNull(last);
158+
// Cumulative ack — should fan out to BOTH topics' per-segment acks.
159+
first.acknowledgeCumulative(last);
160+
// Block briefly so the async ack flushes through to the broker before we close.
161+
Thread.sleep(500);
162+
first.close();
163+
164+
// Re-subscribe with the same name. If the cumulative ack covered both topics,
165+
// there's nothing to re-deliver. If it only acked the message's OWN topic, the
166+
// other topic would re-deliver from the start.
167+
@Cleanup
168+
StreamConsumer<String> second = v5Client.newStreamConsumer(Schema.string())
169+
.namespace(getNamespace())
170+
.subscriptionName("multi-stream-cumulative")
171+
.subscribe();
172+
173+
Message<String> stale = second.receive(Duration.ofSeconds(2));
174+
assertTrue(stale == null,
175+
"cumulative ack should have covered every topic; got redelivery: "
176+
+ (stale != null ? stale.value() : ""));
177+
}
178+
179+
@Test
180+
public void filtersByPropertySoOnlyMatchingTopicsAttach() throws Exception {
181+
String aliceTopic = topicName("alice");
182+
String bobTopic = topicName("bob");
183+
admin.scalableTopics().createScalableTopic(aliceTopic, 1, Map.of("owner", "alice"));
184+
admin.scalableTopics().createScalableTopic(bobTopic, 1, Map.of("owner", "bob"));
185+
186+
@Cleanup
187+
Producer<String> pa = v5Client.newProducer(Schema.string()).topic(aliceTopic).create();
188+
@Cleanup
189+
Producer<String> pb = v5Client.newProducer(Schema.string()).topic(bobTopic).create();
190+
191+
@Cleanup
192+
StreamConsumer<String> consumer = v5Client.newStreamConsumer(Schema.string())
193+
.namespace(getNamespace(), Map.of("owner", "alice"))
194+
.subscriptionName("multi-stream-filter")
195+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
196+
.subscribe();
197+
198+
pa.newMessage().value("alice-msg").send();
199+
pb.newMessage().value("bob-msg").send();
200+
201+
Message<String> got = consumer.receive(Duration.ofSeconds(10));
202+
assertTrue(got != null, "expected one message");
203+
assertEquals(got.value(), "alice-msg");
204+
205+
Message<String> empty = consumer.receive(Duration.ofSeconds(2));
206+
assertTrue(empty == null, "bob's message must be filtered out, got " + empty);
207+
}
208+
}

0 commit comments

Comments
 (0)