Improve RemotePeerForwarder Performance#6394
Conversation
1b233b4 to
072dbd6
Compare
Provides an JMH benchmark for the RemotePeerForwarder to enable and verify improvements to the implementation. Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
* Cache addresses to avoid repeated lookups. * Presize HashMap to avoid rehashing during record grouping * Check queue size on flushes first and avoid creation of Duration objects * Use ArrayList for better cache locality. These changes were measure with the JMH benchmark to lead to significant improvements. They were especially high on small record numbers with small batches. But even on 5000 records cases the provided 30-60% high throughput. Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
072dbd6 to
8d37a39
Compare
The RemotePeerForwarder used to wait for all network requests to finish in each `forwardRecords()` call. This new implementation removes the synchronization and handles the responses asynchronously. This improves performance in the improved benchmarks by a factor of 5-10. The benchmark was changed to include different values of network latency. This allows to determine the impact of the asynchronous forwarding. Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
When there is enough data available the default 100ms batchDelay leads to a hard-cap of 50 ops/s for receiving records. The new implementation introduces a heuristic, that guesses whether there is data available based on the last read. When it expects data it tries for a synchronous read without delay, which uses 5ms within the PeerForwarderReceiveBuffer. This allows more operations per second in this scenario without limitations by the configured or default batchDelay. The criterium to decide for using the fast reads is that at least half the forwarding batch size was received in the last attempt. It was determined, that there is little benefit in changing this limit compared to having it at all. Therefore, this number was hard-coded. Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
dlvenable
left a comment
There was a problem hiding this comment.
Thanks @KarstenSchnitter for these improvements! I'm excited to see the results of this. A few high-level questions:
- Do you have benchmark results you can share from before the changes and after?
- Have you tested this with real Data Prepper nodes? If so, what are your findings.
Also, we should probably have some unit tests for these behaviors.
| try { | ||
| return NetworkInterface.getByInetAddress(inetAddress) != null; | ||
| } catch (final SocketException e) { | ||
| inetAddress = InetAddress.getByName(addr); |
There was a problem hiding this comment.
I take it that this call uses Java's DNS cache. Why do you think caching it helps so much?
There was a problem hiding this comment.
This made a tremendous improvement with the JMH tests. As I understand, this code is to check, whether the own IP address is used. Caching it help especially with small batch sizes, where this code gets called more often.
| // Heuristic: expect substantial batches only if we're receiving more than 50% of forwarding batch size. | ||
| // - When true: use non-blocking reads (0ms) to maximize throughput during high-load periods | ||
| // - When false: use configured batchDelay to accumulate records and prevent fragmentation | ||
| final int minBatchSizeForNonBlocking = forwardingBatchSize / 2; |
There was a problem hiding this comment.
Should we make this percentage configurable in data-prepper-config.yaml?
There was a problem hiding this comment.
I thought about this. The reasoning here is, that it is much more important to have this heuristic at all then where the actual limit is. It is a little hard to explain and tune this value if it were exposed. But there is nothing going against exposing it.
| final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = peerForwarderReceiveBuffer.read(batchDelay); | ||
| // Adaptive timeout: use non-blocking (0ms) or configured delay based on expected batch size | ||
| final int timeout = expectSubstantialBatch ? 0 : batchDelay; | ||
| final Map.Entry<Collection<Record<Event>>, CheckpointState> readResult = peerForwarderReceiveBuffer.read(timeout); |
There was a problem hiding this comment.
If the buffer is already full, does it not immediately return? What case do you think is making this take a long time when there are a lots of requests?
There was a problem hiding this comment.
This is about returning early when the buffer is not entirely full but full enough. The receive buffer is always alternating with forwarding data. Waiting for the buffer full or timeout creates a hard limit on the throughput by the inverse batchDelay the batchSize and the number of workers.
| peerForwarderClient.serializeRecordsAndSendHttpRequest(currentBatch, destinationIp, pluginId, pipelineName); | ||
|
|
||
| // Process response asynchronously without blocking | ||
| responseFuture |
There was a problem hiding this comment.
Do you think there is any issue with this possibly making too many requests? Is there something in Armeria or CompletableFuture that will limit the total requests?
There was a problem hiding this comment.
The number of requests is limited by the client thread count in the peer forwarder client. This config allows to control how many requests are made. I think there is no need for extra synchronisation here.
|
@dlvenable: I can provide the JMH results before and after the code changes. Would that help to understand the impact. They would be depending on my machine but you could see the relative improvements. |
|
I ran the JMH benchmarks introduced with the first commit in this PR and compared with the benchmark results with the currently last commit. The result are down below. Here is a performance comparison:
The large improvements for small record numbers come from the first commit in particular the cached DNS lookups. The second and third commit improve the actual forwarding loop in multi-node setups and skip the read delay on full pipelines (high throughput). Changes below 15% are statistically insignificant. Original: Improved: @dlvenable let me know your take on this. |
Description
I am looking into improving the performance of the
RemotePeerForwarder. I introduced a JMH benchmark to measure improvements. This lead to a first set of changes to improve throughput especially on smaller batch size. The next change removes synchronization during forwarding of the requests. Finally, I improved the receiving of requests by introducing a heuristic to abandon the batchDelay when enough data is expected in the queue. Major performance gains achieved by this PR:batchDelayIssues Resolved
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.