Skip to content

Commit b8f13b2

Browse files
authored
rust(feat): Adds a new SiftStream mode for only backup file writes (#416)
1 parent 76c1b41 commit b8f13b2

23 files changed

Lines changed: 2847 additions & 769 deletions

File tree

rust/crates/sift_error/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,8 @@ pub enum ErrorKind {
116116
UpdateRunError,
117117
/// Indicates that the program was unable to retrieve the ingestion config being requested.
118118
RetrieveIngestionConfigError,
119+
/// Indicates that the program was unable to encode the message being requested.
120+
EncodeMessageError,
119121
/// Indicates a failure to create a run.
120122
CreateRunError,
121123
/// Indicates a failure to create an ingestion config.
@@ -203,6 +205,7 @@ impl fmt::Display for ErrorKind {
203205
Self::UpdateAssetError => write!(f, "UpdateAssetError"),
204206
Self::RetrieveRunError => write!(f, "RetrieveRunError"),
205207
Self::RetrieveIngestionConfigError => write!(f, "RetrieveIngestionConfigError"),
208+
Self::EncodeMessageError => write!(f, "EncodeMessageError"),
206209
Self::EmptyResponseError => write!(f, "EmptyResponseError"),
207210
Self::NotFoundError => write!(f, "NotFoundError"),
208211
Self::CreateRunError => write!(f, "CreateRunError"),

rust/crates/sift_stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ chrono = { workspace = true }
2929
futures-core = "0.3.31"
3030
tokio-stream = { version = "0.1.17", features = ["sync"] }
3131
async-channel = "2.2"
32+
async-trait = "^0.1"
3233
prost = "^0.13"
3334
dirs = "6.0.0"
3435
uuid = { version = "1.16.0", features = ["v4"] }

rust/crates/sift_stream/README.md

Lines changed: 41 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,17 +27,48 @@ See the [examples](https://github.com/sift-stack/sift/tree/main/rust/crates/sift
2727
### Main Entry Points
2828

2929
- **SiftStreamBuilder**: Configures and builds SiftStream instances with various options
30-
- **SiftStream<IngestionConfigMode>**: Main streaming interface that users interact with
31-
- **IngestionConfigMode**: Core streaming implementation that manages the task system
30+
- **SiftStream<E, T>**: Generic streaming interface where `E: Encoder` (encodes data) and `T: Transport` (transmits data)
31+
- **SiftStream<IngestionConfigEncoder, LiveStreaming>**: Main streaming interface for real-time streaming to Sift (default)
32+
- **SiftStream<IngestionConfigEncoder, FileBackup>**: Streaming interface for writing data only to backup files (no live streaming)
33+
34+
### Architecture Overview
35+
36+
SiftStream uses a separation of concerns between **encoding** (how data is structured) and **transport** (how data is transmitted):
37+
38+
- **Encoder** trait: Defines how data is encoded/structured. The encoder is responsible for:
39+
- Converting user-provided data (e.g., `Flow` messages) into the appropriate message format
40+
- Managing flow descriptors and ingestion configuration
41+
- Providing metrics snapshots
42+
- Currently implemented by `IngestionConfigEncoder` for ingestion-config-based encoding
43+
44+
- **Transport** trait: Defines how encoded messages are transmitted. The transport is responsible for:
45+
- Sending encoded messages to their destination
46+
- Managing the transmission mechanism (gRPC streams, file writing, etc.)
47+
- Handling cleanup and shutdown
48+
- Currently implemented by `LiveStreaming` (gRPC streaming) and `FileBackup` (file writing)
49+
50+
- **Encodeable** trait: Types that can be encoded by an encoder (e.g., `Flow`, `FlowBuilder`)
51+
52+
This design allows for future extensibility: new encoding schemes or transport mechanisms can be added independently without affecting the other component.
53+
54+
### Transport Modes
55+
56+
SiftStream supports two different transport modes:
57+
58+
- **LiveStreaming**: The default transport that streams data directly to Sift via gRPC. This transport supports real-time streaming, optional disk backups, checkpointing, and retry policies. Use `SiftStreamBuilder::build()` to create a stream with this transport.
59+
60+
- **FileBackup**: A specialized transport that only writes telemetry data to backup files on disk without streaming to Sift. This transport is useful for offline data collection, batch processing, or scenarios with unreliable network connectivity. Use `SiftStreamBuilder::build_file_backup()` to create a stream with this transport. Note that this transport requires a `RecoveryStrategy::RetryWithBackups` configuration.
3261

3362
### Task System
3463

35-
The SiftStream architecture consists of three main async tasks that work together to provide reliable data streaming:
64+
The SiftStream architecture when using `LiveStreaming` transport consists of three main async tasks that work together to provide reliable data streaming:
3665

3766
1. **Backup Manager Task** - Handles backup file creation and management
3867
2. **Ingestion Task** - Manages gRPC streaming to Sift
3968
3. **Re-ingestion Task** - Handles re-ingestion of backup files when failures occur
4069

70+
**Note**: `FileBackup` transport does not use the task system architecture, as it only writes to disk files without streaming to Sift.
71+
4172
## Control Messages
4273

4374
Control messages are low-frequency messages sent between tasks via broadcast channels to coordinate checkpointing,
@@ -254,14 +285,15 @@ otherwise data loss may occur.
254285

255286
### Normal Operation Flow
256287

257-
1. **User sends data**`SiftStream::send()`
258-
2. **Data validation** → Flow cache lookup
259-
3. **Message ID assignment** → Each message receives a unique, monotonically increasing ID
260-
4. **Dual routing** → Both `ingestion_tx` and `backup_tx` channels
261-
5. **Parallel processing**:
288+
1. **User sends data**`SiftStream::send()` with an `Encodeable` type (e.g., `Flow`)
289+
2. **Encoding**`Encoder` converts the data to the appropriate message format
290+
3. **Transport**`Transport` sends the encoded message
291+
- For `LiveStreaming`: Message ID assignment → Dual routing to `ingestion_tx` and `backup_tx` channels
292+
- For `FileBackup`: Direct writing to backup files
293+
4. **Parallel processing** (LiveStreaming only):
262294
- Ingestion task → gRPC stream → Sift
263295
- Backup task → Backup files
264-
6. **Checkpoint completion** → Cleanup or re-ingestion
296+
5. **Checkpoint completion** → Cleanup or re-ingestion (LiveStreaming only)
265297

266298
### Failure Recovery Flow
267299

rust/crates/sift_stream/benches/message_to_ingest_req.rs

Lines changed: 8 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use rand::thread_rng;
2828
use std::hint::black_box;
2929

3030
use sift_rs::ingestion_configs::v2::{ChannelConfig, FlowConfig};
31+
use sift_rs::runs::v2::Run;
3132
use sift_stream::stream::mode::ingestion_config::Flow;
3233
use sift_stream::{
3334
ChannelDataType, ChannelValue, FlowDescriptor, TimeValue, Value,
@@ -120,7 +121,7 @@ fn flow_randomized(name: &str, flow_config: &FlowConfig) -> Flow {
120121
// Configuration constants - these can be adjusted to test different scenarios
121122
const NUM_CHANNELS_PER_FLOW: usize = 2000; // Number of channels per flow
122123
const INGESTION_CONFIG_ID: &str = "benchmark-config";
123-
const RUN_ID: Option<String> = None;
124+
const RUN: Option<&Run> = None;
124125

125126
fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
126127
// Create a flow with ordered channel values (matching the first flow config)
@@ -131,7 +132,7 @@ fn benchmark_message_to_ingest_req_direct(c: &mut Criterion) {
131132
black_box(message_to_ingest_req_direct(
132133
&message,
133134
INGESTION_CONFIG_ID,
134-
RUN_ID.clone(),
135+
RUN,
135136
))
136137
})
137138
});
@@ -145,7 +146,7 @@ fn benchmark_message_to_ingest_req_ordered(c: &mut Criterion) {
145146
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
146147

147148
c.bench_function("message_to_ingest_req_ordered", |b| {
148-
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
149+
b.iter(|| black_box(message_to_ingest_req(&message, RUN, &descriptor)))
149150
});
150151
}
151152

@@ -157,7 +158,7 @@ fn benchmark_message_to_ingest_req_randomized(c: &mut Criterion) {
157158
let descriptor = FlowDescriptor::try_from((INGESTION_CONFIG_ID, flow)).unwrap();
158159

159160
c.bench_function("message_to_ingest_req_randomized", |b| {
160-
b.iter(|| black_box(message_to_ingest_req(&message, RUN_ID.clone(), &descriptor)))
161+
b.iter(|| black_box(message_to_ingest_req(&message, RUN, &descriptor)))
161162
});
162163
}
163164

@@ -179,30 +180,18 @@ fn benchmark_message_to_ingest_req_varying_sizes(c: &mut Criterion) {
179180
black_box(message_to_ingest_req_direct(
180181
&message_ordered,
181182
INGESTION_CONFIG_ID,
182-
RUN_ID.clone(),
183+
RUN,
183184
))
184185
})
185186
});
186187

187188
group.bench_function(&format!("ordered_{num_channels}_channels"), |b| {
188-
b.iter(|| {
189-
black_box(message_to_ingest_req(
190-
&message_ordered,
191-
RUN_ID.clone(),
192-
&descriptor,
193-
))
194-
})
189+
b.iter(|| black_box(message_to_ingest_req(&message_ordered, RUN, &descriptor)))
195190
});
196191

197192
// Test randomized scenario
198193
group.bench_function(&format!("randomized_{num_channels}_channels"), |b| {
199-
b.iter(|| {
200-
black_box(message_to_ingest_req(
201-
&message_randomized,
202-
RUN_ID.clone(),
203-
&descriptor,
204-
))
205-
})
194+
b.iter(|| black_box(message_to_ingest_req(&message_randomized, RUN, &descriptor)))
206195
});
207196
}
208197

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
use sift_rs::metadata;
2+
use sift_stream::{
3+
ChannelConfig, ChannelDataType, ChannelValue, Credentials, DiskBackupPolicy, Flow, FlowBuilder,
4+
FlowConfig, IngestionConfigForm, RecoveryStrategy, RetryPolicy, RunForm, SiftStreamBuilder,
5+
TimeValue,
6+
};
7+
use std::{
8+
env,
9+
error::Error,
10+
path::PathBuf,
11+
process::ExitCode,
12+
time::{Duration, SystemTime, UNIX_EPOCH},
13+
};
14+
use tracing_subscriber::filter::EnvFilter;
15+
16+
#[tokio::main]
17+
async fn main() -> ExitCode {
18+
tracing_subscriber::fmt()
19+
.with_target(false)
20+
.with_env_filter(EnvFilter::from_default_env())
21+
.init();
22+
23+
match run().await {
24+
Ok(()) => ExitCode::SUCCESS,
25+
Err(err) => {
26+
eprintln!("{err}");
27+
ExitCode::FAILURE
28+
}
29+
}
30+
}
31+
32+
async fn run() -> Result<(), Box<dyn Error>> {
33+
let credentials = Credentials::Config {
34+
apikey: env::var("SIFT_API_KEY").unwrap(),
35+
uri: env::var("SIFT_URI").unwrap(),
36+
};
37+
38+
// Define the schema of your telemetry
39+
let ingestion_config = IngestionConfigForm {
40+
asset_name: "MarsRover0".into(),
41+
client_key: "mars-rover0-ingestion-config-v1".into(),
42+
flows: vec![FlowConfig {
43+
name: "robotic-arm".into(),
44+
channels: vec![ChannelConfig {
45+
name: "joint-angle-encoder".into(),
46+
description: "measures the angular position of the arm’s joints".into(),
47+
data_type: ChannelDataType::Double.into(),
48+
unit: "degrees".into(),
49+
..Default::default()
50+
}],
51+
}],
52+
};
53+
54+
let ts = SystemTime::now()
55+
.duration_since(UNIX_EPOCH)
56+
.map(|d| d.as_millis())
57+
.unwrap();
58+
59+
// Create metadata using the metadata macro
60+
let metadata = metadata![
61+
("test_number", 5.0),
62+
("is_simulation", true),
63+
("location", "SiftHQ"),
64+
];
65+
66+
// Define an optional run to group together data for this period of telemetry ingestion.
67+
let run = RunForm {
68+
name: format!("[MarsRover0].{ts}"),
69+
client_key: format!("mars-rover-sim-{ts}"),
70+
description: Some("simulation run".into()),
71+
tags: Some(vec!["simulation".into()]),
72+
metadata: Some(metadata),
73+
};
74+
75+
let recovery_strategy = RecoveryStrategy::RetryWithBackups {
76+
retry_policy: RetryPolicy::default(),
77+
disk_backup_policy: DiskBackupPolicy {
78+
backups_dir: Some(PathBuf::from("/tmp/sift_backup")),
79+
..Default::default()
80+
},
81+
};
82+
83+
// Initialize your Sift Stream
84+
let mut sift_stream = SiftStreamBuilder::new(credentials)
85+
.ingestion_config(ingestion_config)
86+
.recovery_strategy(recovery_strategy)
87+
.attach_run(run)
88+
.build_file_backup()
89+
.await?;
90+
91+
// Stream telemetry to backup files using the [`SiftStream::send`] method.
92+
for i in 0..360 {
93+
let flow = Flow::new(
94+
"robotic-arm",
95+
TimeValue::now(),
96+
&[ChannelValue::new("joint-angle-encoder", f64::from(i).sin())],
97+
);
98+
99+
sift_stream.send(flow).await.unwrap();
100+
101+
// For demonstrative purposes, adding a contrived wait to get 10Hz data.
102+
tokio::time::sleep(Duration::from_millis(100)).await;
103+
}
104+
105+
// Next, stream telemetry to backup files using the [`SiftStream::send_requests_nonblocking`] method
106+
// and the [`FlowBuilder`] to build the flow.
107+
//
108+
// This approach is more performant, and also provides methods to set the channel value via
109+
// the channel index instead of the key, which further improves performance by avoiding
110+
// hashing operations on the channel key.
111+
//
112+
// However, this approach does require setting the run ID on the flow builder instead of
113+
// letting the [`SiftStream`] handle it. Though this can be useful if using a single [`SiftStream`]
114+
// to send data for multiple runs/assets at one time.
115+
let descriptor = sift_stream.get_flow_descriptor("robotic-arm").unwrap();
116+
let run_id = sift_stream.run().unwrap().run_id.clone();
117+
for i in 0..360 {
118+
// Build the flow using the [`FlowBuilder`] and send it to
119+
// Sift using the [`SiftStream::send_requests_nonblocking`] method.
120+
let mut flow_builder = FlowBuilder::new(&descriptor);
121+
flow_builder.attach_run_id(&run_id);
122+
flow_builder
123+
.set_with_key("joint-angle-encoder", f64::from(i).sin())
124+
.unwrap();
125+
126+
sift_stream
127+
.send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())])
128+
.unwrap();
129+
130+
// For demonstrative purposes, adding a contrived wait to get 10Hz data.
131+
tokio::time::sleep(Duration::from_millis(100)).await;
132+
}
133+
134+
// Gracefully terminate your stream
135+
sift_stream
136+
.finish()
137+
.await
138+
.expect("failed to gracefully terminate Sift stream");
139+
140+
Ok(())
141+
}

