Skip to content

Commit 63cf726

Browse files
committed
fix(cache): make in memory cache global
1 parent 702987d commit 63cf726

File tree

3 files changed

+44
-26
lines changed

3 files changed

+44
-26
lines changed

engine/packages/cache/src/driver.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
fmt::Debug,
3+
sync::OnceLock,
34
time::{Duration, Instant},
45
};
56

@@ -132,10 +133,10 @@ impl moka::Expiry<String, ExpiringValue> for ValueExpiry {
132133
}
133134
}
134135

136+
static CACHE: OnceLock<Cache<String, ExpiringValue>> = OnceLock::new();
137+
135138
/// In-memory cache driver implementation using the moka crate
136-
pub struct InMemoryDriver {
137-
cache: Cache<String, ExpiringValue>,
138-
}
139+
pub struct InMemoryDriver {}
139140

140141
impl Debug for InMemoryDriver {
141142
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
@@ -146,11 +147,20 @@ impl Debug for InMemoryDriver {
146147
impl InMemoryDriver {
147148
pub fn new(max_capacity: u64) -> Self {
148149
// Create a cache with ValueExpiry implementation for custom expiration times
149-
let cache = CacheBuilder::new(max_capacity)
150-
.expire_after(ValueExpiry)
151-
.build();
150+
CACHE.get_or_init(|| {
151+
CacheBuilder::new(max_capacity)
152+
.expire_after(ValueExpiry)
153+
.eviction_listener(|key, _value, cause| {
154+
tracing::debug!(?key, ?cause, "cache eviction");
155+
})
156+
.build()
157+
});
158+
159+
Self {}
160+
}
152161

153-
Self { cache }
162+
fn cache(&self) -> &Cache<String, ExpiringValue> {
163+
CACHE.get().expect("should be initialized")
154164
}
155165

156166
pub async fn get<'a>(
@@ -163,7 +173,7 @@ impl InMemoryDriver {
163173
// Async block for metrics
164174
async {
165175
for key in keys {
166-
result.push(self.cache.get(&**key).await.map(|x| x.value.clone()));
176+
result.push(self.cache().get(&**key).await.map(|x| x.value.clone()));
167177
}
168178
}
169179
.instrument(tracing::info_span!("get"))
@@ -193,7 +203,7 @@ impl InMemoryDriver {
193203
};
194204

195205
// Store in cache - expiry will be handled by ValueExpiry
196-
self.cache.insert(key.into(), entry).await;
206+
self.cache().insert(key.into(), entry).await;
197207
}
198208
}
199209
.instrument(tracing::info_span!("set"))
@@ -212,7 +222,7 @@ impl InMemoryDriver {
212222
async {
213223
for key in keys {
214224
// Use remove instead of invalidate to ensure it's actually removed
215-
self.cache.remove(&*key).await;
225+
self.cache().remove(&*key).await;
216226
}
217227
}
218228
.instrument(tracing::info_span!("delete"))

engine/packages/cache/src/inner.rs

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,20 @@
1-
use std::{fmt::Debug, sync::Arc};
1+
use std::{
2+
fmt::Debug,
3+
sync::{Arc, OnceLock},
4+
};
25

36
use tokio::sync::broadcast;
47

58
use super::*;
69
use crate::driver::{Driver, InMemoryDriver};
710

11+
static IN_FLIGHT: OnceLock<scc::HashMap<RawCacheKey, broadcast::Sender<()>>> = OnceLock::new();
12+
813
pub type Cache = Arc<CacheInner>;
914

1015
/// Utility type used to hold information relating to caching.
1116
pub struct CacheInner {
1217
pub(crate) driver: Driver,
13-
pub(crate) in_flight: scc::HashMap<RawCacheKey, broadcast::Sender<()>>,
1418
pub(crate) ups: Option<universalpubsub::PubSub>,
1519
}
1620

@@ -36,11 +40,12 @@ impl CacheInner {
3640
#[tracing::instrument(skip(ups))]
3741
pub fn new_in_memory(max_capacity: u64, ups: Option<universalpubsub::PubSub>) -> Cache {
3842
let driver = Driver::InMemory(InMemoryDriver::new(max_capacity));
39-
Arc::new(CacheInner {
40-
driver,
41-
in_flight: scc::HashMap::new(),
42-
ups,
43-
})
43+
44+
Arc::new(CacheInner { driver, ups })
45+
}
46+
47+
pub(crate) fn in_flight(&self) -> &scc::HashMap<RawCacheKey, broadcast::Sender<()>> {
48+
IN_FLIGHT.get_or_init(scc::HashMap::new)
4449
}
4550
}
4651

engine/packages/cache/src/req_config.rs

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ impl RequestConfig {
4747

4848
// MARK: Fetch
4949
impl RequestConfig {
50-
#[tracing::instrument(err, skip(keys, getter, encoder, decoder))]
50+
#[tracing::instrument(err, skip_all, fields(?base_key))]
5151
async fn fetch_all_convert<Key, Value, Getter, Fut, Encoder, Decoder>(
5252
self,
5353
base_key: impl ToString + Debug,
@@ -129,7 +129,7 @@ impl RequestConfig {
129129
// Determine which keys are currently being fetched and not
130130
for key in remaining_keys {
131131
let cache_key = self.cache.driver.process_key(&base_key, &key);
132-
match self.cache.in_flight.entry_async(cache_key).await {
132+
match self.cache.in_flight().entry_async(cache_key).await {
133133
scc::hash_map::Entry::Occupied(broadcast) => {
134134
waiting_keys.push((key, broadcast.subscribe()));
135135
}
@@ -189,7 +189,13 @@ impl RequestConfig {
189189
succeeded_keys.into_iter().unzip();
190190

191191
let (cached_values_res, ctx3_res) = tokio::join!(
192-
cache.driver.get(&base_key2, &succeeded_cache_keys),
192+
async {
193+
if succeeded_cache_keys.is_empty() {
194+
Ok(Vec::new())
195+
} else {
196+
cache.driver.get(&base_key2, &succeeded_cache_keys).await
197+
}
198+
},
193199
async {
194200
if failed_keys.is_empty() {
195201
Ok(ctx3)
@@ -276,7 +282,7 @@ impl RequestConfig {
276282
// Release leases
277283
for key in leased_keys {
278284
let cache_key = self.cache.driver.process_key(&base_key, &key);
279-
self.cache.in_flight.remove_async(&cache_key).await;
285+
self.cache.in_flight().remove_async(&cache_key).await;
280286
}
281287
}
282288

@@ -310,7 +316,7 @@ impl RequestConfig {
310316
}
311317
}
312318

313-
#[tracing::instrument(err, skip(keys))]
319+
#[tracing::instrument(err, skip_all, fields(?base_key))]
314320
pub async fn purge<Key>(
315321
self,
316322
base_key: impl AsRef<str> + Debug,
@@ -363,7 +369,7 @@ impl RequestConfig {
363369

364370
/// Purges keys from the local cache without publishing to NATS.
365371
/// This is used by the cache-purge service to avoid recursive publishing.
366-
#[tracing::instrument(err, skip(keys))]
372+
#[tracing::instrument(err, skip_all, fields(?base_key))]
367373
pub async fn purge_local(
368374
self,
369375
base_key: impl AsRef<str> + Debug,
@@ -398,7 +404,6 @@ impl RequestConfig {
398404

399405
// MARK: JSON fetch
400406
impl RequestConfig {
401-
#[tracing::instrument(err, skip(key, getter))]
402407
pub async fn fetch_one_json<Key, Value, Getter, Fut>(
403408
self,
404409
base_key: impl ToString + Debug,
@@ -428,7 +433,6 @@ impl RequestConfig {
428433
Ok(values.into_iter().next().map(|(_, v)| v))
429434
}
430435

431-
#[tracing::instrument(err, skip(keys, getter))]
432436
pub async fn fetch_all_json<Key, Value, Getter, Fut>(
433437
self,
434438
base_key: impl ToString + Debug,
@@ -447,7 +451,6 @@ impl RequestConfig {
447451
.map(|x| x.into_iter().map(|(_, v)| v).collect::<Vec<_>>())
448452
}
449453

450-
#[tracing::instrument(err, skip(keys, getter))]
451454
pub async fn fetch_all_json_with_keys<Key, Value, Getter, Fut>(
452455
self,
453456
base_key: impl ToString + Debug,

0 commit comments

Comments
 (0)