Skip to content

Commit e130536

Browse files
authored
python(feat): Add channel archive/unarchive support (#483)
1 parent 2668395 commit e130536

4 files changed

Lines changed: 157 additions & 22 deletions

File tree

python/lib/sift_client/_internal/low_level_wrappers/channels.py

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
from typing import TYPE_CHECKING, Any, cast
55

66
from sift.channels.v3.channels_pb2 import (
7+
BatchArchiveChannelsRequest,
8+
BatchUnarchiveChannelsRequest,
79
GetChannelRequest,
810
GetChannelResponse,
911
ListChannelsRequest,
@@ -117,3 +119,21 @@ async def list_all_channels(
117119
order_by=order_by,
118120
max_results=max_results,
119121
)
122+
123+
async def batch_archive_channels(self, channel_ids: list[str]) -> None:
124+
"""Batch archive channels by setting active to false.
125+
126+
Args:
127+
channel_ids: The channel IDs to archive.
128+
"""
129+
request = BatchArchiveChannelsRequest(channel_ids=channel_ids)
130+
await self._grpc_client.get_stub(ChannelServiceStub).BatchArchiveChannels(request)
131+
132+
async def batch_unarchive_channels(self, channel_ids: list[str]) -> None:
133+
"""Batch unarchive channels by setting active to true.
134+
135+
Args:
136+
channel_ids: The channel IDs to unarchive.
137+
"""
138+
request = BatchUnarchiveChannelsRequest(channel_ids=channel_ids)
139+
await self._grpc_client.get_stub(ChannelServiceStub).BatchUnarchiveChannels(request)

python/lib/sift_client/_tests/resources/test_channels.py

Lines changed: 68 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@
77
- Error handling and edge cases
88
"""
99

10+
import asyncio
11+
import uuid
12+
from urllib.parse import urljoin
13+
1014
import pytest
15+
import requests
1116

1217
from sift_client import SiftClient
1318
from sift_client.resources import ChannelsAPI, ChannelsAPIAsync
@@ -183,21 +188,6 @@ async def test_list_with_limit(self, channels_api_async):
183188
assert isinstance(channels_3, list)
184189
assert len(channels_3) <= 3
185190

186-
# TODO: active channel test
187-
# @pytest.mark.asyncio
188-
# async def test_list_include_archived(self, channels_api_async):
189-
# """Test channel listing with archived channels included."""
190-
# # Test without archived channels (default)
191-
# channels_active = await channels_api_async.list_(limit=5, include_archived=False)
192-
# assert isinstance(channels_active, list)
193-
#
194-
# # Test with archived channels included
195-
# channels_all = await channels_api_async.list_(limit=5, include_archived=True)
196-
# assert isinstance(channels_all, list)
197-
#
198-
# # Should have at least as many channels when including archived
199-
# assert len(channels_all) >= len(channels_active)
200-
201191
@pytest.mark.asyncio
202192
async def test_list_with_time_filters(self, channels_api_async):
203193
"""Test channel listing with time-based filters."""
@@ -240,6 +230,69 @@ async def test_find_multiple_raises_error(self, channels_api_async):
240230
with pytest.raises(ValueError, match="Multiple"):
241231
await channels_api_async.find(name_contains="test", limit=5)
242232

233+
class TestArchive:
234+
"""Tests for the async archive method."""
235+
236+
@pytest.mark.asyncio
237+
async def test_create_archive_unarchive_flow(self, channels_api_async, test_channel):
238+
"""Create a channel via REST schemaless ingest, then archive/unarchive via channels API; verify at each step with find."""
239+
asset_name = test_channel.asset.name
240+
asset_id = test_channel.asset_id
241+
unique_name = f"archive-test-channel-{uuid.uuid4().hex}"
242+
243+
rest_client = channels_api_async.client.rest_client
244+
rest_url = urljoin(rest_client.base_url, "api/v2/ingest")
245+
api_key = rest_client._config.api_key
246+
247+
# Create the channel by ingesting a single data point (schemaless).
248+
#
249+
# This is currently the simplest way to create a channel. Simply
250+
# creating a channel schema is not sufficient since schemaless channels
251+
# that have no data are filtered out of the `ListChannels` response.
252+
payload = {
253+
"asset_name": asset_name,
254+
"data": [
255+
{
256+
"timestamp": "2024-11-06T10:27:20-07:00",
257+
"values": [
258+
{"channel": unique_name, "value": 1},
259+
],
260+
}
261+
],
262+
}
263+
resp = requests.post(
264+
rest_url,
265+
headers={
266+
"Authorization": f"Bearer {api_key}",
267+
"Content-Type": "application/json",
268+
},
269+
json=payload,
270+
timeout=30,
271+
)
272+
resp.raise_for_status()
273+
274+
# Retry find until the channel is visible.
275+
created = None
276+
for _ in range(20):
277+
created = await channels_api_async.find(name=unique_name, asset=asset_id)
278+
if created is not None:
279+
break
280+
await asyncio.sleep(0.5)
281+
assert created is not None, f"Channel {unique_name} did not appear after ingest"
282+
283+
await channels_api_async.archive([created])
284+
found_archived = await channels_api_async.find(
285+
name=unique_name, asset=asset_id, archived=True
286+
)
287+
assert found_archived is not None
288+
289+
await channels_api_async.unarchive([created])
290+
found_active = await channels_api_async.find(name=unique_name, asset=asset_id)
291+
assert found_active is not None
292+
293+
# Cleanup by archiving the channel again
294+
await channels_api_async.archive([created])
295+
243296
# TODO: data retrieval tests
244297
# class TestGetData:
245298
# """Tests for the async get_data method."""

python/lib/sift_client/resources/channels.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,30 @@
1919
from sift_client.sift_types.channel import Channel
2020

2121

22+
def _channel_ids_from_list(items: list[str | Channel]) -> list[str]:
23+
"""Resolve a list of channel IDs or Channel objects to a list of channel IDs.
24+
25+
Args:
26+
items: List of channel IDs (str) or Channel objects.
27+
28+
Returns:
29+
List of channel ID strings.
30+
31+
Raises:
32+
ValueError: If any Channel object has no id set.
33+
"""
34+
ids: list[str] = []
35+
for item in items:
36+
if isinstance(item, str):
37+
ids.append(item)
38+
else:
39+
try:
40+
ids.append(item._id_or_error)
41+
except ValueError:
42+
raise ValueError("One or more Channel objects have no id set.") from None
43+
return ids
44+
45+
2246
class ChannelsAPIAsync(ResourceBase):
2347
"""High-level API for interacting with channels.
2448
@@ -75,7 +99,7 @@ async def list_(
7599
run: Run | str | None = None,
76100
# common filters
77101
description_contains: str | None = None,
78-
include_archived: bool | None = None,
102+
archived: bool | None = None,
79103
filter_query: str | None = None,
80104
order_by: str | None = None,
81105
limit: int | None = None,
@@ -96,7 +120,7 @@ async def list_(
96120
assets: Filter channels associated with these Assets or asset IDs.
97121
run: Filter channels associated with this Run or run ID.
98122
description_contains: Partial description of the channel.
99-
include_archived: If True, include archived channels in results.
123+
archived: If True, searches for archived channels.
100124
filter_query: Explicit CEL query to filter channels.
101125
order_by: Field and direction to order results by.
102126
limit: Maximum number of channels to return. If None, returns all matches.
@@ -117,7 +141,6 @@ async def list_(
117141
*self._build_common_cel_filters(
118142
description_contains=description_contains,
119143
filter_query=filter_query,
120-
include_archived=include_archived,
121144
),
122145
]
123146
if channel_ids:
@@ -133,9 +156,10 @@ async def list_(
133156
if run is not None:
134157
run_id = run.id_ if isinstance(run, Run) else run
135158
filter_parts.append(cel.equals("run_id", run_id))
159+
136160
# This is opposite of usual archived state
137-
if include_archived is not None:
138-
filter_parts.append(cel.equals("active", not include_archived))
161+
if archived is not None:
162+
filter_parts.append(cel.equals("active", not archived))
139163

140164
query_filter = cel.and_(*filter_parts)
141165

@@ -163,6 +187,26 @@ async def find(self, **kwargs) -> Channel | None:
163187
return channels[0]
164188
return None
165189

190+
async def archive(self, channels: list[str | Channel]) -> None:
191+
"""Batch archive channels by setting active to false.
192+
193+
Args:
194+
channels: List of channel IDs or Channel objects to archive. If a Channel
195+
has no id set, raises ValueError.
196+
"""
197+
channel_ids = _channel_ids_from_list(channels)
198+
await self._low_level_client.batch_archive_channels(channel_ids)
199+
200+
async def unarchive(self, channels: list[str | Channel]) -> None:
201+
"""Batch unarchive channels by setting active to true.
202+
203+
Args:
204+
channels: List of channel IDs or Channel objects to unarchive. If a Channel
205+
has no id set, raises ValueError.
206+
"""
207+
channel_ids = _channel_ids_from_list(channels)
208+
await self._low_level_client.batch_unarchive_channels(channel_ids)
209+
166210
def _ensure_data_low_level_client(self):
167211
"""Ensure that the data low level client is initialized. Separated out like this to not require large dependencies (pandas/pyarrow) for the client if not fetching data."""
168212
if self._data_low_level_client is None:

python/lib/sift_client/resources/sync_stubs/__init__.pyi

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -407,6 +407,15 @@ class ChannelsAPI:
407407
...
408408

409409
def _run(self, coro): ...
410+
def archive(self, channels: list[str | Channel]) -> None:
411+
"""Batch archive channels by setting active to false.
412+
413+
Args:
414+
channels: List of channel IDs or Channel objects to archive. If a Channel
415+
has no id set, raises ValueError.
416+
"""
417+
...
418+
410419
def find(self, **kwargs) -> Channel | None:
411420
"""Find a single channel matching the given query. Takes the same arguments as `list`. If more than one channel is found,
412421
raises an error.
@@ -484,7 +493,7 @@ class ChannelsAPI:
484493
assets: list[str | Asset] | None = None,
485494
run: Run | str | None = None,
486495
description_contains: str | None = None,
487-
include_archived: bool | None = None,
496+
archived: bool | None = None,
488497
filter_query: str | None = None,
489498
order_by: str | None = None,
490499
limit: int | None = None,
@@ -505,7 +514,7 @@ class ChannelsAPI:
505514
assets: Filter channels associated with these Assets or asset IDs.
506515
run: Filter channels associated with this Run or run ID.
507516
description_contains: Partial description of the channel.
508-
include_archived: If True, include archived channels in results.
517+
archived: If True, searches for archived channels.
509518
filter_query: Explicit CEL query to filter channels.
510519
order_by: Field and direction to order results by.
511520
limit: Maximum number of channels to return. If None, returns all matches.
@@ -515,6 +524,15 @@ class ChannelsAPI:
515524
"""
516525
...
517526

527+
def unarchive(self, channels: list[str | Channel]) -> None:
528+
"""Batch unarchive channels by setting active to true.
529+
530+
Args:
531+
channels: List of channel IDs or Channel objects to unarchive. If a Channel
532+
has no id set, raises ValueError.
533+
"""
534+
...
535+
518536
class FileAttachmentsAPI:
519537
"""Sync counterpart to `FileAttachmentsAPIAsync`.
520538

0 commit comments

Comments
 (0)