Skip to content

Commit f662459

Browse files
authored
[feat] PIP-468: Multi-broker shared cluster + V5 cross-broker tests (#25633)
1 parent b4df3c6 commit f662459

7 files changed

Lines changed: 1016 additions & 35 deletions

File tree

pulsar-broker/build.gradle.kts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ dependencies {
132132
testImplementation(libs.jetty.ee8.proxy)
133133
testImplementation(libs.jetty.websocket.jetty.client)
134134
testImplementation(libs.opentelemetry.sdk.testing)
135+
testImplementation(libs.oxia.testcontainers)
135136
testRuntimeOnly(libs.avro.protobuf) {
136137
exclude(group = "com.google.protobuf")
137138
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/DagWatchSession.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,16 @@ private ScalableTopicDAG buildDagProto(ScalableTopicLayoutResponse response) {
169169
}
170170
}
171171

172+
// Propagate the controller-broker URL so V5 clients can connect to the right broker
173+
// for scalable-topic subscribe. Without this the client falls back to its configured
174+
// service URL, which on a multi-broker cluster is rarely the controller leader.
175+
if (response.controllerBrokerUrl() != null) {
176+
dag.setControllerBrokerUrl(response.controllerBrokerUrl());
177+
}
178+
if (response.controllerBrokerUrlTls() != null) {
179+
dag.setControllerBrokerUrlTls(response.controllerBrokerUrlTls());
180+
}
181+
172182
return dag;
173183
}
174184

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/scalable/ScalableTopicController.java

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,8 @@
2121
import io.github.merlimat.slog.Logger;
2222
import java.time.Duration;
2323
import java.util.LinkedHashMap;
24-
import java.util.LinkedHashSet;
2524
import java.util.Map;
2625
import java.util.Optional;
27-
import java.util.Set;
2826
import java.util.concurrent.CompletableFuture;
2927
import java.util.concurrent.ConcurrentHashMap;
3028
import lombok.Getter;
@@ -233,9 +231,11 @@ public CompletableFuture<SegmentLayout> splitSegment(long segmentId) {
233231
SegmentInfo parent = currentLayout.getAllSegments().get(segmentId);
234232
String parentTopicName = toSegmentPersistentName(parent);
235233

236-
// Step 1: Discover subscriptions on the parent segment, then create child
237-
// segment topics with those subscriptions (routed to owning brokers via admin API)
238-
return discoverSubscriptions(parentTopicName)
234+
// Step 1: Read the scalable topic's subscriptions from metadata (the single source
235+
// of truth — segment topics may live on different brokers, but the subscription set
236+
// is tracked here), then create child segment topics with those subscriptions
237+
// already provisioned (the create call routes to each segment's owning broker).
238+
return resources.listSubscriptionsAsync(topicName)
239239
.thenCompose(parentSubs -> {
240240
var subList = new java.util.ArrayList<>(parentSubs);
241241
return createSegmentTopic(child1, subList)
@@ -277,13 +277,10 @@ public CompletableFuture<SegmentLayout> mergeSegments(long segmentId1, long segm
277277
String parent1Topic = toSegmentPersistentName(parent1);
278278
String parent2Topic = toSegmentPersistentName(parent2);
279279

280-
// Step 1: Discover subscriptions from both parents (union), then create merged segment
281-
return discoverSubscriptions(parent1Topic)
282-
.thenCombine(discoverSubscriptions(parent2Topic), (subs1, subs2) -> {
283-
Set<String> allSubs = new LinkedHashSet<>(subs1);
284-
allSubs.addAll(subs2);
285-
return allSubs;
286-
})
280+
// Step 1: Read the scalable topic's subscriptions from metadata (single source of
281+
// truth, see splitSegment), then create the merged segment topic with those
282+
// subscriptions provisioned.
283+
return resources.listSubscriptionsAsync(topicName)
287284
.thenCompose(parentSubs -> createSegmentTopic(merged, new java.util.ArrayList<>(parentSubs)))
288285

289286
// Step 2: Terminate both parent segment topics
@@ -582,29 +579,6 @@ private CompletableFuture<Void> createSegmentTopic(SegmentInfo segment, java.uti
582579
}
583580
}
584581

585-
/**
586-
* Discover all subscription names on a segment topic. Works whether the topic is
587-
* on this broker or a remote one by using the admin client.
588-
*/
589-
private CompletableFuture<Set<String>> discoverSubscriptions(String segmentTopicName) {
590-
// Try local first (avoids RPC if the segment is on this broker)
591-
return brokerService.getTopicIfExists(segmentTopicName)
592-
.thenCompose(optTopic -> {
593-
if (optTopic.isPresent()) {
594-
return CompletableFuture.completedFuture(
595-
new LinkedHashSet<>(optTopic.get().getSubscriptions().keySet()));
596-
}
597-
// Topic is on a remote broker — use admin client
598-
try {
599-
return brokerService.getPulsar().getAdminClient()
600-
.topics().getSubscriptionsAsync(segmentTopicName)
601-
.thenApply(LinkedHashSet::new);
602-
} catch (PulsarServerException e) {
603-
return CompletableFuture.failedFuture(e);
604-
}
605-
});
606-
}
607-
608582
private CompletableFuture<Void> notifySubscriptions(SegmentLayout layout) {
609583
CompletableFuture<?>[] futures = subscriptions.values().stream()
610584
.map(coordinator -> coordinator.onLayoutChange(layout))
Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.service;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Set;
24+
import java.util.UUID;
25+
import lombok.CustomLog;
26+
import org.apache.pulsar.broker.PulsarService;
27+
import org.apache.pulsar.client.admin.PulsarAdmin;
28+
import org.apache.pulsar.client.api.PulsarClient;
29+
import org.apache.pulsar.client.api.PulsarClientException;
30+
import org.testng.annotations.AfterMethod;
31+
import org.testng.annotations.BeforeClass;
32+
import org.testng.annotations.BeforeMethod;
33+
34+
/**
35+
* Base class for tests that need a shared multi-broker cluster across test classes.
36+
*
37+
* <p>Companion to {@link SharedPulsarBaseTest}. Use this when a test specifically depends on
38+
* behavior that only manifests across brokers — namespace ownership transfer, controller-leader
39+
* failover, segment placement on different brokers, V5 client reconnect to a different broker.
40+
* For everything else, prefer the single-broker {@link SharedPulsarBaseTest}: it's faster, has
41+
* fewer moving parts, and is sufficient for most coverage.
42+
*
43+
* <p>Each test method gets a fresh namespace under {@link SharedMultiBrokerPulsarCluster#TENANT_NAME}
44+
* (created in {@link #setupSharedMultiBrokerTest()} and force-deleted in
45+
* {@link #cleanupSharedMultiBrokerTest()}). The cluster itself is JVM-wide and reused across
46+
* every test class that extends this — see {@link SharedMultiBrokerPulsarCluster}.
47+
*
48+
* <p>Subclasses get:
49+
* <ul>
50+
* <li>{@link #admin} / {@link #pulsarClient} — handles aimed at broker 0; lookups against any
51+
* broker correctly redirect to topic owners, so most tests should just use these.</li>
52+
* <li>{@link #brokers} / {@link #admins} / {@link #clients} — full per-broker lists, in start
53+
* order, for tests that need to address a specific broker (e.g. asserting topic
54+
* ownership, killing a specific broker).</li>
55+
* <li>{@link #newTopicName()} — generates a unique topic in the test namespace.</li>
56+
* </ul>
57+
*/
58+
@CustomLog
59+
public abstract class SharedMultiBrokerPulsarBaseTest {
60+
61+
/** All brokers in the shared cluster, in start order. */
62+
protected List<PulsarService> brokers;
63+
/** Per-broker admin handles, aligned with {@link #brokers}. */
64+
protected List<PulsarAdmin> admins;
65+
/** Per-broker client handles, aligned with {@link #brokers}. */
66+
protected List<PulsarClient> clients;
67+
68+
/** Convenience: broker 0's admin. */
69+
protected PulsarAdmin admin;
70+
/** Convenience: broker 0's client. */
71+
protected PulsarClient pulsarClient;
72+
73+
private final List<String> namespaces = new ArrayList<>();
74+
75+
/** Returns the unique namespace assigned to the current test method. */
76+
protected String getNamespace() {
77+
return namespaces.get(0);
78+
}
79+
80+
/** Returns the broker service URL for broker {@code index}. */
81+
protected String getBrokerServiceUrl(int index) {
82+
return brokers.get(index).getBrokerServiceUrl();
83+
}
84+
85+
/** Returns the web service URL for broker {@code index}. */
86+
protected String getWebServiceUrl(int index) {
87+
return brokers.get(index).getWebServiceAddress();
88+
}
89+
90+
/**
91+
* Creates a new {@link PulsarClient} connected to broker {@code index}. Callers are
92+
* responsible for closing the returned client.
93+
*/
94+
protected PulsarClient newPulsarClient(int index) throws PulsarClientException {
95+
return PulsarClient.builder().serviceUrl(brokers.get(index).getBrokerServiceUrl()).build();
96+
}
97+
98+
/**
99+
* Initializes (lazily) the shared cluster singleton and wires the per-class fields. Called
100+
* once per test class.
101+
*/
102+
@BeforeClass(alwaysRun = true)
103+
public void setupSharedMultiBrokerCluster() throws Exception {
104+
SharedMultiBrokerPulsarCluster cluster = SharedMultiBrokerPulsarCluster.get();
105+
brokers = cluster.getBrokers();
106+
admins = cluster.getAdmins();
107+
clients = cluster.getClients();
108+
admin = cluster.getAdmin();
109+
pulsarClient = cluster.getClient();
110+
}
111+
112+
/** Creates a unique namespace for the current test method. */
113+
@BeforeMethod(alwaysRun = true)
114+
public void setupSharedMultiBrokerTest() throws Exception {
115+
createNewNamespace();
116+
}
117+
118+
/** Force-deletes all namespaces created during the test method. */
119+
@AfterMethod(alwaysRun = true)
120+
public void cleanupSharedMultiBrokerTest() throws Exception {
121+
for (String ns : namespaces) {
122+
try {
123+
admin.namespaces().deleteNamespace(ns, true);
124+
log.info().attr("testNamespace", ns).log("Deleted test namespace");
125+
} catch (Exception e) {
126+
log.warn().attr("deleteNamespace", ns).exceptionMessage(e).log("Failed to delete namespace");
127+
}
128+
}
129+
namespaces.clear();
130+
}
131+
132+
/** Creates a new namespace under the shared tenant and registers it for cleanup. */
133+
protected String createNewNamespace() throws Exception {
134+
String nsName = "test-" + UUID.randomUUID().toString().substring(0, 8);
135+
String ns = SharedMultiBrokerPulsarCluster.TENANT_NAME + "/" + nsName;
136+
admin.namespaces().createNamespace(ns, Set.of(SharedMultiBrokerPulsarCluster.CLUSTER_NAME));
137+
namespaces.add(ns);
138+
log.info().attr("testNamespace", ns).log("Created test namespace");
139+
return ns;
140+
}
141+
142+
/** Generates a unique persistent topic name within the current test namespace. */
143+
protected String newTopicName() {
144+
return "persistent://" + getNamespace() + "/topic-" + UUID.randomUUID().toString().substring(0, 8);
145+
}
146+
}

0 commit comments

Comments
 (0)