Skip to content

Commit a26719b

Browse files
committed
python(feat): add data import api
1 parent 54b3545 commit a26719b

8 files changed

Lines changed: 884 additions & 0 deletions

File tree

Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import TYPE_CHECKING, cast
5+
6+
from sift.data_imports.v2.data_imports_pb2 import (
7+
CreateDataImportFromUploadRequest,
8+
CreateDataImportFromUploadResponse,
9+
CreateDataImportFromUrlRequest,
10+
CreateDataImportFromUrlResponse,
11+
DetectConfigRequest,
12+
DetectConfigResponse,
13+
GetDataImportRequest,
14+
GetDataImportResponse,
15+
ListDataImportsRequest,
16+
ListDataImportsResponse,
17+
RetryDataImportRequest,
18+
)
19+
from sift.data_imports.v2.data_imports_pb2_grpc import DataImportServiceStub
20+
21+
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
22+
from sift_client._internal.util.executor import run_sync_function
23+
from sift_client.sift_types.data_import import CsvImportConfig, DataImport
24+
from sift_client.transport import WithGrpcClient, WithRestClient
25+
26+
if TYPE_CHECKING:
27+
from pathlib import Path
28+
29+
from sift.data_imports.v2.data_imports_pb2 import DataTypeKey
30+
31+
from sift_client.transport.grpc_transport import GrpcClient
32+
from sift_client.transport.rest_transport import RestClient
33+
34+
# Union of all supported config types. Extend this as new formats are added.
35+
ImportConfig = CsvImportConfig
36+
37+
38+
def _set_config_on_request(
39+
request: CreateDataImportFromUploadRequest | CreateDataImportFromUrlRequest,
40+
config: ImportConfig,
41+
) -> None:
42+
"""Set the appropriate config field on a proto request based on the config type."""
43+
if isinstance(config, CsvImportConfig):
44+
request.csv_config.CopyFrom(config._to_proto())
45+
else:
46+
raise TypeError(f"Unsupported import config type: {type(config).__name__}")
47+
48+
49+
logger = logging.getLogger(__name__)
50+
51+
52+
class DataImportsLowLevelClient(LowLevelClientBase, WithGrpcClient, WithRestClient):
53+
"""Low-level client for the DataImportService.
54+
55+
This class provides a thin wrapper around the autogenerated bindings for the DataImportsAPI.
56+
"""
57+
58+
def __init__(self, grpc_client: GrpcClient, rest_client: RestClient):
59+
WithGrpcClient.__init__(self, grpc_client=grpc_client)
60+
WithRestClient.__init__(self, rest_client=rest_client)
61+
62+
async def create_from_upload(self, config: ImportConfig) -> tuple[str, str]:
63+
"""Create a data import and get back a presigned upload URL.
64+
65+
Args:
66+
config: The import configuration.
67+
68+
Returns:
69+
A tuple of (data_import_id, upload_url).
70+
"""
71+
request = CreateDataImportFromUploadRequest()
72+
_set_config_on_request(request, config)
73+
response = await self._grpc_client.get_stub(
74+
DataImportServiceStub
75+
).CreateDataImportFromUpload(request)
76+
response = cast("CreateDataImportFromUploadResponse", response)
77+
return response.data_import_id, response.upload_url
78+
79+
async def upload_file(self, upload_url: str, file_path: Path) -> None:
80+
"""Upload a file to a presigned URL.
81+
82+
Runs the synchronous HTTP POST in a thread pool to avoid blocking
83+
the event loop.
84+
85+
Args:
86+
upload_url: The presigned URL to upload to.
87+
file_path: Path to the file to upload.
88+
"""
89+
rest_client = self._rest_client
90+
91+
def _do_upload() -> None:
92+
with open(file_path, "rb") as f:
93+
response = rest_client.post(upload_url, data=f)
94+
response.raise_for_status()
95+
96+
await run_sync_function(_do_upload)
97+
98+
async def create_from_url(self, url: str, config: ImportConfig) -> str:
99+
"""Create a data import from a remote URL.
100+
101+
Args:
102+
url: The URL to import from (HTTP or S3).
103+
config: The import configuration.
104+
105+
Returns:
106+
The data_import_id.
107+
"""
108+
request = CreateDataImportFromUrlRequest(url=url)
109+
_set_config_on_request(request, config)
110+
response = await self._grpc_client.get_stub(DataImportServiceStub).CreateDataImportFromUrl(
111+
request
112+
)
113+
response = cast("CreateDataImportFromUrlResponse", response)
114+
return response.data_import_id
115+
116+
async def get(self, data_import_id: str) -> DataImport:
117+
"""Get a data import by ID.
118+
119+
Args:
120+
data_import_id: The ID of the data import.
121+
122+
Returns:
123+
The DataImport.
124+
"""
125+
request = GetDataImportRequest(data_import_id=data_import_id)
126+
response = await self._grpc_client.get_stub(DataImportServiceStub).GetDataImport(request)
127+
response = cast("GetDataImportResponse", response)
128+
return DataImport._from_proto(response.data_import)
129+
130+
async def list_(
131+
self,
132+
*,
133+
page_size: int | None = None,
134+
page_token: str | None = None,
135+
query_filter: str = "",
136+
order_by: str = "",
137+
) -> tuple[list[DataImport], str]:
138+
"""List data imports with optional filtering and pagination.
139+
140+
Args:
141+
page_size: Maximum number of results per page.
142+
page_token: Token for the next page of results.
143+
query_filter: CEL filter string.
144+
order_by: Ordering string (e.g. "created_date desc").
145+
146+
Returns:
147+
A tuple of (list of DataImports, next_page_token).
148+
"""
149+
request = ListDataImportsRequest(
150+
filter=query_filter,
151+
order_by=order_by,
152+
)
153+
if page_size is not None:
154+
request.page_size = page_size
155+
if page_token:
156+
request.page_token = page_token
157+
158+
response = await self._grpc_client.get_stub(DataImportServiceStub).ListDataImports(request)
159+
response = cast("ListDataImportsResponse", response)
160+
data_imports = [DataImport._from_proto(di) for di in response.data_imports]
161+
return data_imports, response.next_page_token
162+
163+
async def list_all(
164+
self,
165+
*,
166+
query_filter: str = "",
167+
order_by: str = "",
168+
max_results: int | None = None,
169+
) -> list[DataImport]:
170+
"""List all data imports, handling pagination automatically.
171+
172+
Args:
173+
query_filter: CEL filter string.
174+
order_by: Ordering string (e.g. "created_date desc").
175+
max_results: Maximum total results to return.
176+
177+
Returns:
178+
A list of all matching DataImports.
179+
"""
180+
return await self._handle_pagination(
181+
func=self.list_,
182+
kwargs={"query_filter": query_filter, "order_by": order_by},
183+
max_results=max_results,
184+
)
185+
186+
async def retry(self, data_import_id: str) -> None:
187+
"""Retry a failed data import.
188+
189+
Only works for URL-based imports in a failed state.
190+
191+
Args:
192+
data_import_id: The ID of the data import to retry.
193+
"""
194+
request = RetryDataImportRequest(data_import_id=data_import_id)
195+
await self._grpc_client.get_stub(DataImportServiceStub).RetryDataImport(request)
196+
197+
async def detect_config(
198+
self, data: bytes, data_type_key: DataTypeKey.ValueType
199+
) -> DetectConfigResponse:
200+
"""Call the DetectConfig RPC to auto-detect import configuration.
201+
202+
Args:
203+
data: A sample of the file content (e.g. the first 64 KiB).
204+
data_type_key: The file type hint.
205+
206+
Returns:
207+
The raw DetectConfigResponse proto. The caller (resource API)
208+
is responsible for converting to a sift_type.
209+
"""
210+
request = DetectConfigRequest(data=data, type=data_type_key)
211+
response = await self._grpc_client.get_stub(DataImportServiceStub).DetectConfig(request)
212+
return cast("DetectConfigResponse", response)

