-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathchannels.py
More file actions
278 lines (238 loc) · 10.6 KB
/
channels.py
File metadata and controls
278 lines (238 loc) · 10.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
from __future__ import annotations
from typing import TYPE_CHECKING
from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient
from sift_client.resources._base import ResourceBase
from sift_client.sift_types.asset import Asset
from sift_client.sift_types.run import Run
from sift_client.util import cel_utils as cel
if TYPE_CHECKING:
import re
from datetime import datetime
import pandas as pd
import pyarrow as pa
from sift_client.client import SiftClient
from sift_client.sift_types.channel import Channel
def _channel_ids_from_list(items: list[str | Channel]) -> list[str]:
"""Resolve a list of channel IDs or Channel objects to a list of channel IDs.
Args:
items: List of channel IDs (str) or Channel objects.
Returns:
List of channel ID strings.
Raises:
ValueError: If any Channel object has no id set.
"""
ids: list[str] = []
for item in items:
if isinstance(item, str):
ids.append(item)
else:
try:
ids.append(item._id_or_error)
except ValueError:
raise ValueError("One or more Channel objects have no id set.") from None
return ids
class ChannelsAPIAsync(ResourceBase):
"""High-level API for interacting with channels.
This class provides a Pythonic, notebook-friendly interface for interacting with the ChannelsAPI.
It handles automatic handling of gRPC services, seamless type conversion, and clear error handling.
All methods in this class use the Channel class from the low-level wrapper, which is a user-friendly
representation of a channel using standard Python data structures and types.
"""
def __init__(self, sift_client: SiftClient):
"""Initialize the ChannelsAPI.
Args:
sift_client: The Sift client to use.
"""
super().__init__(sift_client)
self._low_level_client = ChannelsLowLevelClient(grpc_client=self.client.grpc_client)
self._data_low_level_client = None
async def get(
self,
*,
channel_id: str,
) -> Channel:
"""Get a Channel.
Args:
channel_id: The ID of the channel.
Returns:
The Channel.
"""
channel = await self._low_level_client.get_channel(channel_id=channel_id)
return self._apply_client_to_instance(channel)
async def list_(
self,
*,
name: str | None = None,
names: list[str] | None = None,
name_contains: str | None = None,
name_regex: str | re.Pattern | None = None,
# self ids
channel_ids: list[str] | None = None,
# created/modified ranges
created_after: datetime | None = None,
created_before: datetime | None = None,
modified_after: datetime | None = None,
modified_before: datetime | None = None,
# channel specific
asset: Asset | str | None = None,
assets: list[str | Asset] | None = None,
run: Run | str | None = None,
# common filters
description_contains: str | None = None,
archived: bool | None = None,
filter_query: str | None = None,
order_by: str | None = None,
limit: int | None = None,
page_size: int | None = None,
) -> list[Channel]:
"""List channels with optional filtering.
Args:
name: Exact name of the channel.
names: List of channel names to filter by.
name_contains: Partial name of the channel.
name_regex: Regular expression to filter channels by name.
channel_ids: Filter to channels with any of these IDs.
created_after: Filter channels created after this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data.
created_before: Filter channels created before this datetime. Note: This is related to the channel creation time, not the timestamp of the underlying data.
modified_after: Filter channels modified after this datetime.
modified_before: Filter channels modified before this datetime.
asset: Filter channels associated with this Asset or asset ID.
assets: Filter channels associated with these Assets or asset IDs.
run: Filter channels associated with this Run or run ID.
description_contains: Partial description of the channel.
archived: If True, searches for archived channels.
filter_query: Explicit CEL query to filter channels.
order_by: Field and direction to order results by.
limit: Maximum number of channels to return. If None, returns all matches.
page_size: Number of results to fetch per request. Lower this if you hit gRPC
message size limits on responses. If None, uses the server default.
Returns:
A list of Channels that matches the filter criteria.
"""
filter_parts = [
*self._build_name_cel_filters(
name=name, names=names, name_contains=name_contains, name_regex=name_regex
),
*self._build_time_cel_filters(
created_after=created_after,
created_before=created_before,
modified_after=modified_after,
modified_before=modified_before,
),
*self._build_common_cel_filters(
description_contains=description_contains,
filter_query=filter_query,
),
]
if channel_ids:
filter_parts.append(cel.in_("channel_id", channel_ids))
if asset is not None:
asset_id = asset._id_or_error if isinstance(asset, Asset) else asset
filter_parts.append(cel.equals("asset_id", asset_id))
if assets:
asset_ids = [
asset._id_or_error if isinstance(asset, Asset) else asset for asset in assets
]
filter_parts.append(cel.in_("asset_id", asset_ids))
if run is not None:
run_id = run.id_ if isinstance(run, Run) else run
filter_parts.append(cel.equals("run_id", run_id))
# This is opposite of usual archived state
if archived is not None:
filter_parts.append(cel.equals("active", not archived))
query_filter = cel.and_(*filter_parts)
channels = await self._low_level_client.list_all_channels(
query_filter=query_filter or None,
order_by=order_by,
max_results=limit,
**({"page_size": page_size} if page_size is not None else {}),
)
return self._apply_client_to_instances(channels)
async def find(self, **kwargs) -> Channel | None:
"""Find a single channel matching the given query. Takes the same arguments as `list`. If more than one channel is found,
raises an error.
Args:
**kwargs: Keyword arguments to pass to `list_`.
Returns:
The Channel found or None.
"""
channels = await self.list_(**kwargs)
if len(channels) > 1:
raise ValueError(f"Multiple ({len(channels)}) channels found for query")
elif len(channels) == 1:
return channels[0]
return None
async def archive(self, channels: list[str | Channel]) -> None:
"""Batch archive channels by setting active to false.
Args:
channels: List of channel IDs or Channel objects to archive. If a Channel
has no id set, raises ValueError.
"""
channel_ids = _channel_ids_from_list(channels)
await self._low_level_client.batch_archive_channels(channel_ids)
async def unarchive(self, channels: list[str | Channel]) -> None:
"""Batch unarchive channels by setting active to true.
Args:
channels: List of channel IDs or Channel objects to unarchive. If a Channel
has no id set, raises ValueError.
"""
channel_ids = _channel_ids_from_list(channels)
await self._low_level_client.batch_unarchive_channels(channel_ids)
def _ensure_data_low_level_client(self):
"""Ensure that the data low level client is initialized. Separated out like this to not require large dependencies (pandas/pyarrow) for the client if not fetching data."""
if self._data_low_level_client is None:
from sift_client._internal.low_level_wrappers.data import DataLowLevelClient
self._data_low_level_client = DataLowLevelClient(grpc_client=self.client.grpc_client)
async def get_data(
self,
*,
channels: list[Channel],
run: Run | str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
limit: int | None = None,
ignore_cache: bool = False,
) -> dict[str, pd.DataFrame]:
"""Get data for one or more channels.
Args:
channels: The channels to get data for.
run: The Run or run_id to get data for.
start_time: The start time to get data for.
end_time: The end time to get data for.
limit: The maximum number of data points to return. Will be in increments of page_size or default page size defined by the call if no page_size is provided.
ignore_cache: Whether to ignore cached data and fetch fresh data from the server.
Returns:
A dictionary mapping channel names to pandas DataFrames containing the channel data.
"""
self._ensure_data_low_level_client()
run_id = run._id_or_error if isinstance(run, Run) else run
return await self._data_low_level_client.get_channel_data( # type: ignore
channels=channels,
run_id=run_id,
start_time=start_time,
end_time=end_time,
max_results=limit,
ignore_cache=ignore_cache,
)
async def get_data_as_arrow(
self,
*,
channels: list[Channel],
run: Run | str | None = None,
start_time: datetime | None = None,
end_time: datetime | None = None,
limit: int | None = None,
ignore_cache: bool = False,
) -> dict[str, pa.Table]:
"""Get data for one or more channels as pyarrow tables."""
from pyarrow import Table as ArrowTable
run_id = run.id_ if isinstance(run, Run) else run
data = await self.get_data(
channels=channels,
run=run_id,
start_time=start_time,
end_time=end_time,
limit=limit,
ignore_cache=ignore_cache,
)
return {k: ArrowTable.from_pandas(v) for k, v in data.items()}