Skip to content

Commit 1b99d65

Browse files
committed
fix(exchange): validate inbound sender parallelism
Route inbound sender creation through the same channel-set guard used by pre-registration so mismatched exchange parallelism fails explicitly. Also add QueryCoordinator unit tests covering inbound channel set registration and mismatch detection.
1 parent 8eb25f8 commit 1b99d65

1 file changed

Lines changed: 73 additions & 5 deletions

File tree

src/query/service/src/servers/flight/v1/exchange/exchange_manager.rs

Lines changed: 73 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,11 +1115,7 @@ impl QueryCoordinator {
11151115
channel_id: &str,
11161116
num_threads: usize,
11171117
) -> Result<NetworkInboundSender> {
1118-
let channel_set = self
1119-
.inbound_channel_sets
1120-
.entry(channel_id.to_string())
1121-
.or_insert_with(|| Arc::new(NetworkInboundChannelSet::new(num_threads)))
1122-
.clone();
1118+
let channel_set = self.get_or_create_inbound_channel_set(channel_id, num_threads)?;
11231119

11241120
// TODO: get max_bytes_per_connection from query settings
11251121
let max_bytes_per_connection = 20 * 1024 * 1024; // 20MB
@@ -1458,3 +1454,75 @@ impl FragmentCoordinator {
14581454
Ok(())
14591455
}
14601456
}
1457+
1458+
#[cfg(test)]
1459+
mod tests {
1460+
use std::collections::HashMap;
1461+
1462+
use databend_common_exception::Result;
1463+
1464+
use super::QueryCoordinator;
1465+
1466+
#[test]
1467+
fn test_query_coordinator_register_inbound_channel_sets() -> Result<()> {
1468+
let mut coordinator = QueryCoordinator::create();
1469+
let channel_sizes = HashMap::from([
1470+
("exchange-a".to_string(), 2_usize),
1471+
("exchange-b".to_string(), 4_usize),
1472+
]);
1473+
1474+
coordinator.register_inbound_channel_sets(&channel_sizes)?;
1475+
1476+
assert_eq!(
1477+
coordinator
1478+
.get_or_create_inbound_channel_set("exchange-a", 2)?
1479+
.channels
1480+
.len(),
1481+
2
1482+
);
1483+
assert_eq!(
1484+
coordinator
1485+
.get_or_create_inbound_channel_set("exchange-b", 4)?
1486+
.channels
1487+
.len(),
1488+
4
1489+
);
1490+
1491+
Ok(())
1492+
}
1493+
1494+
#[test]
1495+
fn test_query_coordinator_detects_mismatched_inbound_parallelism() -> Result<()> {
1496+
let mut coordinator = QueryCoordinator::create();
1497+
coordinator.get_or_create_inbound_channel_set("exchange-a", 2)?;
1498+
1499+
let err = coordinator
1500+
.get_or_create_inbound_channel_set("exchange-a", 1)
1501+
.err()
1502+
.expect("mismatched inbound channel set should fail");
1503+
assert!(
1504+
err.message()
1505+
.contains("Mismatched inbound channel set parallelism")
1506+
);
1507+
1508+
Ok(())
1509+
}
1510+
1511+
#[test]
1512+
fn test_query_coordinator_detects_mismatched_inbound_sender_parallelism() -> Result<()> {
1513+
let mut coordinator = QueryCoordinator::create();
1514+
coordinator
1515+
.register_inbound_channel_sets(&HashMap::from([("exchange-a".to_string(), 2)]))?;
1516+
1517+
let err = coordinator
1518+
.create_inbound_sender("exchange-a", 1)
1519+
.err()
1520+
.expect("mismatched inbound sender should fail");
1521+
assert!(
1522+
err.message()
1523+
.contains("Mismatched inbound channel set parallelism")
1524+
);
1525+
1526+
Ok(())
1527+
}
1528+
}

0 commit comments

Comments
 (0)