Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 3 additions & 37 deletions python/lib/sift_client/_internal/util/hdf5.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,56 +3,22 @@
from pathlib import Path

import h5py
import numpy as np

from sift_client.sift_types.channel import ChannelDataType
from sift_client._internal.util.numpy_types import numpy_to_sift_type
from sift_client.sift_types.data_import import Hdf5DataColumn, Hdf5ImportConfig, TimeFormat

# Common HDF5 attribute names used to detect channel metadata.
_NAME_ATTRS = ["Name", "name", "Title", "title", "Sensor", "sensor", "Channel", "channel"]
_UNIT_ATTRS = ["Unit", "unit", "Units", "units"]
_DESCRIPTION_ATTRS = ["Description", "description"]

_NUMPY_TO_SIFT: dict[type, ChannelDataType] = {
np.bool_: ChannelDataType.BOOL,
np.int8: ChannelDataType.INT_32,
np.int16: ChannelDataType.INT_32,
np.int32: ChannelDataType.INT_32,
np.int64: ChannelDataType.INT_64,
np.uint8: ChannelDataType.UINT_32,
np.uint16: ChannelDataType.UINT_32,
np.uint32: ChannelDataType.UINT_32,
np.uint64: ChannelDataType.UINT_64,
np.float32: ChannelDataType.FLOAT,
np.float64: ChannelDataType.DOUBLE,
np.datetime64: ChannelDataType.INT_64,
np.complex64: ChannelDataType.FLOAT,
np.complex128: ChannelDataType.DOUBLE,
np.str_: ChannelDataType.STRING,
# HDF5/TDMS fixed-length strings are stored as np.bytes_; use STRING, not
# BYTES (np.void below handles truly opaque binary data).
np.bytes_: ChannelDataType.STRING,
# Numpy uses object dtype for variable-length strings; TDMS/HDF5 files
# cannot produce non-string object arrays.
np.object_: ChannelDataType.STRING,
np.void: ChannelDataType.BYTES,
}


def _detect_attr(dataset: h5py.Dataset, candidates: list[str], default: str = "") -> str:
"""Return the first matching HDF5 attribute value, or *default*."""
possible = [dataset.attrs.get(attr) for attr in candidates if dataset.attrs.get(attr)]
return str(possible[0]) if possible else default


def _numpy_to_sift_type(dtype: np.dtype) -> ChannelDataType:
"""Map a numpy dtype to a Sift ChannelDataType."""
sift_type = _NUMPY_TO_SIFT.get(dtype.type)
if sift_type is None:
raise ValueError(f"Unsupported numpy dtype: {dtype}")
return sift_type


def detect_hdf5_config(file_path: str | Path) -> Hdf5ImportConfig:
"""Detect an HDF5 import config by inspecting the file's datasets.

Expand Down Expand Up @@ -88,7 +54,7 @@ def _visit(dataset_name: str, obj: object) -> None:
columns.append(
Hdf5DataColumn(
name=channel_name,
data_type=_numpy_to_sift_type(obj.dtype[value_index]),
data_type=numpy_to_sift_type(obj.dtype[value_index]),
units=_detect_attr(obj, _UNIT_ATTRS),
description=_detect_attr(obj, _DESCRIPTION_ATTRS),
time_dataset=dataset_name,
Expand All @@ -110,7 +76,7 @@ def _visit(dataset_name: str, obj: object) -> None:
columns.append(
Hdf5DataColumn(
name=channel_name,
data_type=_numpy_to_sift_type(obj.dtype),
data_type=numpy_to_sift_type(obj.dtype),
units=_detect_attr(obj, _UNIT_ATTRS),
description=_detect_attr(obj, _DESCRIPTION_ATTRS),
time_dataset="time" if has_root_time else "",
Expand Down
38 changes: 38 additions & 0 deletions python/lib/sift_client/_internal/util/numpy_types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from __future__ import annotations

import numpy as np

from sift_client.sift_types.channel import ChannelDataType

NUMPY_TO_SIFT_TYPE: dict[type, ChannelDataType] = {
np.bool_: ChannelDataType.BOOL,
np.int8: ChannelDataType.INT_32,
np.int16: ChannelDataType.INT_32,
np.int32: ChannelDataType.INT_32,
np.int64: ChannelDataType.INT_64,
np.uint8: ChannelDataType.UINT_32,
np.uint16: ChannelDataType.UINT_32,
np.uint32: ChannelDataType.UINT_32,
np.uint64: ChannelDataType.UINT_64,
np.float32: ChannelDataType.FLOAT,
np.float64: ChannelDataType.DOUBLE,
np.datetime64: ChannelDataType.INT_64,
np.complex64: ChannelDataType.FLOAT,
np.complex128: ChannelDataType.DOUBLE,
np.str_: ChannelDataType.STRING,
# HDF5/TDMS fixed-length strings are stored as np.bytes_; use STRING, not
# BYTES (np.void below handles truly opaque binary data).
np.bytes_: ChannelDataType.STRING,
# Numpy uses object dtype for variable-length strings; TDMS/HDF5 files
# cannot produce non-string object arrays.
np.object_: ChannelDataType.STRING,
np.void: ChannelDataType.BYTES,
}


def numpy_to_sift_type(dtype: np.dtype) -> ChannelDataType:
"""Map a numpy dtype to a Sift ChannelDataType."""
sift_type = NUMPY_TO_SIFT_TYPE.get(dtype.type)
if sift_type is None:
raise ValueError(f"Unsupported numpy dtype: {dtype}")
return sift_type
203 changes: 203 additions & 0 deletions python/lib/sift_client/_internal/util/tdms.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,203 @@
from __future__ import annotations

import json
from typing import TYPE_CHECKING

import numpy as np

if TYPE_CHECKING:
from pathlib import Path
from typing import BinaryIO
from nptdms import TdmsChannel, TdmsFile, TdmsGroup, types

from sift_client._internal.util.numpy_types import numpy_to_sift_type
from sift_client.sift_types.channel import ChannelDataType
from sift_client.sift_types.data_import import (
TdmsComplexComponent,
TdmsDataColumn,
TdmsFallbackMethod,
TdmsImportConfig,
)

# Common property names used to detect the units of a channel in TDMS files.
COMMON_UNIT_PROPS = [
"unit_string",
"NI_UnitDescription",
]

# Common property names used to detect the description of a channel in TDMS files.
COMMON_DESCRIPTION_PROPS = ["description", "NI_Description", "Description"]


def detect_properties(obj: TdmsChannel | TdmsGroup, possible_props: list, default: str = "") -> str:
"""Return the first matching property value from a list of possible property names."""
for prop in possible_props:
value = obj.properties.get(prop)
if value:
return value
return default


def create_description(group_description: str, channel_description: str) -> str:
"""Combine TDMS group and channel descriptions into a single Sift description."""
group_description = group_description.strip()
channel_description = channel_description.strip()
group_entry = f"Group: {group_description}" if group_description else ""
channel_entry = f"Channel: {channel_description}" if channel_description else ""
return "\n".join([group_entry, channel_entry]).strip()


def detect_enum_types(channel: TdmsChannel) -> dict[str, int] | None:
"""Check if the TDMS channel is embedded with enum configs.

Returns a name-to-key mapping, or None if no enum config is present.
"""
name = f"{channel.group_name}/{channel.name}"

enum_config_data = channel.properties.get("enum_config")
if not enum_config_data:
return None
try:
enum_configs = json.loads(enum_config_data)
except Exception as e:
raise ValueError(f"Failed to decode JSON enum_configs for {name}: {e}") from e

enum_types: dict[str, int] = {}
for enum_key, enum_name in enum_configs.items():
try:
key = int(enum_key)
except ValueError as e:
raise ValueError(f"{enum_key} is not a valid enum integer for ({name})") from e
if key < 0:
raise ValueError(f"{enum_key} is not a valid unsigned enum integer ({name})")
enum_types[enum_name] = key

return enum_types if enum_types else None


def is_waveform_time_channel(channel: TdmsChannel) -> bool:
"""A waveform channel carries wf_start_offset and wf_increment properties."""
return "wf_start_offset" in channel.properties and "wf_increment" in channel.properties


def find_time_channel(group: TdmsGroup) -> str | None:
"""Return the name of a dedicated time channel in the group, if one exists.

