Skip to content

Commit 852c953

Browse files
authored
python(refactor): update parquet import example (#591)
1 parent b0da0cf commit 852c953

14 files changed

Lines changed: 152 additions & 31 deletions

File tree

python/examples/data_import/parquet/flat_dataset/.env-example

Lines changed: 0 additions & 3 deletions
This file was deleted.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
SIFT_GRPC_URI=
2+
SIFT_REST_URI=
3+
SIFT_API_KEY=
4+
ASSET_NAME=
Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,54 @@
1+
"""Import a Parquet (flat dataset) file into Sift."""
2+
13
import os
24

35
from dotenv import load_dotenv
4-
from sift_py.data_import.parquet import ParquetUploadService
5-
from sift_py.data_import.status import DataImportService
6-
from sift_py.data_import.time_format import TimeFormatType
7-
from sift_py.rest import SiftRestConfig
6+
from sift_client import SiftClient
7+
from sift_client.sift_types.data_import import DataTypeKey
88

99
if __name__ == "__main__":
10-
"""
11-
Example usage for uploading a Parquet (flat dataset).
12-
"""
1310
load_dotenv()
1411

15-
sift_uri = os.getenv("SIFT_API_URI")
16-
assert sift_uri, "expected 'SIFT_API_URI' environment variable to be set"
12+
grpc_uri = os.getenv("SIFT_GRPC_URI")
13+
assert grpc_uri, "expected 'SIFT_GRPC_URI' environment variable to be set"
14+
15+
rest_uri = os.getenv("SIFT_REST_URI")
16+
assert rest_uri, "expected 'SIFT_REST_URI' environment variable to be set"
1717

1818
apikey = os.getenv("SIFT_API_KEY")
1919
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set"
2020

2121
asset_name = os.getenv("ASSET_NAME")
2222
assert asset_name, "expected 'ASSET_NAME' environment variable to be set"
2323

24-
rest_config: SiftRestConfig = {
25-
"uri": sift_uri,
26-
"apikey": apikey,
27-
}
24+
client = SiftClient(api_key=apikey, grpc_url=grpc_uri, rest_url=rest_uri)
2825

29-
parquet_upload_service = ParquetUploadService(rest_config)
30-
31-
import_service: DataImportService = parquet_upload_service.flat_dataset_upload(
32-
asset_name=asset_name,
33-
run_name="Example Parquet Upload",
34-
path="sample_data.parquet",
35-
time_path="timestamp",
36-
time_format=TimeFormatType.ABSOLUTE_UNIX_NANOSECONDS,
26+
# Auto-detect the config and import the file. Parquet requires the layout
27+
# (data_type) to be specified since the extension alone is ambiguous.
28+
import_job = client.data_import.import_from_path(
29+
"sample_data.parquet",
30+
asset=asset_name,
31+
data_type=DataTypeKey.PARQUET_FLATDATASET,
3732
)
3833

39-
data_import = import_service.get_data_import()
40-
print(data_import.model_dump_json(indent=1))
41-
42-
print("Waiting for upload to complete...")
43-
import_service.wait_until_complete()
44-
print("Upload example complete!")
34+
import_job.wait_until_complete()
35+
36+
# If auto-detect doesn't quite match your file, inspect the config and patch
37+
# it before importing. Common fixes: change the time column, override a
38+
# column's data type, or drop a column that shouldn't be imported.
39+
#
40+
# config = client.data_import.detect_config(
41+
# "sample_data.parquet",
42+
# data_type=DataTypeKey.PARQUET_FLATDATASET,
43+
# )
44+
# print(config) # inspect what was auto-detected
45+
#
46+
# # Example: drop a column from the import
47+
# config.data_columns = [dc for dc in config.data_columns if dc.path != "channel_0"]
48+
#
49+
# import_job = client.data_import.import_from_path(
50+
# "sample_data.parquet",
51+
# asset=asset_name,
52+
# config=config,
53+
# )
54+
# import_job.wait_until_complete()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
python-dotenv
2+
sift-stack-py
Binary file not shown.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
SIFT_GRPC_URI=
2+
SIFT_REST_URI=
3+
SIFT_API_KEY=
4+
ASSET_NAME=
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
"""Import a Parquet file with multiple channels (one channel per row) into Sift."""
2+
3+
import os
4+
5+
from dotenv import load_dotenv
6+
from sift_client import SiftClient
7+
from sift_client.sift_types.data_import import (
8+
ParquetMultiChannelConfig,
9+
ParquetSingleChannelPerRowImportConfig,
10+
ParquetTimeColumn,
11+
)
12+
13+
if __name__ == "__main__":
14+
load_dotenv()
15+
16+
grpc_uri = os.getenv("SIFT_GRPC_URI")
17+
assert grpc_uri, "expected 'SIFT_GRPC_URI' environment variable to be set"
18+
19+
rest_uri = os.getenv("SIFT_REST_URI")
20+
assert rest_uri, "expected 'SIFT_REST_URI' environment variable to be set"
21+
22+
apikey = os.getenv("SIFT_API_KEY")
23+
assert apikey, "expected 'SIFT_API_KEY' environment variable to be set"
24+
25+
asset_name = os.getenv("ASSET_NAME")
26+
assert asset_name, "expected 'ASSET_NAME' environment variable to be set"
27+
28+
client = SiftClient(api_key=apikey, grpc_url=grpc_uri, rest_url=rest_uri)
29+
30+
# SCPR requires declaring the channel layout explicitly. Here each row
31+
# identifies its channel via a name column, so set multi_channel with the
32+
# name and value column paths.
33+
config = ParquetSingleChannelPerRowImportConfig(
34+
asset_name=asset_name,
35+
time_column=ParquetTimeColumn(path="timestamp"),
36+
multi_channel=ParquetMultiChannelConfig(
37+
name_path="channel",
38+
data_path="value",
39+
),
40+
)
41+
42+
import_job = client.data_import.import_from_path(
43+
"sample_data.parquet",
44+
config=config,
45+
)
46+
47+
import_job.wait_until_complete()
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
python-dotenv
2+
sift-stack-py
Binary file not shown.
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
SIFT_GRPC_URI=
2+
SIFT_REST_URI=
3+
SIFT_API_KEY=
4+
ASSET_NAME=

0 commit comments

Comments
 (0)