Skip to content

Commit 4fc2476

Browse files
committed
fix(server): warn and skip non-numeric filenames in consumer offset directories instead of panicking
1 parent f5350d9 commit 4fc2476

3 files changed

Lines changed: 167 additions & 10 deletions

File tree

core/server/src/streaming/partitions/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,5 +30,7 @@ pub mod ops;
3030
mod ops_tests;
3131
pub mod segments;
3232
pub mod storage;
33+
#[cfg(test)]
34+
mod storage_tests;
3335

3436
pub const COMPONENT: &str = "STREAMING_PARTITIONS";

core/server/src/streaming/partitions/storage.rs

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ use compio::{
2828
use err_trail::ErrContext;
2929
use iggy_common::{ConsumerKind, IggyError};
3030
use std::{io::Read, path::Path, sync::atomic::AtomicU64};
31-
use tracing::{error, trace};
31+
use tracing::{error, trace, warn};
3232

3333
pub async fn create_partition_file_hierarchy(
3434
stream_id: usize,
@@ -181,9 +181,16 @@ pub fn load_consumer_offsets(path: &str) -> Result<Vec<ConsumerOffset>, IggyErro
181181
}
182182

183183
let name = dir_entry.file_name().into_string().unwrap();
184-
let consumer_id = name.parse::<u32>().unwrap_or_else(|_| {
185-
panic!("Invalid consumer ID file with name: '{}'.", name);
186-
});
184+
let consumer_id = match name.parse::<u32>() {
185+
Ok(id) => id,
186+
Err(_) => {
187+
warn!(
188+
"Unexpected non-numeric consumer offset file: '{}', skipping.",
189+
name
190+
);
191+
continue;
192+
}
193+
};
187194

188195
let path = dir_entry.path();
189196
let path = path.to_str();
@@ -244,12 +251,16 @@ pub fn load_consumer_group_offsets(
244251

245252
let name = dir_entry.file_name().into_string().unwrap();
246253

247-
let consumer_group_id = name.parse::<u32>().unwrap_or_else(|_| {
248-
panic!(
249-
"Invalid consumer group ID in consumer group file with name: '{}'.",
250-
name
251-
);
252-
});
254+
let consumer_group_id = match name.parse::<u32>() {
255+
Ok(id) => id,
256+
Err(_) => {
257+
warn!(
258+
"Unexpected non-numeric consumer group offset file: '{}', skipping.",
259+
name
260+
);
261+
continue;
262+
}
263+
};
253264
let consumer_group_id = ConsumerGroupId(consumer_group_id as usize);
254265

255266
let path = dir_entry.path();
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
// Licensed to the Apache Software Foundation (ASF) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The ASF licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
use crate::streaming::partitions::storage::{load_consumer_group_offsets, load_consumer_offsets};
19+
use iggy_common::{ConsumerKind, IggyError};
20+
use std::path::Path;
21+
use std::sync::atomic::Ordering;
22+
23+
fn write_offset_file(dir: &Path, name: &str, offset: u64) {
24+
std::fs::write(dir.join(name), offset.to_le_bytes()).unwrap();
25+
}
26+
27+
#[test]
28+
fn load_consumer_offsets_valid_files() {
29+
let dir = tempfile::tempdir().unwrap();
30+
write_offset_file(dir.path(), "1", 100);
31+
write_offset_file(dir.path(), "2", 200);
32+
write_offset_file(dir.path(), "3", 300);
33+
34+
let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
35+
36+
assert_eq!(offsets.len(), 3);
37+
assert_eq!(offsets[0].consumer_id, 1);
38+
assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 100);
39+
assert_eq!(offsets[0].kind, ConsumerKind::Consumer);
40+
assert_eq!(offsets[1].consumer_id, 2);
41+
assert_eq!(offsets[1].offset.load(Ordering::Relaxed), 200);
42+
assert_eq!(offsets[2].consumer_id, 3);
43+
assert_eq!(offsets[2].offset.load(Ordering::Relaxed), 300);
44+
}
45+
46+
#[test]
47+
fn load_consumer_offsets_skips_non_numeric_files() {
48+
let dir = tempfile::tempdir().unwrap();
49+
write_offset_file(dir.path(), ".DS_Store", 0);
50+
write_offset_file(dir.path(), "backup.bak", 0);
51+
write_offset_file(dir.path(), "1", 42);
52+
53+
let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
54+
55+
assert_eq!(offsets.len(), 1);
56+
assert_eq!(offsets[0].consumer_id, 1);
57+
assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 42);
58+
}
59+
60+
#[test]
61+
fn load_consumer_offsets_empty_dir() {
62+
let dir = tempfile::tempdir().unwrap();
63+
64+
let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
65+
66+
assert!(offsets.is_empty());
67+
}
68+
69+
#[test]
70+
fn load_consumer_offsets_skips_directories() {
71+
let dir = tempfile::tempdir().unwrap();
72+
std::fs::create_dir(dir.path().join("123")).unwrap();
73+
write_offset_file(dir.path(), "1", 77);
74+
75+
let offsets = load_consumer_offsets(dir.path().to_str().unwrap()).unwrap();
76+
77+
assert_eq!(offsets.len(), 1);
78+
assert_eq!(offsets[0].consumer_id, 1);
79+
assert_eq!(offsets[0].offset.load(Ordering::Relaxed), 77);
80+
}
81+
82+
#[test]
83+
fn load_consumer_offsets_nonexistent_dir() {
84+
let result = load_consumer_offsets("/tmp/nonexistent_iggy_test_dir_12345");
85+
86+
assert!(result.is_err());
87+
assert!(matches!(
88+
result.unwrap_err(),
89+
IggyError::CannotReadConsumerOffsets(_)
90+
));
91+
}
92+
93+
#[test]
94+
fn load_consumer_group_offsets_valid_files() {
95+
let dir = tempfile::tempdir().unwrap();
96+
write_offset_file(dir.path(), "1", 500);
97+
write_offset_file(dir.path(), "2", 600);
98+
99+
let offsets = load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
100+
101+
assert_eq!(offsets.len(), 2);
102+
for (group_id, offset) in &offsets {
103+
assert_eq!(offset.kind, ConsumerKind::ConsumerGroup);
104+
assert_eq!(offset.consumer_id, group_id.0 as u32);
105+
}
106+
let ids: Vec<u32> = offsets.iter().map(|(_, co)| co.consumer_id).collect();
107+
assert!(ids.contains(&1));
108+
assert!(ids.contains(&2));
109+
}
110+
111+
#[test]
112+
fn load_consumer_group_offsets_skips_non_numeric_files() {
113+
let dir = tempfile::tempdir().unwrap();
114+
write_offset_file(dir.path(), ".DS_Store", 0);
115+
write_offset_file(dir.path(), "notes.txt", 0);
116+
write_offset_file(dir.path(), "5", 999);
117+
118+
let offsets = load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
119+
120+
assert_eq!(offsets.len(), 1);
121+
assert_eq!(offsets[0].0.0, 5);
122+
assert_eq!(offsets[0].1.consumer_id, 5);
123+
assert_eq!(offsets[0].1.offset.load(Ordering::Relaxed), 999);
124+
}
125+
126+
#[test]
127+
fn load_consumer_group_offsets_empty_dir() {
128+
let dir = tempfile::tempdir().unwrap();
129+
130+
let offsets = load_consumer_group_offsets(dir.path().to_str().unwrap()).unwrap();
131+
132+
assert!(offsets.is_empty());
133+
}
134+
135+
#[test]
136+
fn load_consumer_group_offsets_nonexistent_dir() {
137+
let result = load_consumer_group_offsets("/tmp/nonexistent_iggy_test_dir_12345");
138+
139+
assert!(result.is_err());
140+
assert!(matches!(
141+
result.unwrap_err(),
142+
IggyError::CannotReadConsumerOffsets(_)
143+
));
144+
}

0 commit comments

Comments
 (0)