Skip to content

Commit e20f06c

Browse files
authored
Fix index reincarnation routing bug (#6217)
* Fix index reincarnation bug * Use chitchat TTL instead of manual deletion * fix tests * PR comments * fix vec issue
1 parent d94d37a commit e20f06c

9 files changed

Lines changed: 279 additions & 31 deletions

File tree

quickwit/Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-ingest/src/ingest_v2/broadcast/capacity_score.rs

Lines changed: 52 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,20 @@ impl BroadcastIngesterCapacityScoreTask {
125125
current_sources.insert(source_uid);
126126
}
127127

128+
// When a source disappears (e.g. index deleted), broadcast open_shard_count=0 with a TTL
129+
// so Chitchat auto-cleans the key. This is the only way routers learn to clear stale
130+
// routing entries — Chitchat key removal doesn't reliably fire subscribers.
128131
for removed_source in previous_sources.difference(&current_sources) {
129132
let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, removed_source);
130-
self.cluster.remove_self_key(&key).await;
133+
let capacity = IngesterCapacityScore {
134+
capacity_score,
135+
open_shard_count: 0,
136+
};
137+
let value = serde_json::to_string(&capacity)
138+
.expect("`IngesterCapacityScore` should be JSON serializable");
139+
self.cluster
140+
.set_self_key_value_delete_after_ttl(key, value)
141+
.await;
131142
}
132143

133144
current_sources
@@ -264,4 +275,44 @@ mod tests {
264275
assert_eq!(deserialized.capacity_score, 6);
265276
assert_eq!(deserialized.open_shard_count, 1);
266277
}
278+
279+
#[tokio::test]
280+
async fn test_removed_source_broadcasts_zero_with_ttl() {
281+
let transport = ChannelTransport::default();
282+
let cluster = create_cluster_for_test(Vec::new(), &["indexer"], &transport, true)
283+
.await
284+
.unwrap();
285+
286+
let (_temp_dir, state) = IngesterState::for_test(cluster.clone()).await;
287+
let task = BroadcastIngesterCapacityScoreTask {
288+
cluster: cluster.clone(),
289+
weak_state: state.weak(),
290+
};
291+
292+
let index_uid = IndexUid::for_test("test-index", 0);
293+
let source_uid = SourceUid {
294+
index_uid: index_uid.clone(),
295+
source_id: SourceId::from("test-source"),
296+
};
297+
let key = make_key(INGESTER_CAPACITY_SCORE_PREFIX, &source_uid);
298+
299+
// Cycle 1: source is alive.
300+
let open_counts: Vec<(IndexUid, SourceId, usize)> =
301+
vec![(index_uid.clone(), "test-source".into(), 3)];
302+
let current = task
303+
.broadcast_capacity(7, &open_counts, &BTreeSet::new())
304+
.await;
305+
306+
let value = cluster.get_self_key_value(&key).await.unwrap();
307+
let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap();
308+
assert_eq!(parsed.open_shard_count, 3);
309+
310+
// Cycle 2: source disappears. Broadcasts 0 with TTL, key still exists.
311+
let _current = task.broadcast_capacity(7, &vec![], &current).await;
312+
313+
let value = cluster.get_self_key_value(&key).await.unwrap();
314+
let parsed: IngesterCapacityScore = serde_json::from_str(&value).unwrap();
315+
assert_eq!(parsed.capacity_score, 7);
316+
assert_eq!(parsed.open_shard_count, 0);
317+
}
267318
}

quickwit/quickwit-ingest/src/ingest_v2/broadcast/mod.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ use std::time::Duration;
2020

2121
use quickwit_proto::types::SourceUid;
2222

23-
pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration = if cfg!(test) {
24-
Duration::from_millis(50)
25-
} else {
26-
Duration::from_secs(5)
27-
};
23+
pub(in crate::ingest_v2) const BROADCAST_INTERVAL_PERIOD: Duration =
24+
if cfg!(any(test, feature = "testsuite")) {
25+
Duration::from_millis(50)
26+
} else {
27+
Duration::from_secs(5)
28+
};
2829

