Skip to content

Commit 6df2fd3

Browse files
committed
add hdf5 client-side detect_config
1 parent b70b6e3 commit 6df2fd3

3 files changed

Lines changed: 190 additions & 0 deletions

File tree

Lines changed: 174 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,174 @@
1+
from __future__ import annotations
2+
3+
from pathlib import Path
4+
5+
import h5py
6+
import numpy as np
7+
8+
from sift_client.sift_types.channel import ChannelDataType
9+
from sift_client.sift_types.data_import import Hdf5DataColumn, Hdf5ImportConfig, TimeFormat
10+
11+
# Common HDF5 attribute names used to detect channel metadata.
12+
_NAME_ATTRS = ["Name", "name", "Title", "title", "Sensor", "sensor", "Channel", "channel"]
13+
_UNIT_ATTRS = ["Unit", "unit", "Units", "units"]
14+
_DESCRIPTION_ATTRS = ["Description", "description"]
15+
16+
_NUMPY_TO_SIFT: dict[type, ChannelDataType] = {
17+
np.bool_: ChannelDataType.BOOL,
18+
np.int8: ChannelDataType.INT_32,
19+
np.int16: ChannelDataType.INT_32,
20+
np.int32: ChannelDataType.INT_32,
21+
np.int64: ChannelDataType.INT_64,
22+
np.uint8: ChannelDataType.UINT_32,
23+
np.uint16: ChannelDataType.UINT_32,
24+
np.uint32: ChannelDataType.UINT_32,
25+
np.uint64: ChannelDataType.UINT_64,
26+
np.float32: ChannelDataType.FLOAT,
27+
np.float64: ChannelDataType.DOUBLE,
28+
np.datetime64: ChannelDataType.INT_64,
29+
np.complex64: ChannelDataType.FLOAT,
30+
np.complex128: ChannelDataType.DOUBLE,
31+
np.str_: ChannelDataType.STRING,
32+
np.bytes_: ChannelDataType.STRING,
33+
np.object_: ChannelDataType.STRING,
34+
np.void: ChannelDataType.BYTES,
35+
}
36+
37+
38+
def _detect_attr(dataset: h5py.Dataset, candidates: list[str], default: str = "") -> str:
39+
"""Return the first matching HDF5 attribute value, or *default*."""
40+
for attr in candidates:
41+
val = dataset.attrs.get(attr)
42+
if val is not None:
43+
return (
44+
val.decode()
45+
if isinstance(val, bytes)
46+
else str(val)
47+
if not isinstance(val, str)
48+
else val
49+
)
50+
return default
51+
52+
53+
def _numpy_to_sift_type(dtype: np.dtype) -> ChannelDataType:
54+
"""Map a numpy dtype to a Sift ChannelDataType."""
55+
sift_type = _NUMPY_TO_SIFT.get(dtype.type)
56+
if sift_type is None:
57+
raise ValueError(f"Unsupported numpy dtype: {dtype}")
58+
return sift_type
59+
60+
61+
def detect_hdf5_config(file_path: str | Path) -> Hdf5ImportConfig:
62+
"""Detect an HDF5 import config by inspecting the file's datasets.
63+
64+
Traverses the HDF5 file and produces (time dataset, value dataset) pairs.
65+
For compound datasets with multiple fields, the first field is assumed to
66+
be time and remaining fields become value channels. For simple datasets,
67+
a root-level ``time`` dataset is used if present.
68+
"""
69+
path = Path(file_path)
70+
71+
with h5py.File(path, "r") as h5file:
72+
columns: list[Hdf5DataColumn] = []
73+
seen_names: set[str] = set()
74+
has_root_time = "time" in h5file
75+
76+
def _visit(dataset_name: str, obj: object) -> None:
77+
if not isinstance(obj, h5py.Dataset):
78+
return
79+
80+
leaf_name = dataset_name.rsplit("/", 1)[-1]
81+
82+
# Skip root "time" dataset — it's used as the time source, not a value channel.
83+
if dataset_name == "time" and obj.parent == h5file:
84+
return
85+
86+
# Skip "timestamps" datasets — they're time sources, not value channels.
87+
if leaf_name == "timestamps":
88+
return
89+
90+
n_fields = len(obj.dtype.names) if obj.dtype.names else 0
91+
92+
if n_fields > 1:
93+
# Compound type: first field is time, remaining are value channels.
94+
for value_index in range(1, n_fields):
95+
channel_name = _detect_attr(obj, _NAME_ATTRS, dataset_name)
96+
if channel_name in seen_names:
97+
channel_name = f"{channel_name}.{dataset_name}.{value_index}"
98+
99+
columns.append(
100+
Hdf5DataColumn(
101+
name=channel_name,
102+
data_type=_numpy_to_sift_type(obj.dtype[value_index]),
103+
units=_detect_attr(obj, _UNIT_ATTRS),
104+
description=_detect_attr(obj, _DESCRIPTION_ATTRS),
105+
time_dataset=dataset_name,
106+
value_dataset=dataset_name,
107+
time_index=0,
108+
value_index=0,
109+
time_field=obj.dtype.names[0],
110+
value_field=obj.dtype.names[value_index],
111+
)
112+
)
113+
seen_names.add(channel_name)
114+
115+
elif n_fields in (0, 1):
116+
# Resolve time dataset: prefer sibling "timestamps", fall back to root "time".
117+
group = obj.parent
118+
time_dataset = ""
119+
if "timestamps" in group:
120+
group_name = dataset_name.rsplit("/", 1)[0] if "/" in dataset_name else ""
121+
time_dataset = f"{group_name}/timestamps" if group_name else "timestamps"
122+
elif has_root_time:
123+
time_dataset = "time"
124+
125+
# For 2D datasets (N x 2), treat column 0 as time and column 1 as value.
126+
if obj.ndim == 2 and obj.shape[1] == 2:
127+
channel_name = _detect_attr(obj, _NAME_ATTRS, dataset_name)
128+
if channel_name in seen_names:
129+
channel_name = f"{channel_name}.{dataset_name}"
130+
131+
columns.append(
132+
Hdf5DataColumn(
133+
name=channel_name,
134+
data_type=_numpy_to_sift_type(obj.dtype),
135+
units=_detect_attr(obj, _UNIT_ATTRS),
136+
description=_detect_attr(obj, _DESCRIPTION_ATTRS),
137+
time_dataset=dataset_name,
138+
value_dataset=dataset_name,
139+
time_index=0,
140+
value_index=1,
141+
)
142+
)
143+
seen_names.add(channel_name)
144+
else:
145+
# Use the group name as channel name for "values" leaf datasets.
146+
default_name = dataset_name
147+
if leaf_name == "values" and "/" in dataset_name:
148+
default_name = dataset_name.rsplit("/", 1)[0]
149+
150+
channel_name = _detect_attr(obj, _NAME_ATTRS, default_name)
151+
if channel_name in seen_names:
152+
channel_name = f"{channel_name}.{dataset_name}"
153+
154+
columns.append(
155+
Hdf5DataColumn(
156+
name=channel_name,
157+
data_type=_numpy_to_sift_type(obj.dtype),
158+
units=_detect_attr(obj, _UNIT_ATTRS),
159+
description=_detect_attr(obj, _DESCRIPTION_ATTRS),
160+
time_dataset=time_dataset,
161+
value_dataset=dataset_name,
162+
time_index=0,
163+
value_index=0,
164+
)
165+
)
166+
seen_names.add(channel_name)
167+
168+
h5file.visititems(_visit)
169+
170+
return Hdf5ImportConfig(
171+
asset_name="",
172+
time_format=TimeFormat.ABSOLUTE_UNIX_NANOSECONDS,
173+
data=columns,
174+
)

