Skip to content
Merged

Dev #25

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 45 additions & 24 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,23 @@ group = 'io.ringbroker'
version = '0.1.0-SNAPSHOT'

/* ---------- JVM ---------- */
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
withJavadocJar()
withSourcesJar()
}

application {
// for Gradle 7.1+ use:
mainClass.set("io.ringbroker.BrokerMain")
}
java {
toolchain {
languageVersion = JavaLanguageVersion.of(21)
}
withJavadocJar()
withSourcesJar()
}

// Ensure UTF-8 source encoding (avoids Windows-1252 build failures).
tasks.withType(JavaCompile).configureEach {
options.encoding = 'UTF-8'
}

application {
// for Gradle 7.1+ use:
mainClass.set("io.ringbroker.BrokerMain")
}

/* ---------- Repos & Versions ---------- */
repositories { mavenCentral() }
Expand Down Expand Up @@ -112,16 +117,32 @@ jar {
}

/* ---------- JMH Configuration ---------- */
jmh {
includes = ['.*Benchmark.*'] // Include classes with "Benchmark" in their name
resultFormat = 'JSON' // Output format for results
resultsFile = project.file("${project.buildDir}/reports/jmh/results.json")
timeOnIteration = '1s' // Time per iteration
jmh {
includes = ['.*Benchmark.*'] // Include classes with "Benchmark" in their name
resultFormat = 'JSON' // Output format for results
resultsFile = project.file("${project.buildDir}/reports/jmh/results.json")
timeOnIteration = '1s' // Time per iteration
warmupIterations = 2 // Number of warmup iterations
iterations = 5 // Number of measurement iterations
fork = 2 // Number of forks
failOnError = true // Fail build on errors during benchmarking
forceGC = true // Force GC between iterations
jvmArgsAppend = ['--enable-preview'] // Add any JVM args needed for your project
}

iterations = 5 // Number of measurement iterations
fork = 2 // Number of forks
failOnError = true // Fail build on errors during benchmarking
forceGC = true // Force GC between iterations
jvmArgsAppend = ['--enable-preview'] // Add any JVM args needed for your project

// Allow quick overrides from the command line (e.g. -PjmhInclude=Foo -PjmhIterations=1).
if (project.hasProperty('jmhInclude')) {
includes = [project.property('jmhInclude')]
}
if (project.hasProperty('jmhIterations')) {
iterations = Integer.parseInt(project.property('jmhIterations') as String)
}
if (project.hasProperty('jmhWarmupIterations')) {
warmupIterations = Integer.parseInt(project.property('jmhWarmupIterations') as String)
}
if (project.hasProperty('jmhFork')) {
fork = Integer.parseInt(project.property('jmhFork') as String)
}
if (project.hasProperty('jmhIgnoreLock') && project.property('jmhIgnoreLock').toString().toBoolean()) {
jvmArgsAppend += ['-Djmh.ignoreLock=true']
}
}
90 changes: 30 additions & 60 deletions src/jmh/java/io/ringbroker/benchmark/RawTcpClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,21 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;

/**
* A raw-TCP client using Protobuf varint32 framing to communicate with the RingBroker server.
*/
public class RawTcpClient implements AutoCloseable {

private static final IoHandlerFactory SHARED_FACTORY = NioIoHandler.newFactory();
private static final int FLUSH_BATCH_SIZE = 1;

private final Channel channel;
private final EventLoopGroup group;
private final LongAdder nextCorr = new LongAdder();

private final AtomicLong nextCorr = new AtomicLong(1); // 0 reserved for server-push (subscribe)
private final ConcurrentMap<Long, CompletableFuture<BrokerApi.Envelope>> inflight = new ConcurrentHashMap<>();
private final ClientHandler handler = new ClientHandler(inflight);

private int writeCounter = 0;

public RawTcpClient(final String host, final int port) throws InterruptedException {
Expand All @@ -46,18 +45,10 @@ public RawTcpClient(final String host, final int port) throws InterruptedExcepti
@Override
protected void initChannel(final Channel ch) {
final ChannelPipeline p = ch.pipeline();

// Inbound: split by varint32 length prefix
p.addLast(new ProtobufVarint32FrameDecoder());
// Inbound: decode bytes into Envelope messages
p.addLast(new ProtobufDecoder(BrokerApi.Envelope.getDefaultInstance()));

// Outbound: prepend varint32 length prefix
p.addLast(new ProtobufVarint32LengthFieldPrepender());
// Outbound: serialize Envelope to bytes
p.addLast(new ProtobufEncoder());

// Business logic handler
p.addLast(handler);
}
});
Expand All @@ -73,8 +64,7 @@ private void maybeFlush() {
}

