Skip to content

Commit 1b233b4

Browse files
Add small changes for better Performance
* Cache addresses to avoid repeated lookups. * Presize HashMap to avoid rehashing during record grouping * Check queue size on flushes first and avoid creation of Duration objects * Use ArrayList for better cache locality. These changes were measure with the JMH benchmark to lead to significant improvements. They were especially high on small record numbers with small batches. But even on 5000 records cases the provided 30-60% high throughput. Signed-off-by: Karsten Schnitter <k.schnitter@sap.com>
1 parent eb21783 commit 1b233b4

3 files changed

Lines changed: 32 additions & 20 deletions

File tree

data-prepper-core/src/jmh/java/org/opensearch/dataprepper/core/peerforwarder/RemotePeerForwarderBenchmark.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ public class RemotePeerForwarderBenchmark {
5252
private static final int BATCH_DELAY = 100;
5353
private static final int FAILED_FORWARDING_REQUEST_LOCAL_WRITE_TIMEOUT = 100;
5454
private static final int FORWARDING_BATCH_SIZE = BATCH_SIZE;
55-
private static final int FORWARDING_BATCH_QUEUE_DEPTH = 3;
55+
private static final int FORWARDING_BATCH_QUEUE_DEPTH = 25;
5656
private static final Duration FORWARDING_BATCH_TIMEOUT = Duration.ofMillis(800);
5757
private static final int PIPELINE_WORKER_THREADS = 8;
5858
private static final int HASH_RING_VIRTUAL_NODES = 128;

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/HashRing.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ public class HashRing implements Consumer<List<Endpoint>> {
3838
private final PeerListProvider peerListProvider;
3939

4040
private TreeMap<BigInteger, String> hashServerMap = new TreeMap<>();
41+
private int peerCount = 0;
4142

4243
public HashRing(final PeerListProvider peerListProvider, final int numVirtualNodes) {
4344
Objects.requireNonNull(peerListProvider);
@@ -77,6 +78,10 @@ public Optional<String> getServerIp(final List<String> identificationKeyValues)
7778
}
7879
}
7980

81+
public int getPeerCount() {
82+
return peerCount;
83+
}
84+
8085
@Override
8186
public void accept(final List<Endpoint> endpoints) {
8287
buildHashServerMap();
@@ -92,6 +97,7 @@ private void buildHashServerMap() {
9297
}
9398

9499
this.hashServerMap = newHashValueMap;
100+
this.peerCount = endpoints.size();
95101
}
96102

97103
private void addServerIpToHashMap(final String serverIp, final Map<BigInteger, String> targetMap) {

data-prepper-core/src/main/java/org/opensearch/dataprepper/core/peerforwarder/RemotePeerForwarder.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class RemotePeerForwarder implements PeerForwarder {
5656
private final Set<String> identificationKeys;
5757
final ConcurrentHashMap<String, LinkedBlockingQueue<Record<Event>>> peerBatchingQueueMap;
5858
private final ConcurrentHashMap<String, Long> peerBatchingLastFlushTimeMap;
59+
private final ConcurrentHashMap<String, Boolean> localAddressCache;
5960

6061
private final Counter recordsActuallyProcessedLocallyCounter;
6162
private final Counter recordsToBeProcessedLocallyCounter;
@@ -99,6 +100,7 @@ class RemotePeerForwarder implements PeerForwarder {
99100
this.pipelineWorkerThreads = pipelineWorkerThreads;
100101
peerBatchingQueueMap = new ConcurrentHashMap<>();
101102
peerBatchingLastFlushTimeMap = new ConcurrentHashMap<>();
103+
localAddressCache = new ConcurrentHashMap<>();
102104

103105
recordsActuallyProcessedLocallyCounter = pluginMetrics.counter(RECORDS_ACTUALLY_PROCESSED_LOCALLY);
104106
recordsToBeProcessedLocallyCounter = pluginMetrics.counter(RECORDS_TO_BE_PROCESSED_LOCALLY);
@@ -149,13 +151,13 @@ private Map<String, List<Record<Event>>> groupRecordsBasedOnIdentificationKeys(
149151
final Collection<Record<Event>> records,
150152
final Set<String> identificationKeys
151153
) {
152-
final Map<String, List<Record<Event>>> groupedRecords = new HashMap<>();
154+
final Map<String, List<Record<Event>>> groupedRecords = new HashMap<>(hashRing.getPeerCount());
153155

154156
// group records based on IP address calculated by HashRing
155157
for (final Record<Event> record : records) {
156158
final Event event = record.getData();
157159

158-
final List<String> identificationKeyValues = new LinkedList<>();
160+
final List<String> identificationKeyValues = new ArrayList<>(identificationKeys.size());
159161
int numMissingIdentificationKeys = 0;
160162
for (final String identificationKey : identificationKeys) {
161163
final Object identificationKeyValue = event.get(identificationKey, Object.class);
@@ -178,21 +180,23 @@ private Map<String, List<Record<Event>>> groupRecordsBasedOnIdentificationKeys(
178180
}
179181

180182
private boolean isAddressDefinedLocally(final String address) {
181-
final InetAddress inetAddress;
182-
try {
183-
inetAddress = InetAddress.getByName(address);
184-
} catch (final UnknownHostException e) {
185-
return false;
186-
}
187-
if (inetAddress.isAnyLocalAddress() || inetAddress.isLoopbackAddress()) {
188-
return true;
189-
} else {
183+
return localAddressCache.computeIfAbsent(address, addr -> {
184+
final InetAddress inetAddress;
190185
try {
191-
return NetworkInterface.getByInetAddress(inetAddress) != null;
192-
} catch (final SocketException e) {
186+
inetAddress = InetAddress.getByName(addr);
187+
} catch (final UnknownHostException e) {
193188
return false;
194189
}
195-
}
190+
if (inetAddress.isAnyLocalAddress() || inetAddress.isLoopbackAddress()) {
191+
return true;
192+
} else {
193+
try {
194+
return NetworkInterface.getByInetAddress(inetAddress) != null;
195+
} catch (final SocketException e) {
196+
return false;
197+
}
198+
}
199+
});
196200
}
197201

198202
private List<Record<Event>> batchRecordsForForwarding(final String destinationIp, final List<Record<Event>> records) {
@@ -294,12 +298,14 @@ private List<Record<Event>> getRecordsToForward(final String destinationIp) {
294298
}
295299

296300
private boolean shouldFlushBatch(final String destinationIp) {
297-
final long currentTime = System.currentTimeMillis();
298-
final long millisSinceLastFlush = currentTime - peerBatchingLastFlushTimeMap.getOrDefault(destinationIp, System.currentTimeMillis());
299-
final Duration durationSinceLastFlush = Duration.of(millisSinceLastFlush, ChronoUnit.MILLIS);
301+
final LinkedBlockingQueue<Record<Event>> queue = peerBatchingQueueMap.get(destinationIp);
302+
if (queue.size() >= forwardingBatchSize) {
303+
return true;
304+
}
300305

301-
final boolean shouldFlushDueToTimeout = durationSinceLastFlush.compareTo(forwardingBatchTimeout) >= 0;
302-
return shouldFlushDueToTimeout || peerBatchingQueueMap.get(destinationIp).size() >= forwardingBatchSize;
306+
final long currentTime = System.currentTimeMillis();
307+
final long millisSinceLastFlush = currentTime - peerBatchingLastFlushTimeMap.getOrDefault(destinationIp, currentTime);
308+
return millisSinceLastFlush >= forwardingBatchTimeout.toMillis();
303309
}
304310

305311
void processFailedRequestsLocally(final AggregatedHttpResponse httpResponse, final Collection<Record<Event>> records) {

0 commit comments

Comments
 (0)