Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion data-prepper-plugins/common/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ dependencies {
api project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this available in our settings.gradle file? If not, we should move it there. If so, use that version.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, 3.2.2 is the current release

implementation libs.commons.io
implementation 'software.amazon.awssdk:s3'
implementation 'software.amazon.awssdk:acm'
Expand Down Expand Up @@ -37,4 +38,4 @@ jacocoTestCoverageVerification {
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.dataprepper.common.utils;
import com.google.common.annotations.VisibleForTesting;

import org.opensearch.dataprepper.model.event.Event;

import com.github.benmanes.caffeine.cache.RemovalCause;
import com.github.benmanes.caffeine.cache.RemovalListener;
import com.github.benmanes.caffeine.cache.Cache;
import com.github.benmanes.caffeine.cache.stats.CacheStats;
import com.github.benmanes.caffeine.cache.Caffeine;
import com.github.benmanes.caffeine.cache.Weigher;

import java.lang.instrument.Instrumentation;
import java.util.concurrent.TimeUnit;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

public class CaffeineCache<K, V> {

public static final int OVERHEAD_PER_CACHE_ENTRY = 32;

class ByteArrayValueWeigher implements Weigher<K, V> {

@Override
public int weigh(K key, V value) {
return value.toString().length() + key.toString().length();
}
}

private final Cache<K, V> cache;
private final long maxBytes;
private final int maxEntries;
private AtomicInteger curEntries;
private AtomicLong curBytes;

public CaffeineCache(int maxEntries, long maxBytes, int ttlSeconds, boolean lruMode) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another option/mode that Caffeine Cache automatically evict an entry after its first access? This would be useful for the Merge use case where each key only needs to be accessed once and can be removed immediately after to optimize memory usage.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely the cache will detect this and automatically tune itself, as its goal is to maximize the hit ratio. Therefore it wants to evict unused entries aggressively. It will adaptively size a new arrival region (admission window) with a long-term popular region (main), using a frequency filter to guard the promotion.

The workload below has a blockchain trace chained with an analytical loop. The mining is LRU-biased since multiple workers pick up the event, validate it, and its unused thereafter. The analytics is MRU-biased as it scans over rows so it protects the useful contents from being forced out. The stress test switches between traces to show the cache adapts to the changing workload and no tuning parameters are needed. The hope is that it works out-of-the-box, but of course measuring the actual behavior is always good regardless.

image

this.maxBytes = maxBytes;
this.maxEntries = maxEntries;
this.curEntries = new AtomicInteger(0);
this.curBytes = new AtomicLong(0);
Caffeine<K, V> caffeine;
if (lruMode) {
caffeine = (Caffeine<K, V>) Caffeine.newBuilder()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These builders seem far too restrictive. Similar to my comment above, we should have a builder and then map those fields to Caffeine versions.

We don't need to support all the fields now. But, the code should be written in such a way that it is trivial to add them. Probably having the builder in a separate file would help.

.maximumSize(maxEntries);
} else {
caffeine = (Caffeine<K, V>) Caffeine.newBuilder()
.maximumWeight(maxBytes)
.weigher(new ByteArrayValueWeigher());
}
this.cache = caffeine
.expireAfterAccess(ttlSeconds, TimeUnit.SECONDS)
.removalListener((K k, V v, RemovalCause cause) -> {
int valueLength = (v instanceof String) ? ((String)v).length() :
((Event)v).toJsonString().length();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not a string, then you assume it's an Event. This would throw a cast class exception and be quite unclear.

Maybe this code should also go into a different class to allow for different types.

curBytes.addAndGet(-getEntrySize(k, v));
curEntries.decrementAndGet();
})
.recordStats()
.build();
}

long getEntrySize(K key, V value) {
long size = OVERHEAD_PER_CACHE_ENTRY;
if (key instanceof String) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is strongly related to the code above and duplicated. Let's consolidate it.

size += (long)((String)key).length();
} else {
size += 8L;
}
size += (value instanceof String) ? ((String)value).length() :
((Event)value).toJsonString().length();
return size;
}

public void put(K key, V value) {
if (!(value instanceof Event) && !(value instanceof String)) {
throw new RuntimeException("Currently only Event/String type values are supported");
}
if (!(key instanceof String) && !(key instanceof Integer) && !(key instanceof Long) && !(key instanceof List)) {
throw new RuntimeException("Currently only String/Integer/Long type keys are supported");
}
cache.put(key, value);
curEntries.incrementAndGet();
curBytes.addAndGet(getEntrySize(key, value));
// run cleanUp to create space by removing any expired entries
if (curEntries.get() > maxEntries/2 || curBytes.get() > maxBytes/2) {
cache.cleanUp();
}
}

public V get(K key) {
return cache.getIfPresent(key);
}

public boolean containsKey(K key) {
return cache.getIfPresent(key) != null;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache.asMap().containsKey(key)

}

public void remove(K key) {
V value = cache.getIfPresent(key);
if (value == null) {
return;
}
cache.invalidate(key);
cache.cleanUp();
}

public CacheStats getStats() {
return cache.stats();
}

public void clear() {
cache.invalidateAll();
cache.cleanUp();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically not needed fwiw

}

@VisibleForTesting
int getNumEntries() {
return curEntries.get();
}

@VisibleForTesting
long getSize() {
return curBytes.get();
}
}

Loading
Loading