Skip to content

Commit 68b08c8

Browse files
authored
rust(feat): Add gzip compression support in sift-stream ingestion (#360)
1 parent 86672a1 commit 68b08c8

5 files changed

Lines changed: 85 additions & 25 deletions

File tree

Cargo.toml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ members = [
1212

1313
[workspace.package]
1414
authors = ["Sift Software Engineers <engineering@siftstack.com>"]
15-
version = "0.6.0"
15+
version = "0.7.0-rc.1"
1616
edition = "2024"
1717
categories = ["aerospace", "science::robotics"]
1818
homepage = "https://github.com/sift-stack/sift"
@@ -24,12 +24,12 @@ license = "MIT"
2424
[workspace.dependencies]
2525
chrono = { version = "0.4.39", default-features = false, features = ["clock"] }
2626
pbjson-types = "^0.7"
27-
tonic = { version = "^0.12" }
27+
tonic = { version = "^0.12", features = ["gzip"] }
2828

29-
sift_connect = { version = "0.6.0", path = "rust/crates/sift_connect" }
30-
sift_rs = { version = "0.6.0", path = "rust/crates/sift_rs" }
31-
sift_error = { version = "0.6.0", path = "rust/crates/sift_error" }
32-
sift_stream = { version = "0.6.0", path = "rust/crates/sift_stream" }
29+
sift_connect = { version = "0.7.0-rc.1", path = "rust/crates/sift_connect" }
30+
sift_rs = { version = "0.7.0-rc.1", path = "rust/crates/sift_rs" }
31+
sift_error = { version = "0.7.0-rc.1", path = "rust/crates/sift_error" }
32+
sift_stream = { version = "0.7.0-rc.1", path = "rust/crates/sift_stream" }
3333

3434
sift_stream_bindings = { version = "0.1.0", path = "rust/crates/sift_stream_bindings" }
3535

rust/crates/sift_rs/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,4 @@ pub use r#gen::sift::*;
5050
pub mod wrappers;
5151

5252
pub use sift_connect::{Credentials, SiftChannel, SiftChannelBuilder};
53+
pub use tonic::codec::CompressionEncoding;

rust/crates/sift_stream/src/backup/disk/async_manager.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use async_channel::{Receiver, Sender};
88
use chrono::Utc;
99
use prost::Message as PbMessage;
1010
use sift_error::prelude::*;
11+
use sift_rs::CompressionEncoding;
1112
use sift_rs::{SiftChannel, ingest::v1::ingest_service_client::IngestServiceClient};
1213
use std::time::Duration;
1314
use std::{
@@ -407,6 +408,7 @@ pub(crate) struct BackupIngestTask {
407408
control_rx: broadcast::Receiver<ControlMessage>,
408409
to_reingest_rx: Receiver<PathBuf>,
409410
to_reingest_tx: Sender<PathBuf>,
411+
enable_compression_for_ingestion: bool,
410412
grpc_channel: SiftChannel,
411413
retry_policy: RetryPolicy,
412414
retain_backups: bool,
@@ -417,6 +419,7 @@ impl BackupIngestTask {
417419
pub(crate) fn new(
418420
control_rx: broadcast::Receiver<ControlMessage>,
419421
grpc_channel: SiftChannel,
422+
enable_compression_for_ingestion: bool,
420423
retry_policy: RetryPolicy,
421424
retain_backups: bool,
422425
metrics: Arc<SiftStreamMetrics>,
@@ -426,6 +429,7 @@ impl BackupIngestTask {
426429
control_rx,
427430
to_reingest_rx,
428431
to_reingest_tx,
432+
enable_compression_for_ingestion,
429433
grpc_channel,
430434
retry_policy,
431435
retain_backups,
@@ -442,6 +446,7 @@ impl BackupIngestTask {
442446
self.to_reingest_rx.clone(),
443447
self.grpc_channel.clone(),
444448
self.retry_policy,
449+
self.enable_compression_for_ingestion,
445450
self.retain_backups,
446451
self.metrics.clone(),
447452
));
@@ -493,11 +498,19 @@ impl BackupIngestTask {
493498
to_reingest_rx: Receiver<PathBuf>,
494499
grpc_channel: SiftChannel,
495500
retry_policy: RetryPolicy,
501+
enable_compression_for_ingestion: bool,
496502
retain_backups: bool,
497503
metrics: Arc<SiftStreamMetrics>,
498504
) -> Result<()> {
499505
let mut client = IngestServiceClient::new(grpc_channel);
500506

507+
// If compression is enabled, add the compression codecs to the client.
508+
if enable_compression_for_ingestion {
509+
client = client
510+
.send_compressed(CompressionEncoding::Gzip)
511+
.accept_compressed(CompressionEncoding::Gzip);
512+
}
513+
501514
while let Ok(backup_file_path) = to_reingest_rx.recv().await {
502515
metrics
503516
.backups
@@ -1578,6 +1591,7 @@ mod test {
15781591
let (to_reingest_tx, to_reingest_rx) = async_channel::bounded(1024);
15791592
let retry_policy = RetryPolicy::default();
15801593
let retain_backups = false;
1594+
let enable_compression_for_ingestion = false;
15811595
let (grpc_channel, mock_service) =
15821596
crate::test::create_mock_grpc_channel_with_service().await;
15831597
let metrics = Arc::new(SiftStreamMetrics::default());
@@ -1595,6 +1609,7 @@ mod test {
15951609
to_reingest_rx,
15961610
grpc_channel,
15971611
retry_policy,
1612+
enable_compression_for_ingestion,
15981613
retain_backups,
15991614
metrics
16001615
)
@@ -1637,6 +1652,7 @@ mod test {
16371652
let (to_reingest_tx, to_reingest_rx) = async_channel::bounded(1024);
16381653
let retry_policy = RetryPolicy::default();
16391654
let retain_backups = true;
1655+
let enable_compression_for_ingestion = false;
16401656
let (grpc_channel, mock_service) =
16411657
crate::test::create_mock_grpc_channel_with_service().await;
16421658
let metrics = Arc::new(SiftStreamMetrics::default());
@@ -1654,6 +1670,7 @@ mod test {
16541670
to_reingest_rx,
16551671
grpc_channel,
16561672
retry_policy,
1673+
enable_compression_for_ingestion,
16571674
retain_backups,
16581675
metrics
16591676
)
@@ -1701,6 +1718,7 @@ mod test {
17011718
backoff_multiplier: 5,
17021719
};
17031720
let retain_backups = false;
1721+
let enable_compression_for_ingestion = false;
17041722
let (grpc_channel, mock_service) =
17051723
crate::test::create_mock_grpc_channel_with_service().await;
17061724
let metrics = Arc::new(SiftStreamMetrics::default());
@@ -1722,6 +1740,7 @@ mod test {
17221740
to_reingest_rx,
17231741
grpc_channel,
17241742
retry_policy,
1743+
enable_compression_for_ingestion,
17251744
retain_backups,
17261745
metrics.clone()
17271746
)
@@ -1776,6 +1795,7 @@ mod test {
17761795
backoff_multiplier: 5,
17771796
};
17781797
let retain_backups = false;
1798+
let enable_compression_for_ingestion = false;
17791799
let (grpc_channel, mock_service) =
17801800
crate::test::create_mock_grpc_channel_with_service().await;
17811801
let metrics = Arc::new(SiftStreamMetrics::default());
@@ -1797,6 +1817,7 @@ mod test {
17971817
to_reingest_rx,
17981818
grpc_channel,
17991819
retry_policy,
1820+
enable_compression_for_ingestion,
18001821
retain_backups,
18011822
metrics.clone()
18021823
)
@@ -1846,13 +1867,15 @@ mod test {
18461867
// Create the re-ingestion task.
18471868
let retry_policy = RetryPolicy::default();
18481869
let retain_backups = false;
1870+
let enable_compression_for_ingestion = false;
18491871
let (grpc_channel, mock_service) =
18501872
crate::test::create_mock_grpc_channel_with_service().await;
18511873
let metrics = Arc::new(SiftStreamMetrics::default());
18521874

18531875
let reingest_task = BackupIngestTask::new(
18541876
control_rx,
18551877
grpc_channel,
1878+
enable_compression_for_ingestion,
18561879
retry_policy,
18571880
retain_backups,
18581881
metrics,
@@ -1926,13 +1949,15 @@ mod test {
19261949
// Create the re-ingestion task.
19271950
let retry_policy = RetryPolicy::default();
19281951
let retain_backups = false;
1952+
let enable_compression_for_ingestion = false;
19291953
let (grpc_channel, _mock_service) =
19301954
crate::test::create_mock_grpc_channel_with_service().await;
19311955
let metrics = Arc::new(SiftStreamMetrics::default());
19321956

19331957
let reingest_task = BackupIngestTask::new(
19341958
control_rx,
19351959
grpc_channel,
1960+
enable_compression_for_ingestion,
19361961
retry_policy,
19371962
retain_backups,
19381963
metrics,

rust/crates/sift_stream/src/stream/builder.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@ pub struct SiftStreamBuilder<C> {
5757
asset_metadata: Option<Vec<MetadataValue>>,
5858
control_channel_capacity: usize,
5959
data_channel_capacity: usize,
60+
enable_compression_for_ingestion: bool,
6061

6162
// Either `run` or `run_id`. If both are provided then the `run_id` will be prioritized.
6263
run: Option<RunForm>,
@@ -167,6 +168,17 @@ where
167168
self
168169
}
169170

171+
/// Sets whether compression is enabled.
172+
///
173+
/// Currently only gzip is supported.
174+
///
175+
/// WARNING: Compression adds additional overhead both on the client and server, so can reduce
176+
/// the overall throughput of a stream. It is not recommended to enable compression by default.
177+
pub fn enable_compression_for_ingestion(mut self, enable: bool) -> SiftStreamBuilder<C> {
178+
self.enable_compression_for_ingestion = enable;
179+
self
180+
}
181+
170182
/// Disables TLS. Useful for testing. This is ignored if [SiftStreamBuilder::from_channel] is
171183
/// used to initialize the builder.
172184
pub fn disable_tls(mut self) -> SiftStreamBuilder<C> {
@@ -200,6 +212,7 @@ impl SiftStreamBuilder<IngestionConfigMode> {
200212
credentials: Some(credentials),
201213
channel: None,
202214
enable_tls: true,
215+
enable_compression_for_ingestion: false,
203216
ingestion_config: None,
204217
run: None,
205218
run_id: None,
@@ -219,6 +232,7 @@ impl SiftStreamBuilder<IngestionConfigMode> {
219232
credentials: None,
220233
channel: Some(channel),
221234
enable_tls: true,
235+
enable_compression_for_ingestion: false,
222236
ingestion_config: None,
223237
run: None,
224238
run_id: None,
@@ -240,6 +254,7 @@ impl SiftStreamBuilder<IngestionConfigMode> {
240254
channel: grpc_channel,
241255
credentials,
242256
enable_tls,
257+
enable_compression_for_ingestion,
243258
ingestion_config,
244259
recovery_strategy,
245260
run,
@@ -342,6 +357,7 @@ impl SiftStreamBuilder<IngestionConfigMode> {
342357
grpc_channel: channel,
343358
metrics: metrics.clone(),
344359
checkpoint_interval,
360+
enable_compression_for_ingestion,
345361
recovery_config,
346362
control_channel_capacity: self.control_channel_capacity,
347363
data_channel_capacity: self.data_channel_capacity,

rust/crates/sift_stream/src/stream/tasks.rs

Lines changed: 37 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,9 @@ use crate::{
77
use async_channel;
88
use sift_connect::SiftChannel;
99
use sift_error::prelude::*;
10-
use sift_rs::ingest::v1::{
11-
IngestWithConfigDataStreamRequest, ingest_service_client::IngestServiceClient,
10+
use sift_rs::{
11+
CompressionEncoding,
12+
ingest::v1::{IngestWithConfigDataStreamRequest, ingest_service_client::IngestServiceClient},
1213
};
1314
use std::{path::PathBuf, pin::Pin, sync::Arc, time::Duration};
1415
use tokio::{sync::broadcast, task::JoinHandle, time::Instant};
@@ -47,31 +48,32 @@ pub(crate) enum ControlMessage {
4748
}
4849

4950
#[derive(Clone)]
50-
pub struct RecoveryConfig {
51-
pub retry_policy: RetryPolicy,
52-
pub backups_enabled: bool,
53-
pub backups_directory: String,
54-
pub backups_prefix: String,
55-
pub backup_policy: DiskBackupPolicy,
51+
pub(crate) struct RecoveryConfig {
52+
pub(crate) retry_policy: RetryPolicy,
53+
pub(crate) backups_enabled: bool,
54+
pub(crate) backups_directory: String,
55+
pub(crate) backups_prefix: String,
56+
pub(crate) backup_policy: DiskBackupPolicy,
5657
}
5758

5859
/// Configuration for the task-based SiftStream
5960
#[derive(Clone)]
60-
pub struct TaskConfig {
61-
pub sift_stream_id: Uuid,
62-
pub grpc_channel: SiftChannel,
63-
pub metrics: Arc<SiftStreamMetrics>,
64-
pub checkpoint_interval: Duration,
65-
pub recovery_config: RecoveryConfig,
66-
pub control_channel_capacity: usize,
67-
pub data_channel_capacity: usize,
61+
pub(crate) struct TaskConfig {
62+
pub(crate) sift_stream_id: Uuid,
63+
pub(crate) grpc_channel: SiftChannel,
64+
pub(crate) metrics: Arc<SiftStreamMetrics>,
65+
pub(crate) checkpoint_interval: Duration,
66+
pub(crate) enable_compression_for_ingestion: bool,
67+
pub(crate) recovery_config: RecoveryConfig,
68+
pub(crate) control_channel_capacity: usize,
69+
pub(crate) data_channel_capacity: usize,
6870
}
6971

7072
/// Data message with stream ID for routing
7173
#[derive(Debug, Clone)]
72-
pub struct DataMessage {
73-
pub request: IngestWithConfigDataStreamRequest,
74-
pub dropped_for_ingestion: bool,
74+
pub(crate) struct DataMessage {
75+
pub(crate) request: IngestWithConfigDataStreamRequest,
76+
pub(crate) dropped_for_ingestion: bool,
7577
}
7678

7779
/// Handles for the three main tasks
@@ -150,6 +152,7 @@ pub(crate) fn start_tasks(config: TaskConfig) -> Result<StreamSystem> {
150152
let reingestion_task = BackupIngestTask::new(
151153
reingestion_control_tx.subscribe(),
152154
reingestion_config.grpc_channel,
155+
reingestion_config.enable_compression_for_ingestion,
153156
reingest_retry_policy,
154157
reingestion_config
155158
.recovery_config
@@ -233,6 +236,14 @@ impl IngestionTask {
233236
// any race conditions in that task being polled for the first time and other
234237
// events occurring in the system.
235238
let mut client = IngestServiceClient::new(self.config.grpc_channel.clone());
239+
240+
// If compression is enabled, add the compression codecs to the client.
241+
if self.config.enable_compression_for_ingestion {
242+
client = client
243+
.send_compressed(CompressionEncoding::Gzip)
244+
.accept_compressed(CompressionEncoding::Gzip);
245+
}
246+
236247
let data_stream = DataStream::new(
237248
self.data_rx.clone(),
238249
self.control_tx.clone(),
@@ -485,6 +496,7 @@ mod tests {
485496
grpc_channel,
486497
metrics: metrics.clone(),
487498
checkpoint_interval,
499+
enable_compression_for_ingestion: false,
488500
control_channel_capacity: 128,
489501
data_channel_capacity: 128,
490502
recovery_config: RecoveryConfig {
@@ -543,6 +555,7 @@ mod tests {
543555
grpc_channel,
544556
metrics: metrics.clone(),
545557
checkpoint_interval,
558+
enable_compression_for_ingestion: false,
546559
control_channel_capacity: 128,
547560
data_channel_capacity: 128,
548561
recovery_config: RecoveryConfig {
@@ -595,6 +608,7 @@ mod tests {
595608
grpc_channel,
596609
metrics: metrics.clone(),
597610
checkpoint_interval,
611+
enable_compression_for_ingestion: false,
598612
control_channel_capacity: 128,
599613
data_channel_capacity: 128,
600614
recovery_config: RecoveryConfig {
@@ -657,6 +671,7 @@ mod tests {
657671
grpc_channel,
658672
metrics: metrics.clone(),
659673
checkpoint_interval: Duration::from_secs(60),
674+
enable_compression_for_ingestion: false,
660675
control_channel_capacity: 128,
661676
data_channel_capacity: 128,
662677
recovery_config: RecoveryConfig {
@@ -733,6 +748,7 @@ mod tests {
733748
grpc_channel,
734749
metrics: metrics.clone(),
735750
checkpoint_interval,
751+
enable_compression_for_ingestion: false,
736752
control_channel_capacity: 128,
737753
data_channel_capacity: 128,
738754
recovery_config: RecoveryConfig {
@@ -825,6 +841,7 @@ mod tests {
825841
grpc_channel,
826842
metrics: metrics.clone(),
827843
checkpoint_interval,
844+
enable_compression_for_ingestion: false,
828845
control_channel_capacity: 128,
829846
data_channel_capacity: 128,
830847
recovery_config: RecoveryConfig {
@@ -895,6 +912,7 @@ mod tests {
895912
grpc_channel,
896913
metrics: metrics.clone(),
897914
checkpoint_interval,
915+
enable_compression_for_ingestion: false,
898916
control_channel_capacity: 128,
899917
data_channel_capacity: 128,
900918
recovery_config: RecoveryConfig {

0 commit comments

Comments
 (0)