python/lib/sift_client/resources/data_imports.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from sift_client._internal.low_level_wrappers.data_imports import DataImportsLowLevelClient
77
from sift_client._internal.util.executor import run_sync_function
88
from sift_client._internal.util.file import extract_parquet_footer, upload_file
9+
from sift_client._internal.util.hdf5 import detect_hdf5_config
910
from sift_client.resources._base import ResourceBase
1011
from sift_client.sift_types.asset import Asset
1112
from sift_client.sift_types.channel import ChannelDataType
@@ -243,6 +244,9 @@ async def detect_config(
243244

244245
data_type_key = _resolve_data_type_key(path.suffix.lower(), data_type)
245246

247+
if data_type_key == DataTypeKey.HDF5:
248+
return await run_sync_function(lambda: detect_hdf5_config(path))
249+
246250
is_parquet = data_type_key in (
247251
DataTypeKey.PARQUET_FLATDATASET,
248252
DataTypeKey.PARQUET_SINGLE_CHANNEL_PER_ROW,

python/lib/sift_client/sift_types/data_import.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -593,6 +593,18 @@ class Hdf5ImportConfig(ImportConfigBase):
593593
time_format: TimeFormat
594594
relative_start_time: datetime | None = None
595595

596+
def __getitem__(self, name: str) -> Hdf5DataColumn:
597+
"""Look up a data column by channel name.
598+
599+
Example::
600+
601+
config["temperature"].data_type = ChannelDataType.FLOAT
602+
"""
603+
for dc in self.data:
604+
if dc.name == name:
605+
return dc
606+
raise KeyError(f"No data column named '{name}'")
607+
596608
@model_validator(mode="after")
597609
def _check_relative_start_time(self) -> Hdf5ImportConfig:
598610
if self.time_format.name.startswith("RELATIVE_") and self.relative_start_time is None:

0 commit comments

Comments
 (0)