CaffeineCache wrapper package#6003
Conversation
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
| } | ||
| cache.invalidate(key); | ||
| curEntries--; | ||
| curBytes += getEntrySize(key, value); |
There was a problem hiding this comment.
Shouldn't this be subtracting the bytes?
There was a problem hiding this comment.
My mistake. Will fix it.
|
Build failing from checkstyle |
| api project(':data-prepper-api') | ||
| implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
| implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' | ||
| implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8' |
There was a problem hiding this comment.
Is this available in our settings.gradle file? If not, we should move it there. If so, use that version.
| private long curBytes; | ||
| private boolean strictLRU; | ||
|
|
||
| public CaffeineCache(int maxEntries, long maxBytes, int ttlSeconds, boolean strictLRU) { |
There was a problem hiding this comment.
We should probably avoid exposing the constructor and use a builder pattern for this, similar to Caffeine's builder pattern.
| this.maxEntries = maxEntries; | ||
| Caffeine<K, V> caffeine; | ||
| if (strictLRU) { | ||
| caffeine = (Caffeine<K, V>) Caffeine.newBuilder() |
There was a problem hiding this comment.
These builders seem far too restrictive. Similar to my comment above, we should have a builder and then map those fields to Caffeine versions.
We don't need to support all the fields now. But, the code should be written in such a way that it is trivial to add them. Probably having the builder in a separate file would help.
| .expireAfterAccess(ttlSeconds, TimeUnit.SECONDS) | ||
| .removalListener((K k, V v, RemovalCause cause) -> { | ||
| int valueLength = (v instanceof String) ? ((String)v).length() : | ||
| ((Event)v).toJsonString().length(); |
There was a problem hiding this comment.
If it's not a string, then you assume it's an Event. This would throw a cast class exception and be quite unclear.
Maybe this code should also go into a different class to allow for different types.
|
|
||
| long getEntrySize(K key, V value) { | ||
| long size = OVERHEAD_PER_CACHE_ENTRY; | ||
| if (key instanceof String) { |
There was a problem hiding this comment.
This code is strongly related to the code above and duplicated. Let's consolidate it.
| } | ||
|
|
||
| public boolean containsKey(K key) { | ||
| return cache.getIfPresent(key) != null; |
| this.strictLRU = strictLRU; | ||
| this.maxEntries = maxEntries; | ||
| Caffeine<K, V> caffeine; | ||
| if (strictLRU) { |
There was a problem hiding this comment.
Caffeine isn't an LRU cache, nor strict in its decision making, so this flag is confusingly named. Perhaps the confusion is that weighted entries consume more space and likely makes it evict a candidate earlier in a unit test with a tiny maximum. The cache internally uses adaptive regions to tune itself based on the observed workload.
There was a problem hiding this comment.
@ben-manes Thank you very much for your review. So, there is no way to get LRU behavior where the new key is always inserted by throwing out an existing entry ( preferably LRU, but other may be ok, too)?
There was a problem hiding this comment.
A new entry may go in but there are lots of reasons why it might be evicted promptly. A tiny cache for unit tests might see aggressive removal, but a larger cache with activity will be more balanced. The behavior isn’t deterministic though, it’s optimized for hit rates and injects randomness for denial of service protection
There was a problem hiding this comment.
Thanks @ben-manes. Is there any way to get more deterministic behavior? We definitely want the new element to be inserted by evicting an existing element on some criteria. It is OK what gets evicted is not deterministic.
There was a problem hiding this comment.
@ben-manes Any reason why memcache like LRU behavior is not included in this?
There was a problem hiding this comment.
The cache uses two regions, admission and main. It always keeps admission large enough to keep 1 unit of capacity (an entry). When you use weighted entries, the single unit is not enough as your cache entry takes many. In a tiny cache for testing then it might be rejected. In a production sizing then there will be more space. memcached uses three regions and would be given far more space. It’s only that unit tests can be confusing and it’s likely okay
| V value = cache.getIfPresent(key); | ||
| if (value == null) { | ||
| return; | ||
| } | ||
| cache.invalidate(key); | ||
| curEntries--; | ||
| curBytes += getEntrySize(key, value); |
There was a problem hiding this comment.
Is this cache called by multiple threads? If so then its racy and you could use cache.asMap().computeIfPresent along with atomics.
|
|
||
| @VisibleForTesting | ||
| int getNumEntries() { | ||
| return curEntries; |
There was a problem hiding this comment.
I am not using cache directly here because cache may not reflect the current state unless cleanup() is called, is my understanding incorrect?
There was a problem hiding this comment.
I don’t think there is a difference since you are relying on the cache to decrement this on cleanup in your removal listener, e.g. after expiration.
| // run cleanUp to create space by removing any expired entries | ||
| if (curEntries > maxEntries/2 || curBytes > maxBytes/2) { | ||
| cache.cleanUp(); | ||
| } |
There was a problem hiding this comment.
This would run automatically after a put, but in a non-blocking manner. I'll usually set Caffeine.executor(Runnable::run) in unit tests to disable async so it runs on the caller.
You could also use Caffeine.scheduler(Scheduler.systemScheduler()) if you wanted prompt removal after expiration
|
|
||
| @VisibleForTesting | ||
| long getSize() { | ||
| return curBytes; |
There was a problem hiding this comment.
fwiw, you can get the weightedSize() from cache.policy().eviction() if isWeighted().
| api project(':data-prepper-api') | ||
| implementation 'com.fasterxml.jackson.core:jackson-databind' | ||
| implementation 'com.fasterxml.jackson.dataformat:jackson-dataformat-yaml' | ||
| implementation 'com.github.ben-manes.caffeine:caffeine:3.1.8' |
|
|
||
| public void clear() { | ||
| cache.invalidateAll(); | ||
| cache.cleanUp(); |
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
…g them Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
Signed-off-by: Krishna Kondaka <krishkdk@amazon.com>
| private AtomicInteger curEntries; | ||
| private AtomicLong curBytes; | ||
|
|
||
| public CaffeineCache(int maxEntries, long maxBytes, int ttlSeconds, boolean lruMode) { |
There was a problem hiding this comment.
Is there another option/mode that Caffeine Cache automatically evict an entry after its first access? This would be useful for the Merge use case where each key only needs to be accessed once and can be removed immediately after to optimize memory usage.
There was a problem hiding this comment.
Most likely the cache will detect this and automatically tune itself, as its goal is to maximize the hit ratio. Therefore it wants to evict unused entries aggressively. It will adaptively size a new arrival region (admission window) with a long-term popular region (main), using a frequency filter to guard the promotion.
The workload below has a blockchain trace chained with an analytical loop. The mining is LRU-biased since multiple workers pick up the event, validate it, and its unused thereafter. The analytics is MRU-biased as it scans over rows so it protects the useful contents from being forced out. The stress test switches between traces to show the cache adapts to the changing workload and no tuning parameters are needed. The hope is that it works out-of-the-box, but of course measuring the actual behavior is always good regardless.
|
Abandoning this PR for now. Will directly use CaffeineCache in aws-lambda processor without claiming LRU behavior |
Description
Add common cache library using CaffieneCache supporting LRU mode and TinuLFU mode and with metrics keeping track of the size of the cache
Issues Resolved
Resolves #[Issue number to be closed when this PR is merged]
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.