Skip to content

Commit a27f060

Browse files
authored
rust(feat): Add batch_send to sift_stream_bindings (#384)
1 parent efb5db2 commit a27f060

2 files changed

Lines changed: 37 additions & 1 deletion

File tree

rust/crates/sift_stream_bindings/sift_stream_bindings.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,7 @@ class SiftStreamMetricsSnapshotPy:
270270
@typing.final
271271
class SiftStreamPy:
272272
def send(self, flow:FlowPy) -> typing.Any: ...
273+
def batch_send(self, flows:typing.Any) -> typing.Any: ...
273274
def send_requests(self, requests:typing.Sequence[IngestWithConfigDataStreamRequestPy]) -> typing.Any: ...
274275
def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: ...
275276
def add_new_flows(self, flow_configs:typing.Sequence[FlowConfigPy]) -> typing.Any: ...

rust/crates/sift_stream_bindings/src/stream/mod.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use crate::metrics::SiftStreamMetricsSnapshotPy;
1010
use crate::stream::channel::ChannelValuePy;
1111
use crate::stream::config::{FlowConfigPy, RunSelectorPy};
1212
use crate::stream::time::TimeValuePy;
13-
use pyo3::prelude::*;
13+
use pyo3::{prelude::*, types::PyIterator};
1414
use pyo3_async_runtimes::tokio::future_into_py;
1515
use pyo3_stub_gen::derive::*;
1616
use sift_stream::{Flow, FlowConfig, IngestionConfigMode, SiftStream};
@@ -71,6 +71,41 @@ impl SiftStreamPy {
7171
Ok(awaitable.into())
7272
}
7373

74+
// Function to take in a python iterable of PyFlow and call send on each item
75+
// Can allow more performant sending of data from python to SiftStream
76+
pub fn batch_send<'py>(
77+
&self,
78+
py: Python<'py>,
79+
flows: &Bound<'_, PyAny>,
80+
) -> PyResult<Py<PyAny>> {
81+
let flow_iter = PyIterator::from_object(flows)?;
82+
let mut flows_vec = Vec::new();
83+
for item in flow_iter {
84+
flows_vec.push(item?.extract::<FlowPy>()?);
85+
}
86+
87+
let inner = self.inner.clone();
88+
89+
let awaitable = future_into_py(py, async move {
90+
let mut guard = inner.lock().await;
91+
let stream = guard.as_mut().ok_or_else(|| {
92+
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
93+
"Stream has been consumed by finish()",
94+
)
95+
})?;
96+
97+
for flow in flows_vec {
98+
match stream.send(flow.into()).await {
99+
Ok(_) => (),
100+
Err(e) => return Err(SiftErrorWrapper(e).into()),
101+
}
102+
}
103+
Ok(())
104+
})?;
105+
106+
Ok(awaitable.into())
107+
}
108+
74109
pub fn send_requests(
75110
&self,
76111
py: Python,

0 commit comments

Comments
 (0)