Skip to content

Commit 125fbf9

Browse files
committed
glide-core: Phase 2 client-side caching - server-assisted invalidation via CLIENT TRACKING
Signed-off-by: affonsov <67347924+affonsov@users.noreply.github.com>
1 parent 74a6c9a commit 125fbf9

10 files changed

Lines changed: 331 additions & 4 deletions

File tree

glide-core/redis-rs/redis/src/aio/mod.rs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,37 @@ where
225225
}
226226
}
227227

228+
if connection_info.server_assisted_cache {
229+
if connection_info.protocol == ProtocolVersion::RESP2 {
230+
return Err(RedisError::from((
231+
ErrorKind::InvalidClientConfig,
232+
"server_assisted_cache requires RESP3 protocol",
233+
)));
234+
}
235+
match cmd("CLIENT")
236+
.arg("TRACKING")
237+
.arg("ON")
238+
.arg("BCAST")
239+
.query_async(con)
240+
.await
241+
{
242+
Ok(Value::Okay) => {}
243+
Err(e) => {
244+
return Err(RedisError::from((
245+
ErrorKind::ClientError,
246+
"Failed to enable server-assisted client tracking",
247+
e.to_string(),
248+
)));
249+
}
250+
_ => {
251+
return Err(RedisError::from((
252+
ErrorKind::ClientError,
253+
"Unexpected response from CLIENT TRACKING ON BCAST",
254+
)));
255+
}
256+
}
257+
}
258+
228259
if discover_az {
229260
update_az_from_info(con).await?;
230261
}

glide-core/redis-rs/redis/src/aio/multiplexed_connection.rs

Lines changed: 32 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@ pin_project! {
113113
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
114114
is_stream_closed: Arc<AtomicBool>,
115115
response_sync_lost: bool,
116+
cache: Option<Arc<dyn GlideCache>>,
116117
}
117118

118119
impl<T> PinnedDrop for PipelineSink<T> {
@@ -140,6 +141,7 @@ where
140141
push_manager: Arc<ArcSwap<PushManager>>,
141142
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
142143
is_stream_closed: Arc<AtomicBool>,
144+
cache: Option<Arc<dyn GlideCache>>,
143145
) -> Self
144146
where
145147
T: Sink<SinkItem, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
@@ -152,6 +154,7 @@ where
152154
disconnect_notifier,
153155
is_stream_closed,
154156
response_sync_lost: false,
157+
cache,
155158
}
156159
}
157160

