Skip to content

Commit d52246d

Browse files
authored
rust(feature): sift stream send api updates (#519)
1 parent d178675 commit d52246d

19 files changed

Lines changed: 1329 additions & 218 deletions

File tree

rust/crates/sift_stream/examples/backups-only/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
102102
tokio::time::sleep(Duration::from_millis(100)).await;
103103
}
104104

105-
// Next, stream telemetry to backup files using the [`SiftStream::send_requests_nonblocking`] method
105+
// Next, stream telemetry to backup files using the [`SiftStream::try_send_requests`] method
106106
// and the [`FlowBuilder`] to build the flow.
107107
//
108108
// This approach is more performant, and also provides methods to set the channel value via
@@ -116,15 +116,15 @@ async fn run() -> Result<(), Box<dyn Error>> {
116116
let run_id = sift_stream.run().unwrap().run_id.clone();
117117
for i in 0..360 {
118118
// Build the flow using the [`FlowBuilder`] and send it to
119-
// Sift using the [`SiftStream::send_requests_nonblocking`] method.
119+
// Sift using the [`SiftStream::try_send_requests`] method.
120120
let mut flow_builder = FlowBuilder::new(&descriptor);
121121
flow_builder.attach_run_id(&run_id);
122122
flow_builder
123123
.set_with_key("joint-angle-encoder", f64::from(i).sin())
124124
.unwrap();
125125

126126
sift_stream
127-
.send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())])
127+
.try_send_requests(vec![flow_builder.request(TimeValue::now())])
128128
.unwrap();
129129

130130
// For demonstrative purposes, adding a contrived wait to get 10Hz data.

rust/crates/sift_stream/examples/ingestion-tutorial/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
148148
// NOTE: This approach uses `Flow` and `SiftStream::send()` for ease of use, and will
149149
// provide acceptable performance for most users
150150
// In cases where additional performance is required, a separate, more performant method
151-
// is also available that uses `FlowBuilder` and `SiftStream::send_requests_nonblocking`
151+
// is also available that uses `FlowBuilder` and `SiftStream::try_send_requests`
152152
// See `examples/quick-start/` for an example using this alternate approach
153153
let start = std::time::Instant::now();
154154
while start.elapsed() < INGEST_DURATION {

rust/crates/sift_stream/examples/quick-start/main.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
9393
tokio::time::sleep(Duration::from_millis(100)).await;
9494
}
9595

96-
// Next, stream telemetry to Sift using the [`SiftStream::send_requests_nonblocking`] method
96+
// Next, stream telemetry to Sift using the [`SiftStream::try_send_requests`] method
9797
// and the [`FlowBuilder`] to build the flow.
9898
//
9999
// This approach is more performant, and also provides methods to set the channel value via
@@ -107,7 +107,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
107107
let run_id = sift_stream.run().unwrap().run_id.clone();
108108
for i in 0..360 {
109109
// Build the flow using the [`FlowBuilder`] and send it to
110-
// Sift using the [`SiftStream::send_requests_nonblocking`] method.
110+
// Sift using the [`SiftStream::try_send_requests`] method.
111111
let mut flow_builder = FlowBuilder::new(&descriptor);
112112
flow_builder.attach_run_id(&run_id);
113113
flow_builder
@@ -116,7 +116,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
116116

117117
// Send telemetry to Sift.
118118
sift_stream
119-
.send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())])
119+
.try_send_requests(vec![flow_builder.request(TimeValue::now())])
120120
.unwrap();
121121

122122
// For demonstrative purposes, adding a contrived wait to get 10Hz data.

rust/crates/sift_stream/src/backup/disk/async_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1122,7 +1122,7 @@ mod test {
11221122

11231123
// The backup file should have been rotated.
11241124
assert!(
1125-
backup_manager.file_ctx_buffer.len() > 0,
1125+
!backup_manager.file_ctx_buffer.is_empty(),
11261126
"backup files should be present"
11271127
);
11281128
assert!(

rust/crates/sift_stream/src/backup/disk/file_writer.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -452,16 +452,16 @@ mod tests {
452452
}
453453
}
454454

455-
if writer.should_rotate_file() {
456-
if let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file") {
457-
rotated_files.push(ctx.file_path);
458-
}
455+
if writer.should_rotate_file()
456+
&& let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file")
457+
{
458+
rotated_files.push(ctx.file_path);
459459
}
460460
}
461461

