Skip to content

Commit 7fa55ed

Browse files
authored
python(feat): add data import api to sift_client (#515)
1 parent 9bcddda commit 7fa55ed

14 files changed

Lines changed: 1832 additions & 15 deletions

File tree

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
from __future__ import annotations
2+
3+
from typing import TYPE_CHECKING, cast
4+
5+
from sift.data_imports.v2.data_imports_pb2 import (
6+
CreateDataImportFromUploadRequest,
7+
CreateDataImportFromUploadResponse,
8+
DetectConfigRequest,
9+
DetectConfigResponse,
10+
GetDataImportRequest,
11+
GetDataImportResponse,
12+
)
13+
from sift.data_imports.v2.data_imports_pb2_grpc import DataImportServiceStub
14+
15+
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
16+
from sift_client.sift_types.data_import import (
17+
CsvImportConfig,
18+
Hdf5ImportConfig,
19+
ImportConfig,
20+
ParquetFlatDatasetImportConfig,
21+
ParquetSingleChannelPerRowImportConfig,
22+
TdmsImportConfig,
23+
)
24+
from sift_client.transport import WithGrpcClient
25+
26+
if TYPE_CHECKING:
27+
from sift.data_imports.v2.data_imports_pb2 import DataTypeKey
28+
29+
from sift_client.transport.grpc_transport import GrpcClient
30+
31+
32+
def _set_config_on_request(
33+
request: CreateDataImportFromUploadRequest,
34+
config: ImportConfig,
35+
) -> None:
36+
"""Set the appropriate config field on a proto request based on the config type."""
37+
if isinstance(config, CsvImportConfig):
38+
request.csv_config.CopyFrom(config._to_proto())
39+
elif isinstance(
40+
config, (ParquetFlatDatasetImportConfig, ParquetSingleChannelPerRowImportConfig)
41+
):
42+
request.parquet_config.CopyFrom(config._to_proto())
43+
elif isinstance(config, TdmsImportConfig):
44+
request.tdms_config.CopyFrom(config._to_proto())
45+
elif isinstance(config, Hdf5ImportConfig):
46+
request.hdf5_config.CopyFrom(config._to_proto())
47+
else:
48+
raise TypeError(f"Unsupported import config type: {type(config).__name__}")
49+
50+
51+
class DataImportsLowLevelClient(LowLevelClientBase, WithGrpcClient):
52+
"""Low-level client for the DataImportService.
53+
54+
This class provides a thin wrapper around the autogenerated bindings for the DataImportsAPI.
55+
"""
56+
57+
def __init__(self, grpc_client: GrpcClient):
58+
WithGrpcClient.__init__(self, grpc_client=grpc_client)
59+
60+
async def create_from_upload(self, config: ImportConfig) -> tuple[str, str]:
61+
"""Create a data import and get back a presigned upload URL.
62+
63+
Args:
64+
config: The import configuration.
65+
66+
Returns:
67+
A tuple of (data_import_id, upload_url).
68+
"""
69+
request = CreateDataImportFromUploadRequest()
70+
_set_config_on_request(request, config)
71+
response = await self._grpc_client.get_stub(
72+
DataImportServiceStub
73+
).CreateDataImportFromUpload(request)
74+
response = cast("CreateDataImportFromUploadResponse", response)
75+
return response.data_import_id, response.upload_url
76+
77+
async def get(self, data_import_id: str) -> GetDataImportResponse:
78+
"""Get a data import by ID.
79+
80+
Args:
81+
data_import_id: The ID of the data import.
82+
83+
Returns:
84+
The GetDataImportResponse proto.
85+
"""
86+
request = GetDataImportRequest(data_import_id=data_import_id)
87+
response = await self._grpc_client.get_stub(DataImportServiceStub).GetDataImport(request)
88+
return cast("GetDataImportResponse", response)
89+
90+
async def detect_config(
91+
self, data: bytes, data_type_key: DataTypeKey.ValueType
92+
) -> DetectConfigResponse:
93+
"""Call the DetectConfig RPC to auto-detect import configuration.
94+
95+
Args:
96+
data: A sample of the file content (e.g. the first 64 KiB).
97+
data_type_key: The file type hint.
98+
99+
Returns:
100+
The raw DetectConfigResponse proto.
101+
"""
102+
request = DetectConfigRequest(data=data, type=data_type_key)
103+
response = await self._grpc_client.get_stub(DataImportServiceStub).DetectConfig(request)
104+
return cast("DetectConfigResponse", response)

python/lib/sift_client/_internal/util/file.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,14 @@
11
from __future__ import annotations
22

3+
import os
4+
import struct
35
import warnings
46
import zipfile
57
from typing import TYPE_CHECKING
68

79
from alive_progress import alive_bar # type: ignore[import-untyped]
810

11+
import sift_client as _sift_client_module
912
from sift_client.errors import SiftWarning
1013

1114
if TYPE_CHECKING:
@@ -14,6 +17,78 @@
1417
from sift_client.transport.rest_transport import RestClient
1518

1619

20+
class _ProgressReader:
21+
"""Wraps a file object to report read progress to an alive_bar callback."""
22+
23+
def __init__(self, file_object, progress_bar):
24+
self._file_object = file_object
25+
self._progress_bar = progress_bar
26+
27+
def read(self, size=-1):
28+
chunk = self._file_object.read(size)
29+
if chunk:
30+
self._progress_bar(len(chunk))
31+
return chunk
32+
33+
def __getattr__(self, name):
34+
return getattr(self._file_object, name)
35+
36+
37+
def resolve_show_progress(*, is_sync: bool) -> bool:
38+
"""Resolve the show_progress setting from the global config.
39+
40+
Returns the global ``sift_client.config.show_progress`` value when set,
41+
otherwise defaults to ``is_sync``.
42+
"""
43+
global_setting = _sift_client_module.config.show_progress
44+
if global_setting is not None:
45+
return global_setting
46+
return is_sync
47+
48+
49+
def upload_file(
50+
signed_url: str,
51+
file_path: Path,
52+
*,
53+
rest_client: RestClient,
54+
show_progress: bool = False,
55+
) -> dict:
56+
"""Upload a file to a presigned URL.
57+
58+
Args:
59+
signed_url: The presigned URL to upload to.
60+
file_path: Path to the file to upload.
61+
rest_client: The SDK rest client to use for the upload.
62+
show_progress: If True, display a progress spinner during upload.
63+
64+
Returns:
65+
The parsed JSON response from the server.
66+
67+
Raises:
68+
ValueError: If the upload request fails.
69+
"""
70+
file_size = file_path.stat().st_size
71+
72+
with alive_bar(
73+
file_size,
74+
title=f"Upload [{file_path.name}]",
75+
spinner="dots_waves",
76+
spinner_length=7,
77+
unit="B",
78+
scale="SI",
79+
disable=not show_progress,
80+
) as bar:
81+
with open(file_path, "rb") as file:
82+
wrapped = _ProgressReader(file, bar)
83+
response = rest_client.post(
84+
signed_url,
85+
data=wrapped,
86+
headers={"Content-Disposition": f'attachment; filename="{file_path.name}"'},
87+
)
88+
response.raise_for_status()
89+
return response.json()
90+
91+
1792
def download_file(
1893
signed_url: str,
1994
output_path: Path,
@@ -82,3 +157,27 @@ def extract_zip(zip_path: Path, output_dir: Path, *, delete_zip: bool = True) ->
82157
except OSError:
83158
warnings.warn(f"Failed to delete zip file '{zip_path}'", SiftWarning, stacklevel=2)
84159
return [output_dir / name for name in names if not name.endswith("/")]
160+
161+
162+
def extract_parquet_footer(path: Path) -> tuple[bytes, int]:
163+
"""Extract the Parquet footer bytes and compute the footer offset.
164+
165+
Args:
166+
path: Path to the Parquet file.
167+
168+
Returns:
169+
A tuple of (footer_bytes, footer_offset).
170+
171+
Raises:
172+
ValueError: If the file is not a valid Parquet file.
173+
"""
174+
with open(path, "rb") as f:
175+
f.seek(-8, 2)
176+
footer_tail = f.read(8)
177+
footer_len = struct.unpack("<I", footer_tail[:4])[0]
178+
magic = footer_tail[4:]
179+
if magic != b"PAR1":
180+
raise ValueError(f"Invalid Parquet file: missing magic bytes in {path}")
181+
f.seek(-(footer_len + 8), 2)
182+
footer_bytes = f.read(footer_len)
183+
return footer_bytes, os.path.getsize(path) - len(footer_bytes) - 8

0 commit comments

Comments
 (0)