Skip to content

Commit 3019ff9

Browse files
committed
wip
1 parent fddf5e5 commit 3019ff9

2 files changed

Lines changed: 104 additions & 46 deletions

File tree

  • python/lib

python/lib/sift_client/_internal/low_level_wrappers/runs.py

Lines changed: 104 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,19 @@ def __init__(self, grpc_client: GrpcClient):
4141
"""
4242
super().__init__(grpc_client)
4343

44-
async def get_run(self, run_id: str) -> Run:
44+
async def get_run(
45+
self,
46+
run_id: str,
47+
*,
48+
use_cache: bool = True,
49+
force_refresh: bool = False,
50+
ttl: int | None = None,
51+
) -> Run:
4552
"""Get a run by run_id.
4653
4754
Args:
4855
run_id: The run ID to get.
56+
metadata: Optional gRPC metadata including cache control.
4957
5058
Returns:
5159
The Run.
@@ -54,7 +62,14 @@ async def get_run(self, run_id: str) -> Run:
5462
ValueError: If run_id is not provided.
5563
"""
5664
request = GetRunRequest(run_id=run_id)
57-
response = await self._grpc_client.get_stub(RunServiceStub).GetRun(request)
65+
stub = self._grpc_client.get_stub(RunServiceStub)
66+
response = await self._call_with_cache(
67+
stub.GetRun,
68+
request,
69+
use_cache=use_cache,
70+
force_refresh=force_refresh,
71+
ttl=ttl,
72+
)
5873
grpc_run = cast("GetRunResponse", response).run
5974
return Run._from_proto(grpc_run)
6075

@@ -65,6 +80,9 @@ async def list_runs(
6580
page_token: str | None = None,
6681
query_filter: str | None = None,
6782
order_by: str | None = None,
83+
use_cache: bool = True,
84+
force_refresh: bool = False,
85+
ttl: int | None = None,
6886
) -> tuple[list[Run], str]:
6987
"""List runs with optional filtering and pagination.
7088
@@ -88,7 +106,14 @@ async def list_runs(
88106
request_kwargs["order_by"] = order_by
89107

90108
request = ListRunsRequest(**request_kwargs)
91-
response = await self._grpc_client.get_stub(RunServiceStub).ListRuns(request)
109+
stub = self._grpc_client.get_stub(RunServiceStub)
110+
response = await self._call_with_cache(
111+
stub.ListRuns,
112+
request,
113+
use_cache=use_cache,
114+
force_refresh=force_refresh,
115+
ttl=ttl,
116+
)
92117
response = cast("ListRunsResponse", response)
93118

94119
runs = [Run._from_proto(run) for run in response.runs]
@@ -100,6 +125,9 @@ async def list_all_runs(
100125
query_filter: str | None = None,
101126
order_by: str | None = None,
102127
max_results: int | None = None,
128+
use_cache: bool = True,
129+
force_refresh: bool = False,
130+
ttl: int | None = None,
103131
) -> list[Run]:
104132
"""List all runs with optional filtering.
105133
@@ -112,26 +140,68 @@ async def list_all_runs(
112140
A list of all matching runs.
113141
"""
114142
return await self._handle_pagination(
115-
self.list_runs,
116-
kwargs={"query_filter": query_filter},
143+
lambda **k: self.list_runs(
144+
**k,
145+
query_filter=query_filter,
146+
order_by=order_by,
147+
use_cache=use_cache,
148+
force_refresh=force_refresh,
149+
ttl=ttl,
150+
),
151+
kwargs={},
117152
order_by=order_by,
118153
max_results=max_results,
119154
)
120155

121-
async def create_run(self, *, create: RunCreate) -> Run:
156+
async def create_run(
157+
self,
158+
*,
159+
create: RunCreate,
160+
use_cache: bool = False, # Default to False for write operations
161+
force_refresh: bool = False,
162+
ttl: int | None = None,
163+
) -> Run:
122164
request_proto = create.to_proto()
123-
response = await self._grpc_client.get_stub(RunServiceStub).CreateRun(request_proto)
165+
stub = self._grpc_client.get_stub(RunServiceStub)
166+
response = await self._call_with_cache(
167+
stub.CreateRun,
168+
request_proto,
169+
use_cache=use_cache,
170+
force_refresh=force_refresh,
171+
ttl=ttl,
172+
)
124173
grpc_run = cast("CreateRunResponse", response).run
125174
return Run._from_proto(grpc_run)
126175

127-
async def update_run(self, update: RunUpdate) -> Run:
176+
async def update_run(
177+
self,
178+
update: RunUpdate,
179+
*,
180+
use_cache: bool = False, # Default to False for write operations
181+
force_refresh: bool = False,
182+
ttl: int | None = None,
183+
) -> Run:
128184
grpc_run, update_mask = update.to_proto_with_mask()
129185
request = UpdateRunRequest(run=grpc_run, update_mask=update_mask)
130-
response = await self._grpc_client.get_stub(RunServiceStub).UpdateRun(request)
186+
stub = self._grpc_client.get_stub(RunServiceStub)
187+
response = await self._call_with_cache(
188+
stub.UpdateRun,
189+
request,
190+
use_cache=use_cache,
191+
force_refresh=force_refresh,
192+
ttl=ttl,
193+
)
131194
updated_grpc_run = cast("UpdateRunResponse", response).run
132195
return Run._from_proto(updated_grpc_run)
133196

134-
async def stop_run(self, run_id: str) -> None:
197+
async def stop_run(
198+
self,
199+
run_id: str,
200+
*,
201+
use_cache: bool = False, # Default to False for write operations
202+
force_refresh: bool = False,
203+
ttl: int | None = None,
204+
) -> None:
135205
"""Stop a run by setting its stop time to the current time.
136206
137207
Args:
@@ -144,10 +214,23 @@ async def stop_run(self, run_id: str) -> None:
144214
raise ValueError("run_id must be provided")
145215

146216
request = StopRunRequest(run_id=run_id)
147-
await self._grpc_client.get_stub(RunServiceStub).StopRun(request)
217+
stub = self._grpc_client.get_stub(RunServiceStub)
218+
await self._call_with_cache(
219+
stub.StopRun,
220+
request,
221+
use_cache=use_cache,
222+
force_refresh=force_refresh,
223+
ttl=ttl,
224+
)
148225

149226
async def create_automatic_run_association_for_assets(
150-
self, run_id: str, asset_names: list[str]
227+
self,
228+
run_id: str,
229+
asset_names: list[str],
230+
*,
231+
use_cache: bool = False, # Default to False for write operations
232+
force_refresh: bool = False,
233+
ttl: int | None = None,
151234
) -> None:
152235
"""Associate assets with a run for automatic data ingestion.
153236
@@ -164,8 +247,14 @@ async def create_automatic_run_association_for_assets(
164247
raise ValueError("asset_names must be provided")
165248

166249
request = CreateAutomaticRunAssociationForAssetsRequest(
167-
run_id=run_id, asset_names=asset_names
250+
run_id=run_id,
251+
asset_names=asset_names
168252
)
169-
await self._grpc_client.get_stub(RunServiceStub).CreateAutomaticRunAssociationForAssets(
170-
request
253+
stub = self._grpc_client.get_stub(RunServiceStub)
254+
await self._call_with_cache(
255+
stub.CreateAutomaticRunAssociationForAssets,
256+
request,
257+
use_cache=use_cache,
258+
force_refresh=force_refresh,
259+
ttl=ttl,
171260
)

python/lib/sift_py/grpc/cache.py

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -200,34 +200,3 @@ def with_force_refresh(ttl: int | None = None) -> tuple[tuple[str, str], ...]:
200200
if ttl is not None:
201201
metadata.append((METADATA_CACHE_TTL, str(ttl)))
202202
return tuple(metadata)
203-
204-
205-
def ignore_cache() -> tuple[tuple[str, str], ...]:
206-
"""Ignore the cache for a gRPC request without clearing it.
207-
208-
Bypasses the cache for this request but doesn't invalidate the cached entry.
209-
The response from this request will not be cached.
210-
211-
Returns:
212-
Metadata tuple to pass to the gRPC stub method.
213-
214-
Example:
215-
metadata = ignore_cache()
216-
response = stub.GetData(request, metadata=metadata)
217-
"""
218-
return tuple()
219-
220-
221-
def without_cache() -> tuple[tuple[str, str], ...]:
222-
"""Explicitly disable caching for a gRPC request.
223-
224-
This is the default behavior when no cache metadata is provided.
225-
226-
Returns:
227-
Empty metadata tuple.
228-
229-
Example:
230-
metadata = without_cache()
231-
response = stub.GetData(request, metadata=metadata)
232-
"""
233-
return tuple()

0 commit comments

Comments
 (0)