-
Notifications
You must be signed in to change notification settings - Fork 325
CaffeineCache wrapper package #6003
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -10,6 +10,7 @@ dependencies { | |
| api project(':data-prepper-api') | ||
| implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
| implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' | ||
| implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8' | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fyi, 3.2.2 is the current release |
||
| implementation libs.commons.io | ||
| implementation 'software.amazon.awssdk:s3' | ||
| implementation 'software.amazon.awssdk:acm' | ||
|
|
@@ -37,4 +38,4 @@ jacocoTestCoverageVerification { | |
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,133 @@ | ||
| /* | ||
| * Copyright OpenSearch Contributors | ||
| * SPDX-License-Identifier: Apache-2.0 | ||
| */ | ||
|
|
||
| package org.opensearch.dataprepper.common.utils; | ||
| import com.google.common.annotations.VisibleForTesting; | ||
|
|
||
| import org.opensearch.dataprepper.model.event.Event; | ||
|
|
||
| import com.github.benmanes.caffeine.cache.RemovalCause; | ||
| import com.github.benmanes.caffeine.cache.RemovalListener; | ||
| import com.github.benmanes.caffeine.cache.Cache; | ||
| import com.github.benmanes.caffeine.cache.stats.CacheStats; | ||
| import com.github.benmanes.caffeine.cache.Caffeine; | ||
| import com.github.benmanes.caffeine.cache.Weigher; | ||
|
|
||
| import java.lang.instrument.Instrumentation; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.Arrays; | ||
| import java.util.List; | ||
| import java.util.concurrent.atomic.AtomicInteger; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
|
|
||
| public class CaffeineCache<K, V> { | ||
|
|
||
| public static final int OVERHEAD_PER_CACHE_ENTRY = 32; | ||
|
|
||
| class ByteArrayValueWeigher implements Weigher<K, V> { | ||
|
|
||
| @Override | ||
| public int weigh(K key, V value) { | ||
| return value.toString().length() + key.toString().length(); | ||
| } | ||
| } | ||
|
|
||
| private final Cache<K, V> cache; | ||
| private final long maxBytes; | ||
| private final int maxEntries; | ||
| private AtomicInteger curEntries; | ||
| private AtomicLong curBytes; | ||
|
|
||
| public CaffeineCache(int maxEntries, long maxBytes, int ttlSeconds, boolean lruMode) { | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there another option/mode that Caffeine Cache automatically evict an entry after its first access? This would be useful for the Merge use case where each key only needs to be accessed once and can be removed immediately after to optimize memory usage. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Most likely the cache will detect this and automatically tune itself, as its goal is to maximize the hit ratio. Therefore it wants to evict unused entries aggressively. It will adaptively size a new arrival region (admission window) with a long-term popular region (main), using a frequency filter to guard the promotion. The workload below has a blockchain trace chained with an analytical loop. The mining is LRU-biased since multiple workers pick up the event, validate it, and its unused thereafter. The analytics is MRU-biased as it scans over rows so it protects the useful contents from being forced out. The stress test switches between traces to show the cache adapts to the changing workload and no tuning parameters are needed. The hope is that it works out-of-the-box, but of course measuring the actual behavior is always good regardless.
|
||
| this.maxBytes = maxBytes; | ||
| this.maxEntries = maxEntries; | ||
| this.curEntries = new AtomicInteger(0); | ||
| this.curBytes = new AtomicLong(0); | ||
| Caffeine<K, V> caffeine; | ||
| if (lruMode) { | ||
| caffeine = (Caffeine<K, V>) Caffeine.newBuilder() | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. These builders seem far too restrictive. Similar to my comment above, we should have a builder and then map those fields to Caffeine versions. We don't need to support all the fields now. But, the code should be written in such a way that it is trivial to add them. Probably having the builder in a separate file would help. |
||
| .maximumSize(maxEntries); | ||
| } else { | ||
| caffeine = (Caffeine<K, V>) Caffeine.newBuilder() | ||
| .maximumWeight(maxBytes) | ||
| .weigher(new ByteArrayValueWeigher()); | ||
| } | ||
| this.cache = caffeine | ||
| .expireAfterAccess(ttlSeconds, TimeUnit.SECONDS) | ||
| .removalListener((K k, V v, RemovalCause cause) -> { | ||
| int valueLength = (v instanceof String) ? ((String)v).length() : | ||
| ((Event)v).toJsonString().length(); | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's not a string, then you assume it's an Maybe this code should also go into a different class to allow for different types. |
||
| curBytes.addAndGet(-getEntrySize(k, v)); | ||
| curEntries.decrementAndGet(); | ||
| }) | ||
| .recordStats() | ||
| .build(); | ||
| } | ||
|
|
||
| long getEntrySize(K key, V value) { | ||
| long size = OVERHEAD_PER_CACHE_ENTRY; | ||
| if (key instanceof String) { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This code is strongly related to the code above and duplicated. Let's consolidate it. |
||
| size += (long)((String)key).length(); | ||
| } else { | ||
| size += 8L; | ||
| } | ||
| size += (value instanceof String) ? ((String)value).length() : | ||
| ((Event)value).toJsonString().length(); | ||
| return size; | ||
| } | ||
|
|
||
| public void put(K key, V value) { | ||
| if (!(value instanceof Event) && !(value instanceof String)) { | ||
| throw new RuntimeException("Currently only Event/String type values are supported"); | ||
| } | ||
| if (!(key instanceof String) && !(key instanceof Integer) && !(key instanceof Long) && !(key instanceof List)) { | ||
| throw new RuntimeException("Currently only String/Integer/Long type keys are supported"); | ||
| } | ||
| cache.put(key, value); | ||
| curEntries.incrementAndGet(); | ||
| curBytes.addAndGet(getEntrySize(key, value)); | ||
| // run cleanUp to create space by removing any expired entries | ||
| if (curEntries.get() > maxEntries/2 || curBytes.get() > maxBytes/2) { | ||
| cache.cleanUp(); | ||
| } | ||
| } | ||
|
|
||
| public V get(K key) { | ||
| return cache.getIfPresent(key); | ||
| } | ||
|
|
||
| public boolean containsKey(K key) { | ||
| return cache.getIfPresent(key) != null; | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
| } | ||
|
|
||
| public void remove(K key) { | ||
| V value = cache.getIfPresent(key); | ||
| if (value == null) { | ||
| return; | ||
| } | ||
| cache.invalidate(key); | ||
| cache.cleanUp(); | ||
| } | ||
|
|
||
| public CacheStats getStats() { | ||
| return cache.stats(); | ||
| } | ||
|
|
||
| public void clear() { | ||
| cache.invalidateAll(); | ||
| cache.cleanUp(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. technically not needed fwiw |
||
| } | ||
|
|
||
| @VisibleForTesting | ||
| int getNumEntries() { | ||
| return curEntries.get(); | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| long getSize() { | ||
| return curBytes.get(); | ||
| } | ||
| } | ||
|
|
||

There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this available in our settings.gradle file? If not, we should move it there. If so, use that version.