@@ -135,7 +135,7 @@ impl Chitchat {
135135 let scheduled_for_deletion: HashSet < _ > =
136136 self . scheduled_for_deletion_nodes ( ) . collect ( ) ;
137137 let self_digest = self . compute_digest ( & scheduled_for_deletion) ;
138- let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - digest . serialized_len ( ) ;
138+ let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - self_digest . serialized_len ( ) ;
139139 let delta = self . cluster_state . compute_partial_delta_respecting_mtu (
140140 & digest,
141141 delta_mtu,
@@ -474,6 +474,8 @@ mod tests {
474474 use std:: sync:: atomic:: { AtomicUsize , Ordering } ;
475475 use std:: time:: Duration ;
476476
477+ use rand:: Rng ;
478+ use rand:: distr:: Alphanumeric ;
477479 use tokio:: sync:: Mutex ;
478480 use tokio:: time;
479481 use tokio_stream:: StreamExt ;
@@ -1339,4 +1341,66 @@ mod tests {
13391341 ) ;
13401342 assert ! ( node. cluster_state. node_state( & chitchat_id) . is_none( ) ) ;
13411343 }
1344+
1345+ // There was a bug in process_message:
1346+ // When node_states is large and the node receives a SYN with an empty digest,
1347+ // the MTU delta was incorrectly computed based on the received empty digest,
1348+ // whereas it should have used self_digest instead.
1349+ //
1350+ #[ tokio:: test]
1351+ async fn test_process_syn ( ) {
1352+ // Prepare node
1353+ let config = ChitchatConfig :: for_test ( 10_006 ) ;
1354+
1355+ fn id ( i : usize ) -> ChitchatId {
1356+ ChitchatId {
1357+ node_id : "a" . to_string ( ) . repeat ( 1000 ) ,
1358+ generation_id : i as u64 ,
1359+ gossip_advertise_addr : SocketAddr :: from ( ( [ 127 , 0 , 0 , 1 ] , 10000u16 + i as u16 ) ) ,
1360+ }
1361+ }
1362+
1363+ fn random_string ( len : usize ) -> String {
1364+ rand:: rng ( )
1365+ . sample_iter ( & Alphanumeric )
1366+ . take ( len)
1367+ . map ( char:: from)
1368+ . collect ( )
1369+ }
1370+
1371+ let ( _seed_addrs_rx, seed_addrs_tx) = watch:: channel ( Default :: default ( ) ) ;
1372+
1373+ let mut node = Chitchat :: with_chitchat_id_and_seeds ( config, seed_addrs_tx, Vec :: new ( ) ) ;
1374+
1375+ // Add node states that form the digest with a serialized size close to the maximum MTU.
1376+
1377+ let mut digest = Digest :: default ( ) ;
1378+ let mut delta = Delta :: default ( ) ;
1379+ for i in 0 ..55 {
1380+ digest. add_node ( id ( i) , Heartbeat ( 1 ) , 0 , 0 ) ;
1381+ delta. add_node ( id ( i) , 0 , 0 ) ;
1382+ delta. add_kv ( & id ( i) , "key" , & random_string ( 1000 ) , 1 , false ) ;
1383+ }
1384+ node. report_heartbeats_in_digest ( & digest) ;
1385+ node. process_delta ( delta) ;
1386+
1387+ // Process a SYN message with an empty foreign digest
1388+
1389+ let ack = node
1390+ . process_message ( ChitchatMessage :: Syn {
1391+ cluster_id : node. config . cluster_id . clone ( ) ,
1392+ digest : Digest :: default ( ) ,
1393+ } )
1394+ . unwrap ( ) ;
1395+
1396+ // Verify that the serialized reply fits within the max MTU.
1397+
1398+ let mut buf = Vec :: new ( ) ;
1399+ ack. serialize ( & mut buf) ;
1400+ assert ! ( buf. len( ) < MAX_UDP_DATAGRAM_PAYLOAD_SIZE ) ;
1401+ let ChitchatMessage :: SynAck { delta, .. } = ack else {
1402+ panic ! ( "Expected SynAck, got {:?}" , ack) ;
1403+ } ;
1404+ assert_eq ! ( delta. node_deltas. len( ) , 4 ) ;
1405+ }
13421406}
0 commit comments