Skip to content

Commit b3c52b5

Browse files
authored
rust(bugfix): Prefer streaming data on channel full instead of erroring (#363)
* Log warning when backup data channel is full instead of returning to caller. * Bump rc version and add changelog entry.
1 parent 68b08c8 commit b3c52b5

6 files changed

Lines changed: 82 additions & 48 deletions

File tree

Cargo.toml

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ members = [
1212

1313
[workspace.package]
1414
authors = ["Sift Software Engineers <engineering@siftstack.com>"]
15-
version = "0.7.0-rc.1"
15+
version = "0.7.0-rc.2"
1616
edition = "2024"
1717
categories = ["aerospace", "science::robotics"]
1818
homepage = "https://github.com/sift-stack/sift"
@@ -26,10 +26,10 @@ chrono = { version = "0.4.39", default-features = false, features = ["clock"] }
2626
pbjson-types = "^0.7"
2727
tonic = { version = "^0.12", features = ["gzip"] }
2828

29-
sift_connect = { version = "0.7.0-rc.1", path = "rust/crates/sift_connect" }
30-
sift_rs = { version = "0.7.0-rc.1", path = "rust/crates/sift_rs" }
31-
sift_error = { version = "0.7.0-rc.1", path = "rust/crates/sift_error" }
32-
sift_stream = { version = "0.7.0-rc.1", path = "rust/crates/sift_stream" }
29+
sift_connect = { version = "0.7.0-rc.2", path = "rust/crates/sift_connect" }
30+
sift_rs = { version = "0.7.0-rc.2", path = "rust/crates/sift_rs" }
31+
sift_error = { version = "0.7.0-rc.2", path = "rust/crates/sift_error" }
32+
sift_stream = { version = "0.7.0-rc.2", path = "rust/crates/sift_stream" }
3333

3434
sift_stream_bindings = { version = "0.1.0", path = "rust/crates/sift_stream_bindings" }
3535

rust/CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,15 @@ All notable changes to this project will be documented in this file.
33

44
This project adheres to [Semantic Versioning](http://semver.org/).
55

6+
## [v0.7.0-rc.2] - November 3, 2025
7+
### What's New
8+
#### Improvements For Constrained Environments
9+
Compression support has been added for streaming data into Sift. This can be useful in low-bandwidth or network
10+
constrained environments. Since compression does add overhead to streaming, it is not recommended for high
11+
throughput streaming systems.
12+
13+
Additionally, changes have been made to ensure data is streamed to Sift even when writing backup files lags
14+
behind ingestion.
615

716
## [v0.7.0-rc.1] - October 24, 2025
817
### What's New

rust/crates/sift_stream/README.md

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -111,19 +111,23 @@ error handling, and shutdown processes.
111111
- **backup_tx/backup_rx**: Bounded channel for backup data messages
112112

113113
**IMPORTANT**:
114-
Data reliability is among the most important requirements of sift-stream, thus if the data channel used for backups
115-
becomes full, an error is returned to the caller. This is in contrast to the data channel used for the primary
116-
ingestion into Sift -- if this channel becomes full, the oldest data will be removed in favor of streaming newer
117-
data. The data removed during this process will have been backed up to disk and will be re-ingested at the next
118-
checkpoint.
114+
Data reliability is among the most important goals of sift-stream, however if the backup system falls
115+
behind (slow disk writes), a warning will be emitted but data will still be sent for ingestion.
116+
Streaming data in this situation is preferred over preventing it entirely. It is **highly** recommended
117+
however that write speeds be sufficiently high for the given data stream being backed up to ensure data
118+
is reliably backed up.
119+
120+
In contrast, if the primary ingestion channel becomes full, the oldest data will be removed in favor of
121+
streaming newer data. The data removed during this process will be forwarded to the backup system and
122+
will be re-ingested at the next checkpoint.
119123

120124
### Control Channel
121-
- **control_tx/control_rx**: Broadcast channel (1,024 capacity) for low-frequency control messages
125+
- **control_tx/control_rx**: Broadcast channel for low-frequency control messages
122126

123127
### Channel Capacities
124128

125129
The default capacities are as follows:
126-
- **data**: 10,240
130+
- **data**: 102,400
127131
- **control**: 1024
128132

129133
These can be configured however, based on individual streaming needs.

rust/crates/sift_stream/src/stream/builder.rs

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,8 @@ pub struct SiftStreamBuilder<C> {
5656
asset_tags: Option<Vec<String>>,
5757
asset_metadata: Option<Vec<MetadataValue>>,
5858
control_channel_capacity: usize,
59-
data_channel_capacity: usize,
59+
ingestion_data_channel_capacity: usize,
60+
backup_data_channel_capacity: usize,
6061
enable_compression_for_ingestion: bool,
6162

6263
// Either `run` or `run_id`. If both are provided then the `run_id` will be prioritized.
@@ -136,10 +137,17 @@ where
136137
self
137138
}
138139

139-
/// Sets the data channel capacity. See the [top-level documentation](crate#checkpoints)
140+
/// Sets the ingestion data channel capacity. See the [top-level documentation](crate#checkpoints)
140141
/// for further details.
141-
pub fn data_channel_capacity(mut self, capacity: usize) -> SiftStreamBuilder<C> {
142-
self.data_channel_capacity = capacity;
142+
pub fn ingestion_data_channel_capacity(mut self, capacity: usize) -> SiftStreamBuilder<C> {
143+
self.ingestion_data_channel_capacity = capacity;
144+
self
145+
}
146+
147+
/// Sets the backup data channel capacity. See the [top-level documentation](crate#checkpoints)
148+
/// for further details.
149+
pub fn backup_data_channel_capacity(mut self, capacity: usize) -> SiftStreamBuilder<C> {
150+
self.backup_data_channel_capacity = capacity;
143151
self
144152
}
145153

@@ -222,7 +230,8 @@ impl SiftStreamBuilder<IngestionConfigMode> {
222230
asset_tags: None,
223231
asset_metadata: None,
224232
control_channel_capacity: CONTROL_CHANNEL_CAPACITY,
225-
data_channel_capacity: DATA_CHANNEL_CAPACITY,
233+
ingestion_data_channel_capacity: DATA_CHANNEL_CAPACITY,
234+
backup_data_channel_capacity: DATA_CHANNEL_CAPACITY,
226235
}
227236
}
228237

@@ -242,7 +251,8 @@ impl SiftStreamBuilder<IngestionConfigMode> {
242251
asset_tags: None,
243252
asset_metadata: None,
244253
control_channel_capacity: CONTROL_CHANNEL_CAPACITY,
245-
data_channel_capacity: DATA_CHANNEL_CAPACITY,
254+
ingestion_data_channel_capacity: DATA_CHANNEL_CAPACITY,
255+
backup_data_channel_capacity: DATA_CHANNEL_CAPACITY,
246256
}
247257
}
248258

@@ -360,7 +370,8 @@ impl SiftStreamBuilder<IngestionConfigMode> {
360370
enable_compression_for_ingestion,
361371
recovery_config,
362372
control_channel_capacity: self.control_channel_capacity,
363-
data_channel_capacity: self.data_channel_capacity,
373+
ingestion_data_channel_capacity: self.ingestion_data_channel_capacity,
374+
backup_data_channel_capacity: self.backup_data_channel_capacity,
364375
};
365376

366377
SiftStream::<IngestionConfigMode>::new(ingestion_config, flows, run, task_config, metrics)

rust/crates/sift_stream/src/stream/mode/ingestion_config.rs

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -132,17 +132,15 @@ impl SiftStream<IngestionConfigMode> {
132132
/// to register the flow before calling [SiftStream::send]; otherwise users should monitor the
133133
/// Sift DLQ either in the Sift UI or Sift API to ensure successful transmission.
134134
///
135-
/// In the case where the underlying error was closed due to an error, this method will invoke
136-
/// the configured [RetryPolicy] to attempt to reconnect and resend data to Sift. If the amount
137-
/// of retry attempts exceeds the maximum configured, then an [ErrorKind::RetriesExhausted] is
138-
/// returned. If backups are enabled, then all messages since the last successful checkpoint
139-
/// will be reingested.
135+
/// When "sending" messages, first the message will sent to the backup system. This system
136+
/// is used to backup data to disk until the data is confirmed received by Sift. If streaming
137+
/// encounters errors, the backed up data will be re-ingested ensuring all data is received
138+
/// by Sift.
140139
///
141-
/// Lastly, if the underlying stream was gracefully closed due to a checkpoint, this method
142-
/// will automatically establish a new connection.
140+
/// If the backup system has fallen behind and the backup queue/channel is full, it will
141+
/// proceed to sending the message to Sift.
143142
///
144-
/// TODO: To preserve the API, this function is remaining async even though it no longer has
145-
/// any `await`s. This should be changed to a blocking function in the next major version.
143+
/// This ensures data is sent to Sift even if the backup system is lagging.
146144
pub async fn send(&mut self, message: Flow) -> Result<()> {
147145
self.metrics.messages_received.increment();
148146

@@ -227,15 +225,18 @@ impl SiftStream<IngestionConfigMode> {
227225
dropped_for_ingestion: false,
228226
};
229227

230-
// Send the message for backup first. If this fails, return an error.
228+
// Send the message for backup first. If this fails, log an error and continue.
231229
//
232-
// Failure to backup leads to data loss and should be treated as very critical.
233-
self.mode
234-
.stream_system
235-
.backup_tx
236-
.try_send(data_msg.clone())
237-
.map_err(|e| Error::new(ErrorKind::StreamError, e))
238-
.context("failed to send data to backup task system")?;
230+
// Failure to backup can lead to data loss though it is preferable to attempt
231+
// to stream the message to Sift rather than return the error and prevent both.
232+
if let Err(e) = self.mode.stream_system.backup_tx.try_send(data_msg.clone()) {
233+
#[cfg(feature = "tracing")]
234+
tracing::warn!(
235+
sift_stream_id = self.mode.sift_stream_id.to_string(),
236+
"failed to send data to backup system, data will still be streamed to Sift: {e}"
237+
);
238+
}
239+
239240
self.metrics.messages_sent_to_backup.increment();
240241

241242
// Send the message for ingestion.
@@ -267,7 +268,7 @@ impl SiftStream<IngestionConfigMode> {
267268
}
268269
Err(e) => Err(Error::new_msg(
269270
ErrorKind::StreamError,
270-
format!("queueing data for ingestion failed: {}", e),
271+
format!("queueing data for ingestion failed: {e}"),
271272
)),
272273
}
273274
}

rust/crates/sift_stream/src/stream/tasks.rs

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use tokio::{sync::broadcast, task::JoinHandle, time::Instant};
1616
use uuid::Uuid;
1717

1818
/// Capacity for the data channel.
19-
pub(crate) const DATA_CHANNEL_CAPACITY: usize = 1024 * 10;
19+
pub(crate) const DATA_CHANNEL_CAPACITY: usize = 1024 * 100;
2020

2121
/// Capacity for the control channel.
2222
pub(crate) const CONTROL_CHANNEL_CAPACITY: usize = 1024;
@@ -66,7 +66,8 @@ pub(crate) struct TaskConfig {
6666
pub(crate) enable_compression_for_ingestion: bool,
6767
pub(crate) recovery_config: RecoveryConfig,
6868
pub(crate) control_channel_capacity: usize,
69-
pub(crate) data_channel_capacity: usize,
69+
pub(crate) ingestion_data_channel_capacity: usize,
70+
pub(crate) backup_data_channel_capacity: usize,
7071
}
7172

7273
/// Data message with stream ID for routing
@@ -92,8 +93,9 @@ pub(crate) fn start_tasks(config: TaskConfig) -> Result<StreamSystem> {
9293
let (control_tx, _control_rx) = broadcast::channel(config.control_channel_capacity);
9394

9495
// Create data channel for high-frequency data messages
95-
let (ingestion_tx, ingestion_rx) = async_channel::bounded(config.data_channel_capacity);
96-
let (backup_tx, backup_rx) = async_channel::bounded(config.data_channel_capacity);
96+
let (ingestion_tx, ingestion_rx) =
97+
async_channel::bounded(config.ingestion_data_channel_capacity);
98+
let (backup_tx, backup_rx) = async_channel::bounded(config.backup_data_channel_capacity);
9799

98100
// Clone the sender for each task
99101
let backup_control_tx = control_tx.clone();
@@ -498,7 +500,8 @@ mod tests {
498500
checkpoint_interval,
499501
enable_compression_for_ingestion: false,
500502
control_channel_capacity: 128,
501-
data_channel_capacity: 128,
503+
ingestion_data_channel_capacity: 128,
504+
backup_data_channel_capacity: 128,
502505
recovery_config: RecoveryConfig {
503506
retry_policy: RetryPolicy::default(),
504507
backups_enabled: true,
@@ -557,7 +560,8 @@ mod tests {
557560
checkpoint_interval,
558561
enable_compression_for_ingestion: false,
559562
control_channel_capacity: 128,
560-
data_channel_capacity: 128,
563+
ingestion_data_channel_capacity: 128,
564+
backup_data_channel_capacity: 128,
561565
recovery_config: RecoveryConfig {
562566
retry_policy: RetryPolicy::default(),
563567
backups_enabled: true,
@@ -610,7 +614,8 @@ mod tests {
610614
checkpoint_interval,
611615
enable_compression_for_ingestion: false,
612616
control_channel_capacity: 128,
613-
data_channel_capacity: 128,
617+
ingestion_data_channel_capacity: 128,
618+
backup_data_channel_capacity: 128,
614619
recovery_config: RecoveryConfig {
615620
retry_policy: RetryPolicy::default(),
616621
backups_enabled: true,
@@ -673,7 +678,8 @@ mod tests {
673678
checkpoint_interval: Duration::from_secs(60),
674679
enable_compression_for_ingestion: false,
675680
control_channel_capacity: 128,
676-
data_channel_capacity: 128,
681+
ingestion_data_channel_capacity: 128,
682+
backup_data_channel_capacity: 128,
677683
recovery_config: RecoveryConfig {
678684
retry_policy: RetryPolicy::default(),
679685
backups_enabled: true,
@@ -750,7 +756,8 @@ mod tests {
750756
checkpoint_interval,
751757
enable_compression_for_ingestion: false,
752758
control_channel_capacity: 128,
753-
data_channel_capacity: 128,
759+
ingestion_data_channel_capacity: 128,
760+
backup_data_channel_capacity: 128,
754761
recovery_config: RecoveryConfig {
755762
retry_policy: RetryPolicy {
756763
max_attempts: 3,
@@ -843,7 +850,8 @@ mod tests {
843850
checkpoint_interval,
844851
enable_compression_for_ingestion: false,
845852
control_channel_capacity: 128,
846-
data_channel_capacity: 128,
853+
ingestion_data_channel_capacity: 128,
854+
backup_data_channel_capacity: 128,
847855
recovery_config: RecoveryConfig {
848856
retry_policy: RetryPolicy::default(),
849857
backups_enabled: true,
@@ -914,7 +922,8 @@ mod tests {
914922
checkpoint_interval,
915923
enable_compression_for_ingestion: false,
916924
control_channel_capacity: 128,
917-
data_channel_capacity: 128,
925+
ingestion_data_channel_capacity: 128,
926+
backup_data_channel_capacity: 128,
918927
recovery_config: RecoveryConfig {
919928
retry_policy: RetryPolicy::default(),
920929
backups_enabled: true,

0 commit comments

Comments
 (0)