Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
52 commits
Select commit Hold shift + click to select a range
11838ae
python(feat): add data import api
wei-qlu Mar 26, 2026
7224b79
add detect config data types
wei-qlu Mar 27, 2026
d27b070
added relative time validation, refactored the import process
wei-qlu Apr 1, 2026
77dbf86
added progress spinner for polling
wei-qlu Apr 1, 2026
bd5e9f8
missing run defaults to filename
wei-qlu Apr 1, 2026
41f9c08
added relative time format validation in the model
wei-qlu Apr 2, 2026
e76d2d2
updated post request to include file name for downstream file attachment
wei-qlu Apr 2, 2026
4cb5ebd
added parquet data type, refactor to use util
wei-qlu Apr 2, 2026
01f5831
remove upload_from_url
wei-qlu Apr 3, 2026
269a8b5
converted imports to using jobs
wei-qlu Apr 3, 2026
54014ea
fix sync/async behavior when polling directly
wei-qlu Apr 6, 2026
fe50604
add parquet import support
wei-qlu Apr 7, 2026
f0c186b
unfrozen config model refactor
wei-qlu Apr 7, 2026
593f3dc
mypy fix
wei-qlu Apr 7, 2026
8725f59
small refactor moving configs to sift_types
wei-qlu Apr 7, 2026
fd925eb
add a helper function to get a specific data_column
wei-qlu Apr 7, 2026
85e87be
add unit tests
wei-qlu Apr 7, 2026
d669cf2
added client side validation for detect_config
wei-qlu Apr 7, 2026
bac1b52
add validation tests
wei-qlu Apr 7, 2026
09f48e9
update asset_name handlign
wei-qlu Apr 7, 2026
3b1e28f
update sync stubs
wei-qlu Apr 8, 2026
1ae1e95
add ch10, hdf5, and tdms configs
wei-qlu Apr 8, 2026
3ee74a4
mypy fix
wei-qlu Apr 8, 2026
ffdb06f
additional file format tests
wei-qlu Apr 8, 2026
b0559b0
added documentation for csv json metadata
wei-qlu Apr 8, 2026
d66d8e0
updated docs and split import and polling
wei-qlu Apr 8, 2026
e4ae07d
add upload_file polling and refactor global show_progress to a util
wei-qlu Apr 8, 2026
107eaa2
error handling from missing job_id from upload
wei-qlu Apr 8, 2026
9fe594c
refactor to use run/asset objects
wei-qlu Apr 8, 2026
d3377d1
refactor file format configs into private helpers, updated error message
wei-qlu Apr 8, 2026
07007f9
refactored to apply inheritance on time and config classes
wei-qlu Apr 9, 2026
fca8331
updated documentation around detect_config
wei-qlu Apr 9, 2026
deb3590
updated unit tests
wei-qlu Apr 9, 2026
7aa2da4
updated documentation regarding json metadata
wei-qlu Apr 9, 2026
f59336d
updated get_column to getitem
wei-qlu Apr 9, 2026
0328d27
updated detect_config error messages
wei-qlu Apr 9, 2026
930c556
move data column validation from detect_config to _to_proto
wei-qlu Apr 9, 2026
c610c88
updated error types to be more accurate
wei-qlu Apr 9, 2026
66d567f
add import_data method to Run for importing files into existing runs
wei-qlu Apr 9, 2026
462a36d
add get_run to data import API and get_import_run to Job for resolvin…
wei-qlu Apr 9, 2026
f049daa
add model validation for parquet single/multi channel
wei-qlu Apr 9, 2026
8ff5fde
refactor parquet timecolumn detection
wei-qlu Apr 10, 2026
6771278
mypy fix
wei-qlu Apr 10, 2026
261e091
simplify _resolve_data_type_key logic
wei-qlu Apr 11, 2026
59af2a8
autofill the run's asset during import
wei-qlu Apr 11, 2026
f999e97
updated docstrings and fixed run import to infer asset object
wei-qlu Apr 11, 2026
e074a4c
refactor show_progress helper to the base class
wei-qlu Apr 11, 2026
78703e5
add client binding test for data imports
wei-qlu Apr 11, 2026
d736d1e
add a base class for data columns shared by csv, parquet, and hdf5
wei-qlu Apr 11, 2026
3cc3607
update the upload_file progress bar to be more detailed
wei-qlu Apr 11, 2026
8b65390
removed ch10 references, no more support
wei-qlu Apr 13, 2026
542b5fa
sync stubs update
wei-qlu Apr 13, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
104 changes: 104 additions & 0 deletions python/lib/sift_client/_internal/low_level_wrappers/data_imports.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
from __future__ import annotations

from typing import TYPE_CHECKING, cast

from sift.data_imports.v2.data_imports_pb2 import (
CreateDataImportFromUploadRequest,
CreateDataImportFromUploadResponse,
DetectConfigRequest,
DetectConfigResponse,
GetDataImportRequest,
GetDataImportResponse,
)
from sift.data_imports.v2.data_imports_pb2_grpc import DataImportServiceStub

from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.sift_types.data_import import (
CsvImportConfig,
Hdf5ImportConfig,
ImportConfig,
ParquetFlatDatasetImportConfig,
ParquetSingleChannelPerRowImportConfig,
TdmsImportConfig,
)
from sift_client.transport import WithGrpcClient

if TYPE_CHECKING:
from sift.data_imports.v2.data_imports_pb2 import DataTypeKey

from sift_client.transport.grpc_transport import GrpcClient


