diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index ecf5f707642..2b19ad497e2 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -35,7 +35,7 @@ use quickwit_proto::ingest::router::{ IngestFailureReason, IngestRequestV2, IngestResponseV2, IngestRouterService, }; use quickwit_proto::ingest::{CommitTypeV2, IngestV2Error, IngestV2Result, RateLimitingCause}; -use quickwit_proto::types::{NodeId, SubrequestId}; +use quickwit_proto::types::{IndexUid, NodeId, SourceId, SubrequestId}; use serde_json::{Value as JsonValue, json}; use tokio::sync::{Mutex, Semaphore}; use tokio::time::error::Elapsed; @@ -286,6 +286,7 @@ impl IngestRouter { match persist_result { Ok(persist_response) => { let leader_id = NodeId::from_str(&persist_response.leader_id); + let mut no_shards_entries: Vec<(IndexUid, SourceId)> = Vec::new(); for persist_success in persist_response.successes { workbench.record_persist_success(persist_success); @@ -295,9 +296,18 @@ impl IngestRouter { match persist_failure.reason() { PersistFailureReason::NoShardsAvailable => { - // For non-critical failures, we don't mark the nodes unavailable; - // a routing update is piggybacked on PersistResponses, so shard - // counts and capacity scores will be fresh on the next try. + // The ingester reported no shard for this (index_uid, + // source_id). The piggybacked routing update only covers + // sources the ingester still holds, so a removed source + // (e.g. after an index was deleted and recreated) would + // leave this node's entry stale and trap retries on a + // dead shard. Mark the node as having zero open shards + // for this entry; when no nodes remain, the next attempt + // re-queries the control plane. + no_shards_entries.push(( + persist_failure.index_uid().clone(), + persist_failure.source_id.clone(), + )); } PersistFailureReason::NodeUnavailable | PersistFailureReason::WalFull @@ -308,22 +318,35 @@ impl IngestRouter { } } - if let Some(routing_update) = persist_response.routing_update { + if !no_shards_entries.is_empty() || persist_response.routing_update.is_some() { // Since we just talked to the node, we take advantage and use the - // opportunity to get a fresh routing update. + // opportunity to get a fresh routing update. Both the zero-out and the + // piggybacked update run under the same lock so the rate-limited + // subcase of NoShardsAvailable — where the shard still exists — is + // immediately restored by the routing update that follows (the ingester + // only returns routing_update=None on the NodeUnavailable fast path). let mut state_guard = self.state.lock().await; - for shard_update in routing_update.source_shard_updates { - state_guard.routing_table.apply_capacity_update( - leader_id.clone(), - shard_update.index_uid().clone(), - shard_update.source_id, - routing_update.capacity_score as usize, - shard_update.open_shard_count as usize, - ); + + for (index_uid, source_id) in &no_shards_entries { + state_guard + .routing_table + .mark_node_no_shards(&leader_id, index_uid, source_id); } - drop(state_guard); - workbench.closed_shards.extend(routing_update.closed_shards); + if let Some(routing_update) = persist_response.routing_update { + for shard_update in routing_update.source_shard_updates { + state_guard.routing_table.apply_capacity_update( + leader_id.clone(), + shard_update.index_uid().clone(), + shard_update.source_id, + routing_update.capacity_score as usize, + shard_update.open_shard_count as usize, + ); + } + drop(state_guard); + + workbench.closed_shards.extend(routing_update.closed_shards); + } } } Err(persist_error) => { @@ -1860,4 +1883,190 @@ mod tests { .unwrap(); assert_eq!(node.node_id, NodeId::from_str("test-ingester-0")); } + + #[tokio::test] + async fn test_no_shards_available_clears_stale_routing_entry() { + // Regression test for https://github.com/quickwit-oss/quickwit/issues/6324. + // + // When an index is deleted and recreated, the ingester closes its old shards and + // stops advertising them. A persist request issued against the stale routing + // entry fails with NoShardsAvailable, and the piggybacked routing update no + // longer covers the removed source — so before the fix the router kept picking + // the dead entry forever and ingests returned 503 until Chitchat caught up. + let ingester_pool = IngesterPool::default(); + let router = IngestRouter::new( + "test-router".into(), + ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), + ingester_pool.clone(), + 1, + EventBroker::default(), + Some("test-az".to_string()), + ); + let stale_index_uid = IndexUid::for_test("test-index", 0); + { + let mut state_guard = router.state.lock().await; + state_guard.routing_table.merge_from_shards( + stale_index_uid.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(stale_index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), + ..Default::default() + }], + ); + } + ingester_pool.insert( + "test-ingester-0".into(), + IngesterPoolEntry::mocked_ingester(), + ); + + // Sanity-check: the stale entry is currently considered valid for routing. + { + let state_guard = router.state.lock().await; + let mut unavailable_leaders: HashSet = HashSet::new(); + assert!(state_guard.routing_table.has_any_routing_candidate( + "test-index", + "test-source", + &ingester_pool, + &mut unavailable_leaders, + )); + } + + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + + let persist_futures = FuturesUnordered::new(); + let stale_index_uid_clone = stale_index_uid.clone(); + persist_futures.push(async move { + let summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: Some(stale_index_uid_clone), + source_id: "test-source".to_string(), + reason: PersistFailureReason::NoShardsAvailable as i32, + }], + // The ingester has no shard for this source anymore, so the piggybacked + // routing update omits it entirely. + routing_update: Some(RoutingUpdate { + capacity_score: 6, + source_shard_updates: Vec::new(), + ..Default::default() + }), + }); + (summary, result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + + // The stale routing entry must no longer look routable — otherwise retries would + // keep hammering the dead shard and surface as a 503. + let state_guard = router.state.lock().await; + let mut unavailable_leaders: HashSet = HashSet::new(); + assert!(!state_guard.routing_table.has_any_routing_candidate( + "test-index", + "test-source", + &ingester_pool, + &mut unavailable_leaders, + )); + } + + #[tokio::test] + async fn test_no_shards_available_preserves_entry_when_routing_update_refreshes_it() { + // When a shard is merely rate-limited the ingester still returns NoShardsAvailable + // but includes a fresh routing update saying the shard is open. The update must + // take precedence so the node remains routable for the next retry. + let ingester_pool = IngesterPool::default(); + let router = IngestRouter::new( + "test-router".into(), + ControlPlaneServiceClient::from_mock(MockControlPlaneService::new()), + ingester_pool.clone(), + 1, + EventBroker::default(), + Some("test-az".to_string()), + ); + let index_uid = IndexUid::for_test("test-index", 0); + { + let mut state_guard = router.state.lock().await; + state_guard.routing_table.merge_from_shards( + index_uid.clone(), + "test-source".to_string(), + vec![Shard { + index_uid: Some(index_uid.clone()), + source_id: "test-source".to_string(), + shard_id: Some(ShardId::from(1)), + shard_state: ShardState::Open as i32, + leader_id: "test-ingester-0".to_string(), + ..Default::default() + }], + ); + } + ingester_pool.insert( + "test-ingester-0".into(), + IngesterPoolEntry::mocked_ingester(), + ); + + let ingest_subrequests = vec![IngestSubrequest { + subrequest_id: 0, + index_id: "test-index".to_string(), + source_id: "test-source".to_string(), + ..Default::default() + }]; + let mut workbench = IngestWorkbench::new(ingest_subrequests, 2); + + let persist_futures = FuturesUnordered::new(); + let index_uid_clone = index_uid.clone(); + persist_futures.push(async move { + let summary = PersistRequestSummary { + leader_id: "test-ingester-0".into(), + subrequest_ids: vec![0], + }; + let result = Ok::<_, IngestV2Error>(PersistResponse { + leader_id: "test-ingester-0".to_string(), + successes: Vec::new(), + failures: vec![PersistFailure { + subrequest_id: 0, + index_uid: Some(index_uid_clone.clone()), + source_id: "test-source".to_string(), + reason: PersistFailureReason::NoShardsAvailable as i32, + }], + routing_update: Some(RoutingUpdate { + capacity_score: 6, + source_shard_updates: vec![SourceShardUpdate { + index_uid: Some(index_uid_clone), + source_id: "test-source".to_string(), + open_shard_count: 1, + }], + ..Default::default() + }), + }); + (summary, result) + }); + router + .process_persist_results(&mut workbench, persist_futures) + .await; + + let state_guard = router.state.lock().await; + let mut unavailable_leaders: HashSet = HashSet::new(); + assert!(state_guard.routing_table.has_any_routing_candidate( + "test-index", + "test-source", + &ingester_pool, + &mut unavailable_leaders, + )); + } } diff --git a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs index 973de0e582a..0aed8b33663 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs @@ -273,6 +273,36 @@ impl RoutingTable { entry.nodes.insert(node_id, ingester_node); } + /// Zeros out the open shard count for `node_id` on the (index, source) entry while preserving + /// its capacity score. Called when a persist response reports that the ingester no longer + /// holds a shard for this (index_uid, source_id), so the entry stops being picked until a + /// fresh routing update or control-plane response repopulates it. + /// + /// Mirrors the incarnation handling of [`Self::apply_capacity_update`]: a stale signal + /// (entry newer than `index_uid`) is ignored, and a signal for a newer incarnation advances + /// the entry and clears stale nodes so the next attempt re-queries the control plane. + pub fn mark_node_no_shards(&mut self, node_id: &NodeId, index_uid: &IndexUid, source_id: &str) { + let key = (index_uid.index_id.to_string(), source_id.to_string()); + let Some(entry) = self.table.get_mut(&key) else { + return; + }; + match entry.index_uid.cmp(index_uid) { + // The entry is stale relative to the signal: advance it, drop stale nodes, and force + // a control-plane re-seed on the next attempt. + Ordering::Less => { + entry.index_uid = index_uid.clone(); + entry.nodes.clear(); + entry.seeded_from_cp = false; + } + // The signal is stale relative to the entry: leave the fresher entry alone. + Ordering::Greater => return, + Ordering::Equal => {} + } + if let Some(node) = entry.nodes.get_mut(node_id) { + node.open_shard_count = 0; + } + } + /// Merges routing updates from a GetOrCreateOpenShards control plane response into the /// table. For existing nodes, updates their open shard count, including if the count is 0, from /// the CP response while preserving capacity scores if they already exist. @@ -856,4 +886,73 @@ mod tests { assert!(!entry.nodes.contains_key("node-3")); assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 2)); } + + #[test] + fn test_mark_node_no_shards() { + let mut table = RoutingTable::default(); + let index_uid = IndexUid::for_test("test-index", 1); + let key = ("test-index".to_string(), "test-source".to_string()); + + // Missing entry: no-op, no panic, nothing inserted. + table.mark_node_no_shards(&"node-1".into(), &index_uid, "test-source"); + assert!(table.table.get(&key).is_none()); + + // Seed an entry with two nodes carrying real capacity scores. + table.apply_capacity_update( + "node-1".into(), + index_uid.clone(), + "test-source".into(), + 8, + 3, + ); + table.apply_capacity_update( + "node-2".into(), + index_uid.clone(), + "test-source".into(), + 6, + 2, + ); + + // Missing node within the entry: no-op. + table.mark_node_no_shards(&"unknown".into(), &index_uid, "test-source"); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.nodes.get("node-1").unwrap().open_shard_count, 3); + assert_eq!(entry.nodes.get("node-2").unwrap().open_shard_count, 2); + + // Matching incarnation: zero only the open shard count, capacity score is preserved. + table.mark_node_no_shards(&"node-1".into(), &index_uid, "test-source"); + let entry = table.table.get(&key).unwrap(); + let node_1 = entry.nodes.get("node-1").unwrap(); + assert_eq!(node_1.open_shard_count, 0); + assert_eq!(node_1.capacity_score, 8); + // Sibling node untouched. + let node_2 = entry.nodes.get("node-2").unwrap(); + assert_eq!(node_2.open_shard_count, 2); + assert_eq!(node_2.capacity_score, 6); + + // Older incarnation argument: no-op (must not roll the entry back). + let stale_index_uid = IndexUid::for_test("test-index", 0); + table.mark_node_no_shards(&"node-2".into(), &stale_index_uid, "test-source"); + assert_eq!( + table + .table + .get(&key) + .unwrap() + .nodes + .get("node-2") + .unwrap() + .open_shard_count, + 2 + ); + + // Newer incarnation argument: advance the entry, drop stale nodes, and force a CP + // re-seed (mirrors apply_capacity_update's Less arm). No node is inserted — the next + // CP query is responsible for repopulating the entry. + let newer_index_uid = IndexUid::for_test("test-index", 2); + table.mark_node_no_shards(&"node-2".into(), &newer_index_uid, "test-source"); + let entry = table.table.get(&key).unwrap(); + assert_eq!(entry.index_uid, newer_index_uid); + assert!(entry.nodes.is_empty()); + assert!(!entry.seeded_from_cp); + } }