2930
pub use capacity_score::{
3031
BroadcastIngesterCapacityScoreTask, IngesterCapacityScoreUpdate,

quickwit/quickwit-ingest/src/ingest_v2/routing_table.rs

Lines changed: 107 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::cmp::Ordering;
1516
use std::collections::{HashMap, HashSet};
1617

1718
use itertools::Itertools;
@@ -29,17 +30,16 @@ use crate::IngesterPool;
2930
pub(super) struct IngesterNode {
3031
pub node_id: NodeId,
3132
pub index_uid: IndexUid,
32-
#[allow(unused)]
33-
pub source_id: SourceId,
3433
/// Score from 0-10. Higher means more available capacity.
3534
pub capacity_score: usize,
3635
/// Number of open shards on this node for this (index, source) pair. Tiebreaker for power of
3736
/// two choices comparison - we favor a node with more open shards.
3837
pub open_shard_count: usize,
3938
}
4039

41-
#[derive(Debug, Default)]
40+
#[derive(Debug)]
4241
pub(super) struct RoutingEntry {
42+
pub index_uid: IndexUid,
4343
pub nodes: HashMap<NodeId, IngesterNode>,
4444
/// Whether this entry has been seeded from a control plane response. During a rolling
4545
/// deployment, Chitchat broadcasts from already-upgraded nodes may populate the table
@@ -48,6 +48,16 @@ pub(super) struct RoutingEntry {
4848
seeded_from_cp: bool,
4949
}
5050

51+
impl RoutingEntry {
52+
fn new(index_uid: IndexUid) -> Self {
53+
Self {
54+
index_uid,
55+
nodes: HashMap::new(),
56+
seeded_from_cp: false,
57+
}
58+
}
59+
}
60+
5161
/// Given a slice of candidates, picks the better of two random choices.
5262
/// Higher capacity_score wins; tiebreak on more open_shard_count (more landing spots).
5363
fn power_of_two_choices<'a>(candidates: &[&'a IngesterNode]) -> &'a IngesterNode {
@@ -210,13 +220,26 @@ impl RoutingTable {
210220
capacity_score: usize,
211221
open_shard_count: usize,
212222
) {
213-
let key = (index_uid.index_id.to_string(), source_id.clone());
214-
215-
let entry = self.table.entry(key).or_default();
223+
let key = (index_uid.index_id.to_string(), source_id);
224+
225+
let entry = self
226+
.table
227+
.entry(key)
228+
.or_insert_with(|| RoutingEntry::new(index_uid.clone()));
229+
match entry.index_uid.cmp(&index_uid) {
230+
// If we receive an update for a new incarnation of the index, then we clear the entry.
231+
Ordering::Less => {
232+
entry.index_uid = index_uid.clone();
233+
entry.nodes.clear();
234+
entry.seeded_from_cp = false;
235+
}
236+
// If we receive an update for a previous incarnation of the index, then we ignore it.
237+
Ordering::Greater => return,
238+
Ordering::Equal => {}
239+
}
216240
let ingester_node = IngesterNode {
217241
node_id: node_id.clone(),
218242
index_uid,
219-
source_id,
220243
capacity_score,
221244
open_shard_count,
222245
};
@@ -233,6 +256,22 @@ impl RoutingTable {
233256
source_id: SourceId,
234257
shards: Vec<Shard>,
235258
) {
259+
let key = (index_uid.index_id.to_string(), source_id);
260+
let entry = self
261+
.table
262+
.entry(key)
263+
.or_insert_with(|| RoutingEntry::new(index_uid.clone()));
264+
match entry.index_uid.cmp(&index_uid) {
265+
// If we receive an update for a new incarnation of the index, then we clear the entry.
266+
Ordering::Less => {
267+
entry.index_uid = index_uid.clone();
268+
entry.nodes.clear();
269+
}
270+
// If we receive an update for a previous incarnation of the index, then we ignore it.
271+
Ordering::Greater => return,
272+
Ordering::Equal => {}
273+
}
274+
236275
let per_leader_count: HashMap<NodeId, usize> = shards
237276
.iter()
238277
.map(|shard| {
@@ -243,9 +282,6 @@ impl RoutingTable {
243282
.into_grouping_map()
244283
.sum();
245284

246-
let key = (index_uid.index_id.to_string(), source_id.clone());
247-
let entry = self.table.entry(key).or_default();
248-
249285
for (node_id, open_shard_count) in per_leader_count {
250286
entry
251287
.nodes
@@ -254,7 +290,6 @@ impl RoutingTable {
254290
.or_insert_with(|| IngesterNode {
255291
node_id,
256292
index_uid: index_uid.clone(),
257-
source_id: source_id.clone(),
258293
capacity_score: 5,
259294
open_shard_count,
260295
});
@@ -381,7 +416,7 @@ mod tests {
381416
// Node with capacity_score=0 is not eligible.
382417
table.apply_capacity_update(
383418
"node-2".into(),
384-
IndexUid::for_test("test-index", 0),
419+
index_uid.clone(),
385420
"test-source".into(),
386421
0,
387422
2,
@@ -511,21 +546,18 @@ mod tests {
511546
let high = IngesterNode {
512547
node_id: "high".into(),
513548
index_uid: IndexUid::for_test("idx", 0),
514-
source_id: "src".into(),
515549
capacity_score: 9,
516550
open_shard_count: 2,
517551
};
518552
let mid = IngesterNode {
519553
node_id: "mid".into(),
520554
index_uid: IndexUid::for_test("idx", 0),
521-
source_id: "src".into(),
522555
capacity_score: 5,
523556
open_shard_count: 2,
524557
};
525558
let low = IngesterNode {
526559
node_id: "low".into(),
527560
index_uid: IndexUid::for_test("idx", 0),
528-
source_id: "src".into(),
529561
capacity_score: 1,
530562
open_shard_count: 2,
531563
};
@@ -621,4 +653,64 @@ mod tests {
621653
"az_unaware"
622654
);
623655
}
656+
657+
#[test]
658+
fn test_incarnation_check_clears_stale_nodes() {
659+
let mut table = RoutingTable::default();
660+
let key = ("test-index".to_string(), "test-source".to_string());
661+
662+
// Populate with incarnation 0: two nodes.
663+
table.apply_capacity_update(
664+
"node-1".into(),
665+
IndexUid::for_test("test-index", 0),
666+
"test-source".into(),
667+
8,
668+
3,
669+
);
670+
table.apply_capacity_update(
671+
"node-2".into(),
672+
IndexUid::for_test("test-index", 0),
673+
"test-source".into(),
674+
6,
675+
2,
676+
);
677+
let entry = table.table.get(&key).unwrap();
678+
assert_eq!(entry.nodes.len(), 2);
679+
assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 0));
680+
681+
// Capacity update with incarnation 1 clears stale nodes.
682+
table.apply_capacity_update(
683+
"node-3".into(),
684+
IndexUid::for_test("test-index", 1),
685+
"test-source".into(),
686+
5,
687+
1,
688+
);
689+
let entry = table.table.get(&key).unwrap();
690+
assert_eq!(entry.nodes.len(), 1);
691+
assert!(entry.nodes.contains_key("node-3"));
692+
assert!(!entry.nodes.contains_key("node-1"));
693+
assert!(!entry.nodes.contains_key("node-2"));
694+
assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 1));
695+
696+
// merge_from_shards with incarnation 2 clears stale nodes.
697+
let shards = vec![Shard {
698+
index_uid: Some(IndexUid::for_test("test-index", 2)),
699+
source_id: "test-source".to_string(),
700+
shard_id: Some(ShardId::from(1u64)),
701+
shard_state: ShardState::Open as i32,
702+
leader_id: "node-4".to_string(),
703+
..Default::default()
704+
}];
705+
table.merge_from_shards(
706+
IndexUid::for_test("test-index", 2),
707+
"test-source".into(),
708+
shards,
709+
);
710+
let entry = table.table.get(&key).unwrap();
711+
assert_eq!(entry.nodes.len(), 1);
712+
assert!(entry.nodes.contains_key("node-4"));
713+
assert!(!entry.nodes.contains_key("node-3"));
714+
assert_eq!(entry.index_uid, IndexUid::for_test("test-index", 2));
715+
}
624716
}

quickwit/quickwit-integration-tests/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ quickwit-cli = { workspace = true }
4040
quickwit-common = { workspace = true, features = ["testsuite"] }
4141
quickwit-config = { workspace = true, features = ["testsuite"] }
4242
quickwit-indexing = { workspace = true, features = ["testsuite"] }
43+
quickwit-ingest = { workspace = true, features = ["testsuite"] }
4344
quickwit-metastore = { workspace = true, features = ["testsuite"] }
4445
quickwit-opentelemetry = { workspace = true, features = ["testsuite"] }
4546
quickwit-proto = { workspace = true, features = ["testsuite"] }

0 commit comments

Comments
 (0)