462462
// Should have created multiple files
463463
if !rotated_files.is_empty() {
464-
assert!(rotated_files.len() > 0);
464+
assert!(!rotated_files.is_empty());
465465
for file_path in &rotated_files {
466466
assert!(file_path.exists());
467467
}

rust/crates/sift_stream/src/lib.rs

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,87 @@
237237
//!
238238
//! Anything that falls outside of that will require changing the client-key.
239239
//!
240+
//! ## Sending Telemetry
241+
//!
242+
//! [`SiftStream`] exposes four methods for delivering telemetry. They differ only in whether
243+
//! they apply backpressure (blocking) or return immediately (non-blocking), and in whether
244+
//! they accept a high-level [`Flow`] or pre-encoded raw requests:
245+
//!
246+
//! | Method | Blocks? | Input |
247+
//! |---|---|---|
248+
//! | [`send`](stream::SiftStream::send) | Yes | [`Flow`] or any [`Encodeable`](stream::Encodeable) |
249+
//! | [`send_requests`](stream::SiftStream::send_requests) | Yes | Pre-encoded requests |
250+
//! | [`try_send`](stream::SiftStream::try_send) | No | [`Flow`] or any [`Encodeable`](stream::Encodeable) |
251+
//! | [`try_send_requests`](stream::SiftStream::try_send_requests) | No | Pre-encoded requests |
252+
//!
253+
//! ### Backpressure with `send`
254+
//!
255+
//! [`send`](stream::SiftStream::send) awaits until the backing channel has capacity, then
256+
//! delivers the message. Use this when you want the producer to slow down naturally when
257+
//! the pipeline is under load — the simplest and most common choice.
258+
//!
259+
//! ```ignore
260+
//! // Awaits until the channel has room; backpressure is applied automatically.
261+
//! sift_stream.send(Flow::new(
262+
//! "robotic-arm",
263+
//! TimeValue::now(),
264+
//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
265+
//! )).await?;
266+
//! ```
267+
//!
268+
//! On error, [`SiftStreamSendError`] is returned. Call `into_inner()` on the
269+
//! [`ChannelClosed`](stream::SiftStreamSendError::ChannelClosed) variant to recover the
270+
//! undelivered message.
271+
//!
272+
//! ### Non-blocking sends with `try_send`
273+
//!
274+
//! [`try_send`](stream::SiftStream::try_send) returns immediately regardless of channel
275+
//! state. If the channel is full it returns [`TrySendError::Full`] with the message; if
276+
//! the channel is closed it returns [`TrySendError::Closed`]. Use this in tight loops or
277+
//! real-time contexts where blocking even briefly is unacceptable.
278+
//!
279+
//! ```ignore
280+
//! match sift_stream.try_send(Flow::new(
281+
//! "robotic-arm",
282+
//! TimeValue::now(),
283+
//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
284+
//! )) {
285+
//! Ok(()) => {}
286+
//! Err(SiftStreamTrySendError::Channel(TrySendError::Full(msg))) => {
287+
//! // Channel is busy — drop this sample or buffer it for later.
288+
//! drop(msg);
289+
//! }
290+
//! Err(e) => return Err(e.into()),
291+
//! }
292+
//! ```
293+
//!
294+
//! ### Pre-encoded batch sends
295+
//!
296+
//! [`send_requests`](stream::SiftStream::send_requests) and
297+
//! [`try_send_requests`](stream::SiftStream::try_send_requests) accept pre-encoded
298+
//! [`IngestWithConfigDataStreamRequest`](sift_rs::ingest::v1::IngestWithConfigDataStreamRequest)
299+
//! values built with [`FlowBuilder`]. This skips the per-call encoding step and is the
300+
//! highest-throughput option.
301+
//!
302+
//! ```ignore
303+
//! let descriptor = sift_stream.get_flow_descriptor("robotic-arm").unwrap();
304+
//! let run_id = sift_stream.run().unwrap().run_id.clone();
305+
//!
306+
//! let mut builder = FlowBuilder::new(&descriptor);
307+
//! builder.attach_run_id(&run_id);
308+
//! builder.set_with_key("joint-angle-encoder", 7.2_f64).unwrap();
309+
//!
310+
//! // Blocking batch send with backpressure:
311+
//! sift_stream.send_requests(vec![builder.request(TimeValue::now())]).await?;
312+
//!
313+
//! // Non-blocking batch send:
314+
//! sift_stream.try_send_requests(vec![builder.request(TimeValue::now())])?;
315+
//! ```
316+
//!
317+
//! On the first failure, `send_requests` / `try_send_requests` stop iterating and return
318+
//! **all** undelivered messages — the failing one plus any not yet attempted — inside the
319+
//! error so nothing is silently dropped.
320+
//!
240321
//! ## Retry Policy
241322
//!
242323
//! At the time of writing this crate, [tonic](https://docs.rs/tonic/latest/tonic/)
@@ -456,6 +537,7 @@ pub use stream::{
456537
file_backup::FileBackup,
457538
ingestion_config::{Flow, IngestionConfigEncoder, LiveStreaming},
458539
},
540+
send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError},
459541
time::TimeValue,
460542
};
461543

rust/crates/sift_stream/src/stream/builder/config_loader.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -253,7 +253,6 @@ mod tests {
253253
..Default::default()
254254
},
255255
],
256-
..Default::default()
257256
}
258257
}
259258

@@ -373,7 +372,6 @@ mod tests {
373372
data_type: ChannelDataType::Double.into(),
374373
..Default::default()
375374
}],
376-
..Default::default()
377375
}];
378376
let form = create_test_ingestion_config_form(asset_name, client_key, existing_flows);
379377

0 commit comments

Comments
 (0)