-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathchannels.py
More file actions
127 lines (103 loc) · 3.9 KB
/
channels.py
File metadata and controls
127 lines (103 loc) · 3.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
from __future__ import annotations
import logging
from typing import Any, cast
from sift.channels.v3.channels_pb2 import (
GetChannelRequest,
GetChannelResponse,
ListChannelsRequest,
ListChannelsResponse,
)
from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.transport import WithGrpcClient
from sift_client.transport.grpc_transport import GrpcClient
from sift_client.types.channel import Channel
# Configure logging
logger = logging.getLogger(__name__)
CHANNELS_DEFAULT_PAGE_SIZE = 10_000
class ChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""
Low-level client for the ChannelsAPI.
This class provides a thin wrapper around the autogenerated bindings for the ChannelsAPI.
"""
def __init__(self, grpc_client: GrpcClient):
"""
Initialize the ChannelsLowLevelClient.
Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)
async def get_channel(self, channel_id: str) -> Channel:
"""
Get a channel by channel_id.
Args:
channel_id: The channel ID to get.
Returns:
The Channel.
Raises:
ValueError: If channel_id is not provided.
"""
request = GetChannelRequest(channel_id=channel_id)
response = await self._grpc_client.get_stub(ChannelServiceStub).GetChannel(request)
grpc_channel = cast(GetChannelResponse, response).channel
channel = Channel._from_proto(grpc_channel)
return channel
async def list_channels(
self,
*,
page_size: int | None = None,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
) -> tuple[list[Channel], str]:
"""
List channels with optional filtering and pagination.
Args:
page_size: The maximum number of channels to return.
page_token: A page token for pagination.
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.
Returns:
A tuple of (channels, next_page_token).
"""
request_kwargs: dict[str, Any] = {}
if query_filter:
request_kwargs["filter"] = query_filter
if order_by:
request_kwargs["order_by"] = order_by
if page_size:
request_kwargs["page_size"] = page_size
if page_token:
request_kwargs["page_token"] = page_token
request = ListChannelsRequest(**request_kwargs)
response = await self._grpc_client.get_stub(ChannelServiceStub).ListChannels(request)
response = cast(ListChannelsResponse, response)
channels = [Channel._from_proto(channel) for channel in response.channels]
return channels, response.next_page_token
async def list_all_channels(
self,
*,
query_filter: str | None = None,
order_by: str | None = None,
max_results: int | None = None,
) -> list[Channel]:
"""
List all channels with optional filtering.
Args:
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.
max_results: Maximum number of results to return.
Returns:
A list of all matching channels.
"""
# Channels default page size is 10,000 so lower it if we're passing max_results
page_size = None
if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE:
page_size = max_results
return await self._handle_pagination(
self.list_channels,
kwargs={"query_filter": query_filter},
page_size=page_size,
order_by=order_by,
max_results=max_results,
)