-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathRingBrokerBenchmark.java
More file actions
409 lines (352 loc) · 16 KB
/
RingBrokerBenchmark.java
File metadata and controls
409 lines (352 loc) · 16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
package io.ringbroker.benchmark;
import io.ringbroker.broker.ingress.ClusteredIngress;
import io.ringbroker.broker.role.BrokerRole;
import io.ringbroker.cluster.client.RemoteBrokerClient;
import io.ringbroker.cluster.membership.member.Member;
import io.ringbroker.cluster.membership.replicator.AdaptiveReplicator;
import io.ringbroker.cluster.membership.resolver.ReplicaSetResolver;
import io.ringbroker.cluster.metadata.BroadcastingLogMetadataStore;
import io.ringbroker.cluster.metadata.JournaledLogMetadataStore;
import io.ringbroker.cluster.metadata.LogMetadataStore;
import io.ringbroker.cluster.partitioner.impl.RoundRobinPartitioner;
import io.ringbroker.core.wait.AdaptiveSpin;
import io.ringbroker.core.wait.Blocking;
import io.ringbroker.core.wait.BusySpin;
import io.ringbroker.core.wait.WaitStrategy;
import io.ringbroker.offset.InMemoryOffsetStore;
import io.ringbroker.proto.test.EventsProto;
import io.ringbroker.registry.TopicRegistry;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.infra.Blackhole;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
import java.nio.file.*;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
@State(Scope.Benchmark)
public class RingBrokerBenchmark {
private static final int TOTAL_PARTITIONS = 16;
private static final int RING_SIZE = 1 << 20;
private static final long SEG_BYTES = 256L << 20;
private static final int BATCH_SIZE = 12_000;
private static final String TOPIC = "orders/created";
private static final Path DATA = Paths.get("data-jmh-cluster");
// Window size for the "committed" benchmark: controls in-flight pressure.
private static final int INFLIGHT_WINDOW = 4096;
// Precomputed keys length: power-of-two for cheap masking.
private static final int KEY_POOL_SIZE = 1 << 17; // 131072
// Replication timeout tuned for heavy in-proc load to avoid spurious quorum drops.
private static final long REPL_TIMEOUT_MS = 60_000L;
@Param({"100000"})
private int totalMessages;
@Param({"adaptive-spin", "blocking", "busy-spin"})
private String waitStrategy;
/**
* Profile:
* - fast : single-node, ingestion role (no fsync/quorum)
* - quorum-lite : 3-node, quorum=2, ingestion role (replication without fsync)
* - quorum : 3-node, quorum=2, persistence role (durable path)
* - frontdoor : 1 ingestion node forwarding to 2 persistence nodes (quorum=2, durable)
*/
//@Param({"fast"})
private String profile = "frontdoor";
private byte[] payload;
private byte[][] keys;
private static final org.slf4j.Logger log = org.slf4j.LoggerFactory.getLogger(RingBrokerBenchmark.class);
// Cluster state
private Map<Integer, ClusteredIngress> clusterIngresses;
private Map<Integer, InMemoryOffsetStore> clusterOffsetStores;
private ReplicaSetResolver clusterResolver;
// Fast handle to the leader to avoid map lookups in the hot loops.
private ClusteredIngress leader;
@Setup(Level.Trial)
public void setup() throws Exception {
try {
wipeDir(DATA);
Files.createDirectories(DATA);
final TopicRegistry registry = new TopicRegistry.Builder()
.topic(TOPIC, EventsProto.OrderCreated.getDescriptor())
.build();
final WaitStrategy ws = switch (this.waitStrategy) {
case "adaptive-spin" -> new AdaptiveSpin();
case "blocking" -> new Blocking();
case "busy-spin" -> new BusySpin();
default -> throw new IllegalArgumentException("Unknown wait strategy: " + this.waitStrategy);
};
payload = EventsProto.OrderCreated.newBuilder()
.setOrderId("ord-1")
.setCustomer("bob")
.build()
.toByteArray();
// Precompute keys once to remove allocation + UTF8 encoding from the benchmark.
keys = new byte[KEY_POOL_SIZE][];
for (int i = 0; i < KEY_POOL_SIZE; i++) {
keys[i] = ("qkey-" + i).getBytes(StandardCharsets.UTF_8);
}
final Profile prof = profileConfig();
buildCluster(registry, ws, prof);
leader = clusterIngresses.get(prof.leaderId());
} catch (Exception e) {
try { tearDown(); } catch (Exception ignore) {}
throw e;
}
}
@TearDown(Level.Trial)
public void tearDown() {
if (clusterIngresses != null) {
for (ClusteredIngress ci : clusterIngresses.values()) {
if (ci == null) continue;
try { ci.shutdown(); } catch (Exception ignore) {}
}
}
if (clusterOffsetStores != null) {
for (InMemoryOffsetStore os : clusterOffsetStores.values()) {
if (os == null) continue;
try { os.close(); } catch (Exception ignore) {}
}
}
log.info("=== Cluster benchmark complete ===");
}
/**
* Fire-and-forget throughput: measures "accepted/enqueued" pressure + backpressure behaviour,
* not necessarily durability/quorum commit.
*
* With totalMessages=100000 and OperationsPerInvocation=100000, JMH reports msgs/sec directly.
*/
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@Warmup(iterations = 1, time = 5)
@Measurement(iterations = 3, time = 10)
@OperationsPerInvocation(100000)
public void quorumPublish(final Blackhole blackhole) {
final ClusteredIngress l = leader;
final byte[] p = payload;
final byte[][] ks = keys;
final int mask = ks.length - 1;
for (int written = 0; written < totalMessages; written++) {
final byte[] key = ks[written & mask];
l.publish(TOPIC, key, p);
}
blackhole.consume(totalMessages);
}
/**
* Committed throughput: windowed in-flight publishing that waits for completion.
* This is the benchmark you want when you care about quorum and "real" completion semantics.
*
* Still reports msgs/sec directly.
*/
@Benchmark
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Fork(value = 1)
@Warmup(iterations = 1, time = 5)
@Measurement(iterations = 3, time = 10)
@OperationsPerInvocation(100000)
public void quorumPublishCommittedWindowed(final Blackhole blackhole) {
final ClusteredIngress l = leader;
final byte[] p = payload;
final byte[][] ks = keys;
final int mask = ks.length - 1;
@SuppressWarnings("unchecked")
final CompletableFuture<Void>[] inflight = (CompletableFuture<Void>[]) new CompletableFuture[INFLIGHT_WINDOW];
int written = 0;
while (written < totalMessages) {
int w = 0;
// Fill a window.
for (; w < INFLIGHT_WINDOW && written < totalMessages; w++, written++) {
final byte[] key = ks[written & mask];
inflight[w] = l.publish(TOPIC, key, p);
}
// Wait the window (sequential join avoids creating allOf() objects each window).
for (int i = 0; i < w; i++) {
inflight[i].join();
inflight[i] = null; // help GC in long runs / different params
}
}
blackhole.consume(written);
}
/**
* Message-level latency probe: one publish+wait per operation. Use this to inspect p50/p99.
* (Do NOT multiply by 100k; this one is already "1 message op".)
*/
@Benchmark
@BenchmarkMode(Mode.SampleTime)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Fork(value = 1)
@Warmup(iterations = 1, time = 5)
@Measurement(iterations = 3, time = 10)
public void quorumPublishSingleMessageLatency(final Blackhole blackhole) {
// Use a changing key to avoid pathological caching / same-partition artifacts.
final int idx = (int) (System.nanoTime() & (keys.length - 1));
leader.publish(TOPIC, keys[idx], payload).join();
blackhole.consume(idx);
}
private record Profile(int persistenceNodes,
int ingestionNodes,
int ackQuorum,
BrokerRole persistenceRole,
boolean frontdoor,
int leaderId) {}
private Profile profileConfig() {
if ("fast".equalsIgnoreCase(profile)) {
return new Profile(1, 0, 1, BrokerRole.INGESTION, false, 0);
}
if ("quorum-lite".equalsIgnoreCase(profile)) {
return new Profile(3, 0, 2, BrokerRole.INGESTION, false, 0);
}
if ("quorum".equalsIgnoreCase(profile)) {
return new Profile(3, 0, 2, BrokerRole.PERSISTENCE, false, 0);
}
if ("frontdoor".equalsIgnoreCase(profile)) {
// ingestion node id = 2, persistence nodes = 0,1
return new Profile(2, 1, 2, BrokerRole.PERSISTENCE, true, 2);
}
throw new IllegalArgumentException("Unknown profile: " + profile + " (expected fast|quorum-lite|quorum|frontdoor)");
}
private void buildCluster(final TopicRegistry registry,
final WaitStrategy ws,
final Profile cfg) throws IOException {
final int persistenceNodes = Math.max(1, cfg.persistenceNodes());
final int ingestionNodes = Math.max(0, cfg.ingestionNodes());
final int totalNodes = persistenceNodes + ingestionNodes;
final int ackQuorum = Math.min(persistenceNodes, Math.max(1, cfg.ackQuorum()));
final List<Member> persistenceMembers = new ArrayList<>(persistenceNodes);
for (int i = 0; i < persistenceNodes; i++) {
persistenceMembers.add(new Member(
i,
cfg.persistenceRole(),
new InetSocketAddress("localhost", 9000 + i),
System.currentTimeMillis(),
1
));
}
clusterResolver = new ReplicaSetResolver(persistenceNodes, () -> persistenceMembers);
clusterIngresses = new HashMap<>(totalNodes);
clusterOffsetStores = new HashMap<>(totalNodes);
final Map<Integer, AtomicReference<ClusteredIngress>> targets = new HashMap<>();
for (int i = 0; i < totalNodes; i++) targets.put(i, new AtomicReference<>());
for (int nodeId = 0; nodeId < totalNodes; nodeId++) {
final Path nodeDir = DATA.resolve("node-" + nodeId);
Files.createDirectories(nodeDir);
for (int p = 0; p < TOTAL_PARTITIONS; p++) {
Files.createDirectories(nodeDir.resolve("partition-" + p));
}
final Path offsetsDir = nodeDir.resolve("offsets");
Files.createDirectories(offsetsDir);
final InMemoryOffsetStore offsetStore = new InMemoryOffsetStore(offsetsDir);
clusterOffsetStores.put(nodeId, offsetStore);
final Map<Integer, RemoteBrokerClient> clientsForNode = new ConcurrentHashMap<>();
final LogMetadataStore metadataStore = new BroadcastingLogMetadataStore(
new JournaledLogMetadataStore(nodeDir.resolve("metadata")),
clientsForNode,
nodeId,
targets::keySet
);
final AdaptiveReplicator rep = new AdaptiveReplicator(ackQuorum, clientsForNode, REPL_TIMEOUT_MS);
final ClusteredIngress ci = ClusteredIngress.create(
registry,
new RoundRobinPartitioner(),
TOTAL_PARTITIONS,
nodeId,
persistenceNodes,
clientsForNode,
nodeDir,
RING_SIZE,
ws,
SEG_BYTES,
BATCH_SIZE,
false,
offsetStore,
(cfg.frontdoor() && nodeId >= persistenceNodes)
? BrokerRole.INGESTION
: cfg.persistenceRole(),
clusterResolver,
rep,
metadataStore
);
clusterIngresses.put(nodeId, ci);
targets.get(nodeId).set(ci);
clientsForNode.putAll(buildInProcClients(targets, nodeId));
}
}
private static Map<Integer, RemoteBrokerClient> buildInProcClients(
final Map<Integer, AtomicReference<ClusteredIngress>> targets,
final int selfId
) {
final Map<Integer, RemoteBrokerClient> m = new HashMap<>();
for (Map.Entry<Integer, AtomicReference<ClusteredIngress>> e : targets.entrySet()) {
final int id = e.getKey();
if (id == selfId) continue;
m.put(id, new InProcClient(e.getValue()));
}
return m;
}
private static final class InProcClient implements RemoteBrokerClient {
private final AtomicReference<ClusteredIngress> target;
InProcClient(final AtomicReference<ClusteredIngress> target) {
this.target = target;
}
@Override
public void sendMessage(final String topic, final byte[] key, final byte[] payload) {
// not used here
}
@Override
public CompletableFuture<io.ringbroker.api.BrokerApi.ReplicationAck> sendEnvelopeWithAck(
final io.ringbroker.api.BrokerApi.Envelope envelope
) {
final ClusteredIngress ing = target.get();
if (ing == null) {
return CompletableFuture.failedFuture(new IllegalStateException("target not set"));
}
return switch (envelope.getKindCase()) {
case PUBLISH -> {
final io.ringbroker.api.BrokerApi.Message m = envelope.getPublish();
final byte[] key = m.getKey().isEmpty() ? null : m.getKey().toByteArray();
yield ing.publish(
envelope.getCorrelationId(),
m.getTopic(),
key,
m.getRetries(),
m.getPayload().toByteArray()
)
.thenApply(v -> io.ringbroker.api.BrokerApi.ReplicationAck.newBuilder()
.setStatus(io.ringbroker.api.BrokerApi.ReplicationAck.Status.SUCCESS)
.build());
}
case APPEND -> ing.handleAppendAsync(envelope.getAppend());
case APPEND_BATCH -> ing.handleAppendBatchAsync(envelope.getAppendBatch());
case SEAL -> ing.handleSealAsync(envelope.getSeal());
case OPEN_EPOCH -> ing.handleOpenEpochAsync(envelope.getOpenEpoch());
case METADATA_UPDATE -> ing.handleMetadataUpdateAsync(envelope.getMetadataUpdate());
default -> CompletableFuture.failedFuture(
new UnsupportedOperationException("Unsupported kind " + envelope.getKindCase())
);
};
}
@Override
public CompletableFuture<io.ringbroker.api.BrokerApi.BackfillReply> sendBackfill(
final io.ringbroker.api.BrokerApi.Envelope envelope
) {
// not used by this benchmark
return CompletableFuture.completedFuture(
io.ringbroker.api.BrokerApi.BackfillReply.newBuilder().build()
);
}
}
private static void wipeDir(final Path root) throws IOException {
if (!Files.exists(root)) return;
try (Stream<Path> stream = Files.walk(root)) {
stream.sorted(Comparator.reverseOrder()).forEach(p -> {
try { Files.deleteIfExists(p); }
catch (Exception e) { /* ignore */ }
});
}
}
}