Skip to content

Commit 8983a32

Browse files
committed
python(feat): Update sift-stream bindings for FlowBuilder support
1 parent db0fc7c commit 8983a32

13 files changed

Lines changed: 464 additions & 354 deletions

File tree

python/lib/sift_client/_internal/low_level_wrappers/ingestion.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,11 @@
3838
from sift_stream_bindings import (
3939
DurationPy,
4040
FlowConfigPy,
41+
FlowDescriptorPy,
4142
FlowPy,
4243
IngestionConfigFormPy,
4344
IngestWithConfigDataStreamRequestPy,
45+
IngestWithConfigDataStreamRequestWrapperPy,
4446
MetadataPy,
4547
RecoveryStrategyPy,
4648
RunFormPy,
@@ -129,12 +131,10 @@ class IngestionConfigStreamingLowLevelClient(LowLevelClientBase):
129131
DEFAULT_MAX_LOG_FILES = 7 # Equal to 1 week of logs
130132
DEFAULT_LOGFILE_PREFIX = "sift_stream_bindings.log"
131133
_sift_stream_instance: SiftStreamPy
132-
_known_flows: dict[str, FlowConfig]
133134

134-
def __init__(self, sift_stream_instance: SiftStreamPy, known_flows: dict[str, FlowConfig]):
135+
def __init__(self, sift_stream_instance: SiftStreamPy):
135136
super().__init__()
136137
self._sift_stream_instance = sift_stream_instance
137-
self._known_flows = known_flows
138138

139139
@classmethod
140140
async def create_sift_stream_instance(
@@ -194,12 +194,7 @@ async def create_sift_stream_instance(
194194

195195
sift_stream_instance = await builder.build()
196196

197-
known_flows = {
198-
flow_name: FlowConfig._from_rust_config(flow)
199-
for flow_name, flow in sift_stream_instance.get_flows().items()
200-
}
201-
202-
return cls(sift_stream_instance, known_flows)
197+
return cls(sift_stream_instance)
203198

204199
async def send(self, flow: FlowPy):
205200
await self._sift_stream_instance.send(flow)
@@ -210,12 +205,16 @@ async def batch_send(self, flows: Iterable[FlowPy]):
210205
async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]):
211206
await self._sift_stream_instance.send_requests(requests)
212207

208+
def send_requests_nonblocking(
209+
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
210+
):
211+
self._sift_stream_instance.send_requests_nonblocking(requests)
212+
213+
def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
214+
return self._sift_stream_instance.get_flow_descriptor(flow_name)
215+
213216
async def add_new_flows(self, flow_configs: list[FlowConfigPy]):
214217
await self._sift_stream_instance.add_new_flows(flow_configs)
215-
self._known_flows = {
216-
flow_name: FlowConfig._from_rust_config(flow)
217-
for flow_name, flow in self._sift_stream_instance.get_flows().items()
218-
}
219218

220219
async def attach_run(self, run_selector: RunSelectorPy):
221220
await self._sift_stream_instance.attach_run(run_selector)

python/lib/sift_client/resources/ingestion.py

Lines changed: 27 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@
1818
from sift_stream_bindings import (
1919
DiskBackupPolicyPy,
2020
DurationPy,
21+
FlowDescriptorPy,
2122
FlowPy,
2223
IngestionConfigFormPy,
2324
IngestWithConfigDataStreamRequestPy,
25+
IngestWithConfigDataStreamRequestWrapperPy,
2426
MetadataPy,
2527
RecoveryStrategyPy,
2628
RetryPolicyPy,
@@ -491,6 +493,31 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy
491493
"""
492494
await self._low_level_client.send_requests(requests)
493495

496+
def send_requests_nonblocking(
497+
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
498+
):
499+
"""Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.
500+
501+
This method offers a way to send data that matches the raw gRPC service interface. You are
502+
expected to handle channel value ordering as well as empty values correctly.
503+
504+
Important:
505+
If using this interface, you should use `FlowBuilderPy::request` to ensure proper
506+
building of the request.
507+
508+
Args:
509+
requests: List of ingestion requests to send to Sift.
510+
"""
511+
self._low_level_client.send_requests_nonblocking(requests)
512+
513+
def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
514+
"""Retrieve a flow descriptor by name.
515+
516+
Args:
517+
flow_name: The name of the flow descriptor to retrieve.
518+
"""
519+
return self._low_level_client.get_flow_descriptor(flow_name)
520+
494521
async def add_new_flows(self, flow_configs: list[FlowConfig]):
495522
"""Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.
496523
@@ -575,23 +602,6 @@ def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy:
575602
"""
576603
return self._low_level_client.get_metrics_snapshot()
577604

578-
def get_flow_config(self, flow_name: str) -> FlowConfig:
579-
"""Retrieve a flow configuration by name.
580-
581-
Args:
582-
flow_name: The name of the flow configuration to retrieve.
583-
584-
Returns:
585-
The FlowConfig associated with the given flow name.
586-
587-
Raises:
588-
KeyError: If the flow name is not found in the known flows.
589-
"""
590-
flow_config = self._low_level_client._known_flows.get(flow_name)
591-
if flow_config is None:
592-
raise KeyError(f"FlowConfig {flow_name} is unknown to the ingestion client")
593-
return flow_config
594-
595605
async def __aenter__(self):
596606
return self
597607

rust/crates/sift_stream/benches/message_to_ingest_req.rs

Lines changed: 19 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ use std::hint::black_box;
3030
use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig};
3131
use sift_stream::stream::mode::ingestion_config::Flow;
3232
use sift_stream::{
33-
ChannelDataType, ChannelValue, TimeValue, Value,
33+
ChannelDataType, ChannelValue, FlowDescriptor, TimeValue, Value,
3434
stream::mode::bench::{message_to_ingest_req, message_to_ingest_req_direct},
3535
};
3636

@@ -118,11 +118,9 @@ fn flow_randomized(name: &str, flow_config: &FlowConfig) -> Flow {
118118
}
119119

120120
// Configuration constants - these can be adjusted to test different scenarios
121-
const NUM_FLOWS: usize = 10; // Number of flow configs to create
122121
const NUM_CHANNELS_PER_FLOW: usize = 2000; // Number of channels per flow
123122
const INGESTION_CONFIG_ID: &str = "benchmark-config";
124123
const RUN_ID: Option<String> = None;
125-
const FLOW_TO_RANDOMIZE: usize = 8;
126124

127125
fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
128126
// Create a flow with ordered channel values (matching the first flow config)
@@ -140,61 +138,42 @@ fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
140138
}
141139

142140
fn benchmark_message_to_ingest_req_ordered(c: &mut Criterion) {
143-
// Create flow configs
144-
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
145-
for i in 0..NUM_FLOWS {
146-
flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW));
147-
}
141+
// Create a flow with ordered channel values.
142+
let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW);
143+
let message = flow_ordered("my_benchmark_flow", &flow);
148144

149-
// Create a flow with ordered channel values (matching the first flow config)
150-
let message = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
145+
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
151146

152147
c.bench_function("message_to_ingest_req_ordered", |b| {
153-
b.iter(|| {
154-
black_box(message_to_ingest_req(
155-
&message,
156-
INGESTION_CONFIG_ID,
157-
RUN_ID.clone(),
158-
&flow_configs,
159-
))
160-
})
148+
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
161149
});
162150
}
163151

164152
fn benchmark_message_to_ingest_req_randomized(c: &mut Criterion) {
165-
// Create flow configs
166-
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
167-
for i in 0..NUM_FLOWS {
168-
flow_configs.push(flow_config(&format!("flow_{i}"), NUM_CHANNELS_PER_FLOW));
169-
}
153+
// Create a flow with randomized channel values.
154+
let flow = flow_config("my_benchmark_flow", NUM_CHANNELS_PER_FLOW);
155+
let message = flow_randomized("my_benchmark_flow", &flow);
170156

171-
// Create a flow with randomized channel values (matching the first flow config)
172-
let message = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
157+
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
173158

174159
c.bench_function("message_to_ingest_req_randomized", |b| {
175-
b.iter(|| {
176-
black_box(message_to_ingest_req(
177-
&message,
178-
INGESTION_CONFIG_ID,
179-
RUN_ID.clone(),
180-
&flow_configs,
181-
))
182-
})
160+
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
183161
});
184162
}
185163

186164
fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) {
187165
let mut group = c.benchmark_group("message_to_ingest_req_varying_sizes");
188166

189167
for &num_channels in &[5, 10, 100, 1000, 5000] {
168+
let flow_name = format!("flow_{num_channels}");
169+
190170
// Create flow configs with varying channel counts
191-
let mut flow_configs = Vec::with_capacity(NUM_FLOWS);
192-
for i in 0..NUM_FLOWS {
193-
flow_configs.push(flow_config(&format!("flow_{i}"), num_channels));
194-
}
171+
let flow = flow_config(&flow_name, num_channels);
172+
let message_ordered = flow_ordered(&flow_name, &flow);
173+
let message_randomized = flow_randomized(&flow_name, &flow);
174+
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
195175

196176
// Test direct scenario
197-
let message_ordered = flow_ordered("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
198177
group.bench_function(&format!("direct_{num_channels}_channels"), |b| {
199178
b.iter(|| {
200179
black_box(message_to_ingest_req_direct(
@@ -209,22 +188,19 @@ fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) {
209188
b.iter(|| {
210189
black_box(message_to_ingest_req(
211190
&message_ordered,
212-
INGESTION_CONFIG_ID,
213191
RUN_ID.clone(),
214-
&flow_configs,
192+
&descriptor,
215193
))
216194
})
217195
});
218196

219197
// Test randomized scenario
220-
let message_randomized = flow_randomized("flow_0", &flow_configs[FLOW_TO_RANDOMIZE]);
221198
group.bench_function(&format!("randomized_{num_channels}_channels"), |b| {
222199
b.iter(|| {
223200
black_box(message_to_ingest_req(
224201
&message_randomized,
225-
INGESTION_CONFIG_ID,
226202
RUN_ID.clone(),
227-
&flow_configs,
203+
&descriptor,
228204
))
229205
})
230206
});

rust/crates/sift_stream/src/stream/mode/bench.rs

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,15 @@
1+
use crate::stream::flow::FlowDescriptor;
12
use crate::stream::mode::ingestion_config::Flow;
23
use crate::{IngestionConfigMode, SiftStream};
3-
use sift_rs::ingestion_configs::v2::FlowConfig;
44

55
/// Unstable wrapper around [SiftStream::message_to_ingest_req] used for benchmarking purposes.
66
#[inline]
77
pub fn message_to_ingest_req(
88
message: &Flow,
9-
ingestion_config_id: &str,
109
run_id: Option<String>,
11-
flows: &[FlowConfig],
10+
descriptor: &FlowDescriptor<String>,
1211
) -> Option<sift_rs::ingest::v1::IngestWithConfigDataStreamRequest> {
13-
SiftStream::<IngestionConfigMode>::message_to_ingest_req(
14-
message,
15-
ingestion_config_id,
16-
run_id,
17-
flows,
18-
)
12+
SiftStream::<IngestionConfigMode>::message_to_ingest_req(message, run_id, descriptor)
1913
}
2014

2115
/// Unstable wrapper around [SiftStream::message_to_ingest_req_direct] used for benchmarking purposes.

0 commit comments

Comments
 (0)