Skip to content

Commit af1b6e1

Browse files
[improve][broker] PIP-192 Added namespace unload scheduler (#19477)
1 parent 950ff44 commit af1b6e1

4 files changed

Lines changed: 362 additions & 1 deletion

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImpl.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@
4949
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
5050
import org.apache.pulsar.broker.loadbalance.extensions.reporter.BrokerLoadDataReporter;
5151
import org.apache.pulsar.broker.loadbalance.extensions.reporter.TopBundleLoadDataReporter;
52+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.LoadManagerScheduler;
53+
import org.apache.pulsar.broker.loadbalance.extensions.scheduler.UnloadScheduler;
5254
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStore;
5355
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreException;
5456
import org.apache.pulsar.broker.loadbalance.extensions.store.LoadDataStoreFactory;
@@ -86,6 +88,8 @@ public class ExtensibleLoadManagerImpl implements ExtensibleLoadManager {
8688
private LoadDataStore<BrokerLoadData> brokerLoadDataStore;
8789
private LoadDataStore<TopBundlesLoadData> topBundlesLoadDataStore;
8890

91+
private LoadManagerScheduler unloadScheduler;
92+
8993
@Getter
9094
private LoadManagerContext context;
9195

@@ -194,7 +198,9 @@ public void start() throws PulsarServerException {
194198
interval,
195199
interval, TimeUnit.MILLISECONDS);
196200

197-
// TODO: Start unload scheduler and bundle split scheduler
201+
// TODO: Start bundle split scheduler.
202+
this.unloadScheduler = new UnloadScheduler(pulsar.getLoadManagerExecutor(), context, serviceUnitStateChannel);
203+
this.unloadScheduler.start();
198204
this.started = true;
199205
}
200206

@@ -319,6 +325,7 @@ public void close() throws PulsarServerException {
319325

320326
this.brokerLoadDataStore.close();
321327
this.topBundlesLoadDataStore.close();
328+
this.unloadScheduler.close();
322329
} catch (IOException ex) {
323330
throw new PulsarServerException(ex);
324331
} finally {
Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.loadbalance.extensions.scheduler;
20+
21+
import com.google.common.annotations.VisibleForTesting;
22+
import java.util.ArrayList;
23+
import java.util.HashMap;
24+
import java.util.List;
25+
import java.util.Map;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.ScheduledExecutorService;
28+
import java.util.concurrent.ScheduledFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import lombok.extern.slf4j.Slf4j;
31+
import org.apache.pulsar.broker.ServiceConfiguration;
32+
import org.apache.pulsar.broker.loadbalance.extensions.LoadManagerContext;
33+
import org.apache.pulsar.broker.loadbalance.extensions.channel.ServiceUnitStateChannel;
34+
import org.apache.pulsar.broker.loadbalance.extensions.models.UnloadDecision;
35+
import org.apache.pulsar.common.util.FutureUtil;
36+
import org.apache.pulsar.common.util.Reflections;
37+
38+
@Slf4j
39+
public class UnloadScheduler implements LoadManagerScheduler {
40+
41+
private final NamespaceUnloadStrategy namespaceUnloadStrategy;
42+
43+
private final ScheduledExecutorService loadManagerExecutor;
44+
45+
private final LoadManagerContext context;
46+
47+
private final ServiceUnitStateChannel channel;
48+
49+
private final ServiceConfiguration conf;
50+
51+
private volatile ScheduledFuture<?> task;
52+
53+
private final Map<String, Long> recentlyUnloadedBundles;
54+
55+
private final Map<String, Long> recentlyUnloadedBrokers;
56+
57+
private volatile CompletableFuture<Void> currentRunningFuture = null;
58+
59+
public UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
60+
LoadManagerContext context,
61+
ServiceUnitStateChannel channel) {
62+
this(loadManagerExecutor, context, channel, createNamespaceUnloadStrategy(context.brokerConfiguration()));
63+
}
64+
65+
@VisibleForTesting
66+
protected UnloadScheduler(ScheduledExecutorService loadManagerExecutor,
67+
LoadManagerContext context,
68+
ServiceUnitStateChannel channel,
69+
NamespaceUnloadStrategy strategy) {
70+
this.namespaceUnloadStrategy = strategy;
71+
this.recentlyUnloadedBundles = new HashMap<>();
72+
this.recentlyUnloadedBrokers = new HashMap<>();
73+
this.loadManagerExecutor = loadManagerExecutor;
74+
this.context = context;
75+
this.conf = context.brokerConfiguration();
76+
this.channel = channel;
77+
}
78+
79+
@Override
80+
public synchronized void execute() {
81+
boolean debugMode = conf.isLoadBalancerDebugModeEnabled() || log.isDebugEnabled();
82+
if (debugMode) {
83+
log.info("Load balancer enabled: {}, Shedding enabled: {}.",
84+
conf.isLoadBalancerEnabled(), conf.isLoadBalancerSheddingEnabled());
85+
}
86+
if (!isLoadBalancerSheddingEnabled()) {
87+
if (debugMode) {
88+
log.info("The load balancer or load balancer shedding already disabled. Skipping.");
89+
}
90+
return;
91+
}
92+
if (currentRunningFuture != null && !currentRunningFuture.isDone()) {
93+
if (debugMode) {
94+
log.info("Auto namespace unload is running. Skipping.");
95+
}
96+
return;
97+
}
98+
// Remove bundles who have been unloaded for longer than the grace period from the recently unloaded map.
99+
final long timeout = System.currentTimeMillis()
100+
- TimeUnit.MINUTES.toMillis(conf.getLoadBalancerSheddingGracePeriodMinutes());
101+
recentlyUnloadedBundles.keySet().removeIf(e -> recentlyUnloadedBundles.get(e) < timeout);
102+
103+
this.currentRunningFuture = channel.isChannelOwnerAsync().thenCompose(isChannelOwner -> {
104+
if (!isChannelOwner) {
105+
if (debugMode) {
106+
log.info("Current broker is not channel owner. Skipping.");
107+
}
108+
return CompletableFuture.completedFuture(null);
109+
}
110+
return context.brokerRegistry().getAvailableBrokersAsync().thenCompose(availableBrokers -> {
111+
if (debugMode) {
112+
log.info("Available brokers: {}", availableBrokers);
113+
}
114+
if (availableBrokers.size() <= 1) {
115+
log.info("Only 1 broker available: no load shedding will be performed. Skipping.");
116+
return CompletableFuture.completedFuture(null);
117+
}
118+
final UnloadDecision unloadDecision = namespaceUnloadStrategy
119+
.findBundlesForUnloading(context, recentlyUnloadedBundles, recentlyUnloadedBrokers);
120+
if (debugMode) {
121+
log.info("[{}] Unload decision result: {}",
122+
namespaceUnloadStrategy.getClass().getSimpleName(), unloadDecision.toString());
123+
}
124+
if (unloadDecision.getUnloads().isEmpty()) {
125+
if (debugMode) {
126+
log.info("[{}] Unload decision unloads is empty. Skipping.",
127+
namespaceUnloadStrategy.getClass().getSimpleName());
128+
}
129+
return CompletableFuture.completedFuture(null);
130+
}
131+
List<CompletableFuture<Void>> futures = new ArrayList<>();
132+
unloadDecision.getUnloads().forEach((broker, unload) -> {
133+
log.info("[{}] Unloading bundle: {}", namespaceUnloadStrategy.getClass().getSimpleName(), unload);
134+
futures.add(channel.publishUnloadEventAsync(unload).thenAccept(__ -> {
135+
recentlyUnloadedBundles.put(unload.serviceUnit(), System.currentTimeMillis());
136+
recentlyUnloadedBrokers.put(unload.sourceBroker(), System.currentTimeMillis());
137+
}));
138+
});
139+
return FutureUtil.waitForAll(futures).exceptionally(ex -> {
140+
log.error("[{}] Namespace unload has exception.",
141+
namespaceUnloadStrategy.getClass().getSimpleName(), ex);
142+
return null;
143+
});
144+
});
145+
});
146+
}
147+
148+
@Override
149+
public void start() {
150+
long loadSheddingInterval = TimeUnit.MINUTES
151+
.toMillis(conf.getLoadBalancerSheddingIntervalMinutes());
152+
this.task = loadManagerExecutor.scheduleAtFixedRate(
153+
this::execute, loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS);
154+
}
155+
156+
@Override
157+
public void close() {
158+
if (this.task != null) {
159+
this.task.cancel(false);
160+
}
161+
this.recentlyUnloadedBundles.clear();
162+
this.recentlyUnloadedBrokers.clear();
163+
}
164+
165+
private static NamespaceUnloadStrategy createNamespaceUnloadStrategy(ServiceConfiguration conf) {
166+
try {
167+
return Reflections.createInstance(conf.getLoadBalancerLoadSheddingStrategy(), NamespaceUnloadStrategy.class,
168+
Thread.currentThread().getContextClassLoader());
169+
} catch (Exception e) {
170+
log.error("Error when trying to create namespace unload strategy: {}",
171+
conf.getLoadBalancerLoadPlacementStrategy(), e);
172+
}
173+
log.error("create namespace unload strategy failed. using TransferShedder instead.");
174+
return new TransferShedder();
175+
}
176+
177+
private boolean isLoadBalancerSheddingEnabled() {
178+
return conf.isLoadBalancerEnabled() && conf.isLoadBalancerSheddingEnabled();
179+
}
180+
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/ExtensibleLoadManagerImplTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
100100
import org.apache.pulsar.policies.data.loadbalancer.ResourceUsage;
101101
import org.apache.pulsar.policies.data.loadbalancer.SystemResourceUsage;
102+
import org.testng.annotations.AfterClass;
102103
import org.testng.annotations.BeforeClass;
103104
import org.testng.annotations.BeforeMethod;
104105
import org.testng.annotations.Test;
@@ -125,6 +126,7 @@ public class ExtensibleLoadManagerImplTest extends MockedPulsarServiceBaseTest {
125126
private ServiceUnitStateChannelImpl channel2;
126127

127128
@BeforeClass
129+
@Override
128130
public void setup() throws Exception {
129131
conf.setAllowAutoTopicCreation(true);
130132
conf.setLoadManagerClassName(ExtensibleLoadManagerImpl.class.getName());
@@ -186,6 +188,7 @@ protected void createNamespaceIfNotExists(PulsarResources resources,
186188
}
187189

188190
@Override
191+
@AfterClass
189192
protected void cleanup() throws Exception {
190193
pulsar1 = null;
191194
pulsar2.close();

0 commit comments

Comments
 (0)