Skip to content

Improve RemotePeerForwarder Performance#6394

Draft
KarstenSchnitter wants to merge 4 commits into
opensearch-project:mainfrom
KarstenSchnitter:peerforwarder-performance
Draft

Improve RemotePeerForwarder Performance#6394
KarstenSchnitter wants to merge 4 commits into
opensearch-project:mainfrom
KarstenSchnitter:peerforwarder-performance

Conversation

@KarstenSchnitter

@KarstenSchnitter KarstenSchnitter commented Jan 7, 2026

Copy link
Copy Markdown
Collaborator

Description

I am looking into improving the performance of the RemotePeerForwarder. I introduced a JMH benchmark to measure improvements. This lead to a first set of changes to improve throughput especially on smaller batch size. The next change removes synchronization during forwarding of the requests. Finally, I improved the receiving of requests by introducing a heuristic to abandon the batchDelay when enough data is expected in the queue. Major performance gains achieved by this PR:

  • caching of local IPs reduces latencies on forwarding of small batches
  • asynchronous forwarding removes synchronization between different peers
  • reading from a (expectedly) full buffer is done with minimum delay to avoid capping throughput by batchDelay

Issues Resolved

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

@KarstenSchnitter KarstenSchnitter force-pushed the peerforwarder-performance branch from 1b233b4 to 072dbd6 Compare January 7, 2026 08:41
Provides an JMH benchmark for the RemotePeerForwarder to enable and verify
improvements to the implementation.

Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
* Cache addresses to avoid repeated lookups.
* Presize HashMap to avoid rehashing during record grouping
* Check queue size on flushes first and avoid creation of Duration objects
* Use ArrayList for better cache locality.

These changes were measure with the JMH benchmark to lead to significant
improvements. They were especially high on small record numbers with small
batches. But even on 5000 records cases the provided 30-60% high throughput.

Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
@KarstenSchnitter KarstenSchnitter force-pushed the peerforwarder-performance branch from 072dbd6 to 8d37a39 Compare January 7, 2026 09:19
The RemotePeerForwarder used to wait for all network requests to finish in each
`forwardRecords()` call. This new implementation removes the synchronization
and handles the responses asynchronously. This improves performance in the
improved benchmarks by a factor of 5-10.

The benchmark was changed to include different values of network latency. This
allows to determine the impact of the asynchronous forwarding.

Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
When there is enough data available the default 100ms batchDelay leads to a hard-cap of
50 ops/s for receiving records. The new implementation introduces a heuristic, that guesses
whether there is data available based on the last read. When it expects data it tries for a
synchronous read without delay, which uses 5ms within the PeerForwarderReceiveBuffer.
This allows more operations per second in this scenario without limitations by the configured
or default batchDelay.

The criterium to decide for using the fast reads is that at least half the forwarding batch size
was received in the last attempt. It was determined, that there is little benefit in changing this
limit compared to having it at all. Therefore, this number was hard-coded.

Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>

@dlvenable dlvenable left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @KarstenSchnitter for these improvements! I'm excited to see the results of this. A few high-level questions:

  • Do you have benchmark results you can share from before the changes and after?
  • Have you tested this with real Data Prepper nodes? If so, what are your findings.

Also, we should probably have some unit tests for these behaviors.

