Skip to content

Commit 565294b

Browse files
rust(bug): Fix recursive/nested hdf5 imports (#579)
Co-authored-by: Brandon Shippy <brandon.shippy@siftstack.com>
1 parent 065ddf1 commit 565294b

6 files changed

Lines changed: 343 additions & 55 deletions

File tree

rust/crates/sift_cli/src/cli/mod.rs

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -450,11 +450,26 @@ pub struct ImportHdf5Args {
450450
pub relative_start_time: Option<String>,
451451

452452
/// (two-d / compound) Index of the time column or field. Defaults to 0.
453-
/// Mutually exclusive with --time-field.
454-
#[arg(long, conflicts_with = "time_field")]
453+
/// Mutually exclusive with --time-field and --time-name.
454+
#[arg(
455+
long,
456+
conflicts_with_all = ["time_field", "time_name"],
457+
help_heading = "Two-d schema options",
458+
)]
455459
pub time_index: Option<u64>,
456460

457-
/// (compound) Name of the time field. Mutually exclusive with --time-index.
458-
#[arg(long)]
461+
/// (compound) Name of the time field. Mutually exclusive with --time-index
462+
/// and --time-name.
463+
#[arg(
464+
long,
465+
conflicts_with = "time_name",
466+
help_heading = "Compound schema options"
467+
)]
459468
pub time_field: Option<String>,
469+
470+
/// (one-d) Leaf name of the time dataset when it doesn't match the default
471+
/// auto-detected names (time, timestamp, timestamps, ts). Mutually exclusive
472+
/// with --time-index and --time-field.
473+
#[arg(long, help_heading = "One-d schema options")]
474+
pub time_name: Option<String>,
460475
}

rust/crates/sift_cli/src/cmd/import/hdf5/detect_hdf5_schema.rs

Lines changed: 164 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,55 @@
1+
use std::collections::HashMap;
12
use std::path::Path;
23

34
use anyhow::{Context as AnyhowContext, Result, anyhow};
45
use hdf5::types::{FloatSize, IntSize, TypeDescriptor, VarLenAscii, VarLenUnicode};
5-
use hdf5::{Dataset, File};
6+
use hdf5::{Dataset, File, Group};
67
use sift_rs::{
7-
common::r#type::v1::{ChannelConfig, ChannelDataType},
8+
common::r#type::v1::{ChannelConfig, ChannelDataType, ChannelEnumType},
89
data_imports::v2::Hdf5DataConfig,
910
};
1011

1112
use crate::cli::hdf5::Hdf5Schema;
13+
use crate::cmd::import::utils::group_path_to_channel_name;
14+
use crate::util::tty::Output;
1215

16+
const ROOT_PATH: &str = "/";
1317
const TIME_NAMES: &[&str] = &["time", "timestamp", "timestamps", "ts"];
18+
const VALUE_NAMES: &[&str] = &["value", "values"];
1419

