Skip to content

Commit aa16167

Browse files
authored
FD-74: Add channels(ingest and fetch) and rules to sift_client. (#275)
1 parent a345fcb commit aa16167

38 files changed

Lines changed: 3919 additions & 79 deletions

.github/workflows/python_ci.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ jobs:
2727
- name: Pip install
2828
run: |
2929
python -m pip install --upgrade pip
30-
pip install '.[development,openssl,tdms,rosbags,hdf5]'
30+
pip install '.[development,openssl,tdms,rosbags,hdf5,sift-stream]'
3131
- name: Lint
3232
run: |
3333
ruff check

python/lib/sift_client/__init__.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,10 +203,22 @@ async def get_asset_async():
203203
204204
"""
205205

206+
import logging
207+
import sys
208+
206209
from sift_client.client import SiftClient
207210
from sift_client.transport import SiftConnectionConfig
208211

209212
__all__ = [
210213
"SiftClient",
211214
"SiftConnectionConfig",
212215
]
216+
217+
logger = logging.getLogger("sift_client")
218+
logging.basicConfig(
219+
level=logging.ERROR, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
220+
)
221+
222+
223+
handler = logging.StreamHandler(sys.stdout)
224+
logger.addHandler(handler)

python/lib/sift_client/_internal/low_level_wrappers/__init__.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,18 @@
22
from sift_client._internal.low_level_wrappers.calculated_channels import (
33
CalculatedChannelsLowLevelClient,
44
)
5+
from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient
6+
from sift_client._internal.low_level_wrappers.ingestion import IngestionLowLevelClient
57
from sift_client._internal.low_level_wrappers.ping import PingLowLevelClient
8+
from sift_client._internal.low_level_wrappers.rules import RulesLowLevelClient
69
from sift_client._internal.low_level_wrappers.runs import RunsLowLevelClient
710

811
__all__ = [
912
"AssetsLowLevelClient",
1013
"CalculatedChannelsLowLevelClient",
14+
"ChannelsLowLevelClient",
15+
"IngestionLowLevelClient",
1116
"PingLowLevelClient",
17+
"RulesLowLevelClient",
1218
"RunsLowLevelClient",
1319
]

python/lib/sift_client/_internal/low_level_wrappers/calculated_channels.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
from sift.calculated_channels.v2.calculated_channels_pb2_grpc import CalculatedChannelServiceStub
2424

2525
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
26-
from sift_client.transport.grpc_transport import GrpcClient
26+
from sift_client.transport import GrpcClient, WithGrpcClient
2727
from sift_client.types.calculated_channel import (
2828
CalculatedChannel,
2929
CalculatedChannelUpdate,
@@ -33,7 +33,7 @@
3333
logger = logging.getLogger(__name__)
3434

3535

36-
class CalculatedChannelsLowLevelClient(LowLevelClientBase):
36+
class CalculatedChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient):
3737
"""
3838
Low-level client for the CalculatedChannelsAPI.
3939
@@ -47,7 +47,7 @@ def __init__(self, grpc_client: GrpcClient):
4747
Args:
4848
grpc_client: The gRPC client to use for making API calls.
4949
"""
50-
self._grpc_client = grpc_client
50+
super().__init__(grpc_client)
5151

5252
async def get_calculated_channel(
5353
self,
Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,127 @@
1+
from __future__ import annotations
2+
3+
import logging
4+
from typing import Any, cast
5+
6+
from sift.channels.v3.channels_pb2 import (
7+
GetChannelRequest,
8+
GetChannelResponse,
9+
ListChannelsRequest,
10+
ListChannelsResponse,
11+
)
12+
from sift.channels.v3.channels_pb2_grpc import ChannelServiceStub
13+
14+
from sift_client._internal.low_level_wrappers.base import LowLevelClientBase
15+
from sift_client.transport import WithGrpcClient
16+
from sift_client.transport.grpc_transport import GrpcClient
17+
from sift_client.types.channel import Channel
18+
19+
# Configure logging
20+
logger = logging.getLogger(__name__)
21+
22+
CHANNELS_DEFAULT_PAGE_SIZE = 10_000
23+
24+
25+
class ChannelsLowLevelClient(LowLevelClientBase, WithGrpcClient):
26+
"""
27+
Low-level client for the ChannelsAPI.
28+
29+
This class provides a thin wrapper around the autogenerated bindings for the ChannelsAPI.
30+
"""
31+
32+
def __init__(self, grpc_client: GrpcClient):
33+
"""
34+
Initialize the ChannelsLowLevelClient.
35+
36+
Args:
37+
grpc_client: The gRPC client to use for making API calls.
38+
"""
39+
super().__init__(grpc_client)
40+
41+
async def get_channel(self, channel_id: str) -> Channel:
42+
"""
43+
Get a channel by channel_id.
44+
45+
Args:
46+
channel_id: The channel ID to get.
47+
48+
Returns:
49+
The Channel.
50+
51+
Raises:
52+
ValueError: If channel_id is not provided.
53+
"""
54+
55+
request = GetChannelRequest(channel_id=channel_id)
56+
response = await self._grpc_client.get_stub(ChannelServiceStub).GetChannel(request)
57+
grpc_channel = cast(GetChannelResponse, response).channel
58+
channel = Channel._from_proto(grpc_channel)
59+
return channel
60+
61+
async def list_channels(
62+
self,
63+
*,
64+
page_size: int | None = None,
65+
page_token: str | None = None,
66+
query_filter: str | None = None,
67+
order_by: str | None = None,
68+
) -> tuple[list[Channel], str]:
69+
"""
70+
List channels with optional filtering and pagination.
71+
72+
Args:
73+
page_size: The maximum number of channels to return.
74+
page_token: A page token for pagination.
75+
query_filter: A CEL filter string.
76+
order_by: How to order the retrieved channels.
77+
78+
Returns:
79+
A tuple of (channels, next_page_token).
80+
"""
81+
82+
request_kwargs: dict[str, Any] = {}
83+
if query_filter:
84+
request_kwargs["filter"] = query_filter
85+
if order_by:
86+
request_kwargs["order_by"] = order_by
87+
if page_size:
88+
request_kwargs["page_size"] = page_size
89+
if page_token:
90+
request_kwargs["page_token"] = page_token
91+
92+
request = ListChannelsRequest(**request_kwargs)
93+
response = await self._grpc_client.get_stub(ChannelServiceStub).ListChannels(request)
94+
response = cast(ListChannelsResponse, response)
95+
96+
channels = [Channel._from_proto(channel) for channel in response.channels]
97+
return channels, response.next_page_token
98+
99+
async def list_all_channels(
100+
self,
101+
*,
102+
query_filter: str | None = None,
103+
order_by: str | None = None,
104+
max_results: int | None = None,
105+
) -> list[Channel]:
106+
"""
107+
List all channels with optional filtering.
108+
109+
Args:
110+
query_filter: A CEL filter string.
111+
order_by: How to order the retrieved channels.
112+
max_results: Maximum number of results to return.
113+
114+
Returns:
115+
A list of all matching channels.
116+
"""
117+
# Channels default page size is 10,000 so lower it if we're passing max_results
118+
page_size = None
119+
if max_results is not None and max_results <= CHANNELS_DEFAULT_PAGE_SIZE:
120+
page_size = max_results
121+
return await self._handle_pagination(
122+
self.list_channels,
123+
kwargs={"query_filter": query_filter},
124+
page_size=page_size,
125+
order_by=order_by,
126+
max_results=max_results,
127+
)

0 commit comments

Comments
 (0)