Skip to content

Commit f050b10

Browse files
feat(cubestore): add stale-while-revalidate timeout for SQL query cache (cube-js#10822)
* feat(cubestore): add stale-while-revalidate timeout for SQL query cache Add CUBESTORE_QUERY_CACHE_STALE_WHILE_REVALIDATE env config param that enables stale-while-revalidate behavior for CubeStore's SQL result cache. When enabled (set to a number of seconds, e.g. 30), the cache maintains a secondary lookup keyed only by SQL query text + inline tables (ignoring partition/chunk IDs). If a query arrives and the exact cache key (which includes current partition/chunk snapshot) misses, but a stale entry exists and was created within the configured timeout, the stale result is served immediately while a background task refreshes the cache with fresh data. This is disabled by default (0 = disabled). The feature reduces query latency during compaction or partition changes when the underlying data hasn't materially changed. Changes: - config/mod.rs: Add query_cache_stale_while_revalidate_secs to ConfigObj trait and ConfigObjImpl, parsed from CUBESTORE_QUERY_CACHE_STALE_WHILE_REVALIDATE - sql/cache.rs: Add stale_cache (Moka cache keyed by SqlQueueCacheKey), stale-while-revalidate lookup in get(), background refresh via tokio::spawn - app_metrics.rs: Add DATA_QUERIES_CACHE_STALE_HIT counter metric - Unit test for stale-while-revalidate behavior Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com> * refactor(cubestore): use queue cache for background stale-while-revalidate refresh The background refresh spawned by stale-while-revalidate now integrates with the queue cache dedup mechanism: - Extract try_register_background() which atomically checks if a query is already in flight in the queue cache and registers a new entry if not. - On stale hit, try_register_background() is called first. If another execution is already in flight (from a previous stale hit or a concurrent foreground request), we skip spawning entirely. - The background spawn registers a watch::Sender in the queue cache, so any concurrent foreground get() for the same SQL that doesn't hit the stale cache will join as a waiter instead of starting a duplicate execution. - Queue cache entry cleanup happens lazily: sender is dropped when the background task completes, closing the channel. The next get() for that key detects the closed channel via has_changed().is_err() and pops it. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com> * refactor(cubestore): reuse queue cache flow for background stale-while-revalidate Extract get_inner() with is_background_refresh flag from get(). The background spawn now calls get_inner(true) which skips the stale cache check but reuses the entire queue cache dedup flow — no separate implementation for the background path. - get() is a thin wrapper: delegates to get_inner(false) - get_inner(is_background_refresh=true) skips stale check, goes straight to queue cache logic for natural dedup - get() now takes self: &Arc<Self> so the spawn can clone the Arc - get_inner() takes self: Arc<Self> and returns Pin<Box<dyn Future>> to satisfy Send + 'static bounds for tokio::spawn - Removed try_register_background() and all duplicated cache update logic from previous approach Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com> * fix(cubestore): fix stale-while-revalidate test to simulate partition change The test now invalidates the exact result cache before the second get() call to simulate a partition/chunk change that would make the exact cache key miss while the stale entry (keyed by SQL only) still matches. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com> * fix(cubestore): address review feedback on stale-while-revalidate cache - Add weigher to stale_cache so max_capacity is in bytes, not entry count - Fix clippy: use Duration::from_secs as method reference instead of closure - Add test: background refresh failure preserves stale entry for next request - Add test: stale timeout expiry falls through to fresh execution Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com> * feat(cubestore): add size/weight metrics for stale cache Add cs.sql.query.data.cache.stale.size and cs.sql.query.data.cache.stale.weight gauge metrics mirroring the existing result cache metrics. Reported alongside result cache metrics on cache mutations and zeroed on drop. Co-authored-by: Pavel Tiunov <pavel.tiunov@gmail.com> --------- Co-authored-by: Cursor Agent <cursoragent@cursor.com>
1 parent 9bdafd8 commit f050b10

5 files changed

Lines changed: 439 additions & 66 deletions

File tree

rust/cubestore/cubestore/src/app_metrics.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,16 @@ pub static WORKER_POOL_ERROR: Counter = metrics::counter("cs.worker_pool.errors"
1313
/// Incoming SQL queries that do data reads.
1414
pub static DATA_QUERIES: Counter = metrics::counter("cs.sql.query.data");
1515
pub static DATA_QUERIES_CACHE_HIT: Counter = metrics::counter("cs.sql.query.data.cache.hit");
16+
pub static DATA_QUERIES_CACHE_STALE_HIT: Counter =
17+
metrics::counter("cs.sql.query.data.cache.stale_hit");
1618
// Approximate number of entries in this cache.
1719
pub static DATA_QUERIES_CACHE_SIZE: Gauge = metrics::gauge("cs.sql.query.data.cache.size");
1820
// Approximate total weighted size of entries in this cache.
1921
pub static DATA_QUERIES_CACHE_WEIGHT: Gauge = metrics::gauge("cs.sql.query.data.cache.weight");
22+
pub static DATA_QUERIES_STALE_CACHE_SIZE: Gauge =
23+
metrics::gauge("cs.sql.query.data.cache.stale.size");
24+
pub static DATA_QUERIES_STALE_CACHE_WEIGHT: Gauge =
25+
metrics::gauge("cs.sql.query.data.cache.stale.weight");
2026
pub static DATA_QUERY_TIME_MS: Histogram = metrics::histogram("cs.sql.query.data.ms");
2127
pub static DATA_QUERY_LOGICAL_PLAN_TOTAL_CREATION_TIME_US: Histogram =
2228
metrics::histogram("cs.sql.query.data.planning.logical_plan.total_creation.us");

rust/cubestore/cubestore/src/config/mod.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -510,6 +510,8 @@ pub trait ConfigObj: DIService {
510510

511511
fn query_cache_time_to_idle_secs(&self) -> Option<u64>;
512512

513+
fn query_cache_stale_while_revalidate_secs(&self) -> Option<u64>;
514+
513515
fn metadata_cache_max_capacity_bytes(&self) -> u64;
514516

515517
fn metadata_cache_time_to_idle_secs(&self) -> u64;
@@ -643,6 +645,7 @@ pub struct ConfigObjImpl {
643645
pub query_cache_max_capacity_bytes: u64,
644646
pub query_queue_cache_max_capacity: u64,
645647
pub query_cache_time_to_idle_secs: Option<u64>,
648+
pub query_cache_stale_while_revalidate_secs: Option<u64>,
646649
pub metadata_cache_max_capacity_bytes: u64,
647650
pub metadata_cache_time_to_idle_secs: u64,
648651
pub stream_replay_check_interval_secs: u64,
@@ -956,6 +959,9 @@ impl ConfigObj for ConfigObjImpl {
956959
fn query_cache_time_to_idle_secs(&self) -> Option<u64> {
957960
self.query_cache_time_to_idle_secs
958961
}
962+
fn query_cache_stale_while_revalidate_secs(&self) -> Option<u64> {
963+
self.query_cache_stale_while_revalidate_secs
964+
}
959965

960966
fn metadata_cache_max_capacity_bytes(&self) -> u64 {
961967
self.metadata_cache_max_capacity_bytes
@@ -1233,6 +1239,8 @@ impl Config {
12331239

12341240
pub fn default() -> Config {
12351241
let query_timeout = env_parse("CUBESTORE_QUERY_TIMEOUT", 120);
1242+
let query_cache_stale_while_revalidate_secs: u64 =
1243+
env_parse("CUBESTORE_QUERY_CACHE_STALE_WHILE_REVALIDATE", 0);
12361244
let query_cache_time_to_idle_secs = env_parse(
12371245
"CUBESTORE_QUERY_CACHE_TIME_TO_IDLE",
12381246
// 1 hour
@@ -1528,6 +1536,13 @@ impl Config {
15281536
} else {
15291537
Some(query_cache_time_to_idle_secs)
15301538
},
1539+
query_cache_stale_while_revalidate_secs: if query_cache_stale_while_revalidate_secs
1540+
== 0
1541+
{
1542+
None
1543+
} else {
1544+
Some(query_cache_stale_while_revalidate_secs)
1545+
},
15311546
metadata_cache_max_capacity_bytes: env_parse(
15321547
"CUBESTORE_METADATA_CACHE_MAX_CAPACITY_BYTES",
15331548
0,
@@ -1740,6 +1755,7 @@ impl Config {
17401755
query_cache_max_capacity_bytes: 512 << 20,
17411756
query_queue_cache_max_capacity: 10000,
17421757
query_cache_time_to_idle_secs: Some(600),
1758+
query_cache_stale_while_revalidate_secs: None,
17431759
metadata_cache_max_capacity_bytes: 0,
17441760
metadata_cache_time_to_idle_secs: 1_000,
17451761
meta_store_log_upload_interval: 30,
@@ -2393,6 +2409,7 @@ impl Config {
23932409
self.config_obj.query_cache_max_capacity_bytes(),
23942410
self.config_obj.query_cache_time_to_idle_secs(),
23952411
self.config_obj.query_queue_cache_max_capacity(),
2412+
self.config_obj.query_cache_stale_while_revalidate_secs(),
23962413
));
23972414

23982415
let query_cache_to_move = query_cache.clone();

rust/cubestore/cubestore/src/queryplanner/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1063,7 +1063,7 @@ pub mod tests {
10631063
Arc::new(test_utils::MetaStoreMock {}),
10641064
Arc::new(test_utils::CacheStoreMock {}),
10651065
&vec![],
1066-
Arc::new(SqlResultCache::new(1 << 20, None, 10000)),
1066+
Arc::new(SqlResultCache::new(1 << 20, None, 10000, None)),
10671067
Arc::new(SessionContext::new().state()),
10681068
)
10691069
}

0 commit comments

Comments
 (0)