-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathcalculated_channels.py
More file actions
373 lines (327 loc) · 15 KB
/
Copy pathcalculated_channels.py
File metadata and controls
373 lines (327 loc) · 15 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
from __future__ import annotations
from typing import TYPE_CHECKING, Any
from sift_client._internal.low_level_wrappers.calculated_channels import (
CalculatedChannelsLowLevelClient,
)
from sift_client.resources._base import ResourceBase
from sift_client.sift_types.asset import Asset
from sift_client.sift_types.calculated_channel import (
CalculatedChannel,
CalculatedChannelCreate,
CalculatedChannelUpdate,
)
from sift_client.sift_types.run import Run
from sift_client.util import cel_utils as cel
if TYPE_CHECKING:
import re
from datetime import datetime
from sift_client.client import SiftClient
from sift_client.sift_types.tag import Tag
class CalculatedChannelsAPIAsync(ResourceBase):
"""High-level API for interacting with calculated channels.
This class provides a Pythonic, notebook-friendly interface for interacting with the CalculatedChannelsAPI.
It handles automatic handling of gRPC services, seamless type conversion, and clear error handling.
All methods in this class use the CalculatedChannel class from the low-level wrapper, which is a user-friendly
representation of a calculated channel using standard Python data structures and types.
"""
def __init__(self, sift_client: SiftClient):
"""Initialize the CalculatedChannelsAPI.
Args:
sift_client: The Sift client to use.
"""
super().__init__(sift_client)
self._low_level_client = CalculatedChannelsLowLevelClient(
grpc_client=self.client.grpc_client
)
async def get(
self,
*,
calculated_channel_id: str | None = None,
client_key: str | None = None,
) -> CalculatedChannel:
"""Get a Calculated Channel.
Args:
calculated_channel_id: The ID of the calculated channel.
client_key: The client key of the calculated channel.
Returns:
The CalculatedChannel.
Raises:
ValueError: If neither calculated_channel_id nor client_key is provided.
"""
calculated_channel = await self._low_level_client.get_calculated_channel(
calculated_channel_id=calculated_channel_id,
client_key=client_key,
)
return self._apply_client_to_instance(calculated_channel)
async def list_(
self,
*,
name: str | None = None,
names: list[str] | None = None,
name_contains: str | None = None,
name_regex: str | re.Pattern | None = None,
# self ids
calculated_channel_ids: list[str] | None = None,
client_keys: list[str] | None = None,
# created/modified ranges
created_after: datetime | None = None,
created_before: datetime | None = None,
modified_after: datetime | None = None,
modified_before: datetime | None = None,
# created/modified users
created_by: Any | str | None = None,
modified_by: Any | str | None = None,
# tags
tags: list[Any] | list[str] | list[Tag] | None = None,
# metadata
metadata: list[Any] | None = None,
# calculated channel specific
asset: Asset | str | None = None,
run: Run | str | None = None,
version: int | None = None,
# common filters
description_contains: str | None = None,
include_archived: bool = False,
filter_query: str | None = None,
order_by: str | None = None,
limit: int | None = None,
page_size: int | None = None,
) -> list[CalculatedChannel]:
"""List calculated channels with optional filtering. This will return the latest version. To find all versions, use `list_versions`.
Args:
name: Exact name of the calculated channel.
names: List of calculated channel names to filter by.
name_contains: Partial name of the calculated channel.
name_regex: Regular expression string to filter calculated channels by name.
calculated_channel_ids: Filter to calculated channels with any of these IDs.
client_keys: Filter to calculated channels with any of these client keys.
created_after: Created after this date.
created_before: Created before this date.
modified_after: Modified after this date.
modified_before: Modified before this date.
created_by: Calculated channels created by this user.
modified_by: Calculated channels last modified by this user.
tags: Filter calculated channels with any of these Tags or tag names.
metadata: Filter calculated channels by metadata criteria.
asset: Filter calculated channels associated with this Asset or asset ID.
run: Filter calculated channels associated with this Run or run ID.
version: The version of the calculated channel.
description_contains: Partial description of the calculated channel.
include_archived: Include archived calculated channels.
filter_query: Explicit CEL query to filter calculated channels.
order_by: How to order the retrieved calculated channels.
limit: How many calculated channels to retrieve. If None, retrieves all matches.
page_size: Number of results to fetch per request. Lower this if you hit gRPC
message size limits on responses. If None, uses the server default.
Returns:
A list of CalculatedChannels that matches the filter.
"""
filter_parts = [
*self._build_name_cel_filters(
name=name, names=names, name_contains=name_contains, name_regex=name_regex
),
*self._build_time_cel_filters(
created_after=created_after,
created_before=created_before,
modified_after=modified_after,
modified_before=modified_before,
created_by=created_by,
modified_by=modified_by,
),
*self._build_tags_metadata_cel_filters(tag_names=tags, metadata=metadata),
*self._build_common_cel_filters(
description_contains=description_contains,
include_archived=include_archived,
filter_query=filter_query,
),
]
if calculated_channel_ids:
filter_parts.append(cel.in_("calculated_channel_id", calculated_channel_ids))
if client_keys:
filter_parts.append(cel.in_("client_key", client_keys))
if asset:
asset_id = asset._id_or_error if isinstance(asset, Asset) else asset
filter_parts.append(cel.equals("asset_id", asset_id))
if run:
run_id = run._id_or_error if isinstance(run, Run) else run
filter_parts.append(cel.equals("run_id", run_id))
if version:
filter_parts.append(cel.equals("version", version))
query_filter = cel.and_(*filter_parts)
calculated_channels = await self._low_level_client.list_all_calculated_channels(
query_filter=query_filter or None,
order_by=order_by,
max_results=limit,
**({"page_size": page_size} if page_size is not None else {}), # type: ignore[arg-type]
)
return self._apply_client_to_instances(calculated_channels)
async def find(self, **kwargs) -> CalculatedChannel | None:
"""Find a single calculated channel matching the given query. Takes the same arguments as `list` but handles checking for multiple matches.
Will raise an error if multiple calculated channels are found.
Args:
**kwargs: Keyword arguments to pass to `list_`.
Returns:
The CalculatedChannel found or None.
"""
calculated_channels = await self.list_(**kwargs)
if len(calculated_channels) > 1:
raise ValueError(
f"Multiple ({len(calculated_channels)}) calculated channels found for query"
)
elif len(calculated_channels) == 1:
return calculated_channels[0]
return None
async def create(
self,
create: CalculatedChannelCreate | dict,
) -> CalculatedChannel:
"""Create a calculated channel.
Args:
create: A CalculatedChannelCreate object or dictionary with configuration for the new calculated channel.
This should include properties like name, expression, channel_references, etc.
Returns:
The created CalculatedChannel.
"""
if isinstance(create, dict):
create = CalculatedChannelCreate.model_validate(create)
created_calc_channel, _ = await self._low_level_client.create_calculated_channel(
create=create
)
return self._apply_client_to_instance(created_calc_channel)
async def update(
self,
calculated_channel: CalculatedChannel | str,
update: CalculatedChannelUpdate | dict,
*,
user_notes: str | None = None,
) -> CalculatedChannel:
"""Update a Calculated Channel.
Args:
calculated_channel: The CalculatedChannel or id of the CalculatedChannel to update.
update: Updates to apply to the CalculatedChannel.
user_notes: User notes for the update.
Returns:
The updated CalculatedChannel.
"""
calculated_channel_id = (
calculated_channel.id_
if isinstance(calculated_channel, CalculatedChannel)
else calculated_channel
)
if isinstance(update, dict):
update = CalculatedChannelUpdate.model_validate(update)
update.resource_id = calculated_channel_id
(
updated_calculated_channel,
_inapplicable_assets,
) = await self._low_level_client.update_calculated_channel(
update=update, user_notes=user_notes
)
return self._apply_client_to_instance(updated_calculated_channel)
async def archive(self, calculated_channel: str | CalculatedChannel) -> CalculatedChannel:
"""Archive a calculated channel.
Args:
calculated_channel: The id or CalculatedChannel object of the calculated channel to archive.
Returns:
The archived CalculatedChannel.
"""
return await self.update(
calculated_channel=calculated_channel, update=CalculatedChannelUpdate(is_archived=True)
)
async def unarchive(self, calculated_channel: str | CalculatedChannel) -> CalculatedChannel:
"""Unarchive a calculated channel.
Args:
calculated_channel: The id or CalculatedChannel object of the calculated channel to unarchive.
Returns:
The unarchived CalculatedChannel.
"""
return await self.update(
calculated_channel=calculated_channel, update=CalculatedChannelUpdate(is_archived=False)
)
async def list_versions(
self,
*,
# self ids
calculated_channel: CalculatedChannel | str | None = None,
client_key: str | None = None,
name: str | None = None,
names: list[str] | None = None,
name_contains: str | None = None,
name_regex: str | re.Pattern | None = None,
# created/modified ranges
created_after: datetime | None = None,
created_before: datetime | None = None,
modified_after: datetime | None = None,
modified_before: datetime | None = None,
# created/modified users
created_by: Any | str | None = None,
modified_by: Any | str | None = None,
# tags
tags: list[Any] | list[str] | list[Tag] | None = None,
# metadata
metadata: list[Any] | None = None,
# common filters
description_contains: str | None = None,
include_archived: bool = False,
filter_query: str | None = None,
order_by: str | None = None,
limit: int | None = None,
page_size: int | None = None,
) -> list[CalculatedChannel]:
"""List versions of a calculated channel.
Args:
calculated_channel: The CalculatedChannel or ID of the calculated channel to get versions for.
client_key: The client key of the calculated channel.
name: Exact name of the calculated channel.
names: List of calculated channel names to filter by.
name_contains: Partial name of the calculated channel.
name_regex: Regular expression string to filter calculated channels by name.
created_after: Filter versions created after this datetime.
created_before: Filter versions created before this datetime.
modified_after: Filter versions modified after this datetime.
modified_before: Filter versions modified before this datetime.
created_by: Filter versions created by this user or user ID.
modified_by: Filter versions modified by this user or user ID.
tags: Filter versions with any of these Tags or tag names.
metadata: Filter versions by metadata criteria.
description_contains: Partial description of the calculated channel.
include_archived: Include archived versions.
filter_query: Explicit CEL query to filter versions.
order_by: How to order the retrieved versions.
limit: Maximum number of versions to return. If None, returns all matches.
page_size: Number of results to fetch per request. Lower this if you hit gRPC
message size limits on responses. If None, uses the server default.
Returns:
A list of CalculatedChannel versions that match the filter criteria.
"""
filter_parts = [
*self._build_name_cel_filters(
name=name, names=names, name_contains=name_contains, name_regex=name_regex
),
*self._build_time_cel_filters(
created_after=created_after,
created_before=created_before,
modified_after=modified_after,
modified_before=modified_before,
created_by=created_by,
modified_by=modified_by,
),
*self._build_tags_metadata_cel_filters(tag_names=tags, metadata=metadata),
*self._build_common_cel_filters(
description_contains=description_contains,
include_archived=include_archived,
filter_query=filter_query,
),
]
query_filter = cel.and_(*filter_parts)
versions = await self._low_level_client.list_all_calculated_channel_versions(
client_key=client_key,
calculated_channel_id=calculated_channel.id_
if isinstance(calculated_channel, CalculatedChannel)
else calculated_channel,
query_filter=query_filter or None,
order_by=order_by,
limit=limit,
**({"page_size": page_size} if page_size is not None else {}), # type: ignore[arg-type]
)
return self._apply_client_to_instances(versions)