def _set_config_on_request(
request: CreateDataImportFromUploadRequest,
config: ImportConfig,
) -> None:
"""Set the appropriate config field on a proto request based on the config type."""
if isinstance(config, CsvImportConfig):
request.csv_config.CopyFrom(config._to_proto())
elif isinstance(
config, (ParquetFlatDatasetImportConfig, ParquetSingleChannelPerRowImportConfig)
):
request.parquet_config.CopyFrom(config._to_proto())
elif isinstance(config, TdmsImportConfig):
request.tdms_config.CopyFrom(config._to_proto())
elif isinstance(config, Hdf5ImportConfig):
request.hdf5_config.CopyFrom(config._to_proto())
else:
raise TypeError(f"Unsupported import config type: {type(config).__name__}")


class DataImportsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the DataImportService.

This class provides a thin wrapper around the autogenerated bindings for the DataImportsAPI.
"""

def __init__(self, grpc_client: GrpcClient):
WithGrpcClient.__init__(self, grpc_client=grpc_client)

async def create_from_upload(self, config: ImportConfig) -> tuple[str, str]:
"""Create a data import and get back a presigned upload URL.

Args:
config: The import configuration.

Returns:
A tuple of (data_import_id, upload_url).
"""
request = CreateDataImportFromUploadRequest()
_set_config_on_request(request, config)
response = await self._grpc_client.get_stub(
DataImportServiceStub
).CreateDataImportFromUpload(request)
response = cast("CreateDataImportFromUploadResponse", response)
return response.data_import_id, response.upload_url

async def get(self, data_import_id: str) -> GetDataImportResponse:
"""Get a data import by ID.

Args:
data_import_id: The ID of the data import.

Returns:
The GetDataImportResponse proto.
"""
request = GetDataImportRequest(data_import_id=data_import_id)
response = await self._grpc_client.get_stub(DataImportServiceStub).GetDataImport(request)
return cast("GetDataImportResponse", response)

async def detect_config(
self, data: bytes, data_type_key: DataTypeKey.ValueType
) -> DetectConfigResponse:
"""Call the DetectConfig RPC to auto-detect import configuration.

Args:
data: A sample of the file content (e.g. the first 64 KiB).
data_type_key: The file type hint.

Returns:
The raw DetectConfigResponse proto.
"""
request = DetectConfigRequest(data=data, type=data_type_key)
response = await self._grpc_client.get_stub(DataImportServiceStub).DetectConfig(request)
return cast("DetectConfigResponse", response)
99 changes: 99 additions & 0 deletions python/lib/sift_client/_internal/util/file.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from __future__ import annotations

import os
import struct
import warnings
import zipfile
from typing import TYPE_CHECKING

from alive_progress import alive_bar # type: ignore[import-untyped]

import sift_client as _sift_client_module
from sift_client.errors import SiftWarning

if TYPE_CHECKING:
Expand All @@ -14,6 +17,78 @@
from sift_client.transport.rest_transport import RestClient


class _ProgressReader:
"""Wraps a file object to report read progress to an alive_bar callback."""

def __init__(self, file_object, progress_bar):
self._file_object = file_object
self._progress_bar = progress_bar

def read(self, size=-1):
chunk = self._file_object.read(size)
if chunk:
self._progress_bar(len(chunk))
return chunk

def __getattr__(self, name):
return getattr(self._file_object, name)


def resolve_show_progress(*, is_sync: bool) -> bool:
"""Resolve the show_progress setting from the global config.

Returns the global ``sift_client.config.show_progress`` value when set,
otherwise defaults to ``is_sync``.
"""
global_setting = _sift_client_module.config.show_progress
if global_setting is not None:
return global_setting
return is_sync


def upload_file(
signed_url: str,
file_path: Path,
*,
rest_client: RestClient,
show_progress: bool = False,
) -> dict:
"""Upload a file to a presigned URL.

Args:
signed_url: The presigned URL to upload to.
file_path: Path to the file to upload.
rest_client: The SDK rest client to use for the upload.
show_progress: If True, display a progress spinner during upload.

Returns:
The parsed JSON response from the server.

Raises:
ValueError: If the upload request fails.
"""
file_size = file_path.stat().st_size

with alive_bar(
file_size,
title=f"Upload [{file_path.name}]",
spinner="dots_waves",
spinner_length=7,
unit="B",
scale="SI",
disable=not show_progress,
) as bar:
with open(file_path, "rb") as file:
wrapped = _ProgressReader(file, bar)
response = rest_client.post(
signed_url,
Comment thread
alexluck-sift marked this conversation as resolved.
data=wrapped,
headers={"Content-Disposition": f'attachment; filename="{file_path.name}"'},
)
response.raise_for_status()
return response.json()


def download_file(
signed_url: str,
output_path: Path,
Expand Down Expand Up @@ -82,3 +157,27 @@ def extract_zip(zip_path: Path, output_dir: Path, *, delete_zip: bool = True) ->
except OSError:
warnings.warn(f"Failed to delete zip file '{zip_path}'", SiftWarning, stacklevel=2)
return [output_dir / name for name in names if not name.endswith("/")]


def extract_parquet_footer(path: Path) -> tuple[bytes, int]:
"""Extract the Parquet footer bytes and compute the footer offset.

Args:
path: Path to the Parquet file.

Returns:
A tuple of (footer_bytes, footer_offset).

Raises:
ValueError: If the file is not a valid Parquet file.
"""
with open(path, "rb") as f:
f.seek(-8, 2)
footer_tail = f.read(8)
footer_len = struct.unpack("<I", footer_tail[:4])[0]
magic = footer_tail[4:]
if magic != b"PAR1":
raise ValueError(f"Invalid Parquet file: missing magic bytes in {path}")
f.seek(-(footer_len + 8), 2)
footer_bytes = f.read(footer_len)
return footer_bytes, os.path.getsize(path) - len(footer_bytes) - 8
Loading
Loading