Skip to content

Commit 4a10fe9

Browse files
authored
[refactor][broker] Decouple delayed delivery trackers from dispatcher (#25384)
1 parent 8d9d497 commit 4a10fe9

10 files changed

Lines changed: 643 additions & 447 deletions

File tree

microbench/src/main/java/org/apache/bookkeeper/mledger/impl/MockManagedCursor.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.bookkeeper.mledger.Position;
3737
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats;
3838

39-
class MockManagedCursor implements ManagedCursor {
39+
public class MockManagedCursor implements ManagedCursor {
4040
ActiveManagedCursorContainer container;
4141
Position markDeletePosition;
4242
Position readPosition;
@@ -58,7 +58,8 @@ class MockManagedCursor implements ManagedCursor {
5858
this.durable = durable;
5959
}
6060

61-
static MockManagedCursor createCursor(ActiveManagedCursorContainer container, String name, Position position) {
61+
public static MockManagedCursor createCursor(ActiveManagedCursorContainer container, String name,
62+
Position position) {
6263
return new MockManagedCursor(container, name, position, position, false, true);
6364
}
6465

Lines changed: 321 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,321 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.delayed.bucket;
20+
21+
import io.netty.util.HashedWheelTimer;
22+
import io.netty.util.Timer;
23+
import io.netty.util.concurrent.DefaultThreadFactory;
24+
import java.time.Clock;
25+
import java.util.NavigableSet;
26+
import java.util.concurrent.ThreadLocalRandom;
27+
import java.util.concurrent.TimeUnit;
28+
import java.util.concurrent.atomic.AtomicLong;
29+
import org.apache.bookkeeper.mledger.Position;
30+
import org.apache.bookkeeper.mledger.PositionFactory;
31+
import org.apache.bookkeeper.mledger.impl.ActiveManagedCursorContainerImpl;
32+
import org.apache.bookkeeper.mledger.impl.MockManagedCursor;
33+
import org.apache.pulsar.broker.delayed.DelayedDeliveryTracker;
34+
import org.apache.pulsar.broker.delayed.NoopDelayedDeliveryContext;
35+
import org.openjdk.jmh.annotations.Benchmark;
36+
import org.openjdk.jmh.annotations.BenchmarkMode;
37+
import org.openjdk.jmh.annotations.Fork;
38+
import org.openjdk.jmh.annotations.Level;
39+
import org.openjdk.jmh.annotations.Measurement;
40+
import org.openjdk.jmh.annotations.Mode;
41+
import org.openjdk.jmh.annotations.OutputTimeUnit;
42+
import org.openjdk.jmh.annotations.Param;
43+
import org.openjdk.jmh.annotations.Scope;
44+
import org.openjdk.jmh.annotations.Setup;
45+
import org.openjdk.jmh.annotations.State;
46+
import org.openjdk.jmh.annotations.TearDown;
47+
import org.openjdk.jmh.annotations.Threads;
48+
import org.openjdk.jmh.annotations.Warmup;
49+
50+
/**
51+
* JMH benchmarks for {@link BucketDelayedDeliveryTracker}.
52+
*
53+
* <p>This benchmark measures tracker throughput under different read/write ratios
54+
* and initial message counts without implying a specific lock implementation.
55+
*
56+
* <p>Run with: mvn exec:java -Dexec.mainClass="org.openjdk.jmh.Main"
57+
* -Dexec.args="BucketDelayedDeliveryTrackerBenchmark"
58+
*/
59+
@BenchmarkMode(Mode.Throughput)
60+
@OutputTimeUnit(TimeUnit.SECONDS)
61+
@State(Scope.Benchmark)
62+
@Warmup(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
63+
@Measurement(time = 10, timeUnit = TimeUnit.SECONDS, iterations = 1)
64+
@Fork(1)
65+
public class BucketDelayedDeliveryTrackerBenchmark {
66+
67+
/**
68+
* Fixed delivery timestamp base that stays far beyond any benchmark trial duration,
69+
* so scheduled tasks will not start firing while throughput is being measured.
70+
*/
71+
private static final long FUTURE_DELIVERY_BASE_TIME_MILLIS = 4102444800000L; // 2100-01-01T00:00:00Z
72+
73+
@Param({"90_10", "80_20", "70_30", "50_50"})
74+
public String readWriteRatio;
75+
76+
@Param({"1000", "5000", "8000"})
77+
public int initialMessages;
78+
79+
private BucketDelayedDeliveryTracker tracker;
80+
private Timer timer;
81+
private MockBucketSnapshotStorage storage;
82+
private NoopDelayedDeliveryContext context;
83+
private AtomicLong messageIdGenerator;
84+
private int readPercentage;
85+
private long futureDeliveryBaseTimeMillis;
86+
/**
87+
* Maximum number of additional unique (ledgerId, entryId) positions to
88+
* introduce per trial on top of {@link #initialMessages}. This allows
89+
* controlling the memory footprint of the benchmark while still applying
90+
* sustained write pressure to the tracker.
91+
*
92+
* <p>Use {@code -p maxAdditionalUniqueMessages=...} on the JMH command line
93+
* to tune the load. The default value is conservative for local runs.</p>
94+
*/
95+
@Param({"1000000"})
96+
public long maxAdditionalUniqueMessages;
97+
/**
98+
* Upper bound on the absolute message id that will be used to derive
99+
* (ledgerId, entryId) positions during a single trial.
100+
*/
101+
private long maxUniqueMessageId;
102+
/**
103+
* In real Pulsar usage, {@link DelayedDeliveryTracker#addMessage(long, long, long)} is invoked
104+
* by a single dispatcher thread and messages arrive in order of (ledgerId, entryId).
105+
* <p>
106+
* To reflect this invariant in the benchmark, all write operations that end up calling
107+
* {@code tracker.addMessage(...)} are serialized via this mutex so that the tracker only
108+
* ever observes a single writer with monotonically increasing ids, even when JMH runs the
109+
* benchmark method with multiple threads.
110+
*/
111+
private final Object writeMutex = new Object();
112+
113+
@Setup(Level.Trial)
114+
public void setup() throws Exception {
115+
setupMockComponents();
116+
createTracker();
117+
String[] parts = readWriteRatio.split("_");
118+
readPercentage = Integer.parseInt(parts[0]);
119+
futureDeliveryBaseTimeMillis = FUTURE_DELIVERY_BASE_TIME_MILLIS;
120+
preloadMessages();
121+
messageIdGenerator = new AtomicLong(initialMessages + 1);
122+
// Allow a bounded number of additional unique messages per trial to avoid
123+
// unbounded memory growth while still stressing the indexing logic.
124+
maxUniqueMessageId = initialMessages + maxAdditionalUniqueMessages;
125+
}
126+
127+
@TearDown(Level.Trial)
128+
public void tearDown() throws Exception {
129+
if (tracker != null) {
130+
tracker.close();
131+
}
132+
if (timer != null) {
133+
timer.stop();
134+
}
135+
}
136+
137+
private void setupMockComponents() throws Exception {
138+
timer = new HashedWheelTimer(new DefaultThreadFactory("test-delayed-delivery"), 100, TimeUnit.MILLISECONDS);
139+
storage = new MockBucketSnapshotStorage();
140+
141+
ActiveManagedCursorContainerImpl container = new ActiveManagedCursorContainerImpl();
142+
MockManagedCursor cursor = MockManagedCursor.createCursor(container, "test-cursor",
143+
PositionFactory.create(0, 0));
144+
// Use the same "<topic> / <cursor>" naming pattern as real dispatchers,
145+
// so that Bucket.asyncSaveBucketSnapshot can correctly derive topicName.
146+
String dispatcherName = "persistent://public/default/jmh-topic / " + cursor.getName();
147+
context = new NoopDelayedDeliveryContext(dispatcherName, cursor);
148+
}
149+
150+
private void createTracker() throws Exception {
151+
tracker = new BucketDelayedDeliveryTracker(
152+
context, timer, 1000, Clock.systemUTC(), true, storage,
153+
20, 1000, 100, 50
154+
);
155+
}
156+
157+
private void preloadMessages() {
158+
// Preload messages to create realistic test conditions while keeping
159+
// delivery timestamps far beyond the benchmark trial duration so the
160+
// tracker's timer does not start firing during measurement.
161+
long baseTime = futureDeliveryBaseTimeMillis;
162+
for (int i = 1; i <= initialMessages; i++) {
163+
tracker.addMessage(i, i, baseTime + i * 1000L);
164+
}
165+
}
166+
167+
// =============================================================================
168+
// READ-WRITE RATIO BENCHMARKS
169+
// =============================================================================
170+
171+
@Benchmark
172+
public boolean benchmarkMixedOperations() {
173+
if (ThreadLocalRandom.current().nextInt(100) < readPercentage) {
174+
// Read operations
175+
return performReadOperation();
176+
} else {
177+
// Write operations
178+
return performWriteOperation();
179+
}
180+
}
181+
182+
/**
183+
* Serialize calls to {@link BucketDelayedDeliveryTracker#addMessage(long, long, long)} and
184+
* ensure (ledgerId, entryId) are generated in a strictly increasing sequence, matching the
185+
* real dispatcher single-threaded behaviour.
186+
*/
187+
private boolean addMessageSequential(long deliverAt, int entryIdModulo) {
188+
synchronized (writeMutex) {
189+
long id = messageIdGenerator.getAndIncrement();
190+
// Limit the number of distinct positions that are introduced into the tracker
191+
// to keep memory usage bounded. Once the upper bound is reached, we re-use
192+
// the last position id so that subsequent calls behave like updates to
193+
// existing messages and are short-circuited by containsMessage checks.
194+
long boundedId = Math.min(id, maxUniqueMessageId);
195+
long ledgerId = boundedId;
196+
long entryId = boundedId % entryIdModulo;
197+
return tracker.addMessage(ledgerId, entryId, deliverAt);
198+
}
199+
}
200+
201+
private boolean performReadOperation() {
202+
int operation = ThreadLocalRandom.current().nextInt(3);
203+
switch (operation) {
204+
case 0:
205+
// containsMessage
206+
long ledgerId = ThreadLocalRandom.current().nextLong(1, initialMessages + 100);
207+
long entryId = ThreadLocalRandom.current().nextLong(1, 1000);
208+
return tracker.containsMessage(ledgerId, entryId);
209+
case 1:
210+
// nextDeliveryTime
211+
try {
212+
tracker.nextDeliveryTime();
213+
return true;
214+
} catch (Exception e) {
215+
return false;
216+
}
217+
case 2:
218+
// getNumberOfDelayedMessages
219+
long count = tracker.getNumberOfDelayedMessages();
220+
return count >= 0;
221+
default:
222+
return false;
223+
}
224+
}
225+
226+
private boolean performWriteOperation() {
227+
long deliverAt = futureDeliveryBaseTimeMillis + ThreadLocalRandom.current().nextLong(5000, 30000);
228+
return addMessageSequential(deliverAt, 1000);
229+
}
230+
231+
// =============================================================================
232+
// SPECIFIC OPERATION BENCHMARKS
233+
// =============================================================================
234+
235+
@Benchmark
236+
@Threads(8)
237+
public boolean benchmarkConcurrentContainsMessage() {
238+
long ledgerId = ThreadLocalRandom.current().nextLong(1, initialMessages + 100);
239+
long entryId = ThreadLocalRandom.current().nextLong(1, 1000);
240+
return tracker.containsMessage(ledgerId, entryId);
241+
}
242+
243+
@Benchmark
244+
@Threads(4)
245+
public boolean benchmarkConcurrentAddMessage() {
246+
long deliverAt = futureDeliveryBaseTimeMillis + ThreadLocalRandom.current().nextLong(10000, 60000);
247+
return addMessageSequential(deliverAt, 1000);
248+
}
249+
250+
@Benchmark
251+
@Threads(2)
252+
public NavigableSet<Position> benchmarkConcurrentGetScheduledMessages() {
253+
// Create some messages ready for delivery
254+
long currentTime = System.currentTimeMillis();
255+
for (int i = 0; i < 5; i++) {
256+
addMessageSequential(currentTime - 1000, 100);
257+
}
258+
return tracker.getScheduledMessages(10);
259+
}
260+
261+
@Benchmark
262+
@Threads(16)
263+
public long benchmarkConcurrentNextDeliveryTime() {
264+
try {
265+
return tracker.nextDeliveryTime();
266+
} catch (Exception e) {
267+
return -1;
268+
}
269+
}
270+
271+
@Benchmark
272+
@Threads(1)
273+
public long benchmarkGetNumberOfDelayedMessages() {
274+
return tracker.getNumberOfDelayedMessages();
275+
}
276+
277+
// =============================================================================
278+
// HIGH CONTENTION SCENARIOS
279+
// =============================================================================
280+
281+
@Benchmark
282+
@Threads(32)
283+
public boolean benchmarkHighContentionMixedOperations() {
284+
return benchmarkMixedOperations();
285+
}
286+
287+
@Benchmark
288+
@Threads(16)
289+
public boolean benchmarkContentionReads() {
290+
return performReadOperation();
291+
}
292+
293+
@Benchmark
294+
@Threads(8)
295+
public boolean benchmarkContentionWrites() {
296+
return performWriteOperation();
297+
}
298+
299+
// =============================================================================
300+
// THROUGHPUT BENCHMARKS
301+
// =============================================================================
302+
303+
@Benchmark
304+
@Threads(1)
305+
public boolean benchmarkSingleThreadedThroughput() {
306+
return benchmarkMixedOperations();
307+
}
308+
309+
@Benchmark
310+
@Threads(4)
311+
public boolean benchmarkMediumConcurrencyThroughput() {
312+
return benchmarkMixedOperations();
313+
}
314+
315+
@Benchmark
316+
@Threads(8)
317+
public boolean benchmarkHighConcurrencyThroughput() {
318+
return benchmarkMixedOperations();
319+
}
320+
321+
}

0 commit comments

Comments
 (0)