Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions conf/defaults.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ topology.transfer.batch.size: 1 # can be no larger than half of `topology.

topology.executor.receive.buffer.size: 32768 # size of recv queue for spouts & bolts. Will be internally rounded up to next power of 2 (if not already a power of 2)
topology.producer.batch.size: 1 # can be no larger than half of `topology.executor.receive.buffer.size`
topology.producer.batch.dynamic: false # when true, the producer batch size adapts between 1 and topology.producer.batch.size to reduce latency under light load. No effect unless topology.producer.batch.size > 1.

topology.batch.flush.interval.millis: 1 # Flush tuples are disabled if this is set to 0 or if (topology.producer.batch.size=1 and topology.transfer.batch.size=1).
topology.spout.recvq.skips: 3 # Check recvQ once every N invocations of Spout's nextTuple() [when ACKs disabled]
Expand Down
20 changes: 19 additions & 1 deletion docs/Performance.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,22 @@ the **Worker Transfer Queue**. The Worker Transfer Thread is responsible for dra
worker process over the network. This setting controls the batch size for writes into the Worker Transfer Queue. This impacts the communication
between worker processes.

- `topology.producer.batch.dynamic` (default `false`) : When enabled, the receive-queue producer batch size is no longer fixed: it adapts at
runtime between 1 and `topology.producer.batch.size` so that the topology gets low latency under light traffic and high throughput under heavy
traffic, without having to pick a single compromise value. It has no effect unless `topology.producer.batch.size` is greater than 1. It applies to
the producer (receive-queue) batch only; the Worker Transfer Queue batch (`topology.transfer.batch.size`) remains fixed.

The effective batch size is adjusted using an AIMD (additive-increase / multiplicative-decrease) policy driven solely by *why* a batch is
flushed, so it requires no extra metrics:

- a batch flushed because it filled up to the current effective size is read as heavy load, and the effective size is increased by 1 (up to the
configured maximum);
- a partially-filled batch flushed by a *flush tuple* (see [Flush Tuple Frequency](#3-flush-tuple-frequency)) is read as light load, and the
effective size is halved (down to a minimum of 1).

Under sustained heavy traffic the effective size climbs to `topology.producer.batch.size` and behaves like a fixed batch; under light traffic it
shrinks toward 1 so that each message is published immediately instead of waiting for the batch to fill or for the next flush tuple.

#### Guidance

**For Low latency:** Set batch size to 1. This basically disables batching. This is likely to reduce peak sustainable throughput under heavy traffic, but
Expand All @@ -57,7 +73,9 @@ Beyond a certain point the throughput is likely to get worse.

**Varying throughput:** Topologies often experience fluctuating amounts of incoming traffic over the day. Other topos may experience higher traffic in some
paths and lower throughput in other paths simultaneously. If latency is not a concern, a small bach size (e.g. 10) and in conjunction with the right flush
frequency may provide a reasonable compromise for such scenarios. For meeting stricter latency SLAs, consider setting it to 1.
frequency may provide a reasonable compromise for such scenarios. For meeting stricter latency SLAs, consider setting it to 1. Alternatively, set
`topology.producer.batch.size` to the throughput-optimal value and enable `topology.producer.batch.dynamic`: the batch will automatically shrink toward 1
during quiet periods (for latency) and grow back toward the configured size during bursts (for throughput), instead of committing to one compromise value.


## 3. Flush Tuple Frequency
Expand Down
7 changes: 7 additions & 0 deletions storm-client/src/jvm/org/apache/storm/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -751,6 +751,13 @@ public class Config extends HashMap<String, Object> {
@IsPositiveNumber
@NotNull
public static final String TOPOLOGY_PRODUCER_BATCH_SIZE = "topology.producer.batch.size";
/**
* When enabled, the producer batch size adapts at runtime between 1 and {@link #TOPOLOGY_PRODUCER_BATCH_SIZE} using AIMD:
* it shrinks toward 1 to reduce latency under light load and grows back toward the configured size to preserve throughput
* under heavy load. Has no effect unless {@link #TOPOLOGY_PRODUCER_BATCH_SIZE} is greater than 1.
*/
@IsBoolean
public static final String TOPOLOGY_PRODUCER_BATCH_DYNAMIC = "topology.producer.batch.dynamic";
/**
* If number of items in task's overflowQ exceeds this, new messages coming from other workers to this task will be dropped This
* prevents OutOfMemoryException that can occur in rare scenarios in the presence of BackPressure. This affects only inter-worker
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyC
Set<List<Long>> executors, Map<Integer, String> taskToComponent) {
Integer recvQueueSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_RECEIVE_BUFFER_SIZE));
Integer recvBatchSize = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_SIZE));
boolean dynamicBatch = ObjectReader.getBoolean(topologyConf.get(Config.TOPOLOGY_PRODUCER_BATCH_DYNAMIC), false);
Integer overflowLimit = ObjectReader.getInt(topologyConf.get(Config.TOPOLOGY_EXECUTOR_OVERFLOW_LIMIT));

if (recvBatchSize > recvQueueSize / 2) {
Expand All @@ -746,7 +747,7 @@ private Map<List<Long>, JCQueue> mkReceiveQueueMap(Map<String, Object> topologyC
}
receiveQueueMap.put(executor, new JCQueue("receive-queue" + executor.toString(), "receive-queue",
recvQueueSize, overflowLimit, recvBatchSize, backPressureWaitStrategy,
this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry));
this.getTopologyId(), compId, taskIds, this.getPort(), metricRegistry, dynamicBatch));

}
return receiveQueueMap;
Expand Down
88 changes: 80 additions & 8 deletions storm-client/src/jvm/org/apache/storm/utils/JCQueue.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,26 @@ public class JCQueue implements Closeable {
private final MpscUnboundedArrayQueue<Object> overflowQ;
private final int overflowLimit; // ensures... overflowCount <= overflowLimit. if set to 0, disables overflow limiting.
private final int producerBatchSz;
private final boolean dynamicBatch;
private final DirectInserter directInserter = new DirectInserter(this);
private final ThreadLocal<BatchInserter> thdLocalBatcher = new ThreadLocal<BatchInserter>(); // ensure 1 instance per producer thd.
// ensure 1 instance per producer thd.
private final ThreadLocal<DynamicBatchInserter> thdLocalDynamicBatcher = new ThreadLocal<DynamicBatchInserter>();
private final IWaitStrategy backPressureWaitStrategy;
private final String queueName;

public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
int port, StormMetricRegistry metricRegistry) {
this(queueName, metricNamePrefix, size, overflowLimit, producerBatchSz, backPressureWaitStrategy, topologyId, componentId,
taskIds, port, metricRegistry, false);
}

public JCQueue(String queueName, String metricNamePrefix, int size, int overflowLimit, int producerBatchSz,
IWaitStrategy backPressureWaitStrategy, String topologyId, String componentId, List<Integer> taskIds,
int port, StormMetricRegistry metricRegistry, boolean dynamicBatch) {
this.queueName = queueName;
this.dynamicBatch = dynamicBatch;
this.overflowLimit = overflowLimit;
this.recvQueue = new MpscArrayQueue<>(size);
this.overflowQ = new MpscUnboundedArrayQueue<>(size);
Expand Down Expand Up @@ -160,11 +171,20 @@ public Object get() {
private Inserter getInserter() {
Inserter inserter;
if (producerBatchSz > 1) {
inserter = thdLocalBatcher.get();
if (inserter == null) {
BatchInserter b = new BatchInserter(this, producerBatchSz);
if (dynamicBatch) {
DynamicBatchInserter d = thdLocalDynamicBatcher.get();
if (d == null) {
d = new DynamicBatchInserter(this, producerBatchSz);
thdLocalDynamicBatcher.set(d);
}
inserter = d;
} else {
BatchInserter b = thdLocalBatcher.get();
if (b == null) {
b = new BatchInserter(this, producerBatchSz);
thdLocalBatcher.set(b);
}
inserter = b;
thdLocalBatcher.set(b);
}
} else {
inserter = directInserter;
Expand Down Expand Up @@ -325,22 +345,36 @@ public boolean tryFlush() {
/* Not thread safe. Have one instance per producer thread or synchronize externally */
private static class BatchInserter implements Inserter {
private final int batchSz;
private JCQueue queue;
private ArrayList<Object> currentBatch;
private final JCQueue queue;
private final ArrayList<Object> currentBatch;

BatchInserter(JCQueue queue, int batchSz) {
this.queue = queue;
this.batchSz = batchSz;
this.currentBatch = new ArrayList<>(batchSz + 1);
}

/**
* Number of buffered elements that triggers a flush. Constant here; subclasses may vary it at runtime.
*/
int batchSize() {
return batchSz;
}

/**
* Hook invoked after every non-empty flush, passing whether the batch had reached {@link #batchSize()} when the flush started.
* No-op here; subclasses may use it to adapt {@link #batchSize()}.
*/
void afterFlush(boolean wasFull) {
}

/**
* Blocking call - retires till element is successfully added.
*/
@Override
public void publish(Object obj) throws InterruptedException {
currentBatch.add(obj);
if (currentBatch.size() >= batchSz) {
if (currentBatch.size() >= batchSize()) {
flush();
}
}
Expand All @@ -350,7 +384,7 @@ public void publish(Object obj) throws InterruptedException {
*/
@Override
public boolean tryPublish(Object obj) {
if (currentBatch.size() >= batchSz) {
if (currentBatch.size() >= batchSize()) {
if (!tryFlush()) {
return false;
}
Expand All @@ -368,6 +402,7 @@ public void flush() throws InterruptedException {
if (currentBatch.isEmpty()) {
return;
}
boolean wasFull = currentBatch.size() >= batchSize();
int publishCount = queue.tryPublishInternal(currentBatch);
int retryCount = 0;
while (publishCount == 0) { // retry till at least 1 element is drained
Expand All @@ -385,6 +420,7 @@ public void flush() throws InterruptedException {
publishCount = queue.tryPublishInternal(currentBatch);
}
currentBatch.subList(0, publishCount).clear();
afterFlush(wasFull);
}

/**
Expand All @@ -396,16 +432,52 @@ public boolean tryFlush() {
if (currentBatch.isEmpty()) {
return true;
}
boolean wasFull = currentBatch.size() >= batchSize();
int publishCount = queue.tryPublishInternal(currentBatch);
if (publishCount == 0) {
for (JCQueueMetrics jcQueueMetric : queue.jcqMetrics) {
jcQueueMetric.notifyInsertFailure();
}
afterFlush(wasFull);
return false;
} else {
currentBatch.subList(0, publishCount).clear();
afterFlush(wasFull);
return true;
}
}
} // class BatchInserter

/**
* A {@link BatchInserter} that adapts its batch size between 1 and a configured maximum using AIMD, to favor low latency under
* light load while preserving throughput under heavy load. It reuses the parent's publish/flush logic and only customizes the
* flush threshold ({@link #batchSize()}) and the post-flush adaptation ({@link #afterFlush(boolean)}): a flush of a full batch
* is read as heavy load and additively grows the effective size; a flush of a partially-filled batch (e.g. driven by the
* flush-tuple timer) is read as light load and multiplicatively shrinks it toward 1. Not thread safe. Have one instance per
* producer thread or synchronize externally.
*/
static class DynamicBatchInserter extends BatchInserter {
private final int maxBatchSz;
private int effectiveBatchSz;

DynamicBatchInserter(JCQueue queue, int maxBatchSz) {
super(queue, maxBatchSz); // sizes the buffer to the max; the flush threshold comes from batchSize()
this.maxBatchSz = maxBatchSz;
this.effectiveBatchSz = 1; // start small to favor latency; grows under sustained load
}

@Override
int batchSize() {
return effectiveBatchSz;
}

@Override
void afterFlush(boolean wasFull) {
if (wasFull) {
effectiveBatchSz = Math.min(maxBatchSz, effectiveBatchSz + 1); // additive increase
} else {
effectiveBatchSz = Math.max(1, effectiveBatchSz >> 1); // multiplicative decrease
}
}
}
}
123 changes: 123 additions & 0 deletions storm-client/test/jvm/org/apache/storm/utils/JCQueueTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,124 @@ public void flush() {
});
}

@Test
public void testInOrderDynamicBatch() {
Assertions.assertTimeoutPreemptively(Duration.ofSeconds(10), () -> {
final AtomicBoolean allInOrder = new AtomicBoolean(true);

JCQueue queue = createQueue("dynamicBatch", 10, 1024, true);
Runnable producer = new IncProducer(queue, 1024 * 1024, 100);
Runnable consumer = new ConsumerThd(queue, new JCQueue.Consumer() {
long _expected = 0;

@Override
public void accept(Object obj) {
if (_expected != ((Number) obj).longValue()) {
allInOrder.set(false);
System.out.println("Expected " + _expected + " but got " + obj);
}
_expected++;
}

@Override
public void flush() {
}
});

run(producer, consumer, queue, 1000, 1);
assertTrue(allInOrder.get(), "Messages delivered out of order");
});
}

@Test
public void testDynamicBatchStartsAtOne() {
JCQueue queue = createQueue("dynStart", 1024);
JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8);
assertEquals(1, inserter.batchSize());
}

@Test
public void testDynamicBatchGrowsOnFullFlush() throws Exception {
JCQueue queue = createQueue("dynGrow", 1024);
JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8);

inserter.publish(1L); // batch hits effective(1) -> full flush -> grow to 2
assertEquals(2, inserter.batchSize());

inserter.publish(2L); // batch size 1 < 2, no flush
inserter.publish(3L); // batch size 2 == 2 -> full flush -> grow to 3
assertEquals(3, inserter.batchSize());
}

@Test
public void testDynamicBatchCapsAtMax() throws Exception {
JCQueue queue = createQueue("dynCap", 1024);
JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 4);
for (int i = 0; i < 40; i++) {
inserter.publish((long) i);
assertTrue(inserter.batchSize() <= 4, "effective batch exceeded max");
}
assertEquals(4, inserter.batchSize());
}

@Test
public void testDynamicBatchShrinksOnPartialFlush() throws Exception {
JCQueue queue = createQueue("dynShrink", 1024);
JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8);
growEffectiveTo(inserter, 4);

inserter.publish(100L); // partial batch (size 1 < 4)
inserter.flush(); // timer-style flush of a partial batch -> shrink (4 -> 2)
assertEquals(2, inserter.batchSize());
}

@Test
public void testDynamicBatchEmptyFlushIsNoOp() throws Exception {
JCQueue queue = createQueue("dynEmpty", 1024);
JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8);
growEffectiveTo(inserter, 4); // leaves the batch empty after the growing flush

inserter.flush(); // empty batch -> no adaptation
assertEquals(4, inserter.batchSize());
}

@Test
public void testDynamicBatchNeverDropsBelowOne() throws Exception {
JCQueue queue = createQueue("dynFloor", 1024);
JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 8);
growEffectiveTo(inserter, 2);

inserter.publish(1L); // partial (size 1 < 2)
inserter.flush(); // shrink 2 -> 1
assertEquals(1, inserter.batchSize());
}

@Test
public void testDynamicBatchGrowsNotShrinksUnderBackpressure() throws Exception {
JCQueue queue = createQueue("dynBp", 8);
JCQueue.DynamicBatchInserter inserter = new JCQueue.DynamicBatchInserter(queue, 4);

inserter.publish(1L); // eff 1 -> 2, recvQueue now has 1 element
assertEquals(2, inserter.batchSize());

while (queue.tryPublishDirect(99L)) { // fill recvQueue to capacity
}

assertTrue(inserter.tryPublish(2L)); // batch size 1
assertTrue(inserter.tryPublish(3L)); // batch size 2 (== effective)
assertFalse(inserter.tryPublish(4L)); // full batch, queue full -> tryFlush fails
// A full batch that could not be published must be read as heavy load (grow), not light load (shrink).
assertEquals(3, inserter.batchSize());
}

/** Drive the inserter with full flushes until the effective batch size reaches the target. */
private void growEffectiveTo(JCQueue.DynamicBatchInserter inserter, int target) throws InterruptedException {
long val = 0;
while (inserter.batchSize() < target) {
inserter.publish(val++);
}
}

private void run(Runnable producer, Runnable consumer, JCQueue queue)
throws InterruptedException {
run(producer, consumer, queue, 20, PRODUCER_NUM);
Expand Down Expand Up @@ -159,6 +277,11 @@ private JCQueue createQueue(String name, int batchSize, int queueSize) {
return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000, new StormMetricRegistry());
}

private JCQueue createQueue(String name, int batchSize, int queueSize, boolean dynamicBatch) {
return new JCQueue(name, name, queueSize, 0, batchSize, waitStrategy, "test", "test", Collections.singletonList(1000), 1000,
new StormMetricRegistry(), dynamicBatch);
}

private static class IncProducer implements Runnable {

private final JCQueue queue;
Expand Down
Loading