15-
pub(super) fn is_time_dataset_name(name: &str) -> bool {
16-
let trimmed = name.trim_start_matches('/').to_ascii_lowercase();
17-
TIME_NAMES.iter().any(|n| *n == trimmed)
20+
pub fn basename(path: &str) -> &str {
21+
path.rsplit('/').next().unwrap_or(path)
22+
}
23+
24+
pub fn parent_path(path: &str) -> &str {
25+
match path.rfind('/') {
26+
Some(0) => ROOT_PATH,
27+
Some(idx) => &path[..idx],
28+
None => ROOT_PATH,
29+
}
30+
}
31+
32+
pub fn is_time_dataset_name(name: &str) -> bool {
33+
let leaf = basename(name).to_ascii_lowercase();
34+
TIME_NAMES.iter().any(|n| *n == leaf)
35+
}
36+
37+
fn is_value_leaf(name: &str) -> bool {
38+
let leaf = basename(name).to_ascii_lowercase();
39+
VALUE_NAMES.iter().any(|n| *n == leaf)
40+
}
41+
42+
fn collect_datasets_recursive(group: &Group) -> Result<Vec<Dataset>> {
43+
let mut datasets = group
44+
.datasets()
45+
.with_context(|| format!("failed to enumerate datasets in {}", group.name()))?;
46+
let subgroups = group
47+
.groups()
48+
.with_context(|| format!("failed to enumerate groups in {}", group.name()))?;
49+
for sub in &subgroups {
50+
datasets.extend(collect_datasets_recursive(sub)?);
51+
}
52+
Ok(datasets)
1853
}
1954

2055
fn get_string_attr(ds: &Dataset, name: &str) -> Option<String> {
@@ -28,12 +63,10 @@ fn get_string_attr(ds: &Dataset, name: &str) -> Option<String> {
2863
None
2964
}
3065

31-
/// Supported HDF5 channel types. Anything outside this set is rejected with a
32-
/// client-side error so users get clear feedback before upload.
33-
pub(super) const SUPPORTED_TYPES_BLURB: &str =
34-
"bool, int8/16/32/64, uint8/16/32/64, float32, float64";
66+
pub const SUPPORTED_TYPES_BLURB: &str =
67+
"bool, int8/16/32/64, uint8/16/32/64, float32, float64, string, enum";
3568

36-
pub(super) fn hdf5_to_sift_data_type(ty: &TypeDescriptor) -> Option<ChannelDataType> {
69+
pub fn hdf5_to_sift_data_type(ty: &TypeDescriptor) -> Option<ChannelDataType> {
3770
match ty {
3871
TypeDescriptor::Boolean => Some(ChannelDataType::Bool),
3972
TypeDescriptor::Integer(IntSize::U1)
@@ -46,31 +79,57 @@ pub(super) fn hdf5_to_sift_data_type(ty: &TypeDescriptor) -> Option<ChannelDataT
4679
TypeDescriptor::Unsigned(IntSize::U8) => Some(ChannelDataType::Uint64),
4780
TypeDescriptor::Float(FloatSize::U4) => Some(ChannelDataType::Float),
4881
TypeDescriptor::Float(FloatSize::U8) => Some(ChannelDataType::Double),
82+
TypeDescriptor::VarLenUnicode
83+
| TypeDescriptor::VarLenAscii
84+
| TypeDescriptor::FixedAscii(_)
85+
| TypeDescriptor::FixedUnicode(_) => Some(ChannelDataType::String),
86+
TypeDescriptor::Enum(_) => Some(ChannelDataType::Enum),
4987
_ => None,
5088
}
5189
}
5290

53-
pub(super) fn detect_config(
91+
pub fn enum_types_for(ty: &TypeDescriptor) -> Result<Vec<ChannelEnumType>> {
92+
let TypeDescriptor::Enum(enum_type) = ty else {
93+
return Ok(Vec::new());
94+
};
95+
enum_type
96+
.members
97+
.iter()
98+
.map(|member| {
99+
Ok(ChannelEnumType {
100+
name: member.name.clone(),
101+
key: u32::try_from(member.value).with_context(|| {
102+
format!(
103+
"enum member '{}' value {} doesn't fit in u32",
104+
member.name, member.value
105+
)
106+
})?,
107+
is_signed: enum_type.signed,
108+
})
109+
})
110+
.collect()
111+
}
112+
113+
pub fn detect_config(
54114
path: &Path,
55115
schema: Hdf5Schema,
56116
time_index: u64,
57117
time_field: Option<&str>,
118+
time_name: Option<&str>,
58119
) -> Result<(Vec<Hdf5DataConfig>, Vec<ChannelConfig>)> {
59120
let file = File::open(path).map_err(|e| anyhow!("failed to open hdf5 file: {e}"))?;
60-
let datasets = file
61-
.datasets()
62-
.map_err(|e| anyhow!("failed to enumerate datasets: {e}"))?;
121+
let datasets = collect_datasets_recursive(&file)?;
63122

64123
let result = match schema {
65-
Hdf5Schema::OneD => detect_one_d(&datasets),
124+
Hdf5Schema::OneD => detect_one_d(&datasets, time_name),
66125
Hdf5Schema::TwoD => detect_two_d(&datasets, time_index),
67126
Hdf5Schema::Compound => detect_compound(&datasets, time_index, time_field),
68127
};
69128

70129
match result {
71-
Ok((data, _)) if data.is_empty() => {
72-
Err(no_match_error(&datasets, schema, time_index, time_field))
73-
}
130+
Ok((data, _)) if data.is_empty() => Err(no_match_error(
131+
&datasets, schema, time_index, time_field, time_name,
132+
)),
74133
Ok(other) => Ok(other),
75134
Err(e) => Err(e),
76135
}
@@ -81,6 +140,7 @@ fn no_match_error(
81140
selected: Hdf5Schema,
82141
time_index: u64,
83142
time_field: Option<&str>,
143+
time_name: Option<&str>,
84144
) -> anyhow::Error {
85145
let alternatives: &[(Hdf5Schema, &str)] = &[
86146
(Hdf5Schema::OneD, "one-d"),
@@ -93,7 +153,7 @@ fn no_match_error(
93153
.filter(|(s, _)| *s != selected)
94154
.filter_map(|(s, name)| {
95155
let probe = match s {
96-
Hdf5Schema::OneD => detect_one_d(datasets),
156+
Hdf5Schema::OneD => detect_one_d(datasets, time_name),
97157
Hdf5Schema::TwoD => detect_two_d(datasets, time_index),
98158
Hdf5Schema::Compound => detect_compound(datasets, time_index, time_field),
99159
};
@@ -124,53 +184,91 @@ fn no_match_error(
124184
}
125185
}
126186

127-
fn detect_one_d(datasets: &[Dataset]) -> Result<(Vec<Hdf5DataConfig>, Vec<ChannelConfig>)> {
128-
let time_dataset = datasets
129-
.iter()
130-
.find(|d| is_time_dataset_name(&d.name()))
131-
.map(|d| d.name())
132-
.ok_or_else(|| {
133-
anyhow!("no time dataset found — expected one of {TIME_NAMES:?} (case-insensitive)")
134-
})?;
187+
fn detect_one_d(
188+
datasets: &[Dataset],
189+
time_name: Option<&str>,
190+
) -> Result<(Vec<Hdf5DataConfig>, Vec<ChannelConfig>)> {
191+
let mut group_time: HashMap<String, String> = HashMap::new();
192+
for ds in datasets {
193+
let name = ds.name();
194+
let matches = match time_name {
195+
Some(want) => basename(&name) == want,
196+
None => is_time_dataset_name(&name),
197+
};
198+
if !matches || ds.ndim() != 1 {
199+
continue;
200+
}
201+
group_time
202+
.entry(parent_path(&name).to_owned())
203+
.or_insert(name);
204+
}
205+
206+
if group_time.is_empty() {
207+
return Err(match time_name {
208+
Some(want) => anyhow!(
209+
"no time dataset found with name '{want}'. \
210+
Verify --time-name matches a leaf dataset name in the file."
211+
),
212+
None => anyhow!(
213+
"no time dataset found — expected one of {TIME_NAMES:?} (case-insensitive) \
214+
at the root or within any group. \
215+
If your file uses a custom name, pass it via --time-name."
216+
),
217+
});
218+
}
135219

136220
let mut data_configs = Vec::new();
137221
let mut channel_configs = Vec::new();
138222

139223
for ds in datasets {
140224
let name = ds.name();
141-
if name == time_dataset {
225+
if is_time_dataset_name(&name) || ds.ndim() != 1 {
142226
continue;
143227
}
144-
if ds.ndim() != 1 {
228+
let Some(time_dataset) = nearest_time_dataset(&group_time, &name) else {
145229
continue;
146-
}
147-
let dtype = ds
148-
.dtype()
149-
.map_err(|e| anyhow!("failed to read dtype for {name}: {e}"))?
150-
.to_descriptor()
151-
.map_err(|e| anyhow!("failed to describe dtype for {name}: {e}"))?;
230+
};
231+
232+
let dtype = match ds.dtype().and_then(|t| t.to_descriptor()) {
233+
Ok(d) => d,
234+
Err(e) => {
235+
Output::new()
236+
.line(format!(
237+
"skipping {name}: cannot describe HDF5 dtype ({e}). \
238+
Supported types: {SUPPORTED_TYPES_BLURB}."
239+
))
240+
.eprint();
241+
continue;
242+
}
243+
};
152244
let Some(channel_type) = hdf5_to_sift_data_type(&dtype) else {
153-
return Err(anyhow!(
154-
"unsupported HDF5 type for dataset {name}: {dtype:?}. \
155-
Supported types: {SUPPORTED_TYPES_BLURB}."
156-
));
245+
Output::new()
246+
.line(format!(
247+
"skipping {name}: unsupported HDF5 type {dtype:?}. \
248+
Supported types: {SUPPORTED_TYPES_BLURB}."
249+
))
250+
.eprint();
251+
continue;
157252
};
158253

159254
let units = get_string_attr(ds, "units").unwrap_or_default();
160255
let description = get_string_attr(ds, "long_name")
161256
.or_else(|| get_string_attr(ds, "description"))
162257
.unwrap_or_default();
163258

259+
let channel_name = one_d_channel_name(&name);
260+
164261
let channel_config = ChannelConfig {
165-
name: name.trim_start_matches('/').to_string(),
262+
name: channel_name,
166263
data_type: channel_type as i32,
167264
units,
168265
description,
266+
enum_types: enum_types_for(&dtype)?,
169267
..Default::default()
170268
};
171269

172270
data_configs.push(Hdf5DataConfig {
173-
time_dataset: time_dataset.clone(),
271+
time_dataset,
174272
time_index: 0,
175273
value_dataset: name.clone(),
176274
value_index: 0,
@@ -184,6 +282,27 @@ fn detect_one_d(datasets: &[Dataset]) -> Result<(Vec<Hdf5DataConfig>, Vec<Channe
184282
Ok((data_configs, channel_configs))
185283
}
186284

285+
fn nearest_time_dataset(group_time: &HashMap<String, String>, value_path: &str) -> Option<String> {
286+
let mut current = parent_path(value_path);
287+
while current != ROOT_PATH {
288+
if let Some(t) = group_time.get(current) {
289+
return Some(t.clone());
290+
}
291+
current = parent_path(current);
292+
}
293+
group_time.get(ROOT_PATH).cloned()
294+
}
295+
296+
fn one_d_channel_name(value_path: &str) -> String {
297+
if is_value_leaf(value_path) {
298+
let parent = parent_path(value_path);
299+
if parent != ROOT_PATH {
300+
return group_path_to_channel_name(parent);
301+
}
302+
}
303+
group_path_to_channel_name(value_path)
304+
}
305+
187306
fn detect_two_d(
188307
datasets: &[Dataset],
189308
time_index: u64,
@@ -220,10 +339,11 @@ fn detect_two_d(
220339
if col == time_index {
221340
continue;
222341
}
223-
let channel_name = format!("{}.{col}", name.trim_start_matches('/'));
342+
let channel_name = format!("{}.{col}", group_path_to_channel_name(&name));
224343
let channel_config = ChannelConfig {
225344
name: channel_name,
226345
data_type: channel_type as i32,
346+
enum_types: enum_types_for(&dtype)?,
227347
..Default::default()
228348
};
229349

@@ -295,10 +415,11 @@ fn detect_compound(
295415
field.ty
296416
));
297417
};
298-
let channel_name = format!("{}.{}", name.trim_start_matches('/'), field.name);
418+
let channel_name = format!("{}.{}", group_path_to_channel_name(&name), field.name);
299419
let channel_config = ChannelConfig {
300420
name: channel_name,
301421
data_type: channel_type as i32,
422+
enum_types: enum_types_for(&field.ty)?,
302423
..Default::default()
303424
};
304425

rust/crates/sift_cli/src/cmd/import/hdf5/import.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result<ExitCode> {
4444
args.schema,
4545
args.time_index.unwrap_or(0),
4646
args.time_field.as_deref(),
47+
args.time_name.as_deref(),
4748
) {
4849
Ok((_, channel_configs)) => {
4950
let refs: Vec<&ChannelConfig> = channel_configs.iter().collect();
@@ -65,6 +66,7 @@ pub async fn run(ctx: Context, args: ImportHdf5Args) -> Result<ExitCode> {
6566
args.schema,
6667
args.time_index.unwrap_or(0),
6768
args.time_field.as_deref(),
69+
args.time_name.as_deref(),
6870
)
6971
.context("failed to parse hdf5 file")?;
7072
hdf5_config.data = data_configs;

rust/crates/sift_cli/src/cmd/import/hdf5/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
pub mod detect_hdf5_schema;
1+
mod detect_hdf5_schema;
22
pub mod import;
33

44
#[cfg(test)]

0 commit comments

Comments
 (0)