Skip to content

Commit f3880d5

Browse files
bugfix: dont schedule indexing tasks on retiring indexers (#6427)
* Track ingester status in control-plane indexer pool * filter out retiring indexers when scheduling indexing tasks * make fix * Unit test to update the indexer_pool when changing indexer readiness status * Unit tests for select_available_indexers_for_scheduling + renaming * Fix merge conflicts with commit 710aa11
1 parent eb93859 commit f3880d5

5 files changed

Lines changed: 141 additions & 5 deletions

File tree

quickwit/quickwit-control-plane/src/control_plane.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1395,6 +1395,7 @@ mod tests {
13951395
client: indexer,
13961396
indexing_tasks: Vec::new(),
13971397
indexing_capacity: CpuCapacity::from_cpu_millis(1_000),
1398+
ingester_status: IngesterStatus::Ready,
13981399
};
13991400
indexer_pool.insert(self_node_id.clone(), indexer_info);
14001401

@@ -1824,6 +1825,7 @@ mod tests {
18241825
client,
18251826
indexing_tasks: Vec::new(),
18261827
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
1828+
ingester_status: IngesterStatus::Ready,
18271829
};
18281830
indexer_pool.insert(indexer_node_info.node_id.clone(), indexer_node_info);
18291831
let ingester_pool = IngesterPool::default();
@@ -1973,6 +1975,7 @@ mod tests {
19731975
client,
19741976
indexing_tasks: Vec::new(),
19751977
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
1978+
ingester_status: IngesterStatus::Ready,
19761979
};
19771980
indexer_pool.insert(indexer_node_info.node_id.clone(), indexer_node_info);
19781981
let ingester_pool = IngesterPool::default();
@@ -2051,6 +2054,7 @@ mod tests {
20512054
client,
20522055
indexing_tasks: Vec::new(),
20532056
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
2057+
ingester_status: IngesterStatus::Ready,
20542058
};
20552059
indexer_pool.insert(indexer_node_info.node_id.clone(), indexer_node_info);
20562060
let ingester_pool = IngesterPool::default();
@@ -2777,6 +2781,7 @@ mod tests {
27772781
client: indexer,
27782782
indexing_tasks: Vec::new(),
27792783
indexing_capacity: CpuCapacity::from_cpu_millis(1_000),
2784+
ingester_status: IngesterStatus::Ready,
27802785
};
27812786
indexer_pool.insert(ingester_id.clone(), indexer_info);
27822787

quickwit/quickwit-control-plane/src/indexing_scheduler/mod.rs

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ use quickwit_proto::indexing::{
3030
ApplyIndexingPlanRequest, CpuCapacity, IndexingService, IndexingTask, PIPELINE_FULL_CAPACITY,
3131
PIPELINE_THROUGHPUT,
3232
};
33+
use quickwit_proto::ingest::ingester::IngesterStatus;
3334
use quickwit_proto::types::NodeId;
3435
use scheduling::{SourceToSchedule, SourceToScheduleType};
3536
use serde::Serialize;
@@ -301,7 +302,7 @@ impl IndexingScheduler {
301302

302303
let sources = get_sources_to_schedule(model);
303304

304-
let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();
305+
let indexers: Vec<IndexerNodeInfo> = self.select_available_indexers_for_scheduling();
305306

306307
let indexer_id_to_cpu_capacities: FnvHashMap<String, CpuCapacity> = indexers
307308
.iter()
@@ -366,7 +367,7 @@ impl IndexingScheduler {
366367
{
367368
return;
368369
}
369-
let indexers: Vec<IndexerNodeInfo> = self.get_indexers_from_indexer_pool();
370+
let indexers: Vec<IndexerNodeInfo> = self.select_available_indexers_for_scheduling();
370371
let running_indexing_tasks_by_node_id: FnvHashMap<String, Vec<IndexingTask>> = indexers
371372
.iter()
372373
.map(|indexer| (indexer.node_id.to_string(), indexer.indexing_tasks.clone()))
@@ -386,8 +387,23 @@ impl IndexingScheduler {
386387
}
387388
}
388389

389-
fn get_indexers_from_indexer_pool(&self) -> Vec<IndexerNodeInfo> {
390-
self.indexer_pool.values()
390+
fn select_available_indexers_for_scheduling(&self) -> Vec<IndexerNodeInfo> {
391+
let (ready, not_ready): (Vec<IndexerNodeInfo>, Vec<IndexerNodeInfo>) = self
392+
.indexer_pool
393+
.values()
394+
.into_iter()
395+
.partition(|indexer| indexer.ingester_status == IngesterStatus::Ready);
396+
397+
if ready.is_empty() {
398+
// Allow scheduling on retiring indexers to drain shards
399+
// and avoid decommission timeouts (e.g. single-node cluster).
400+
warn!(
401+
"no ready indexer available, falling back to retiring indexers for shard draining"
402+
);
403+
not_ready
404+
} else {
405+
ready
406+
}
391407
}
392408

393409
fn apply_physical_indexing_plan(
@@ -1090,9 +1106,82 @@ mod tests {
10901106
}
10911107

10921108
use quickwit_config::SourceInputFormat;
1093-
use quickwit_proto::indexing::mcpu;
1109+
use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, MockIndexingService, mcpu};
10941110
use quickwit_proto::ingest::{Shard, ShardState};
10951111

1112+
fn mock_indexer_node_info(node_id: &str, status: IngesterStatus) -> IndexerNodeInfo {
1113+
let mock_indexer = MockIndexingService::new();
1114+
let client = IndexingServiceClient::from_mock(mock_indexer);
1115+
IndexerNodeInfo {
1116+
node_id: NodeId::from_str(node_id),
1117+
generation_id: 0,
1118+
client,
1119+
indexing_tasks: Vec::new(),
1120+
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
1121+
ingester_status: status,
1122+
}
1123+
}
1124+
1125+
#[test]
1126+
fn test_select_available_indexers_returns_only_ready_when_available() {
1127+
let indexer_pool = IndexerPool::default();
1128+
let ready_indexer = mock_indexer_node_info("indexer-ready-1", IngesterStatus::Ready);
1129+
let ready_indexer_2 = mock_indexer_node_info("indexer-ready-2", IngesterStatus::Ready);
1130+
let retiring_indexer = mock_indexer_node_info("indexer-retiring", IngesterStatus::Retiring);
1131+
indexer_pool.insert(ready_indexer.node_id.clone(), ready_indexer);
1132+
indexer_pool.insert(ready_indexer_2.node_id.clone(), ready_indexer_2);
1133+
indexer_pool.insert(retiring_indexer.node_id.clone(), retiring_indexer);
1134+
1135+
let scheduler = IndexingScheduler::new(
1136+
"test-cluster".to_string(),
1137+
NodeId::from_str("control-plane"),
1138+
indexer_pool,
1139+
);
1140+
let selected = scheduler.select_available_indexers_for_scheduling();
1141+
1142+
assert_eq!(selected.len(), 2);
1143+
assert!(
1144+
selected
1145+
.iter()
1146+
.all(|i| i.ingester_status == IngesterStatus::Ready)
1147+
);
1148+
}
1149+
1150+
#[test]
1151+
fn test_select_available_indexers_falls_back_to_retiring_when_no_ready() {
1152+
let indexer_pool = IndexerPool::default();
1153+
let retiring_1 = mock_indexer_node_info("indexer-retiring-1", IngesterStatus::Retiring);
1154+
let retiring_2 = mock_indexer_node_info("indexer-retiring-2", IngesterStatus::Retiring);
1155+
indexer_pool.insert(retiring_1.node_id.clone(), retiring_1);
1156+
indexer_pool.insert(retiring_2.node_id.clone(), retiring_2);
1157+
1158+
let scheduler = IndexingScheduler::new(
1159+
"test-cluster".to_string(),
1160+
NodeId::from_str("control-plane"),
1161+
indexer_pool,
1162+
);
1163+
let selected = scheduler.select_available_indexers_for_scheduling();
1164+
1165+
assert_eq!(selected.len(), 2);
1166+
assert!(
1167+
selected
1168+
.iter()
1169+
.all(|i| i.ingester_status == IngesterStatus::Retiring)
1170+
);
1171+
}
1172+
1173+
#[test]
1174+
fn test_select_available_indexers_returns_empty_when_pool_is_empty() {
1175+
let indexer_pool = IndexerPool::default();
1176+
let scheduler = IndexingScheduler::new(
1177+
"test-cluster".to_string(),
1178+
NodeId::from_str("control-plane"),
1179+
indexer_pool,
1180+
);
1181+
let selected = scheduler.select_available_indexers_for_scheduling();
1182+
assert!(selected.is_empty());
1183+
}
1184+
10961185
fn kafka_source_params_for_test() -> SourceParams {
10971186
SourceParams::Kafka(KafkaSourceParams {
10981187
topic: "topic".to_string(),

quickwit/quickwit-control-plane/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ pub(crate) mod model;
2121

2222
use quickwit_common::tower::Pool;
2323
use quickwit_proto::indexing::{CpuCapacity, IndexingServiceClient, IndexingTask};
24+
use quickwit_proto::ingest::ingester::IngesterStatus;
2425
use quickwit_proto::types::NodeId;
2526

2627
/// Indexer-node specific information stored in the pool of available indexer nodes
@@ -31,6 +32,7 @@ pub struct IndexerNodeInfo {
3132
pub client: IndexingServiceClient,
3233
pub indexing_tasks: Vec<IndexingTask>,
3334
pub indexing_capacity: CpuCapacity,
35+
pub ingester_status: IngesterStatus,
3436
}
3537

3638
pub type IndexerPool = Pool<NodeId, IndexerNodeInfo>;

quickwit/quickwit-control-plane/src/tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ pub fn test_indexer_change_stream(
8585
client,
8686
indexing_tasks,
8787
indexing_capacity: CpuCapacity::from_cpu_millis(4_000),
88+
ingester_status: node.ingester_status,
8889
},
8990
);
9091
Some(change)

quickwit/quickwit-serve/src/lib.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1266,6 +1266,17 @@ fn setup_indexer_pool(
12661266
);
12671267
Some(change)
12681268
}
1269+
ClusterChange::Update { previous, updated }
1270+
if updated.is_indexer()
1271+
&& previous.ingester_status != updated.ingester_status =>
1272+
{
1273+
let change = build_indexer_insert_change(
1274+
&updated,
1275+
indexing_service_clone_opt,
1276+
grpc_max_message_size,
1277+
);
1278+
Some(change)
1279+
}
12691280
ClusterChange::Remove(node) if node.is_indexer() => {
12701281
let change = build_indexer_remove_change(&node);
12711282
Some(change)
@@ -1300,6 +1311,7 @@ fn build_indexer_insert_change(
13001311
client,
13011312
indexing_tasks: node.indexing_tasks.to_vec(),
13021313
indexing_capacity: node.indexing_cpu_capacity,
1314+
ingester_status: node.ingester_status,
13031315
},
13041316
)
13051317
}
@@ -1654,6 +1666,33 @@ mod tests {
16541666

16551667
assert_eq!(indexer_pool.len(), 1);
16561668

1669+
// changing the ingester status of an indexer node refreshes the indexer pool
1670+
let updated_indexer_node = ClusterNode::for_test(
1671+
"test-indexer-node",
1672+
1,
1673+
true,
1674+
&["indexer"],
1675+
&[],
1676+
IngesterStatus::Retiring,
1677+
)
1678+
.await;
1679+
cluster_change_stream_tx
1680+
.send(ClusterChange::Update {
1681+
previous: new_indexer_node.clone(),
1682+
updated: updated_indexer_node.clone(),
1683+
})
1684+
.unwrap();
1685+
tokio::time::sleep(Duration::from_millis(1)).await;
1686+
1687+
assert_eq!(indexer_pool.len(), 1);
1688+
assert_eq!(
1689+
indexer_pool
1690+
.get(&NodeId::from_str("test-indexer-node"))
1691+
.expect("indexer node should be in the pool")
1692+
.ingester_status,
1693+
IngesterStatus::Retiring
1694+
);
1695+
16571696
// removing an indexer node refreshes the indexer pool
16581697
cluster_change_stream_tx
16591698
.send(ClusterChange::Remove(new_indexer_node))

0 commit comments

Comments
 (0)