Skip to content

Commit 329ec11

Browse files
authored
python(fix): infer parquet time column for int64/uint64 timestamps (#582)
1 parent e73894a commit 329ec11

2 files changed

Lines changed: 195 additions & 3 deletions

File tree

python/lib/sift_client/_tests/resources/test_data_imports.py

Lines changed: 153 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,31 @@
11
"""Unit tests for data import config models and helpers."""
22

3+
from __future__ import annotations
4+
35
from datetime import datetime, timezone
6+
from typing import TYPE_CHECKING, cast
47

58
import pytest
9+
from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto
10+
from sift.data_imports.v2.data_imports_pb2 import (
11+
ParquetColumn,
12+
ParquetConfig,
13+
ParquetFlatDatasetConfig,
14+
ParquetSingleChannelPerRowConfig,
15+
)
16+
from sift.data_imports.v2.data_imports_pb2 import (
17+
ParquetDataColumn as ParquetDataColumnProto,
18+
)
19+
from sift.data_imports.v2.data_imports_pb2 import (
20+
ParquetTimeColumn as ParquetTimeColumnProto,
21+
)
622

723
from sift_client.resources import DataImportAPI, DataImportAPIAsync
8-
from sift_client.resources.data_imports import _resolve_data_type_key
24+
from sift_client.resources.data_imports import (
25+
_infer_time_column,
26+
_parse_parquet_detect_response,
27+
_resolve_data_type_key,
28+
)
929
from sift_client.sift_types.channel import ChannelDataType
1030
from sift_client.sift_types.data_import import (
1131
CsvDataColumn,
@@ -22,6 +42,11 @@
2242
TimeFormat,
2343
)
2444

45+
if TYPE_CHECKING:
46+
from sift.common.type.v1.channel_data_type_pb2 import (
47+
ChannelDataType as ChannelDataTypeProto,
48+
)
49+
2550

2651
@pytest.mark.integration
2752
def test_client_binding(sift_client):
@@ -362,3 +387,130 @@ def test_explicit_data_type_overrides_extension(self):
362387
def test_unknown_extension_raises(self):
363388
with pytest.raises(ValueError, match="Unsupported file extension"):
364389
_resolve_data_type_key(".xyz", None)
390+
391+
392+
class TestInferTimeColumn:
393+
def test_picks_canonical_skips_other_columns(self):
394+
path = _infer_time_column(
395+
[
396+
("delta_time", ChannelDataType.INT_64, "delta_time"),
397+
("voltage", ChannelDataType.DOUBLE, "voltage"),
398+
("timestamp", ChannelDataType.INT_64, "timestamp"),
399+
]
400+
)
401+
assert path == "timestamp"
402+
403+
def test_accepts_uint64(self):
404+
path = _infer_time_column([("time", ChannelDataType.UINT_64, "time")])
405+
assert path == "time"
406+
407+
def test_case_insensitive(self):
408+
path = _infer_time_column([("TimeStamp", ChannelDataType.INT_64, "TimeStamp")])
409+
assert path == "TimeStamp"
410+
411+
def test_multiple_candidates_sorted_alphabetically(self):
412+
path = _infer_time_column(
413+
[
414+
("timestamp", ChannelDataType.INT_64, "timestamp"),
415+
("time", ChannelDataType.INT_64, "time"),
416+
("ts", ChannelDataType.INT_64, "ts"),
417+
]
418+
)
419+
assert path == "time"
420+
421+
def test_returns_none_when_no_canonical_int_column(self):
422+
path = _infer_time_column(
423+
[
424+
("timestamp", ChannelDataType.DOUBLE, "timestamp"),
425+
("event_time", ChannelDataType.INT_64, "event_time"),
426+
]
427+
)
428+
assert path is None
429+
430+
431+
def _make_flat_dataset_response(
432+
time_path: str, data_columns: list[tuple[str, int]]
433+
) -> ParquetConfig:
434+
return ParquetConfig(
435+
flat_dataset=ParquetFlatDatasetConfig(
436+
time_column=ParquetTimeColumnProto(path=time_path),
437+
data_columns=[
438+
ParquetDataColumnProto(
439+
path=path,
440+
channel_config=ChannelConfigProto(
441+
name=path,
442+
data_type=cast("ChannelDataTypeProto.ValueType", data_type),
443+
),
444+
)
445+
for path, data_type in data_columns
446+
],
447+
)
448+
)
449+
450+
451+
def _make_scpr_response(time_path: str, columns: list[tuple[str, int]]) -> ParquetConfig:
452+
return ParquetConfig(
453+
single_channel_per_row=ParquetSingleChannelPerRowConfig(
454+
time_column=ParquetTimeColumnProto(path=time_path),
455+
columns=[
456+
ParquetColumn(
457+
path=path,
458+
column_config=ChannelConfigProto(
459+
name=path,
460+
data_type=cast("ChannelDataTypeProto.ValueType", data_type),
461+
),
462+
)
463+
for path, data_type in columns
464+
],
465+
)
466+
)
467+
468+
469+
class TestParseParquetDetectResponseTimeFallback:
470+
def test_flat_dataset_infers_int64_time_column(self):
471+
proto = _make_flat_dataset_response(
472+
time_path="",
473+
data_columns=[
474+
("voltage", ChannelDataType.DOUBLE.value),
475+
("timestamp", ChannelDataType.INT_64.value),
476+
("status", ChannelDataType.INT_32.value),
477+
],
478+
)
479+
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
480+
assert isinstance(config, ParquetFlatDatasetImportConfig)
481+
assert config.time_column.path == "timestamp"
482+
assert [dc.path for dc in config.data_columns] == ["voltage", "status"]
483+
484+
def test_flat_dataset_keeps_server_time_column_when_set(self):
485+
proto = _make_flat_dataset_response(
486+
time_path="server_ts",
487+
data_columns=[
488+
("server_ts", ChannelDataType.INT_64.value),
489+
("timestamp", ChannelDataType.INT_64.value),
490+
("voltage", ChannelDataType.DOUBLE.value),
491+
],
492+
)
493+
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
494+
assert config.time_column.path == "server_ts"
495+
assert [dc.path for dc in config.data_columns] == ["timestamp", "voltage"]
496+
497+
def test_flat_dataset_no_int64_match_leaves_time_empty(self):
498+
proto = _make_flat_dataset_response(
499+
time_path="",
500+
data_columns=[("voltage", ChannelDataType.DOUBLE.value)],
501+
)
502+
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
503+
assert config.time_column.path == ""
504+
assert [dc.path for dc in config.data_columns] == ["voltage"]
505+
506+
def test_scpr_infers_int64_time_column(self):
507+
proto = _make_scpr_response(
508+
time_path="",
509+
columns=[
510+
("voltage", ChannelDataType.DOUBLE.value),
511+
("timestamp", ChannelDataType.INT_64.value),
512+
],
513+
)
514+
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
515+
assert isinstance(config, ParquetSingleChannelPerRowImportConfig)
516+
assert config.time_column.path == "timestamp"

python/lib/sift_client/resources/data_imports.py

Lines changed: 42 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,17 +8,21 @@
88
from sift_client._internal.util.file import extract_parquet_footer, upload_file
99
from sift_client.resources._base import ResourceBase
1010
from sift_client.sift_types.asset import Asset
11+
from sift_client.sift_types.channel import ChannelDataType
1112
from sift_client.sift_types.data_import import (
1213
EXTENSION_TO_DATA_TYPE_KEY,
1314
CsvImportConfig,
1415
DataTypeKey,
1516
ImportConfig,
1617
ParquetFlatDatasetImportConfig,
1718
ParquetSingleChannelPerRowImportConfig,
19+
ParquetTimeColumn,
1820
)
1921
from sift_client.sift_types.run import Run
2022

2123
if TYPE_CHECKING:
24+
from collections.abc import Iterable
25+
2226
from sift_client.client import SiftClient
2327
from sift_client.sift_types.job import Job
2428

@@ -320,6 +324,28 @@ def _parse_csv_detect_response(proto) -> CsvImportConfig:
320324
return csv_config
321325

322326

327+
_TIME_COLUMN_NAMES: frozenset[str] = frozenset({"ts", "timestamp", "time"})
328+
_TIME_COLUMN_TYPES: frozenset[ChannelDataType] = frozenset(
329+
{ChannelDataType.INT_64, ChannelDataType.UINT_64}
330+
)
331+
332+
333+
def _infer_time_column(
334+
columns: Iterable[tuple[str, ChannelDataType, str]],
335+
) -> str | None:
336+
"""Pick a likely time column when the server couldn't identify one.
337+
338+
Returns the path of an INT64 or UINT64 column whose name
339+
(case-insensitive) matches one of ``ts``, ``timestamp``, or ``time``.
340+
Returns None otherwise.
341+
"""
342+
data_columns = sorted(columns, key=lambda c: c[0].lower())
343+
for name, data_type, path in data_columns:
344+
if data_type in _TIME_COLUMN_TYPES and name.lower() in _TIME_COLUMN_NAMES:
345+
return path
346+
return None
347+
348+
323349
def _parse_parquet_detect_response(
324350
proto, filename: str, footer_offset: int, footer_length: int
325351
) -> ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig:
@@ -328,16 +354,30 @@ def _parse_parquet_detect_response(
328354
parquet_config = ParquetFlatDatasetImportConfig._from_proto(
329355
proto, footer_offset=footer_offset, footer_length=footer_length
330356
)
331-
time_path = parquet_config.time_column.path
357+
time_path: str | None = parquet_config.time_column.path
358+
if not time_path:
359+
time_path = _infer_time_column(
360+
(dc.name, dc.data_type, dc.path) for dc in parquet_config.data_columns
361+
)
362+
if time_path:
363+
parquet_config.time_column = ParquetTimeColumn(path=time_path)
332364
if time_path:
333365
parquet_config.data_columns = [
334366
dc for dc in parquet_config.data_columns if dc.path != time_path
335367
]
336368
return parquet_config
337369
elif proto.HasField("single_channel_per_row"):
338-
return ParquetSingleChannelPerRowImportConfig._from_proto(
370+
scpr_config = ParquetSingleChannelPerRowImportConfig._from_proto(
339371
proto, footer_offset=footer_offset, footer_length=footer_length
340372
)
373+
if not scpr_config.time_column.path:
374+
time_path = _infer_time_column(
375+
(col.column_config.name, ChannelDataType(col.column_config.data_type), col.path)
376+
for col in proto.single_channel_per_row.columns
377+
)
378+
if time_path is not None:
379+
scpr_config.time_column = ParquetTimeColumn(path=time_path)
380+
return scpr_config
341381
raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.")
342382

343383

0 commit comments

Comments
 (0)