@@ -194,6 +197,26 @@ where
194197
if let Ok(res) = &result {
195198
if let Value::Push { kind, data: _data } = res {
196199
self_.push_manager.load().try_send_raw(res);
200+
if kind == &PushKind::Invalidate {
201+
if let Some(cache) = self_.cache {
202+
match _data.first() {
203+
Some(Value::Array(keys)) => {
204+
for key in keys {
205+
if let Value::BulkString(k) = key {
206+
cache.invalidate(k);
207+
} else if let Value::VerbatimString { text, .. } = key {
208+
cache.invalidate(text.as_bytes());
209+
}
210+
}
211+
}
212+
Some(Value::Nil) => {
213+
cache.flush_all();
214+
}
215+
None => { /* malformed push, ignore */ }
216+
_ => {}
217+
}
218+
}
219+
}
197220
if !kind.has_reply() {
198221
return;
199222
}
@@ -503,6 +526,7 @@ where
503526
fn new<T>(
504527
sink_stream: T,
505528
disconnect_notifier: Option<Box<dyn DisconnectNotifier>>,
529+
cache: Option<Arc<dyn GlideCache>>,
506530
) -> (Self, impl Future<Output = ()>)
507531
where
508532
T: Sink<SinkItem, Error = RedisError> + Stream<Item = RedisResult<Value>> + 'static,
@@ -521,6 +545,7 @@ where
521545
push_manager.clone(),
522546
disconnect_notifier,
523547
is_stream_closed.clone(),
548+
cache,
524549
);
525550
let f = stream::poll_fn(move |cx| receiver.poll_recv(cx))
526551
.map(Ok)
@@ -702,8 +727,11 @@ impl MultiplexedConnection {
702727
let codec = ValueCodec::default()
703728
.framed(stream)
704729
.and_then(|msg| async move { msg });
705-
let (mut pipeline, driver) =
706-
Pipeline::new(codec, glide_connection_options.disconnect_notifier);
730+
let (mut pipeline, driver) = Pipeline::new(
731+
codec,
732+
glide_connection_options.disconnect_notifier,
733+
connection_info.redis.cache.clone(),
734+
);
707735
let driver = Box::pin(driver);
708736
let pm = PushManager::new(
709737
glide_connection_options.push_sender,
@@ -1160,7 +1188,7 @@ mod tests {
11601188
};
11611189

11621190
// Create pipeline but don't drive it, the channel will fill and send() will block
1163-
let (mut pipeline, driver) = Pipeline::new(stalling_sink, None);
1191+
let (mut pipeline, driver) = Pipeline::new(stalling_sink, None, None);
11641192
std::mem::forget(driver);
11651193

11661194
// Fill the 50-slot pipeline channel
@@ -1299,7 +1327,7 @@ mod tests {
12991327
waker: None,
13001328
};
13011329

1302-
let (pipeline, driver) = Pipeline::new(stream, None);
1330+
let (pipeline, driver) = Pipeline::new(stream, None, None);
13031331
let driver_handle = tokio::spawn(driver);
13041332

13051333
// Send first command — this should go through fine

glide-core/redis-rs/redis/src/cache/glide_cache.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -456,6 +456,9 @@ pub trait GlideCache: Send + Sync + Debug {
456456
/// * `key` - The key to invalidate
457457
fn invalidate(&self, key: &[u8]);
458458

459+
/// Removes all entries from the cache
460+
fn flush_all(&self);
461+
459462
// ==================== Metrics ====================
460463

461464
/// Returns current cache metrics (hits, misses, etc.)
@@ -637,6 +640,20 @@ impl<S: EvictionStrategy + 'static> GlideCache for GlideCacheImpl<S> {
637640
}
638641
}
639642

643+
fn flush_all(&self) {
644+
let mut store = self.store.write().unwrap();
645+
while let Some(entry) = store.evict_one() {
646+
self.core.uncharge(entry.size);
647+
}
648+
if let Some(stats) = self.core.stats() {
649+
stats.record_invalidation();
650+
}
651+
debug!(
652+
"cache_flush_all - [{}] Flushed all entries",
653+
store.policy_name()
654+
);
655+
}
656+
640657
fn entry_count(&self) -> u64 {
641658
self.store.read().unwrap().len() as u64
642659
}

glide-core/redis-rs/redis/src/cluster.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1028,6 +1028,7 @@ pub(crate) fn get_connection_info(
10281028
protocol: cluster_params.protocol,
10291029
db: cluster_params.database_id,
10301030
cache: cluster_params.cache,
1031+
server_assisted_cache: cluster_params.server_assisted_cache,
10311032
},
10321033
})
10331034
}

glide-core/redis-rs/redis/src/cluster_client.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ struct BuilderParams {
5050
database_id: i64,
5151
tcp_nodelay: bool,
5252
cache: Option<Arc<dyn GlideCache>>,
53+
server_assisted_cache: bool,
5354
address_resolver: Option<Arc<dyn AddressResolver>>,
5455
}
5556

@@ -155,6 +156,7 @@ pub struct ClusterParams {
155156
pub(crate) database_id: i64,
156157
pub(crate) tcp_nodelay: bool,
157158
pub(crate) cache: Option<Arc<dyn GlideCache>>,
159+
pub(crate) server_assisted_cache: bool,
158160
/// Optional callback for resolving addresses before connection.
159161
pub(crate) address_resolver: Option<Arc<dyn AddressResolver>>,
160162
}
@@ -190,6 +192,7 @@ impl ClusterParams {
190192
database_id: value.database_id,
191193
tcp_nodelay: value.tcp_nodelay,
192194
cache: value.cache,
195+
server_assisted_cache: value.server_assisted_cache,
193196
address_resolver: value.address_resolver,
194197
})
195198
}
@@ -222,6 +225,7 @@ impl ClusterParams {
222225
database_id: 0,
223226
tcp_nodelay: false,
224227
cache: None,
228+
server_assisted_cache: false,
225229
address_resolver: None,
226230
}
227231
}
@@ -593,6 +597,12 @@ impl ClusterClientBuilder {
593597
self
594598
}
595599

