Skip to content

CaffeineCache wrapper package#6003

Closed
kkondaka wants to merge 4 commits into
opensearch-project:mainfrom
kkondaka:common-cache
Closed

CaffeineCache wrapper package#6003
kkondaka wants to merge 4 commits into
opensearch-project:mainfrom
kkondaka:common-cache

Conversation

@kkondaka

Copy link
Copy Markdown
Collaborator

Description

Add common cache library using CaffieneCache supporting LRU mode and TinuLFU mode and with metrics keeping track of the size of the cache

Issues Resolved

Resolves #[Issue number to be closed when this PR is merged]

Check List

  • [ X] New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • [ X] Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
}
cache.invalidate(key);
curEntries--;
curBytes += getEntrySize(key, value);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be subtracting the bytes?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My mistake. Will fix it.

@graytaylor0

Copy link
Copy Markdown
Member

Build failing from checkstyle

Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/utils/CaffeineCache.java:12:8: Unused import - com.github.benmanes.caffeine.cache.RemovalListener. [UnusedImports]
Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/utils/CaffeineCache.java:18:8: Unused import - java.lang.instrument.Instrumentation. [UnusedImports]
Error: eckstyle] [ERROR] /home/runner/work/data-prepper/data-prepper/data-prepper-plugins/common/src/main/java/org/opensearch/dataprepper/common/utils/CaffeineCache.java:20:8: Unused import - java.util.Arrays. [UnusedImports]

api project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this available in our settings.gradle file? If not, we should move it there. If so, use that version.

private long curBytes;
private boolean strictLRU;

public CaffeineCache(int maxEntries, long maxBytes, int ttlSeconds, boolean strictLRU) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably avoid exposing the constructor and use a builder pattern for this, similar to Caffeine's builder pattern.

this.maxEntries = maxEntries;
Caffeine<K, V> caffeine;
if (strictLRU) {
caffeine = (Caffeine<K, V>) Caffeine.newBuilder()

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These builders seem far too restrictive. Similar to my comment above, we should have a builder and then map those fields to Caffeine versions.

We don't need to support all the fields now. But, the code should be written in such a way that it is trivial to add them. Probably having the builder in a separate file would help.

.expireAfterAccess(ttlSeconds, TimeUnit.SECONDS)
.removalListener((K k, V v, RemovalCause cause) -> {
int valueLength = (v instanceof String) ? ((String)v).length() :
((Event)v).toJsonString().length();

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's not a string, then you assume it's an Event. This would throw a cast class exception and be quite unclear.

Maybe this code should also go into a different class to allow for different types.


long getEntrySize(K key, V value) {
long size = OVERHEAD_PER_CACHE_ENTRY;
if (key instanceof String) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code is strongly related to the code above and duplicated. Let's consolidate it.

}

public boolean containsKey(K key) {
return cache.getIfPresent(key) != null;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache.asMap().containsKey(key)

this.strictLRU = strictLRU;
this.maxEntries = maxEntries;
Caffeine<K, V> caffeine;
if (strictLRU) {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caffeine isn't an LRU cache, nor strict in its decision making, so this flag is confusingly named. Perhaps the confusion is that weighted entries consume more space and likely makes it evict a candidate earlier in a unit test with a tiny maximum. The cache internally uses adaptive regions to tune itself based on the observed workload.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ben-manes Thank you very much for your review. So, there is no way to get LRU behavior where the new key is always inserted by throwing out an existing entry ( preferably LRU, but other may be ok, too)?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A new entry may go in but there are lots of reasons why it might be evicted promptly. A tiny cache for unit tests might see aggressive removal, but a larger cache with activity will be more balanced. The behavior isn’t deterministic though, it’s optimized for hit rates and injects randomness for denial of service protection

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ben-manes. Is there any way to get more deterministic behavior? We definitely want the new element to be inserted by evicting an existing element on some criteria. It is OK what gets evicted is not deterministic.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ben-manes Any reason why memcache like LRU behavior is not included in this?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cache uses two regions, admission and main. It always keeps admission large enough to keep 1 unit of capacity (an entry). When you use weighted entries, the single unit is not enough as your cache entry takes many. In a tiny cache for testing then it might be rejected. In a production sizing then there will be more space. memcached uses three regions and would be given far more space. It’s only that unit tests can be confusing and it’s likely okay

Comment on lines +103 to +109
V value = cache.getIfPresent(key);
if (value == null) {
return;
}
cache.invalidate(key);
curEntries--;
curBytes += getEntrySize(key, value);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cache called by multiple threads? If so then its racy and you could use cache.asMap().computeIfPresent along with atomics.


@VisibleForTesting
int getNumEntries() {
return curEntries;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cache.asMap().size()?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not using cache directly here because cache may not reflect the current state unless cleanup() is called, is my understanding incorrect?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don’t think there is a difference since you are relying on the cache to decrement this on cleanup in your removal listener, e.g. after expiration.

Comment on lines +88 to +91
// run cleanUp to create space by removing any expired entries
if (curEntries > maxEntries/2 || curBytes > maxBytes/2) {
cache.cleanUp();
}

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would run automatically after a put, but in a non-blocking manner. I'll usually set Caffeine.executor(Runnable::run) in unit tests to disable async so it runs on the caller.

You could also use Caffeine.scheduler(Scheduler.systemScheduler()) if you wanted prompt removal after expiration


@VisibleForTesting
long getSize() {
return curBytes;

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, you can get the weightedSize() from cache.policy().eviction() if isWeighted().

api project(':data-prepper-api')
implementation 'com.fasterxml.jackson.core:jackson-databind'
implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml'
implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8'

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fyi, 3.2.2 is the current release


public void clear() {
cache.invalidateAll();
cache.cleanUp();

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

technically not needed fwiw

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
…g them

Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
private AtomicInteger curEntries;
private AtomicLong curBytes;

public CaffeineCache(int maxEntries, long maxBytes, int ttlSeconds, boolean lruMode) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another option/mode that Caffeine Cache automatically evict an entry after its first access? This would be useful for the Merge use case where each key only needs to be accessed once and can be removed immediately after to optimize memory usage.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most likely the cache will detect this and automatically tune itself, as its goal is to maximize the hit ratio. Therefore it wants to evict unused entries aggressively. It will adaptively size a new arrival region (admission window) with a long-term popular region (main), using a frequency filter to guard the promotion.

The workload below has a blockchain trace chained with an analytical loop. The mining is LRU-biased since multiple workers pick up the event, validate it, and its unused thereafter. The analytics is MRU-biased as it scans over rows so it protects the useful contents from being forced out. The stress test switches between traces to show the cache adapts to the changing workload and no tuning parameters are needed. The hope is that it works out-of-the-box, but of course measuring the actual behavior is always good regardless.

image

@kkondaka

Copy link
Copy Markdown
Collaborator Author

Abandoning this PR for now. Will directly use CaffeineCache in aws-lambda processor without claiming LRU behavior

@kkondaka kkondaka closed this Oct 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants