Skip to content

Commit 8ff5fde

Browse files
committed
refactor parquet timecolumn detection
1 parent f049daa commit 8ff5fde

2 files changed

Lines changed: 44 additions & 20 deletions

File tree

python/lib/sift_client/resources/data_imports.py

Lines changed: 35 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,8 @@
2626
from sift_client.sift_types.run import Run
2727

2828
if TYPE_CHECKING:
29+
from collections.abc import Iterable
30+
2931
from sift_client.client import SiftClient
3032
from sift_client.sift_types.job import Job
3133

@@ -318,6 +320,24 @@ def _parse_csv_detect_response(proto) -> CsvImportConfig:
318320
return csv_config
319321

320322

323+
def _infer_time_column(columns: Iterable[tuple[str, ChannelDataType, str]]) -> str | None:
324+
"""Find a likely time column from a sequence of (name, data_type, path) tuples.
325+
326+
The backend only detects arrow timestamp types. This falls back to the first
327+
integer column whose name starts with "time".
328+
"""
329+
_integer_types = {
330+
ChannelDataType.INT_32,
331+
ChannelDataType.INT_64,
332+
ChannelDataType.UINT_32,
333+
ChannelDataType.UINT_64,
334+
}
335+
for name, data_type, path in columns:
336+
if data_type in _integer_types and name.lower().startswith("time"):
337+
return path
338+
return None
339+
340+
321341
def _parse_parquet_detect_response(
322342
proto, filename: str, footer_offset: int, footer_length: int
323343
) -> ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig:
@@ -332,29 +352,27 @@ def _parse_parquet_detect_response(
332352
dc for dc in parquet_config.data_columns if dc.path != time_path
333353
]
334354
else:
335-
# The backend only detects arrow timestamp types. Fall back to
336-
# an integer column whose name starts with "time".
337-
_integer_types = {
338-
ChannelDataType.INT_32,
339-
ChannelDataType.INT_64,
340-
ChannelDataType.UINT_32,
341-
ChannelDataType.UINT_64,
342-
}
343-
match = None
344-
for dc in parquet_config.data_columns:
345-
if dc.data_type in _integer_types and dc.name.lower().startswith("time"):
346-
match = dc
347-
break
348-
if match is not None:
349-
parquet_config.time_column = ParquetTimeColumn(path=match.path)
355+
inferred = _infer_time_column(
356+
(dc.name, dc.data_type, dc.path) for dc in parquet_config.data_columns
357+
)
358+
if inferred is not None:
359+
parquet_config.time_column = ParquetTimeColumn(path=inferred)
350360
parquet_config.data_columns = [
351-
c for c in parquet_config.data_columns if c.path != match.path
361+
c for c in parquet_config.data_columns if c.path != inferred
352362
]
353363
return parquet_config
354364
elif proto.HasField("single_channel_per_row"):
355-
return ParquetSingleChannelPerRowImportConfig._from_proto(
365+
parquet_config = ParquetSingleChannelPerRowImportConfig._from_proto(
356366
proto, footer_offset=footer_offset, footer_length=footer_length
357367
)
368+
if not parquet_config.time_column.path:
369+
inferred = _infer_time_column(
370+
(col.column_config.name, ChannelDataType(col.column_config.data_type), col.path)
371+
for col in proto.single_channel_per_row.columns
372+
)
373+
if inferred is not None:
374+
parquet_config.time_column = ParquetTimeColumn(path=inferred)
375+
return parquet_config
358376
raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.")
359377

360378

python/lib/sift_client/sift_types/data_import.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,9 @@ class ParquetSingleChannelPerRowImportConfig(ImportConfigBase):
432432
"""Configuration for importing a Parquet file where each row represents
433433
a single channel's data point.
434434
435-
Exactly one of ``single_channel`` or ``multi_channel`` must be set.
435+
Exactly one of ``single_channel`` or ``multi_channel`` must be set before
436+
importing. When returned by ``detect_config()``, neither field is populated
437+
and must be filled in before passing the config to ``import_from_path()``.
436438
437439
Attributes:
438440
time_column: Time column configuration.
@@ -454,15 +456,19 @@ class ParquetSingleChannelPerRowImportConfig(ImportConfigBase):
454456

455457
@model_validator(mode="after")
456458
def _check_channel_config(self) -> ParquetSingleChannelPerRowImportConfig:
457-
if self.single_channel is None and self.multi_channel is None:
458-
raise ValueError("Exactly one of 'single_channel' or 'multi_channel' must be set.")
459459
if self.single_channel is not None and self.multi_channel is not None:
460460
raise ValueError(
461461
"Exactly one of 'single_channel' or 'multi_channel' must be set, not both."
462462
)
463463
return self
464464

465465
def _to_proto(self) -> ParquetConfigProto:
466+
if self.single_channel is None and self.multi_channel is None:
467+
raise ValueError(
468+
"Either 'single_channel' or 'multi_channel' must be set before importing. "
469+
"If this config was returned by detect_config(), set one of these fields "
470+
"to specify the channel layout."
471+
)
466472
scpr = ParquetSingleChannelPerRowConfigProto(
467473
time_column=self.time_column._to_proto(),
468474
)

0 commit comments

Comments
 (0)