try {
return NetworkInterface.getByInetAddress(inetAddress) != null;
} catch (final SocketException e) {
inetAddress = InetAddress.getByName(addr);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I take it that this call uses Java's DNS cache. Why do you think caching it helps so much?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This made a tremendous improvement with the JMH tests. As I understand, this code is to check, whether the own IP address is used. Caching it help especially with small batch sizes, where this code gets called more often.

// Heuristic: expect substantial batches only if we're receiving more than 50% of forwarding batch size.
// - When true: use non-blocking reads (0ms) to maximize throughput during high-load periods
// - When false: use configured batchDelay to accumulate records and prevent fragmentation
final int minBatchSizeForNonBlocking = forwardingBatchSize / 2;

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we make this percentage configurable in data-prepper-config.yaml?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this. The reasoning here is, that it is much more important to have this heuristic at all then where the actual limit is. It is a little hard to explain and tune this value if it were exposed. But there is nothing going against exposing it.

final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = peerForwarderReceiveBuffer.read(batchDelay);
// Adaptive timeout: use non-blocking (0ms) or configured delay based on expected batch size
final int timeout = expectSubstantialBatch ? 0 : batchDelay;
final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = peerForwarderReceiveBuffer.read(timeout);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the buffer is already full, does it not immediately return? What case do you think is making this take a long time when there are a lots of requests?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is about returning early when the buffer is not entirely full but full enough. The receive buffer is always alternating with forwarding data. Waiting for the buffer full or timeout creates a hard limit on the throughput by the inverse batchDelay the batchSize and the number of workers.

peerForwarderClient.serializeRecordsAndSendHttpRequest(currentBatch, destinationIp, pluginId, pipelineName);

// Process response asynchronously without blocking
responseFuture

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think there is any issue with this possibly making too many requests? Is there something in Armeria or CompletableFuture that will limit the total requests?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The number of requests is limited by the client thread count in the peer forwarder client. This config allows to control how many requests are made. I think there is no need for extra synchronisation here.

@KarstenSchnitter

Copy link
Copy Markdown
Collaborator Author

@dlvenable: I can provide the JMH results before and after the code changes. Would that help to understand the impact. They would be depending on my machine but you could see the relative improvements.

@KarstenSchnitter

Copy link
Copy Markdown
Collaborator Author

I ran the JMH benchmarks introduced with the first commit in this PR and compared with the benchmark results with the currently last commit. The result are down below. Here is a performance comparison:

nodes records baseline (ops/s) improved (ops/s) delta
1 100 12,012 ± 2,397 10,619 ± 1,732 -12%
1 1,000 1,610 ± 81 1,790 ± 47 +11%
1 5,000 359 ± 29 307 ± 14 -14%
1 50,000 28.3 ± 1.3 26.9 ± 1.2 -5%
2 100 1,977 ± 105 10,642 ± 2,720 +438%
2 1,000 914 ± 30 1,444 ± 85 +58%
2 5,000 270 ± 7 264 ± 31 -2%
2 50,000 16.3 ± 1.7 21.7 ± 1.0 +33%
4 100 736 ± 34 9,224 ± 1,747 +1153%
4 1,000 499 ± 126 1,237 ± 145 +148%
4 5,000 206 ± 21 260 ± 15 +26%
4 50,000 15.2 ± 0.9 21.4 ± 1.1 +41%

The large improvements for small record numbers come from the first commit in particular the cached DNS lookups. The second and third commit improve the actual forwarding loop in multi-node setups and skip the read delay on full pipelines (high throughput). Changes below 15% are statistically insignificant.

Original:

Benchmark                                             (nodeCount)  (recordCount)   Mode  Cnt      Score      Error  Units
RemotePeerForwarderBenchmark.benchmarkForwardRecords            1            100  thrpt    5  12011,864 ± 2397,057  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            1           1000  thrpt    5   1610,144 ±   81,295  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            1           5000  thrpt    5    358,806 ±   29,385  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            1          50000  thrpt    5     28,311 ±    1,268  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            2            100  thrpt    5   1977,318 ±  105,486  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            2           1000  thrpt    5    914,486 ±   30,344  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            2           5000  thrpt    5    269,807 ±    7,452  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            2          50000  thrpt    5     16,301 ±    1,672  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            4            100  thrpt    5    735,829 ±   33,802  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            4           1000  thrpt    5    499,197 ±  125,630  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            4           5000  thrpt    5    205,993 ±   20,602  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords            4          50000  thrpt    5     15,214 ±    0,923  ops/s

Improved:

Benchmark                                             (networkLatencyMs)  (nodeCount)  (recordCount)   Mode  Cnt      Score      Error  Units
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            1            100  thrpt    5  10619,076 ± 1731,733  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            1           1000  thrpt    5   1789,886 ±   47,027  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            1           5000  thrpt    5    307,253 ±   13,556  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            1          50000  thrpt    5     26,943 ±    1,184  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            2            100  thrpt    5  10641,924 ± 2720,015  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            2           1000  thrpt    5   1443,829 ±   85,013  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            2           5000  thrpt    5    264,101 ±   31,455  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            2          50000  thrpt    5     21,738 ±    0,998  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            4            100  thrpt    5   9224,068 ± 1747,447  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            4           1000  thrpt    5   1236,758 ±  144,839  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            4           5000  thrpt    5    260,311 ±   14,853  ops/s
RemotePeerForwarderBenchmark.benchmarkForwardRecords                   0            4          50000  thrpt    5     21,399 ±    1,136  ops/s

@dlvenable let me know your take on this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants