Skip to content

Commit 8e1c90f

Browse files
authored
python(feat): Add ParquetUploadService (#315)
1 parent 8a7f5f7 commit 8e1c90f

12 files changed

Lines changed: 931 additions & 13 deletions

File tree

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
SIFT_API_URI=""
2+
SIFT_API_KEY=""
3+
ASSET_NAME=""
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
import os
2+
3+
from dotenv import load_dotenv
4+
from sift_py.data_import.parquet import ParquetUploadService
5+
from sift_py.data_import.status import DataImportService
6+
from sift_py.data_import.time_format import TimeFormatType
7+
from sift_py.rest import SiftRestConfig
8+
9+
if __name__ == "__main__":
10+
"""
11+
Example usage for uploading a Parquet (flat dataset).
12+
"""
13+
load_dotenv()
14+
15+
sift_uri = os.getenv("SIFT_API_URI")
16+
assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set"
17+
18+
apikey = os.getenv("SIFT_API_KEY")
19+
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set"
20+
21+
asset_name = os.getenv("ASSET_NAME")
22+
assert asset_name, "expected 'ASSET_NAME' environment variable to be set"
23+
24+
rest_config: SiftRestConfig = {
25+
"uri": sift_uri,
26+
"apikey": apikey,
27+
}
28+
29+
parquet_upload_service = ParquetUploadService(rest_config)
30+
31+
import_service: DataImportService = parquet_upload_service.flat_dataset_upload(
32+
asset_name=asset_name,
33+
run_name="Example Parquet Upload",
34+
path="sample_data.parquet",
35+
time_path="timestamp",
36+
time_format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
37+
)
38+
39+
data_import = import_service.get_data_import()
40+
print(data_import.model_dump_json(indent=1))
41+
42+
print("Waiting for upload to complete...")
43+
import_service.wait_until_complete()
44+
print("Upload example complete!")
Binary file not shown.

python/lib/sift_py/data_import/_config.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from typing_extensions import Self
88

99
from sift_py._internal.channel import channel_fqn
10+
from sift_py.data_import.parquet_complex_types import ParquetComplexTypesImportModeType
1011
from sift_py.data_import.time_format import TimeFormatType
1112
from sift_py.error import _component_deprecation_warning
1213
from sift_py.ingestion.channel import ChannelBitFieldElement, ChannelDataType, ChannelEnumType
@@ -239,3 +240,69 @@ class Hdf5DataCfg(ConfigDataModel):
239240
time_column: int = 1
240241
value_dataset: str
241242
value_column: int = 1
243+
244+
245+
class ParquetTimeColumn(ConfigTimeModel):
246+
"""
247+
Defines a time column entry in the Parquet config.
248+
"""
249+
250+
path: str
251+
252+
253+
class ParquetDataColumn(ConfigBaseModel):
254+
"""
255+
Defines a data column entry in the Parquet config.
256+
"""
257+
258+
path: str
259+
channel_config: ConfigDataModel
260+
261+
262+
class ParquetFlatDatasetConfig(ConfigBaseModel):
263+
"""
264+
Defines the flat dataset config for Parquet files.
265+
"""
266+
267+
time_column: ParquetTimeColumn
268+
data_columns: List[ParquetDataColumn]
269+
270+
271+
class ParquetConfigImpl(ConfigBaseModel):
272+
"""
273+
Defines the Parquet config spec.
274+
"""
275+
276+
asset_name: str
277+
run_name: str = ""
278+
run_id: str = ""
279+
flat_dataset: Optional[ParquetFlatDatasetConfig] = None
280+
footer_offset: int
281+
footer_length: int
282+
complex_types_import_mode: Union[str, ParquetComplexTypesImportModeType]
283+
284+
@model_validator(mode="after")
285+
def validate_config(self) -> Self:
286+
if self.run_name and self.run_id:
287+
raise PydanticCustomError(
288+
"invalid_config_error", "Only specify run_name or run_id, not both."
289+
)
290+
return self
291+
292+
@field_validator("complex_types_import_mode", mode="before")
293+
@classmethod
294+
def convert_complex_types_import_mode(cls, raw: Optional[str]) -> Optional[str]:
295+
"""
296+
Converts the provided complex_types_import_mode value to a string.
297+
"""
298+
if raw is None:
299+
return None
300+
if isinstance(raw, ParquetComplexTypesImportModeType):
301+
return raw.as_human_str()
302+
elif isinstance(raw, str):
303+
value = ParquetComplexTypesImportModeType.from_str(raw)
304+
if value is not None:
305+
return value.as_human_str()
306+
raise PydanticCustomError(
307+
"invalid_config_error", f"Invalid complex_types_import_mode: {raw}."
308+
)

0 commit comments

Comments
 (0)