Skip to content

Commit ab91090

Browse files
authored
rust(feat): Update sift-stream-bindings to support FlowDescriptor/Flo… (#406)
1 parent 31e9cf6 commit ab91090

12 files changed

Lines changed: 431 additions & 332 deletions

File tree

python/pyproject.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ all = [
5959
'pyOpenSSL<24.0.0',
6060
'pyarrow>=17.0.0',
6161
'rosbags~=0.0',
62-
'sift-stream-bindings>=0.2.0-rc2',
62+
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
6363
'types-pyOpenSSL<24.0.0',
6464
]
6565
build = [
@@ -100,7 +100,7 @@ dev-all = [
100100
'pytest==8.2.2',
101101
'rosbags~=0.0',
102102
'ruff~=0.12.10',
103-
'sift-stream-bindings>=0.2.0-rc2',
103+
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
104104
'tomlkit~=0.13.3',
105105
'types-pyOpenSSL<24.0.0',
106106
]
@@ -153,7 +153,7 @@ docs-build = [
153153
'pytest==8.2.2',
154154
'rosbags~=0.0',
155155
'ruff~=0.12.10',
156-
'sift-stream-bindings>=0.2.0-rc2',
156+
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
157157
'tomlkit~=0.13.3',
158158
'types-pyOpenSSL<24.0.0',
159159
]
@@ -176,10 +176,10 @@ rosbags = [
176176
'rosbags~=0.0',
177177
]
178178
sift-stream = [
179-
'sift-stream-bindings>=0.2.0-rc2',
179+
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
180180
]
181181
sift-stream-bindings = [
182-
'sift-stream-bindings>=0.2.0-rc2',
182+
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
183183
]
184184
tdms = [
185185
'npTDMS~=1.9',
@@ -215,7 +215,7 @@ docs = ["mkdocs",
215215
openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"]
216216
tdms = ["npTDMS~=1.9"]
217217
rosbags = ["rosbags~=0.0"]
218-
sift-stream = ["sift-stream-bindings>=0.2.0-rc2"]
218+
sift-stream = ["sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4"]
219219
hdf5 = ["h5py~=3.11", "polars~=1.8"]
220220
data-review = ["pyarrow>=17.0.0"]
221221

rust/crates/sift_stream/benches/message_to_ingest_req.rs

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::hint::black_box;
3030
use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig};
3131
use sift_stream::stream::mode::ingestion_config::Flow;
3232
use sift_stream::{
33-
ChannelDataType, ChannelValue, TimeValue, Value,
33+
ChannelDataType, ChannelValue, FlowDescriptor, TimeValue, Value,
3434
stream::mode::bench::{message_to_ingest_req, message_to_ingest_req_direct},
3535
};
3636

@@ -118,11 +118,9 @@ fn flow_randomized(name: &str, flow_config: &FlowConfig) -> Flow {
118118
}
119119

120120
// Configuration constants - these can be adjusted to test different scenarios
121-
const NUM_FLOWS: usize = 10; // Number of flow configs to create
122121
const NUM_CHANNELS_PER_FLOW: usize = 2000; // Number of channels per flow
123122
const INGESTION_CONFIG_ID: &str = "benchmark-config";
124123
const RUN_ID: Option<String> = None;
125-
const FLOW_TO_RANDOMIZE: usize = 8;
126124

127125
fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
128126
// Create a flow with ordered channel values (matching the first flow config)
@@ -140,61 +138,42 @@ fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
140138
}
141139

142140
fn benchmark_message_to_ingest_req_ordered(c: &mut Criterion) {
143-
// Create flow configs
144-
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
145-
for i in 0..NUM_FLOWS {
146-
flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW));
147-
}
141+
// Create a flow with ordered channel values.
142+
let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW);
143+
let message = flow_ordered("my_benchmark_flow", &flow);
148144

149-
// Create a flow with ordered channel values (matching the first flow config)
150-
let message = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
145+
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
151146

