Skip to content

Commit efb5db2

Browse files
authored
rust(feat): Add get_flows to SiftStream (#382)
1 parent 4e98435 commit efb5db2

6 files changed

Lines changed: 87 additions & 3 deletions

File tree

rust/crates/sift_stream/src/stream/mode/ingestion_config.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,24 @@ impl SiftStream<IngestionConfigMode> {
309309
Ok(())
310310
}
311311

312+
/// Get a copy of the current flow configs known to SiftStream as a HashMap keyed to the flow name.
313+
/// This includes flows provided at initialization, and any existing configs
314+
/// previously registered in Sift
315+
pub fn get_flows(&self) -> HashMap<String, FlowConfig> {
316+
// Currently we get the first FlowConfig provided in the Vec to match how send() validates flows
317+
self.mode
318+
.flows_by_name
319+
.iter()
320+
.filter_map(|(k, v)| {
321+
if v.is_empty() {
322+
None
323+
} else {
324+
Some((k.clone(), v[0].clone()))
325+
}
326+
})
327+
.collect()
328+
}
329+
312330
/// Attach a run to the stream. Any data provided through [SiftStream::send] after return
313331
/// of this function will be associated with the run.
314332
pub async fn attach_run(&mut self, run_selector: RunSelector) -> Result<()> {

rust/crates/sift_stream_bindings/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,9 @@ path = "src/bin/stub_gen.rs"
2121
[dependencies]
2222
pyo3 = { version = "0.25.0" }
2323
pyo3-stub-gen = { version = "0.10.0" }
24-
sift_rs = { version = "0.7.0-rc.2" }
25-
sift_stream = { version = "0.7.0-rc.2", features = ["metrics-unstable"] }
26-
sift_error = { version = "0.7.0-rc.2" }
24+
sift_rs = { workspace = true }
25+
sift_stream = { workspace = true, features = ["metrics-unstable"] }
26+
sift_error = { workspace = true }
2727
tokio = { version = "1.43.0", features = ["rt", "sync", "time", "macros"] }
2828
pyo3-async-runtimes = { version = "0.25.0", features = ["tokio-runtime"] }
2929
pbjson-types = { workspace = true }

rust/crates/sift_stream_bindings/sift_stream_bindings.pyi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ class SiftStreamPy:
273273
def send_requests(self, requests:typing.Sequence[IngestWithConfigDataStreamRequestPy]) -> typing.Any: ...
274274
def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: ...
275275
def add_new_flows(self, flow_configs:typing.Sequence[FlowConfigPy]) -> typing.Any: ...
276+
def get_flows(self) -> builtins.dict[builtins.str, FlowConfigPy]: ...
276277
def attach_run(self, run_selector:RunSelectorPy) -> typing.Any: ...
277278
def detach_run(self) -> None: ...
278279
def run(self) -> typing.Optional[builtins.str]: ...

rust/crates/sift_stream_bindings/src/stream/channel.rs

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,12 +83,33 @@ impl From<ChannelBitFieldElementPy> for ChannelBitFieldElement {
8383
}
8484
}
8585

86+
impl From<ChannelBitFieldElement> for ChannelBitFieldElementPy {
87+
fn from(value: ChannelBitFieldElement) -> Self {
88+
ChannelBitFieldElementPy {
89+
inner: value.clone(),
90+
name: value.name,
91+
index: value.index,
92+
bit_count: value.bit_count,
93+
}
94+
}
95+
}
96+
8697
impl From<ChannelEnumTypePy> for ChannelEnumType {
8798
fn from(value: ChannelEnumTypePy) -> Self {
8899
value.inner
89100
}
90101
}
91102

103+
impl From<ChannelEnumType> for ChannelEnumTypePy {
104+
fn from(value: ChannelEnumType) -> Self {
105+
ChannelEnumTypePy {
106+
inner: value.clone(),
107+
name: value.name,
108+
key: value.key,
109+
}
110+
}
111+
}
112+
92113
impl From<ChannelDataType> for ChannelDataTypePy {
93114
fn from(data_type: ChannelDataType) -> Self {
94115
match data_type {

rust/crates/sift_stream_bindings/src/stream/config.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,35 @@ impl From<FlowConfigPy> for sift_rs::ingestion_configs::v2::FlowConfig {
127127
}
128128
}
129129

130+
impl From<sift_rs::ingestion_configs::v2::FlowConfig> for FlowConfigPy {
131+
fn from(config: sift_rs::ingestion_configs::v2::FlowConfig) -> Self {
132+
FlowConfigPy {
133+
inner: config.clone(),
134+
name: config.name,
135+
channels: config.channels.into_iter().map(|c| c.into()).collect(),
136+
}
137+
}
138+
}
139+
140+
impl From<ChannelConfig> for ChannelConfigPy {
141+
fn from(config: ChannelConfig) -> Self {
142+
let data_type_py = config.data_type().into();
143+
ChannelConfigPy {
144+
inner: config.clone(),
145+
name: config.name,
146+
unit: config.unit,
147+
description: config.description,
148+
data_type: data_type_py,
149+
enum_types: config.enum_types.into_iter().map(|ce| ce.into()).collect(),
150+
bit_field_elements: config
151+
.bit_field_elements
152+
.into_iter()
153+
.map(|bfe| bfe.into())
154+
.collect(),
155+
}
156+
}
157+
}
158+
130159
// PyO3 Method Implementations
131160
#[gen_stub_pymethods]
132161
#[pymethods]

rust/crates/sift_stream_bindings/src/stream/mod.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use pyo3::prelude::*;
1414
use pyo3_async_runtimes::tokio::future_into_py;
1515
use pyo3_stub_gen::derive::*;
1616
use sift_stream::{Flow, FlowConfig, IngestionConfigMode, SiftStream};
17+
use std::collections::HashMap;
1718
use std::sync::Arc;
1819
use tokio::sync::Mutex;
1920

@@ -135,6 +136,20 @@ impl SiftStreamPy {
135136
Ok(awaitable.into())
136137
}
137138

139+
pub fn get_flows(&self) -> PyResult<HashMap<String, FlowConfigPy>> {
140+
let inner_guard = self.inner.blocking_lock();
141+
let sift_stream = inner_guard.as_ref().ok_or_else(|| {
142+
PyErr::new::<pyo3::exceptions::PyRuntimeError, _>(
143+
"Stream has been consumed by finish()",
144+
)
145+
})?;
146+
Ok(sift_stream
147+
.get_flows()
148+
.into_iter()
149+
.map(|(k, v)| (k, v.into()))
150+
.collect())
151+
}
152+
138153
pub fn attach_run(&self, py: Python, run_selector: RunSelectorPy) -> PyResult<Py<PyAny>> {
139154
let inner = self.inner.clone();
140155

0 commit comments

Comments
 (0)