Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ public class Workload {
public int messageSize;

/**
* Message size distribution for variable-sized payloads.
* Keys are size ranges (e.g., "0-256", "256-1024", "1KB-4KB"),
* values are relative weights.
* Mutually exclusive with messageSize - if set, messageSize is ignored.
* Message size distribution for variable-sized payloads. Keys are size ranges (e.g., "0-256",
* "256-1024", "1KB-4KB"), values are relative weights. Mutually exclusive with messageSize - if
* set, messageSize is ignored.
*/
public Map<String, Integer> messageSizeDistribution;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,17 @@
*/
package io.openmessaging.benchmark.utils.payload;


import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
* Parses and represents a message size distribution from workload config.
* Creates one payload size per bucket and provides weights for runtime selection.
* Parses and represents a message size distribution from workload config. Creates one payload size
* per bucket and provides weights for runtime selection.
*
* <p>Example configuration:
*
* <pre>
* messageSizeDistribution:
* "0-256": 234
Expand All @@ -34,9 +36,7 @@ public class MessageSizeDistribution {
private final List<Bucket> buckets;
private final int totalWeight;

/**
* Represents a single size bucket with min/max range and weight.
*/
/** Represents a single size bucket with min/max range and weight. */
public static class Bucket {
public final int minSize;
public final int maxSize;
Expand Down Expand Up @@ -147,8 +147,8 @@ public List<Integer> getBucketSizes() {
}

/**
* Returns list of max sizes, one per bucket (for payload generation).
* Using max sizes ensures the system is tested with the largest messages in each bucket range.
* Returns list of max sizes, one per bucket (for payload generation). Using max sizes ensures the
* system is tested with the largest messages in each bucket range.
*
* @return list of max sizes per bucket
*/
Expand Down Expand Up @@ -217,4 +217,3 @@ public List<Bucket> getBuckets() {
return buckets;
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import static java.util.stream.Collectors.joining;

import com.beust.jcommander.internal.Maps;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
Expand All @@ -36,13 +39,16 @@
import org.slf4j.LoggerFactory;

public class DistributedWorkersEnsemble implements Worker {
private static final String KAFKA_BENCHMARK_DRIVER_CLASS =
"io.openmessaging.benchmark.driver.kafka.KafkaBenchmarkDriver";
private final Thread shutdownHook = new Thread(this::stopAll);
private final List<Worker> workers;
private final List<Worker> producerWorkers;
private final List<Worker> consumerWorkers;
private final Worker leader;

private int numberOfUsedProducerWorkers;
private volatile boolean stopLeaderLast;

public DistributedWorkersEnsemble(List<Worker> workers, boolean extraConsumerWorkers) {
Preconditions.checkArgument(workers.size() > 1);
Expand Down Expand Up @@ -75,6 +81,7 @@ static int getNumberOfProducerWorkers(List<Worker> workers, boolean extraConsume

@Override
public void initializeDriver(File configurationFile) throws IOException {
stopLeaderLast = shouldStopLeaderLast(configurationFile);
workers.parallelStream()
.forEach(
w -> {
Expand Down Expand Up @@ -165,7 +172,39 @@ public void adjustPublishRate(double publishRate) throws IOException {

@Override
public void stopAll() {
workers.parallelStream().forEach(Worker::stopAll);
if (!stopLeaderLast) {
workers.parallelStream().forEach(Worker::stopAll);
return;
}

RuntimeException stopError = null;

try {
// The leader owns topic lifecycle, so stop it after every other worker is done.
workers.parallelStream().filter(worker -> worker != leader).forEach(Worker::stopAll);
} catch (RuntimeException e) {
stopError = e;
}

try {
leader.stopAll();
} catch (RuntimeException e) {
if (stopError != null) {
stopError.addSuppressed(e);
} else {
stopError = e;
}
}

if (stopError != null) {
throw stopError;
}
}

static boolean shouldStopLeaderLast(File configurationFile) throws IOException {
JsonNode configuration = mapper.readTree(configurationFile);
return KAFKA_BENCHMARK_DRIVER_CLASS.equals(configuration.path("driverClass").asText())
&& configuration.path("deleteTopicsOnClose").asBoolean(false);
}

@Override
Expand Down Expand Up @@ -290,4 +329,5 @@ public void close() throws Exception {
}

private static final Logger log = LoggerFactory.getLogger(DistributedWorkersEnsemble.class);
private static final ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
}
Original file line number Diff line number Diff line change
Expand Up @@ -240,9 +240,7 @@ private void submitProducersToExecutor(
idx = r.nextInt(payloadCount);
}
messageProducer.sendMessage(
p,
Optional.ofNullable(keyDistributor.next()),
payloads.get(idx));
p, Optional.ofNullable(keyDistributor.next()), payloads.get(idx));
});
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ public class ProducerWorkAssignment {
public List<byte[]> payloadData;

/**
* Weights for weighted payload selection. If null, uniform random selection is used.
* Each weight corresponds to the payload at the same index in payloadData.
* Used for message size distribution feature.
* Weights for weighted payload selection. If null, uniform random selection is used. Each weight
* corresponds to the payload at the same index in payloadData. Used for message size distribution
* feature.
*/
public int[] payloadWeights;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,4 +219,3 @@ void weightedSelectionProducesCorrectDistribution() {
assertThat(ratio1).isBetween(0.49, 0.51);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
public class Config {
public short replicationFactor;

public boolean deleteTopicsOnClose = false;

public String topicConfig;

public String commonConfig;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,27 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.bookkeeper.stats.StatsLogger;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaBenchmarkDriver implements BenchmarkDriver {

Expand All @@ -52,6 +59,7 @@ public class KafkaBenchmarkDriver implements BenchmarkDriver {

private List<BenchmarkProducer> producers = Collections.synchronizedList(new ArrayList<>());
private List<BenchmarkConsumer> consumers = Collections.synchronizedList(new ArrayList<>());
private final Set<String> createdTopics = Collections.synchronizedSet(new HashSet<>());

private Properties topicProperties;
private Properties producerProperties;
Expand Down Expand Up @@ -111,7 +119,9 @@ public CompletableFuture<Void> createTopics(List<TopicInfo> topicInfos) {
Map<String, String> topicConfigs = new HashMap<>((Map) topicProperties);
KafkaTopicCreator topicCreator =
new KafkaTopicCreator(admin, topicConfigs, config.replicationFactor);
return topicCreator.create(topicInfos);
return topicCreator
.create(topicInfos)
.thenRun(() -> topicInfos.forEach(topicInfo -> createdTopics.add(topicInfo.getTopic())));
}

@Override
Expand Down Expand Up @@ -158,14 +168,47 @@ public void close() throws Exception {
for (BenchmarkConsumer consumer : consumers) {
consumer.close();
}
if (config != null && config.deleteTopicsOnClose) {
deleteCreatedTopics();
}
admin.close();
}

private void deleteCreatedTopics() {
if (createdTopics.isEmpty()) {
return;
}

final Set<String> topicsToDelete;
synchronized (createdTopics) {
topicsToDelete = new HashSet<>(createdTopics);
}

try {
DeleteTopicsResult deleteTopicsResult = admin.deleteTopics(topicsToDelete);
deleteTopicsResult.all().get();
log.info("Deleted {} benchmark topics", topicsToDelete.size());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.warn("Interrupted while deleting benchmark topics {}", topicsToDelete, e);
} catch (ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException) {
log.warn("Some benchmark topics were already deleted: {}", topicsToDelete);
} else {
log.warn("Failed deleting benchmark topics {}", topicsToDelete, e);
}
} catch (Exception e) {
log.warn("Failed deleting benchmark topics {}", topicsToDelete, e);
}
}

private static String applyZoneId(String clientId, String zoneId) {
return clientId.replace(ZONE_ID_TEMPLATE, zoneId);
}

private static final ObjectMapper mapper =
new ObjectMapper(new YAMLFactory())
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

private static final Logger log = LoggerFactory.getLogger(KafkaBenchmarkDriver.class);
}