152147
c.bench_function("message_to_ingest_req_ordered", |b| {
153-
b.iter(|| {
154-
black_box(message_to_ingest_req(
155-
&message,
156-
INGESTION_CONFIG_ID,
157-
RUN_ID.clone(),
158-
&flow_configs,
159-
))
160-
})
148+
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
161149
});
162150
}
163151

164152
fn benchmark_message_to_ingest_req_randomized(c: &mut Criterion) {
165-
// Create flow configs
166-
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
167-
for i in 0..NUM_FLOWS {
168-
flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW));
169-
}
153+
// Create a flow with randomized channel values.
154+
let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW);
155+
let message = flow_randomized("my_benchmark_flow", &flow);
170156

171-
// Create a flow with randomized channel values (matching the first flow config)
172-
let message = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
157+
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
173158

174159
c.bench_function("message_to_ingest_req_randomized", |b| {
175-
b.iter(|| {
176-
black_box(message_to_ingest_req(
177-
&message,
178-
INGESTION_CONFIG_ID,
179-
RUN_ID.clone(),
180-
&flow_configs,
181-
))
182-
})
160+
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
183161
});
184162
}
185163

186164
fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) {
187165
let mut group = c.benchmark_group("message_to_ingest_req_varying_sizes");
188166

189167
for &num_channels in &[5, 10, 100, 1000, 5000] {
168+
let flow_name = format!("flow_{num_channels}");
169+
190170
// Create flow configs with varying channel counts
191-
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
192-
for i in 0..NUM_FLOWS {
193-
flow_configs.push(flow_config(&format!("flow_{i}"), num_channels));
194-
}
171+
let flow = flow_config(&flow_name, num_channels);
172+
let message_ordered = flow_ordered(&flow_name, &flow);
173+
let message_randomized = flow_randomized(&flow_name, &flow);
174+
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
195175

196176
// Test direct scenario
197-
let message_ordered = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
198177
group.bench_function(&format!("direct_{num_channels}_channels"), |b| {
199178
b.iter(|| {
200179
black_box(message_to_ingest_req_direct(
@@ -209,22 +188,19 @@ fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) {
209188
b.iter(|| {
210189
black_box(message_to_ingest_req(
211190
&message_ordered,
212-
INGESTION_CONFIG_ID,
213191
RUN_ID.clone(),
214-
&flow_configs,
192+
&descriptor,
215193
))
216194
})
217195
});
218196

219197
// Test randomized scenario
220-
let message_randomized = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
221198
group.bench_function(&format!("randomized_{num_channels}_channels"), |b| {
222199
b.iter(|| {
223200
black_box(message_to_ingest_req(
224201
&message_randomized,
225-
INGESTION_CONFIG_ID,
226202
RUN_ID.clone(),
227-
&flow_configs,
203+
&descriptor,
228204
))
229205
})
230206
});

rust/crates/sift_stream/src/stream/mode/bench.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
1+
use crate::stream::flow::FlowDescriptor;
12
use crate::stream::mode::ingestion_config::Flow;
23
use crate::{IngestionConfigMode, SiftStream};
3-
use sift_rs::ingestion_configs::v2::FlowConfig;
44

55
/// Unstable wrapper around [SiftStream::message_to_ingest_req] used for benchmarking purposes.
66
#[inline]
77
pub fn message_to_ingest_req(
88
message: &Flow,
9-
ingestion_config_id: &str,
109
run_id: Option<String>,
11-
flows: &[FlowConfig],
10+
descriptor: &FlowDescriptor<String>,
1211
) -> Option<sift_rs::ingest::v1::IngestWithConfigDataStreamRequest> {
13-
SiftStream::<IngestionConfigMode>::message_to_ingest_req(
14-
message,
15-
ingestion_config_id,
16-
run_id,
17-
flows,
18-
)
12+
SiftStream::<IngestionConfigMode>::message_to_ingest_req(message, run_id, descriptor)
1913
}
2014

2115
/// Unstable wrapper around [SiftStream::message_to_ingest_req_direct] used for benchmarking purposes.

0 commit comments

Comments
 (0)