-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathchannels.py
More file actions
139 lines (112 loc) · 4.58 KB
/
Copy pathchannels.py
File metadata and controls
139 lines (112 loc) · 4.58 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Any, cast
from sift.channels.v3.channels_pb2 import (
BatchArchiveChannelsRequest,
BatchUnarchiveChannelsRequest,
GetChannelRequest,
GetChannelResponse,
ListChannelsRequest,
ListChannelsResponse,
)
from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
from sift_client.sift_types.channel import Channel
from sift_client.transport import WithGrpcClient
if TYPE_CHECKING:
from sift_client.transport.grpc_transport import GrpcClient
# Configure logging
logger = logging.getLogger(__name__)
CHANNELS_DEFAULT_PAGE_SIZE = 10_000
class ChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient):
"""Low-level client for the ChannelsAPI.
This class provides a thin wrapper around the autogenerated bindings for the ChannelsAPI.
"""
def __init__(self, grpc_client: GrpcClient):
"""Initialize the ChannelsLowLevelClient.
Args:
grpc_client: The gRPC client to use for making API calls.
"""
super().__init__(grpc_client)
async def get_channel(self, channel_id: str) -> Channel:
"""Get a channel by channel_id.
Args:
channel_id: The channel ID to get.
Returns:
The Channel.
Raises:
ValueError: If channel_id is not provided.
"""
request = GetChannelRequest(channel_id=channel_id)
response = await self._grpc_client.get_stub(ChannelServiceStub).GetChannel(request)
grpc_channel = cast("GetChannelResponse", response).channel
channel = Channel._from_proto(grpc_channel)
return channel
async def list_channels(
self,
*,
page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE,
page_token: str | None = None,
query_filter: str | None = None,
order_by: str | None = None,
) -> tuple[list[Channel], str]:
"""List channels with optional filtering and pagination.
Args:
page_size: The maximum number of channels to return.
page_token: A page token for pagination.
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.
Returns:
A tuple of (channels, next_page_token).
"""
request_kwargs: dict[str, Any] = {}
if query_filter:
request_kwargs["filter"] = query_filter
if order_by:
request_kwargs["order_by"] = order_by
if page_size:
request_kwargs["page_size"] = page_size
if page_token:
request_kwargs["page_token"] = page_token
request = ListChannelsRequest(**request_kwargs)
response = await self._grpc_client.get_stub(ChannelServiceStub).ListChannels(request)
response = cast("ListChannelsResponse", response)
channels = [Channel._from_proto(channel) for channel in response.channels]
return channels, response.next_page_token
async def list_all_channels(
self,
*,
query_filter: str | None = None,
order_by: str | None = None,
page_size: int | None = CHANNELS_DEFAULT_PAGE_SIZE,
max_results: int | None = None,
) -> list[Channel]:
"""List all channels with optional filtering.
Args:
query_filter: A CEL filter string.
order_by: How to order the retrieved channels.
max_results: Maximum number of results to return.
Returns:
A list of all matching channels.
"""
return await self._handle_pagination(
self.list_channels,
kwargs={"query_filter": query_filter},
page_size=page_size,
order_by=order_by,
max_results=max_results,
)
async def batch_archive_channels(self, channel_ids: list[str]) -> None:
"""Batch archive channels by setting active to false.
Args:
channel_ids: The channel IDs to archive.
"""
request = BatchArchiveChannelsRequest(channel_ids=channel_ids)
await self._grpc_client.get_stub(ChannelServiceStub).BatchArchiveChannels(request)
async def batch_unarchive_channels(self, channel_ids: list[str]) -> None:
"""Batch unarchive channels by setting active to true.
Args:
channel_ids: The channel IDs to unarchive.
"""
request = BatchUnarchiveChannelsRequest(channel_ids=channel_ids)
await self._grpc_client.get_stub(ChannelServiceStub).BatchUnarchiveChannels(request)