Skip to content

Commit 2a5d0f8

Browse files
authored
python(feat): Updates for sift-stream-bindings, FlowBuilder, FlowDesc… (#407)
1 parent 0dce6cd commit 2a5d0f8

3 files changed

Lines changed: 45 additions & 36 deletions

File tree

python/lib/sift_client/_internal/low_level_wrappers/ingestion.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
from sift_stream_bindings import (
3939
DurationPy,
4040
FlowConfigPy,
41+
FlowDescriptorPy,
4142
FlowPy,
4243
IngestionConfigFormPy,
4344
IngestWithConfigDataStreamRequestPy,
45+
IngestWithConfigDataStreamRequestWrapperPy,
4446
MetadataPy,
4547
RecoveryStrategyPy,
4648
RunFormPy,
@@ -129,12 +131,10 @@ class IngestionConfigStreamingLowLevelClient(LowLevelClientBase):
129131
DEFAULT_MAX_LOG_FILES = 7 # Equal to 1 week of logs
130132
DEFAULT_LOGFILE_PREFIX = "sift_stream_bindings.log"
131133
_sift_stream_instance: SiftStreamPy
132-
_known_flows: dict[str, FlowConfig]
133134

134-
def __init__(self, sift_stream_instance: SiftStreamPy, known_flows: dict[str, FlowConfig]):
135+
def __init__(self, sift_stream_instance: SiftStreamPy):
135136
super().__init__()
136137
self._sift_stream_instance = sift_stream_instance
137-
self._known_flows = known_flows
138138

139139
@classmethod
140140
async def create_sift_stream_instance(
@@ -194,12 +194,7 @@ async def create_sift_stream_instance(
194194

195195
sift_stream_instance = await builder.build()
196196

197-
known_flows = {
198-
flow_name: FlowConfig._from_rust_config(flow)
199-
for flow_name, flow in sift_stream_instance.get_flows().items()
200-
}
201-
202-
return cls(sift_stream_instance, known_flows)
197+
return cls(sift_stream_instance)
203198

204199
async def send(self, flow: FlowPy):
205200
await self._sift_stream_instance.send(flow)
@@ -210,12 +205,16 @@ async def batch_send(self, flows: Iterable[FlowPy]):
210205
async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]):
211206
await self._sift_stream_instance.send_requests(requests)
212207

208+
def send_requests_nonblocking(
209+
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
210+
):
211+
self._sift_stream_instance.send_requests_nonblocking(requests)
212+
213+
def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
214+
return self._sift_stream_instance.get_flow_descriptor(flow_name)
215+
213216
async def add_new_flows(self, flow_configs: list[FlowConfigPy]):
214217
await self._sift_stream_instance.add_new_flows(flow_configs)
215-
self._known_flows = {
216-
flow_name: FlowConfig._from_rust_config(flow)
217-
for flow_name, flow in self._sift_stream_instance.get_flows().items()
218-
}
219218

220219
async def attach_run(self, run_selector: RunSelectorPy):
221220
await self._sift_stream_instance.attach_run(run_selector)

python/lib/sift_client/resources/ingestion.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
from sift_stream_bindings import (
1919
DiskBackupPolicyPy,
2020
DurationPy,
21+
FlowDescriptorPy,
2122
FlowPy,
2223
IngestionConfigFormPy,
2324
IngestWithConfigDataStreamRequestPy,
25+
IngestWithConfigDataStreamRequestWrapperPy,
2426
MetadataPy,
2527
RecoveryStrategyPy,
2628
RetryPolicyPy,
@@ -491,6 +493,31 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy
491493
"""
492494
await self._low_level_client.send_requests(requests)
493495

496+
def send_requests_nonblocking(
497+
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
498+
):
499+
"""Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.
500+
501+
This method offers a way to send data that matches the raw gRPC service interface. You are
502+
expected to handle channel value ordering as well as empty values correctly.
503+
504+
Important:
505+
If using this interface, you should use `FlowBuilderPy::request` to ensure proper
506+
building of the request.
507+
508+
Args:
509+
requests: List of ingestion requests to send to Sift.
510+
"""
511+
self._low_level_client.send_requests_nonblocking(requests)
512+
513+
def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
514+
"""Retrieve a flow descriptor by name.
515+
516+
Args:
517+
flow_name: The name of the flow descriptor to retrieve.
518+
"""
519+
return self._low_level_client.get_flow_descriptor(flow_name)
520+
494521
async def add_new_flows(self, flow_configs: list[FlowConfig]):
495522
"""Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.
496523
@@ -575,23 +602,6 @@ def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy:
575602
"""
576603
return self._low_level_client.get_metrics_snapshot()
577604

578-
def get_flow_config(self, flow_name: str) -> FlowConfig:
579-
"""Retrieve a flow configuration by name.
580-
581-
Args:
582-
flow_name: The name of the flow configuration to retrieve.
583-
584-
Returns:
585-
The FlowConfig associated with the given flow name.
586-
587-
Raises:
588-
KeyError: If the flow name is not found in the known flows.
589-
"""
590-
flow_config = self._low_level_client._known_flows.get(flow_name)
591-
if flow_config is None:
592-
raise KeyError(f"FlowConfig {flow_name} is unknown to the ingestion client")
593-
return flow_config
594-
595605
async def __aenter__(self):
596606
return self
597607

python/pyproject.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ all = [
5959
'pyOpenSSL<24.0.0',
6060
'pyarrow>=17.0.0',
6161
'rosbags~=0.0',
62-
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
62+
'sift-stream-bindings>=0.2.0-rc4',
6363
'types-pyOpenSSL<24.0.0',
6464
]
6565
build = [
@@ -100,7 +100,7 @@ dev-all = [
100100
'pytest==8.2.2',
101101
'rosbags~=0.0',
102102
'ruff~=0.12.10',
103-
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
103+
'sift-stream-bindings>=0.2.0-rc4',
104104
'tomlkit~=0.13.3',
105105
'types-pyOpenSSL<24.0.0',
106106
]
@@ -153,7 +153,7 @@ docs-build = [
153153
'pytest==8.2.2',
154154
'rosbags~=0.0',
155155
'ruff~=0.12.10',
156-
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
156+
'sift-stream-bindings>=0.2.0-rc4',
157157
'tomlkit~=0.13.3',
158158
'types-pyOpenSSL<24.0.0',
159159
]
@@ -176,10 +176,10 @@ rosbags = [
176176
'rosbags~=0.0',
177177
]
178178
sift-stream = [
179-
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
179+
'sift-stream-bindings>=0.2.0-rc4',
180180
]
181181
sift-stream-bindings = [
182-
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
182+
'sift-stream-bindings>=0.2.0-rc4',
183183
]
184184
tdms = [
185185
'npTDMS~=1.9',
@@ -215,7 +215,7 @@ docs = ["mkdocs",
215215
openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"]
216216
tdms = ["npTDMS~=1.9"]
217217
rosbags = ["rosbags~=0.0"]
218-
sift-stream = ["sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4"]
218+
sift-stream = ["sift-stream-bindings>=0.2.0-rc4"]
219219
hdf5 = ["h5py~=3.11", "polars~=1.8"]
220220
data-review = ["pyarrow>=17.0.0"]
221221

0 commit comments

Comments
 (0)