private CompletableFuture<BrokerApi.Envelope> sendEnv(BrokerApi.Envelope env) {
final long id = nextCorr.longValue();
nextCorr.increment();
final long id = nextCorr.getAndIncrement();
env = env.toBuilder().setCorrelationId(id).build();

final CompletableFuture<BrokerApi.Envelope> fut = new CompletableFuture<>();
Expand All @@ -85,41 +75,32 @@ private CompletableFuture<BrokerApi.Envelope> sendEnv(BrokerApi.Envelope env) {
return fut;
}

/**
* 1) Publish one message
*/
public CompletableFuture<Void> publishAsync(final BrokerApi.Message msg) {
final BrokerApi.Envelope env = BrokerApi.Envelope.newBuilder()
.setPublish(msg)
.build();

return sendEnv(env).thenCompose(reply -> {
final var ack = reply.getPublishReply();
if (ack.getSuccess()) return CompletableFuture.completedFuture(null);
else return CompletableFuture.failedFuture(
new RuntimeException("publish failed: " + ack.getError())
);
return ack.getSuccess()
? CompletableFuture.completedFuture(null)
: CompletableFuture.failedFuture(new RuntimeException("publish failed: " + ack.getError()));
});
}

/**
* 2) Publish a batch of messages
*/
public CompletableFuture<Void> publishBatchAsync(final List<BrokerApi.Message> msgs) {
final BrokerApi.Envelope env = BrokerApi.Envelope.newBuilder()
.setBatch(BrokerApi.BatchMessage.newBuilder().addAllMessages(msgs))
.build();

return sendEnv(env).thenCompose(reply -> {
final var ack = reply.getPublishReply();
if (ack.getSuccess()) return CompletableFuture.completedFuture(null);
else return CompletableFuture.failedFuture(
new RuntimeException("batch failed: " + ack.getError())
);
return ack.getSuccess()
? CompletableFuture.completedFuture(null)
: CompletableFuture.failedFuture(new RuntimeException("batch failed: " + ack.getError()));
});
}

/**
* 3) Fetch up to maxMsgs from (topic,partition,offset)
*/
public CompletableFuture<List<BrokerApi.MessageEvent>> fetchAsync(
final String topic, final int partition, final long offset, final int maxMsgs
) {
Expand All @@ -130,13 +111,10 @@ public CompletableFuture<List<BrokerApi.MessageEvent>> fetchAsync(
.setOffset(offset)
.setMaxMessages(maxMsgs)
).build();
return sendEnv(env)
.thenApply(r -> r.getFetchReply().getMessagesList());

return sendEnv(env).thenApply(r -> r.getFetchReply().getMessagesList());
}

/**
* 4) Commit an offset
*/
public CompletableFuture<Void> commitAsync(
final String topic, final String group, final int partition, final long offset
) {
Expand All @@ -147,12 +125,10 @@ public CompletableFuture<Void> commitAsync(
.setPartition(partition)
.setOffset(offset)
).build();

return sendEnv(env).thenApply(r -> null);
}

/**
* 5) Get committed offset
*/
public CompletableFuture<Long> fetchCommittedAsync(
final String topic, final String group, final int partition
) {
Expand All @@ -162,29 +138,24 @@ public CompletableFuture<Long> fetchCommittedAsync(
.setGroup(group)
.setPartition(partition)
).build();
return sendEnv(env)
.thenApply(r -> r.getCommittedReply().getOffset());

return sendEnv(env).thenApply(r -> r.getCommittedReply().getOffset());
}

/**
* 6) Subscribe: set callback, then send subscribe request
*/
public void subscribe(
final String topic, final String group,
final BiConsumer<Long, byte[]> messageHandler
) {
handler.setSubscribeHandler(messageHandler);
final BrokerApi.Envelope env = BrokerApi.Envelope.newBuilder()
.setCorrelationId(0)
.setSubscribe(BrokerApi.SubscribeRequest.newBuilder()
.setTopic(topic)
.setGroup(group)
).build();
channel.writeAndFlush(env);
}

/**
* Force any buffered writes out
*/
public void finishAndFlush() {
if (writeCounter > 0) {
channel.flush();
Expand All @@ -199,41 +170,40 @@ public void close() {
group.shutdownGracefully();
}

// ---------------------------------
// Internal handler for inbound Envelopes
// ---------------------------------
@ChannelHandler.Sharable
private static class ClientHandler
extends SimpleChannelInboundHandler<BrokerApi.Envelope> {

private static class ClientHandler extends SimpleChannelInboundHandler<BrokerApi.Envelope> {
private final ConcurrentMap<Long, CompletableFuture<BrokerApi.Envelope>> inflight;
private volatile BiConsumer<Long, byte[]> subscribeHandler = (seq, b) -> {
};
private volatile BiConsumer<Long, byte[]> subscribeHandler = (seq, b) -> { };

ClientHandler(final ConcurrentMap<Long, CompletableFuture<BrokerApi.Envelope>> map) {
this.inflight = map;
}

void setSubscribeHandler(final BiConsumer<Long, byte[]> h) {
this.subscribeHandler = h;
this.subscribeHandler = (h == null) ? (seq, b) -> { } : h;
}

@Override
protected void channelRead0(final ChannelHandlerContext ctx, final BrokerApi.Envelope env) {
if (env.hasMessageEvent()) {
final var ev = env.getMessageEvent();
subscribeHandler.accept(ev.getOffset(), ev.getPayload().toByteArray());
return;
}

final long id = env.getCorrelationId();
if (id == 0) return;

final CompletableFuture<BrokerApi.Envelope> f = inflight.remove(id);
if (f != null) {
f.complete(env);
}
if (f != null) f.complete(env);
}

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
for (final var e : inflight.entrySet()) {
e.getValue().completeExceptionally(cause);
}
inflight.clear();
ctx.close();
}
}
Expand Down
Loading
Loading