-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathchannels.py
More file actions
119 lines (96 loc) · 3.76 KB
/
Copy pathchannels.py
File metadata and controls
119 lines (96 loc) · 3.76 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, cast
from sift.channels.v3.channels_pb2 import (
GetChannelRequest,
GetChannelResponse,
ListChannelsRequest,
ListChannelsResponse,
)
from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.sift_types.channel import Channel
from sift_client.transport import WithGrpcClient
if TYPE_CHECKING:
from sift_client.transport.grpc_transport import GrpcClient
# Configure logging
logger = logging.getLogger(__name__)
CHANNELS_DEFAULT_PAGE_SIZE = 10_000
class ChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the ChannelsAPI.
This class provides a thin wrapper around the autogenerated bindings for the ChannelsAPI.
"""
def __init__(self, grpc_client: GrpcClient):
"""Initialize the ChannelsLowLevelClient.
Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)
async def get_channel(self, channel_id: str) -> Channel:
"""Get a channel by channel_id.
Args:
channel_id: The channel ID to get.
Returns:
The Channel.
Raises:
ValueError: If channel_id is not provided.
"""
request = GetChannelRequest(channel_id=channel_id)
response = await self._grpc_client.get_stub(ChannelServiceStub).GetChannel(request)
grpc_channel = cast("GetChannelResponse", response).channel
channel = Channel._from_proto(grpc_channel)
return channel
async def list_channels(
self,
*,
page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
) -> tuple[list[Channel], str]:
"""List channels with optional filtering and pagination.
Args:
page_size: The maximum number of channels to return.
page_token: A page token for pagination.
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.
Returns:
A tuple of (channels, next_page_token).
"""
request_kwargs: dict[str, Any] = {}
if query_filter:
request_kwargs["filter"] = query_filter
if order_by:
request_kwargs["order_by"] = order_by
if page_size:
request_kwargs["page_size"] = page_size
if page_token:
request_kwargs["page_token"] = page_token
request = ListChannelsRequest(**request_kwargs)
response = await self._grpc_client.get_stub(ChannelServiceStub).ListChannels(request)
response = cast("ListChannelsResponse", response)
channels = [Channel._from_proto(channel) for channel in response.channels]
return channels, response.next_page_token
async def list_all_channels(
self,
*,
query_filter: str | None = None,
order_by: str | None = None,
page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[Channel]:
"""List all channels with optional filtering.
Args:
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.
max_results: Maximum number of results to return.
Returns:
A list of all matching channels.
"""
return await self._handle_pagination(
self.list_channels,
kwargs={"query_filter": query_filter},
page_size=page_size,
order_by=order_by,
max_results=max_results,
)