rust/crates/sift_stream/examples/quick-start/main.rs

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use sift_rs::metadata;
22
use sift_stream::{
3-
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig,
3+
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowBuilder, FlowConfig,
44
IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder, TimeValue,
55
};
66
use std::{
@@ -78,7 +78,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
7878
.build()
7979
.await?;
8080

81-
// Stream telemetry to Sift
81+
// Stream telemetry to Sift using the [`SiftStream::send`] method.
8282
for i in 0..360 {
8383
let flow = Flow::new(
8484
"robotic-arm",
@@ -93,6 +93,36 @@ async fn run() -> Result<(), Box<dyn Error>> {
9393
tokio::time::sleep(Duration::from_millis(100)).await;
9494
}
9595

96+
// Next, stream telemetry to Sift using the [`SiftStream::send_requests_nonblocking`] method
97+
// and the [`FlowBuilder`] to build the flow.
98+
//
99+
// This approach is more performant, and also provides methods to set the channel value via
100+
// the channel index instead of the key, which further improves performance by avoiding
101+
// hashing operations on the channel key.
102+
//
103+
// However, this approach does require setting the run ID on the flow builder instead of
104+
// letting the [`SiftStream`] handle it. Though this can be useful if using a single [`SiftStream`]
105+
// to send data for multiple runs/assets at one time.
106+
let descriptor = sift_stream.get_flow_descriptor("robotic-arm").unwrap();
107+
let run_id = sift_stream.run().unwrap().run_id.clone();
108+
for i in 0..360 {
109+
// Build the flow using the [`FlowBuilder`] and send it to
110+
// Sift using the [`SiftStream::send_requests_nonblocking`] method.
111+
let mut flow_builder = FlowBuilder::new(&descriptor);
112+
flow_builder.attach_run_id(&run_id);
113+
flow_builder
114+
.set_with_key("joint-angle-encoder", f64::from(i).sin())
115+
.unwrap();
116+
117+
// Send telemetry to Sift.
118+
sift_stream
119+
.send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())])
120+
.unwrap();
121+
122+
// For demonstrative purposes, adding a contrived wait to get 10Hz data.
123+
tokio::time::sleep(Duration::from_millis(100)).await;
124+
}
125+
96126
// Gracefully terminate your stream
97127
sift_stream
98128
.finish()

rust/crates/sift_stream/src/backup/disk/async_manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ struct CheckpointInfo {
9494
}
9595

9696
/// Disk-based backup with async ingestion implementation.
97-
pub struct AsyncBackupsManager {
97+
pub(crate) struct AsyncBackupsManager {
9898
/// Configuration for how to manage backups.
9999
backup_config: BackupConfig,
100100

0 commit comments

Comments
 (0)