Skip to content

Commit 2ea428b

Browse files
authored
Handle correctly subscription messages (TraceMachina#2201)
1 parent c7109f6 commit 2ea428b

File tree

2 files changed

+64
-8
lines changed

2 files changed

+64
-8
lines changed

nativelink-store/src/redis_store.rs

Lines changed: 16 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,7 @@ where
337337
/// A manager for subscriptions to keys in Redis.
338338
subscription_manager: tokio::sync::OnceCell<Arc<RedisSubscriptionManager>>,
339339

340-
/// Channel for getting subscription messages. Only used by cluster mode where
341-
/// the sender is connected at construction time. For standard mode, this is
342-
/// None and created on demand in `subscription_manager()`.
340+
/// Channel for getting subscription messages
343341
subscriber_channel: Mutex<Option<UnboundedReceiver<PushInfo>>>,
344342

345343
/// Permits to limit inflight Redis requests. Technically only
@@ -1366,11 +1364,22 @@ impl RedisSubscriptionManager {
13661364
},
13671365
maybe_push_info = local_subscriber_channel.next() => {
13681366
if let Some(push_info) = maybe_push_info {
1369-
if push_info.data.len() != 1 {
1370-
error!(?push_info, "Expected exactly one message on subscriber_channel");
1367+
match push_info.kind {
1368+
redis::PushKind::PMessage => {},
1369+
redis::PushKind::PSubscribe => {
1370+
trace!(?push_info, "PSubscribe, ignore");
1371+
continue;
1372+
}
1373+
_ => {
1374+
warn!(?push_info, "Other push_info message, discarded");
1375+
continue;
1376+
},
1377+
}
1378+
if push_info.data.len() != 3 {
1379+
error!(?push_info, "Expected exactly 3 values on subscriber channel (pattern, channel, value)");
13711380
continue;
13721381
}
1373-
match push_info.data.first().unwrap() {
1382+
match push_info.data.last().unwrap() {
13741383
Value::SimpleString(s) => {
13751384
s.clone()
13761385
}
@@ -1388,6 +1397,7 @@ impl RedisSubscriptionManager {
13881397
}
13891398
}
13901399
};
1400+
trace!(key, "New subscription manager key");
13911401
let Some(subscribed_keys) = subscribed_keys_weak.upgrade() else {
13921402
warn!(
13931403
"It appears our parent has been dropped, exiting RedisSubscriptionManager spawn"

nativelink-store/tests/redis_store_test.rs

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,9 @@ use nativelink_util::store_trait::{
4040
StoreLike, TrueValue, UploadSizeInfo,
4141
};
4242
use pretty_assertions::assert_eq;
43-
use redis::{RedisError, Value};
43+
use redis::{PushInfo, RedisError, Value};
4444
use redis_test::{MockCmd, MockRedisConnection};
45-
use tokio::time::sleep;
45+
use tokio::time::{sleep, timeout};
4646
use tracing::{Instrument, info, info_span};
4747

4848
const VALID_HASH1: &str = "3031323334353637383961626364656630303030303030303030303030303030";
@@ -1338,3 +1338,49 @@ async fn no_items_from_none_subscription_channel() -> Result<(), Error> {
13381338

13391339
Ok(())
13401340
}
1341+
1342+
#[nativelink_test]
1343+
async fn send_messages_to_subscription_channel() -> Result<(), Error> {
1344+
let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
1345+
let subscription_manager = RedisSubscriptionManager::new(rx);
1346+
1347+
tx.send(PushInfo {
1348+
kind: redis::PushKind::PSubscribe,
1349+
data: vec![
1350+
// Pattern
1351+
Value::BulkString("scheduler_key_change".into()),
1352+
// Subscribe count
1353+
Value::Int(1),
1354+
],
1355+
})
1356+
.unwrap();
1357+
tx.send(PushInfo {
1358+
kind: redis::PushKind::PMessage,
1359+
data: vec![
1360+
// First is the pattern
1361+
Value::BulkString("scheduler_key_change".into()),
1362+
// Second is the matching channel. Which in this case is the same as the pattern.
1363+
Value::BulkString("scheduler_key_change".into()),
1364+
// And then the actual message
1365+
Value::BulkString("demo-key".into()),
1366+
],
1367+
})
1368+
.unwrap();
1369+
1370+
timeout(Duration::from_secs(5), async {
1371+
loop {
1372+
assert!(!logs_contain("ERROR"));
1373+
if logs_contain("New subscription manager key key=\"demo-key\"") {
1374+
break;
1375+
}
1376+
sleep(Duration::from_millis(100)).await;
1377+
}
1378+
})
1379+
.await
1380+
.unwrap();
1381+
1382+
// Because otherwise it gets dropped immediately, and we need it to live to do things
1383+
drop(subscription_manager);
1384+
1385+
Ok(())
1386+
}

0 commit comments

Comments
 (0)