-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathchannel.py
More file actions
50 lines (42 loc) · 1.58 KB
/
channel.py
File metadata and controls
50 lines (42 loc) · 1.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import functools
import warnings
from typing import List, Optional, cast
from sift.channels.v3.channels_pb2 import Channel as ChannelPb
from sift.channels.v3.channels_pb2 import ListChannelsRequest, ListChannelsResponse
from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
def channel_fqn(name: str, component: Optional[str] = None) -> str:
if component is not None:
warnings.warn(
"`component` is deprecated. This function should only be used for compatibility with legacy code.",
DeprecationWarning, # Warning ignored by default
)
return name if component is None or len(component) == 0 else f"{component}.{name}"
@functools.cache
def get_channels(
channel_service: ChannelServiceStub,
filter: str,
page_size: int = 1_000,
page_token: str = "",
) -> List[ChannelPb]:
"""
Queries all channels with the given filter. Filter must be a CEL expression.
"""
channels_pb: List[ChannelPb] = []
req = ListChannelsRequest(
filter=filter,
page_size=page_size,
page_token=page_token,
)
res = cast(ListChannelsResponse, channel_service.ListChannels(req))
channels_pb.extend(res.channels)
next_page_token = res.next_page_token
while len(next_page_token) > 0:
req = ListChannelsRequest(
filter=filter,
page_size=page_size,
page_token=page_token,
)
res = cast(ListChannelsResponse, channel_service.ListChannels(req))
channels_pb.extend(res.channels)
next_page_token = res.next_page_token
return channels_pb