Detection order:
1. Group-level 'xchannel' property.
2. Look for the time channel in the first index.

https://www.ni.com/en/support/documentation/supplemental/12/writing-data-management-ready-tdms-files.html
"""
channels = group.channels()
channel_names = {ch.name for ch in channels}

# 1. Explicit xchannel property set by the file author.
xchannel = group.properties.get("xchannel")
if xchannel and xchannel in channel_names:
return xchannel

# 2. Native datetime type in first index
if channels and channels[0].data_type == types.TimeStamp:
return channels[0].name

return None


def detect_tdms_config(
file_path: str | Path | BinaryIO,
asset_name: str = "",
fallback_method: TdmsFallbackMethod = TdmsFallbackMethod.FAIL_ON_ERROR,
) -> TdmsImportConfig:
"""Detect a TDMS import config by inspecting the file's channels.

Args:
file_path: Path to the TDMS file, or a binary file-like object.
asset_name: The asset name to set on the config.
fallback_method: How to handle channels with missing timing information.

Returns:
A TdmsImportConfig populated with detected channel configurations.
"""
data: list[TdmsDataColumn] = []

with TdmsFile.open(file_path) as tdms_file:
for group in tdms_file.groups():
group_name = group.name
time_channel_name = find_time_channel(group)
group_description = detect_properties(group, COMMON_DESCRIPTION_PROPS)

for channel in group.channels():
tdms_channel_name = channel.name

# Skip channels that are used as a time axis
if tdms_channel_name == time_channel_name:
continue

# Channel name will always be <group>.<channel>
channel_name = f"{group_name}.{tdms_channel_name}"

units = detect_properties(channel, COMMON_UNIT_PROPS)
channel_description = detect_properties(channel, COMMON_DESCRIPTION_PROPS)
description = create_description(group_description, channel_description)
enum_types = detect_enum_types(channel)

candidates: list[tuple[str, ChannelDataType, TdmsComplexComponent | None]] = []
if np.issubdtype(channel.dtype, np.complexfloating):
# Split complex channel into separate .real and .imag channels.
sift_type = numpy_to_sift_type(channel.dtype)
candidates.append(
(f"{channel_name}.real", sift_type, TdmsComplexComponent.REAL)
)
candidates.append(
(f"{channel_name}.imag", sift_type, TdmsComplexComponent.IMAGINARY)
)
else:
sift_type = (
ChannelDataType.ENUM if enum_types else numpy_to_sift_type(channel.dtype)
)
candidates.append((channel_name, sift_type, None))

for name, data_type, complex_component in candidates:
# If a time channel is present, that takes priority.
# Some applications will generate invalid waveform
# properties that are not meant to be used.
if time_channel_name is not None:
data.append(
TdmsDataColumn(
group_name=group_name,
channel_name=tdms_channel_name,
name=name,
data_type=data_type,
units=units,
description=description,
time_channel_name=time_channel_name,
complex_component=complex_component,
enum_types=enum_types,
)
)
elif is_waveform_time_channel(channel):
data.append(
TdmsDataColumn(
group_name=group_name,
channel_name=tdms_channel_name,
name=name,
data_type=data_type,
units=units,
description=description,
time_channel_name=None,
complex_component=complex_component,
enum_types=enum_types,
)
)
# Non time series data (e.g, binary blob, spectrum data, etc.)
else:
if fallback_method == TdmsFallbackMethod.IGNORE_ERROR:
continue
raise ValueError(f"No timing information for {channel_name}")

return TdmsImportConfig(
asset_name=asset_name,
data=data,
fallback_method=fallback_method,
)
Loading
Loading