python/lib/sift_client/client.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@
99
ChannelsAPIAsync,
1010
DataExportAPI,
1111
DataExportAPIAsync,
12+
DataImportAPI,
13+
DataImportAPIAsync,
1214
FileAttachmentsAPI,
1315
FileAttachmentsAPIAsync,
1416
IngestionAPIAsync,
@@ -110,6 +112,9 @@ class SiftClient(
110112
data_export: DataExportAPI
111113
"""Instance of the Data Export API for making synchronous requests."""
112114

115+
data_import: DataImportAPI
116+
"""Instance of the Data Import API for making synchronous requests."""
117+
113118
async_: AsyncAPIs
114119
"""Accessor for the asynchronous APIs. All asynchronous APIs are available as attributes on this accessor."""
115120

@@ -159,6 +164,7 @@ def __init__(
159164
self.tags = TagsAPI(self)
160165
self.test_results = TestResultsAPI(self)
161166
self.data_export = DataExportAPI(self)
167+
self.data_import = DataImportAPI(self)
162168

163169
# Accessor for the asynchronous APIs
164170
self.async_ = AsyncAPIs(
@@ -175,6 +181,7 @@ def __init__(
175181
tags=TagsAPIAsync(self),
176182
test_results=TestResultsAPIAsync(self),
177183
data_export=DataExportAPIAsync(self),
184+
data_import=DataImportAPIAsync(self),
178185
)
179186

180187
@property

python/lib/sift_client/resources/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,7 @@ async def main():
162162
from sift_client.resources.runs import RunsAPIAsync
163163
from sift_client.resources.tags import TagsAPIAsync
164164
from sift_client.resources.test_results import TestResultsAPIAsync
165+
from sift_client.resources.data_imports import DataImportAPIAsync
165166
from sift_client.resources.exports import DataExportAPIAsync
166167

167168
# ruff: noqa All imports needs to be imported before sync_stubs to avoid circular import
@@ -178,6 +179,7 @@ async def main():
178179
TestResultsAPI,
179180
FileAttachmentsAPI,
180181
DataExportAPI,
182+
DataImportAPI,
181183
)
182184

183185
import sys
@@ -215,4 +217,6 @@ async def main():
215217
"TracingConfig",
216218
"DataExportAPI",
217219
"DataExportAPIAsync",
220+
"DataImportAPI",
221+
"DataImportAPIAsync",
218222
]

0 commit comments

Comments
 (0)