Skip to content

Commit cd910c2

Browse files
authored
[feat] PIP-468: Auto-reconnect scalable consumer session on disconnect (#25623)
1 parent 145b3a5 commit cd910c2

2 files changed

Lines changed: 374 additions & 14 deletions

File tree

Lines changed: 259 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,259 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.client.api.v5;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import static org.testng.Assert.assertNotNull;
23+
import java.lang.reflect.Field;
24+
import java.lang.reflect.Method;
25+
import java.time.Duration;
26+
import java.util.HashSet;
27+
import java.util.Set;
28+
import lombok.Cleanup;
29+
import org.apache.pulsar.client.api.v5.config.SubscriptionInitialPosition;
30+
import org.apache.pulsar.client.api.v5.schema.Schema;
31+
import org.testng.annotations.Test;
32+
33+
/**
34+
* Coverage for automatic reconnect on the scalable consumer's controller session.
35+
*
36+
* <p>A V5 StreamConsumer keeps a long-lived subscription against the topic
37+
* controller — that's how it learns about assignment changes (split / merge /
38+
* peer join / peer leave). When the underlying connection drops mid-life
39+
* (broker bounce, network blip, container restart) the per-segment v4 consumers
40+
* have their own reconnect logic, but the controller session needs its own.
41+
*
42+
* <p>These tests force-close the controller channel and assert:
43+
* <ul>
44+
* <li>the consumer continues to deliver messages without the application
45+
* having to do anything; and</li>
46+
* <li>within the controller's grace window the assignment is preserved, so a
47+
* second consumer that didn't drop sees no reshuffling.</li>
48+
* </ul>
49+
*/
50+
public class V5StreamConsumerAutoReconnectTest extends V5ClientBaseTest {
51+
52+
/**
53+
* Force-close the controller channel underneath a single stream consumer and
54+
* verify it continues to receive messages produced after the disconnect. The
55+
* v5 layer must reattach to the controller and resume receiving without any
56+
* application-visible interruption.
57+
*/
58+
@Test
59+
public void testStreamConsumerSurvivesConnectionDrop() throws Exception {
60+
String topic = newScalableTopic(2);
61+
String subscription = "auto-reconnect-sub";
62+
63+
@Cleanup
64+
Producer<String> producer = v5Client.newProducer(Schema.string())
65+
.topic(topic)
66+
.create();
67+
68+
@Cleanup
69+
StreamConsumer<String> consumer = v5Client.newStreamConsumer(Schema.string())
70+
.topic(topic)
71+
.subscriptionName(subscription)
72+
.consumerName("solo")
73+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
74+
.subscribe();
75+
76+
// Pre-disconnect batch: prove the consumer is healthy before we sever.
77+
int firstN = 20;
78+
Set<String> firstSent = new HashSet<>();
79+
for (int i = 0; i < firstN; i++) {
80+
String v = "first-" + i;
81+
producer.newMessage().key("k-" + i).value(v).send();
82+
firstSent.add(v);
83+
}
84+
Set<String> firstReceived = drain(consumer, firstN);
85+
assertEquals(firstReceived, firstSent, "first batch must arrive in full before disconnect");
86+
87+
// Sever the controller channel. The cnx layer fires connectionClosed() on
88+
// the scalable session, which kicks off the reconnect path with backoff.
89+
forceCloseControllerChannel(consumer);
90+
91+
// Post-disconnect batch: produced after the channel close. The consumer
92+
// must auto-reconnect to the controller and resume serving its segments.
93+
int secondN = 20;
94+
Set<String> secondSent = new HashSet<>();
95+
for (int i = 0; i < secondN; i++) {
96+
String v = "second-" + i;
97+
producer.newMessage().key("k-" + i).value(v).send();
98+
secondSent.add(v);
99+
}
100+
Set<String> secondReceived = drain(consumer, secondN);
101+
assertEquals(secondReceived, secondSent,
102+
"consumer must resume receiving after auto-reconnect");
103+
}
104+
105+
/**
106+
* Two stream consumers share the topic, then alice's controller channel is
107+
* forcibly closed. Within the grace window (2s in the test cluster) alice's
108+
* auto-reconnect should re-attach to the existing session and the same
109+
* assignment, so bob's per-key share is unchanged across batches.
110+
*/
111+
@Test
112+
public void testReconnectWithinGracePreservesAssignment() throws Exception {
113+
String topic = newScalableTopic(4);
114+
String subscription = "auto-reconnect-grace-sub";
115+
116+
@Cleanup
117+
Producer<String> producer = v5Client.newProducer(Schema.string())
118+
.topic(topic)
119+
.create();
120+
121+
@Cleanup
122+
StreamConsumer<String> alice = v5Client.newStreamConsumer(Schema.string())
123+
.topic(topic)
124+
.subscriptionName(subscription)
125+
.consumerName("alice")
126+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
127+
.subscribe();
128+
@Cleanup
129+
StreamConsumer<String> bob = v5Client.newStreamConsumer(Schema.string())
130+
.topic(topic)
131+
.subscriptionName(subscription)
132+
.consumerName("bob")
133+
.subscriptionInitialPosition(SubscriptionInitialPosition.EARLIEST)
134+
.subscribe();
135+
136+
// Baseline batch: capture each consumer's per-key share before the drop.
137+
int batchN = 60;
138+
Set<String> firstSent = new HashSet<>();
139+
for (int i = 0; i < batchN; i++) {
140+
String v = "first-" + i;
141+
producer.newMessage().key("k-" + i).value(v).send();
142+
firstSent.add(v);
143+
}
144+
Set<String> aliceFirst = new HashSet<>();
145+
Set<String> bobFirst = new HashSet<>();
146+
Set<String> received1 = new HashSet<>();
147+
drainBoth(alice, bob, batchN, received1, aliceFirst, bobFirst);
148+
assertEquals(received1, firstSent, "first batch must be delivered exactly once");
149+
150+
// Sever alice's controller channel. The reconnect must land well within
151+
// the 2s grace window — the controller still has alice's session and
152+
// re-attaches the new connection to the existing assignment.
153+
forceCloseControllerChannel(alice);
154+
155+
int secondN = 60;
156+
Set<String> secondSent = new HashSet<>();
157+
for (int i = 0; i < secondN; i++) {
158+
String v = "second-" + i;
159+
producer.newMessage().key("k-" + i).value(v).send();
160+
secondSent.add(v);
161+
}
162+
Set<String> aliceSecond = new HashSet<>();
163+
Set<String> bobSecond = new HashSet<>();
164+
Set<String> received2 = new HashSet<>();
165+
drainBoth(alice, bob, secondN, received2, aliceSecond, bobSecond);
166+
assertEquals(received2, secondSent, "second batch must be delivered exactly once");
167+
168+
// Same key set, same routing → each consumer's per-key share must be
169+
// identical across batches (modulo the prefix). That holds only if alice
170+
// re-attached to her existing session — i.e. the reconnect landed within
171+
// grace and no rebalance happened.
172+
assertEquals(stripPrefix(aliceSecond, "second-"), stripPrefix(aliceFirst, "first-"),
173+
"alice must keep her segments after auto-reconnect within grace");
174+
assertEquals(stripPrefix(bobSecond, "second-"), stripPrefix(bobFirst, "first-"),
175+
"bob must be unaffected by alice's reconnect");
176+
}
177+
178+
// --- Helpers ---
179+
180+
/**
181+
* Drain at most {@code expected} messages from {@code consumer}, capping at a
182+
* generous deadline so a regression doesn't hang the suite.
183+
*/
184+
private Set<String> drain(StreamConsumer<String> consumer, int expected) throws Exception {
185+
Set<String> received = new HashSet<>();
186+
MessageId last = null;
187+
long deadline = System.currentTimeMillis() + 30_000L;
188+
while (received.size() < expected && System.currentTimeMillis() < deadline) {
189+
Message<String> msg = consumer.receive(Duration.ofSeconds(1));
190+
if (msg != null) {
191+
received.add(msg.value());
192+
last = msg.id();
193+
}
194+
}
195+
if (last != null) {
196+
consumer.acknowledgeCumulative(last);
197+
}
198+
return received;
199+
}
200+
201+
/**
202+
* Drain a known number of messages across two consumers, populating each
203+
* consumer's per-message set. Stops once {@code allReceived} reaches
204+
* {@code expected} or the deadline fires.
205+
*/
206+
private void drainBoth(StreamConsumer<String> a, StreamConsumer<String> b,
207+
int expected, Set<String> allReceived,
208+
Set<String> aGot, Set<String> bGot) throws Exception {
209+
long deadline = System.currentTimeMillis() + 30_000L;
210+
MessageId aLast = null;
211+
MessageId bLast = null;
212+
while (allReceived.size() < expected && System.currentTimeMillis() < deadline) {
213+
Message<String> ma = a.receive(Duration.ofMillis(200));
214+
if (ma != null) {
215+
allReceived.add(ma.value());
216+
aGot.add(ma.value());
217+
aLast = ma.id();
218+
}
219+
Message<String> mb = b.receive(Duration.ofMillis(200));
220+
if (mb != null) {
221+
allReceived.add(mb.value());
222+
bGot.add(mb.value());
223+
bLast = mb.id();
224+
}
225+
}
226+
if (aLast != null) {
227+
a.acknowledgeCumulative(aLast);
228+
}
229+
if (bLast != null) {
230+
b.acknowledgeCumulative(bLast);
231+
}
232+
}
233+
234+
private static Set<String> stripPrefix(Set<String> values, String prefix) {
235+
Set<String> out = new HashSet<>(values.size());
236+
for (String v : values) {
237+
if (v.startsWith(prefix)) {
238+
out.add(v.substring(prefix.length()));
239+
}
240+
}
241+
return out;
242+
}
243+
244+
/**
245+
* Force-close the controller channel underneath a stream consumer. Reaches
246+
* into the v5 internals via reflection because the {@code session} field on
247+
* {@code ScalableStreamConsumer} and the test hook on
248+
* {@code ScalableConsumerClient} are package-private.
249+
*/
250+
private static void forceCloseControllerChannel(StreamConsumer<?> consumer) throws Exception {
251+
Field sessionField = consumer.getClass().getDeclaredField("session");
252+
sessionField.setAccessible(true);
253+
Object session = sessionField.get(consumer);
254+
assertNotNull(session, "expected session on stream consumer");
255+
Method m = session.getClass().getDeclaredMethod("forceCloseConnectionForTesting");
256+
m.setAccessible(true);
257+
m.invoke(session);
258+
}
259+
}

0 commit comments

Comments
 (0)