-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathassets.py
More file actions
105 lines (88 loc) · 3.83 KB
/
Copy pathassets.py
File metadata and controls
105 lines (88 loc) · 3.83 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
from __future__ import annotations
from typing import Any, cast
from sift.assets.v1.assets_pb2 import (
ArchiveAssetRequest,
ArchiveAssetResponse,
GetAssetRequest,
GetAssetResponse,
ListAssetsRequest,
ListAssetsResponse,
UpdateAssetRequest,
UpdateAssetResponse,
)
from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
from sift_client._internal.low_level_wrappers.base import (
DEFAULT_PAGE_SIZE,
LowLevelClientBase,
)
from sift_client.sift_types.asset import Asset, AssetUpdate
from sift_client.transport import GrpcClient, WithGrpcClient
class AssetsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the AssetsAPI.
This class provides a thin wrapper around the autogenerated bindings for the AssetsAPI.
"""
def __init__(self, grpc_client: GrpcClient):
"""Initialize the AssetsLowLevelClient.
Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)
async def get_asset(self, asset_id: str) -> Asset:
request = GetAssetRequest(asset_id=asset_id)
response = await self._grpc_client.get_stub(AssetServiceStub).GetAsset(request)
grpc_asset = cast("GetAssetResponse", response).asset
return Asset._from_proto(grpc_asset)
async def list_all_assets(
self,
query_filter: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
page_size: int | None = DEFAULT_PAGE_SIZE,
) -> list[Asset]:
"""List all results matching the given query.
Args:
query_filter: The CEL query filter.
order_by: The field to order by.
max_results: The maximum number of results to return.
page_size: The number of results to return per page
Returns:
A list of Assets matching the given query.
"""
return await self._handle_pagination(
self.list_assets,
kwargs={"query_filter": query_filter},
page_size=page_size,
order_by=order_by,
max_results=max_results,
)
async def list_assets(
self,
page_size: int | None = DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
) -> tuple[list[Asset], str]:
request_kwargs: dict[str, Any] = {}
if page_size is not None:
request_kwargs["page_size"] = page_size
if page_token is not None:
request_kwargs["page_token"] = page_token
if query_filter is not None:
request_kwargs["filter"] = query_filter
if order_by is not None:
request_kwargs["order_by"] = order_by
request = ListAssetsRequest(**request_kwargs)
response = await self._grpc_client.get_stub(AssetServiceStub).ListAssets(request)
response = cast("ListAssetsResponse", response)
return [Asset._from_proto(asset) for asset in response.assets], response.next_page_token
async def update_asset(self, update: AssetUpdate) -> Asset:
grpc_asset, update_mask = update.to_proto_with_mask()
request = UpdateAssetRequest(asset=grpc_asset, update_mask=update_mask)
response = await self._grpc_client.get_stub(AssetServiceStub).UpdateAsset(request)
updated_grpc_asset = cast("UpdateAssetResponse", response).asset
return Asset._from_proto(updated_grpc_asset)
async def archive_asset(self, asset_id: str, archive_runs: bool = False) -> list[str] | None:
request = ArchiveAssetRequest(asset_id=asset_id, archive_runs=archive_runs)
response = await self._grpc_client.get_stub(AssetServiceStub).ArchiveAsset(request)
response = cast("ArchiveAssetResponse", response)
return response.archived_runs