600+
/// Sets whether server-assisted client-side caching (CLIENT TRACKING) is enabled.
601+
pub fn server_assisted_cache(mut self, enabled: bool) -> ClusterClientBuilder {
602+
self.builder_params.server_assisted_cache = enabled;
603+
self
604+
}
605+
596606
/// Use `build()`.
597607
#[deprecated(since = "0.22.0", note = "Use build()")]
598608
pub fn open(self) -> RedisResult<ClusterClient> {

glide-core/redis-rs/redis/src/connection.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ pub struct RedisConnectionInfo {
237237
pub lib_name: Option<String>,
238238
/// Optionally a cache used for client-side caching
239239
pub cache: Option<Arc<dyn GlideCache>>,
240+
/// Whether to enable server-assisted client tracking (CLIENT TRACKING ON BCAST)
241+
pub server_assisted_cache: bool,
240242
}
241243

242244
impl FromStr for ConnectionInfo {
@@ -396,6 +398,7 @@ fn url_to_tcp_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
396398
client_name: None,
397399
lib_name: None,
398400
cache: None,
401+
server_assisted_cache: false,
399402
},
400403
})
401404
}
@@ -430,6 +433,7 @@ fn url_to_unix_connection_info(url: url::Url) -> RedisResult<ConnectionInfo> {
430433
client_name: None,
431434
lib_name: None,
432435
cache: None,
436+
server_assisted_cache: false,
433437
},
434438
})
435439
}
@@ -1878,6 +1882,7 @@ mod tests {
18781882
client_name: None,
18791883
lib_name: None,
18801884
cache: None,
1885+
server_assisted_cache: false,
18811886
},
18821887
},
18831888
),

glide-core/src/client/mod.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,12 @@ pub async fn get_valkey_connection_info(
203203
)
204204
});
205205

206+
let server_assisted_cache = connection_request
207+
.client_side_cache
208+
.as_ref()
209+
.map(|c| c.server_assisted)
210+
.unwrap_or(false);
211+
206212
match &connection_request.authentication_info {
207213
Some(info) => {
208214
// If we have IAM configuration and a token manager, use the IAM token as password
@@ -222,6 +228,7 @@ pub async fn get_valkey_connection_info(
222228
client_name,
223229
lib_name,
224230
cache,
231+
server_assisted_cache,
225232
}
226233
} else {
227234
// Regular password-based authentication
@@ -233,6 +240,7 @@ pub async fn get_valkey_connection_info(
233240
client_name,
234241
lib_name,
235242
cache,
243+
server_assisted_cache,
236244
}
237245
}
238246
}
@@ -242,6 +250,7 @@ pub async fn get_valkey_connection_info(
242250
client_name,
243251
lib_name,
244252
cache,
253+
server_assisted_cache,
245254
..Default::default()
246255
},
247256
}
@@ -1797,6 +1806,7 @@ async fn create_cluster_client(
17971806
builder = builder.use_protocol(request.protocol.unwrap_or_default());
17981807
builder = builder.database_id(valkey_connection_info.db);
17991808
builder = builder.cache(valkey_connection_info.cache);
1809+
builder = builder.server_assisted_cache(valkey_connection_info.server_assisted_cache);
18001810
if let Some(client_name) = valkey_connection_info.client_name {
18011811
builder = builder.client_name(client_name);
18021812
}

glide-core/src/client/types.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub struct ClientSideCache {
7272
pub entry_ttl_ms: u64,
7373
pub eviction_policy: Option<EvictionPolicy>,
7474
pub enable_metrics: bool,
75+
pub server_assisted: bool,
7576
}
7677

7778
/// Authentication information for connecting to Redis/Valkey servers
@@ -367,6 +368,7 @@ impl From<protobuf::ConnectionRequest> for ConnectionRequest {
367368
protobuf::EvictionPolicy::LFU => EvictionPolicy::Lfu,
368369
}),
369370
enable_metrics: proto_cache.enable_metrics,
371+
server_assisted: proto_cache.server_assisted,
370372
});
371373

372374
// Convert protobuf compression config to internal compression config

glide-core/src/protobuf/connection_request.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,7 @@ message ClientSideCache {
9797
uint64 entry_ttl_ms = 3; // 0 = no expiration
9898
optional EvictionPolicy eviction_policy = 4;
9999
bool enable_metrics = 5;
100+
bool server_assisted = 6;
100101
}
101102

102103
enum EvictionPolicy {

0 commit comments

Comments
 (0)