-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathruns.py
More file actions
225 lines (186 loc) · 7.52 KB
/
Copy pathruns.py
File metadata and controls
225 lines (186 loc) · 7.52 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, cast
from sift.runs.v2.runs_pb2 import (
CreateAdhocRunRequest,
CreateAdhocRunResponse,
CreateAutomaticRunAssociationForAssetsRequest,
CreateRunResponse,
GetRunRequest,
GetRunResponse,
ListRunsRequest,
ListRunsResponse,
StopRunRequest,
UpdateRunRequest,
UpdateRunResponse,
)
from sift.runs.v2.runs_pb2_grpc import RunServiceStub
from sift_client._internal.low_level_wrappers.base import DEFAULT_PAGE_SIZE, LowLevelClientBase
from sift_client._internal.util.timestamp import to_pb_timestamp
from sift_client.sift_types.run import Run, RunCreate, RunUpdate
from sift_client.transport import WithGrpcClient
if TYPE_CHECKING:
from datetime import datetime
from sift_client.transport.grpc_transport import GrpcClient
# Configure logging
logger = logging.getLogger(__name__)
class RunsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the RunsAPI.
This class provides a thin wrapper around the autogenerated bindings for the RunsAPI.
"""
def __init__(self, grpc_client: GrpcClient):
"""Initialize the RunsLowLevelClient.
Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)
async def get_run(self, run_id: str) -> Run:
"""Get a run by run_id.
Args:
run_id: The run ID to get.
Returns:
The Run.
Raises:
ValueError: If run_id is not provided.
"""
request = GetRunRequest(run_id=run_id)
response = await self._grpc_client.get_stub(RunServiceStub).GetRun(request)
grpc_run = cast("GetRunResponse", response).run
return Run._from_proto(grpc_run)
async def list_runs(
self,
*,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
) -> tuple[list[Run], str]:
"""List runs with optional filtering and pagination.
Args:
page_size: The maximum number of runs to return.
page_token: A page token for pagination.
query_filter: A CEL filter string.
order_by: How to order the retrieved runs.
Returns:
A tuple of (runs, next_page_token).
"""
request_kwargs: dict[str, Any] = {}
if page_size is not None:
request_kwargs["page_size"] = page_size
if page_token is not None:
request_kwargs["page_token"] = page_token
if query_filter is not None:
request_kwargs["filter"] = query_filter
if order_by is not None:
request_kwargs["order_by"] = order_by
request = ListRunsRequest(**request_kwargs)
response = await self._grpc_client.get_stub(RunServiceStub).ListRuns(request)
response = cast("ListRunsResponse", response)
runs = [Run._from_proto(run) for run in response.runs]
return runs, response.next_page_token
async def list_all_runs(
self,
*,
query_filter: str | None = None,
order_by: str | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[Run]:
"""List all runs with optional filtering.
Args:
query_filter: A CEL filter string.
order_by: How to order the retrieved runs.
max_results: Maximum number of results to return.
Returns:
A list of all matching runs.
"""
return await self._handle_pagination(
self.list_runs,
kwargs={"query_filter": query_filter},
order_by=order_by,
max_results=max_results,
page_size=page_size,
)
async def create_run(self, *, create: RunCreate) -> Run:
request_proto = create.to_proto()
response = await self._grpc_client.get_stub(RunServiceStub).CreateRun(request_proto)
grpc_run = cast("CreateRunResponse", response).run
return Run._from_proto(grpc_run)
async def update_run(self, update: RunUpdate) -> Run:
grpc_run, update_mask = update.to_proto_with_mask()
request = UpdateRunRequest(run=grpc_run, update_mask=update_mask)
response = await self._grpc_client.get_stub(RunServiceStub).UpdateRun(request)
updated_grpc_run = cast("UpdateRunResponse", response).run
return Run._from_proto(updated_grpc_run)
async def stop_run(self, run_id: str) -> None:
"""Stop a run by setting its stop time to the current time.
Args:
run_id: The ID of the run to stop.
Raises:
ValueError: If run_id is not provided.
"""
if not run_id:
raise ValueError("run_id must be provided")
request = StopRunRequest(run_id=run_id)
await self._grpc_client.get_stub(RunServiceStub).StopRun(request)
async def create_automatic_run_association_for_assets(
self, run_id: str, asset_names: list[str]
) -> None:
"""Associate assets with a run for automatic data ingestion.
Args:
run_id: The ID of the run.
asset_names: List of asset names to associate.
Raises:
ValueError: If run_id or asset_names are not provided.
"""
if not run_id:
raise ValueError("run_id must be provided")
if not asset_names:
raise ValueError("asset_names must be provided")
request = CreateAutomaticRunAssociationForAssetsRequest(
run_id=run_id, asset_names=asset_names
)
await self._grpc_client.get_stub(RunServiceStub).CreateAutomaticRunAssociationForAssets(
request
)
async def create_adhoc_run(
self,
*,
name: str,
description: str | None = None,
asset_ids: list[str],
start_time: datetime | None = None,
stop_time: datetime | None = None,
tag_names: list[str] | None = None,
metadata: dict[str, str | float | bool] | None = None,
client_key: str | None = None,
) -> Run:
"""Create an adhoc run.
Args:
name: The name of the run.
description: Optional description of the run.
asset_ids: List of asset IDs to associate with the run.
start_time: Optional start time of the run.
stop_time: Optional stop time of the run.
tag_names: Optional list of tag names to associate with the run.
metadata: Optional metadata to associate with the run.
client_key: Optional client key for the run.
Returns:
The created Run.
Raises:
ValueError: If name is not provided or if start_time/stop_time are invalid.
"""
from sift_client.util.metadata import metadata_dict_to_proto
request = CreateAdhocRunRequest(
name=name,
description=description or "",
start_time=to_pb_timestamp(start_time) if start_time else None,
stop_time=to_pb_timestamp(stop_time) if stop_time else None,
asset_ids=asset_ids,
tags=tag_names,
metadata=metadata_dict_to_proto(metadata) if metadata else None,
client_key=client_key,
)
response = await self._grpc_client.get_stub(RunServiceStub).CreateAdhocRun(request)
grpc_run = cast("CreateAdhocRunResponse", response).run
return Run._from_proto(grpc_run)