Skip to content

Commit 85937d8

Browse files
thetumbledfengwenzhi
andauthored
[fix][broker] Unthrottle producers immediately when publish rate limiting is disabled (#25502)
Co-authored-by: fengwenzhi <fengwenzhi.max@bigo.sg>
1 parent 840fc34 commit 85937d8

2 files changed

Lines changed: 142 additions & 2 deletions

File tree

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/PublishRateLimiterImpl.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@ public class PublishRateLimiterImpl implements PublishRateLimiter {
4343

4444
private final AtomicInteger throttledProducersCount = new AtomicInteger(0);
4545
private final AtomicBoolean processingQueuedProducers = new AtomicBoolean(false);
46+
47+
/**
48+
* Executor used for the last {@link #scheduleUnthrottling} from this limiter (set when throttling starts).
49+
* Used to schedule an immediate follow-up run after publish-rate limits change.
50+
*/
51+
private volatile ScheduledExecutorService lastUnthrottleExecutor;
4652
private final Consumer<Producer> throttleAction;
4753
private final Consumer<Producer> unthrottleAction;
4854

@@ -88,6 +94,7 @@ private void scheduleDecrementThrottleCount(Producer producer) {
8894
// this is to avoid scheduling unthrottling multiple times for concurrent producers
8995
if (throttledProducersCount.incrementAndGet() == 1) {
9096
ScheduledExecutorService executor = producer.getCnx().getBrokerService().executor().next();
97+
lastUnthrottleExecutor = executor;
9198
scheduleUnthrottling(executor, calculateThrottlingDurationNanos());
9299
}
93100
}
@@ -167,12 +174,18 @@ public void update(Policies policies, String clusterName) {
167174
update(maxPublishRate);
168175
}
169176

177+
private void scheduleImmediateUnthrottling() {
178+
ScheduledExecutorService executor = lastUnthrottleExecutor;
179+
if (executor != null) {
180+
scheduleUnthrottling(executor, 0L);
181+
}
182+
}
183+
170184
public void update(PublishRate maxPublishRate) {
171185
if (maxPublishRate != null) {
172186
updateTokenBuckets(maxPublishRate.publishThrottlingRateInMsg, maxPublishRate.publishThrottlingRateInByte);
173187
} else {
174-
tokenBucketOnMessage = null;
175-
tokenBucketOnByte = null;
188+
updateTokenBuckets(0L, 0L);
176189
}
177190
}
178191

@@ -189,6 +202,9 @@ protected void updateTokenBuckets(long publishThrottlingRateInMsg, long publishT
189202
} else {
190203
tokenBucketOnByte = null;
191204
}
205+
// After any bucket rebuild, wake unthrottling:
206+
// old scheduled delay may be invalid and cause unnecessary wait time for producers to be unthrottled.
207+
scheduleImmediateUnthrottling();
192208
}
193209

194210
@VisibleForTesting

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PublishRateLimiterTest.java

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,19 +19,26 @@
1919

2020
package org.apache.pulsar.broker.service;
2121

22+
import static org.mockito.ArgumentMatchers.any;
23+
import static org.mockito.ArgumentMatchers.anyLong;
2224
import static org.mockito.Mockito.doAnswer;
2325
import static org.mockito.Mockito.mock;
2426
import static org.mockito.Mockito.when;
2527
import static org.testng.Assert.assertEquals;
28+
import static org.testng.Assert.assertNotNull;
29+
import static org.testng.Assert.assertTrue;
2630
import io.netty.channel.ChannelHandlerContext;
2731
import io.netty.channel.DefaultEventLoop;
2832
import io.netty.channel.EventLoop;
2933
import io.netty.channel.EventLoopGroup;
3034
import io.netty.util.concurrent.DefaultThreadFactory;
35+
import io.netty.util.concurrent.ScheduledFuture;
3136
import java.util.HashMap;
3237
import java.util.concurrent.CompletableFuture;
3338
import java.util.concurrent.TimeUnit;
39+
import java.util.concurrent.atomic.AtomicInteger;
3440
import java.util.concurrent.atomic.AtomicLong;
41+
import org.apache.pulsar.broker.qos.AsyncTokenBucket;
3542
import org.apache.pulsar.common.policies.data.Policies;
3643
import org.apache.pulsar.common.policies.data.PublishRate;
3744
import org.testng.annotations.AfterMethod;
@@ -149,4 +156,121 @@ public void testPublishRateLimiterImplUpdate() throws Exception {
149156
});
150157
future.get(5, TimeUnit.SECONDS);
151158
}
159+
160+
/**
161+
* When the token bucket is deeply depleted, the first scheduled unthrottle uses a long delay. Disabling limits
162+
* must schedule an immediate unthrottle (delay 0) so producers are not stuck until that delay elapses.
163+
*/
164+
@Test
165+
public void shouldUnthrottleImmediatelyAfterDisablingLimitsDespiteLongPendingDelay() {
166+
AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
167+
AtomicInteger unthrottleCalls = new AtomicInteger();
168+
169+
PublishRateLimiterImpl limiter = new PublishRateLimiterImpl(
170+
manualClock::get,
171+
p -> { },
172+
p -> unthrottleCalls.incrementAndGet());
173+
174+
EventLoop scheduler = mock(EventLoop.class);
175+
AtomicInteger longDelaySchedules = new AtomicInteger();
176+
doAnswer(invocation -> {
177+
Runnable task = invocation.getArgument(0);
178+
long delay = invocation.getArgument(1);
179+
TimeUnit unit = invocation.getArgument(2);
180+
long delayNanos = unit.toNanos(delay);
181+
if (delayNanos == 0L) {
182+
task.run();
183+
} else {
184+
longDelaySchedules.incrementAndGet();
185+
}
186+
@SuppressWarnings("unchecked")
187+
ScheduledFuture<?> scheduled = mock(ScheduledFuture.class);
188+
return scheduled;
189+
}).when(scheduler).schedule(any(Runnable.class), anyLong(), any());
190+
191+
Producer p = mock(Producer.class);
192+
ServerCnx cnx = mock(ServerCnx.class);
193+
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
194+
doAnswer(a -> ctx).when(cnx).ctx();
195+
doAnswer(a -> cnx).when(p).getCnx();
196+
when(p.getCnx()).thenReturn(cnx);
197+
doAnswer(a -> {
198+
((Runnable) a.getArgument(0)).run();
199+
return null;
200+
}).when(cnx).execute(any(Runnable.class));
201+
202+
BrokerService brokerService = mock(BrokerService.class);
203+
when(cnx.getBrokerService()).thenReturn(brokerService);
204+
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
205+
when(brokerService.executor()).thenReturn(eventLoopGroup);
206+
when(eventLoopGroup.next()).thenReturn(scheduler);
207+
208+
limiter.update(new PublishRate(1, 0));
209+
manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1));
210+
211+
limiter.handlePublishThrottling(p, 100_000, 0L);
212+
assertEquals(unthrottleCalls.get(), 0);
213+
assertTrue(longDelaySchedules.get() >= 1,
214+
"Expected a long-delay unthrottle to be scheduled while the bucket is deeply depleted");
215+
216+
limiter.update(new PublishRate(0, 0));
217+
assertEquals(unthrottleCalls.get(), 1);
218+
}
219+
220+
/**
221+
* Relaxing only the byte limit still invalidates a previously scheduled long unthrottle delay; an immediate
222+
* unthrottle pass must run after buckets are rebuilt.
223+
*/
224+
@Test
225+
public void shouldUnthrottleImmediatelyAfterRaisingByteLimitDespiteLongPendingDelay() {
226+
AtomicLong manualClock = new AtomicLong(TimeUnit.SECONDS.toNanos(100));
227+
AtomicInteger unthrottleCalls = new AtomicInteger();
228+
229+
PublishRateLimiterImpl limiter = new PublishRateLimiterImpl(
230+
manualClock::get,
231+
p -> { },
232+
p -> unthrottleCalls.incrementAndGet());
233+
234+
EventLoop scheduler = mock(EventLoop.class);
235+
doAnswer(invocation -> {
236+
Runnable task = invocation.getArgument(0);
237+
long delay = invocation.getArgument(1);
238+
TimeUnit unit = invocation.getArgument(2);
239+
if (unit.toNanos(delay) == 0L) {
240+
task.run();
241+
}
242+
@SuppressWarnings("unchecked")
243+
ScheduledFuture<?> scheduled = mock(ScheduledFuture.class);
244+
return scheduled;
245+
}).when(scheduler).schedule(any(Runnable.class), anyLong(), any());
246+
247+
Producer p = mock(Producer.class);
248+
ServerCnx cnx = mock(ServerCnx.class);
249+
ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
250+
doAnswer(a -> ctx).when(cnx).ctx();
251+
doAnswer(a -> cnx).when(p).getCnx();
252+
when(p.getCnx()).thenReturn(cnx);
253+
doAnswer(a -> {
254+
((Runnable) a.getArgument(0)).run();
255+
return null;
256+
}).when(cnx).execute(any(Runnable.class));
257+
258+
BrokerService brokerService = mock(BrokerService.class);
259+
when(cnx.getBrokerService()).thenReturn(brokerService);
260+
EventLoopGroup eventLoopGroup = mock(EventLoopGroup.class);
261+
when(brokerService.executor()).thenReturn(eventLoopGroup);
262+
when(eventLoopGroup.next()).thenReturn(scheduler);
263+
264+
limiter.update(new PublishRate(0, 1));
265+
manualClock.addAndGet(TimeUnit.SECONDS.toNanos(1));
266+
267+
limiter.handlePublishThrottling(p, 0, 100_000L);
268+
assertEquals(unthrottleCalls.get(), 0);
269+
270+
limiter.update(new PublishRate(0, 1_000_000));
271+
assertEquals(unthrottleCalls.get(), 1);
272+
273+
AsyncTokenBucket byteBucket = limiter.getTokenBucketOnByte();
274+
assertNotNull(byteBucket);
275+
}
152276
}

0 commit comments

Comments
 (0)