-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathshared.py
More file actions
57 lines (47 loc) · 1.67 KB
/
Copy pathshared.py
File metadata and controls
57 lines (47 loc) · 1.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from typing import List, Optional, Tuple, Union, cast
from sift.assets.v1.assets_pb2 import Asset, ListAssetsRequest, ListAssetsResponse
from sift.assets.v1.assets_pb2_grpc import AssetServiceStub
from sift_py._internal.cel import cel_in
def list_assets_impl(
_asset_service_stub: AssetServiceStub,
names: Optional[Union[Tuple[str], List[str]]] = None,
ids: Optional[Union[Tuple[str], List[str]]] = None,
) -> List[Asset]:
"""
Lists assets in an organization.
Args:
_asset_service_stub: The asset service stub to use.
names: Optional collection of names to filter by.
ids: Optional collection of IDs to filter by.
Returns:
A list of assets matching the criteria.
"""
def get_assets_with_filter(
_asset_service_stub: AssetServiceStub, cel_filter: str
) -> List[Asset]:
assets: List[Asset] = []
next_page_token = ""
while True:
req = ListAssetsRequest(
filter=cel_filter,
page_size=1_000,
page_token=next_page_token,
)
res = cast(ListAssetsResponse, _asset_service_stub.ListAssets(req))
assets.extend(res.assets)
if not res.next_page_token:
break
next_page_token = res.next_page_token
return assets
if names is None:
names = []
if ids is None:
ids = []
if names:
names_cel = cel_in("name", names)
return get_assets_with_filter(_asset_service_stub, names_cel)
elif ids:
ids_cel = cel_in("asset_id", ids)
return get_assets_with_filter(_asset_service_stub, ids_cel)
else:
return []