Skip to content

Commit 44b05a7

Browse files
authored
Rust(chore): New ingest tutorial (#489)
1 parent 186a3d5 commit 44b05a7

2 files changed

Lines changed: 191 additions & 0 deletions

File tree

rust/crates/sift_stream/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ unstable = []
4646

4747
[dev-dependencies]
4848
async-trait = "^0.1"
49+
dotenvy = "0.15.7"
4950
hyper-util = "0.1.10"
5051
tempdir = "0.3.7"
5152
tokio-stream = "0.1.17"
Lines changed: 190 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,190 @@
1+
// Streams simulated vehicle velocity and temperature telemetry, generated using random values to mimic onboard vehicle sensors, to Sift for up to 10 minutes.
2+
//
3+
// This example demonstrates the complete streaming ingestion lifecycle:
4+
// - Authenticate with Sift
5+
// - Define a telemetry schema (Flow + Channels)
6+
// - Create an Asset and Run
7+
// - Open a streaming ingestion session
8+
// - Send timestamped flows in real time
9+
//
10+
// The program runs for INGEST_DURATION
11+
12+
use sift_stream::{
13+
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig,
14+
IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder, TimeValue,
15+
};
16+
use std::{
17+
env,
18+
error::Error,
19+
time::{Duration, SystemTime, UNIX_EPOCH},
20+
};
21+
22+
// Define configuration constants
23+
// ---------------------------------------------------------------------
24+
/// FLOW_NAME identifies the telemetry schema of a flow inside Sift.
25+
const FLOW_NAME: &str = "vehicle_metrics";
26+
/// SEND_INTERVAL controls sampling frequency.
27+
const SEND_INTERVAL: Duration = Duration::from_millis(500);
28+
/// INGEST_DURATION controls how long we send data before exiting
29+
const INGEST_DURATION: Duration = Duration::from_mins(10);
30+
31+
/// Helper function to generate unique names
32+
/// ---------------------------------------------------------------------
33+
/// Sift Assets and Runs should have unique names.
34+
/// This helper creates a timestamp + short random suffix to prevent collisions.
35+
fn make_unique_suffix() -> String {
36+
format!(
37+
"{}_{:x}",
38+
SystemTime::now()
39+
.duration_since(UNIX_EPOCH)
40+
.expect("system time before UNIX EPOCH")
41+
.as_secs(),
42+
rand::random::<u32>()
43+
)
44+
}
45+
46+
/// Main function
47+
/// ---------------------------------------------------------------------
48+
/// All ingestion logic lives inside this async function.
49+
#[tokio::main]
50+
async fn main() -> Result<(), Box<dyn Error>> {
51+
// Sift stream uses the tracing crate for logging, which we can enable
52+
// to see internal sift stream logs
53+
tracing_subscriber::fmt().init();
54+
55+
tracing::info!("Starting streaming session.");
56+
57+
// Create unique Asset and Run names
58+
// -----------------------------------------------------------------
59+
// An Asset represents the telemetry-producing system.
60+
// A Run represents a single data collection session for that Asset.
61+
let suffix = make_unique_suffix();
62+
let asset_name = format!("robot_vehicle_{suffix}");
63+
let run_name = format!("{asset_name}_run");
64+
65+
// Load authentication from .env
66+
// -----------------------------------------------------------------
67+
// We load credentials from a .env file instead of hardcoding them.
68+
// These values are required to establish authenticated communication
69+
// with the gRPC endpoint of your Sift environment.
70+
dotenvy::dotenv()?;
71+
let credentials = Credentials::Config {
72+
apikey: env::var("SIFT_API_KEY").unwrap(),
73+
uri: env::var("SIFT_GRPC_URL").unwrap(),
74+
};
75+
76+
// Define telemetry signals (Channels) within a Flow
77+
// -----------------------------------------------------------------
78+
// A FlowConfig defines the telemetry schema.
79+
// Each ChannelConfig defines:
80+
// - name (signal identifier)
81+
// - unit (measurement unit)
82+
// - data_type (numeric, string, etc.)
83+
// - description (a human-readable explanation of what the Channel (signal) represents and how it should be interpreted)
84+
//
85+
// All telemetry sent to Sift must conform to this schema.
86+
let flow_config = FlowConfig {
87+
name: FLOW_NAME.into(),
88+
channels: vec![
89+
ChannelConfig {
90+
name: "velocity".into(),
91+
unit: "m/s".into(),
92+
data_type: ChannelDataType::Double.into(),
93+
description: "The velocity Channel streams real-time speed measurements of the vehicle in meters per second (m/s) as double-precision numeric values.".into(),
94+
..Default::default()
95+
},
96+
ChannelConfig {
97+
name: "temperature".into(),
98+
unit: "C".into(),
99+
data_type: ChannelDataType::Double.into(),
100+
description: "The temperature Channel streams real-time temperature readings of the vehicle in degrees Celsius (°C) as double-precision numeric values.".into(),
101+
..Default::default()
102+
},
103+
]
104+
};
105+
106+
// Create ingestion configuration
107+
// -----------------------------------------------------------------
108+
// IngestionConfigForm requires:
109+
// - An Asset
110+
// - A client key identifier expected to be unique across the user's organization
111+
// - One or more Flow configs
112+
let ingestion_client_key = format!("{asset_name}_v1");
113+
let ingestion_config = IngestionConfigForm {
114+
asset_name: asset_name.clone(),
115+
client_key: ingestion_client_key,
116+
flows: vec![flow_config],
117+
};
118+
119+
// Create Run
120+
// -----------------------------------------------------------------
121+
// RunForm defines the session that will group all incoming flows.
122+
// While not strictly necessary for ingestion, Runs are useful for organizing
123+
// data from one or more Assets for a given period of time (such as a specific test,
124+
// or daily ops)
125+
// Requires a unique client_key, which we'll set to the same as the run name in this case
126+
let run = RunForm {
127+
name: run_name.clone(),
128+
client_key: run_name,
129+
..Default::default()
130+
};
131+
132+
// Initialize Sift Stream
133+
// -----------------------------------------------------------------
134+
// SiftStream is built using SiftStreamBuilder, which much be supplied with the user credentials
135+
// We will also provide the following:
136+
// - The ingestion config defining the telemetry schema we plan to send
137+
// - A default recovery strategy (Retry only, with no file backups of ingested data)
138+
// - A run to attach incoming data to
139+
let mut sift_stream = SiftStreamBuilder::new(credentials)
140+
.ingestion_config(ingestion_config)
141+
.recovery_strategy(RecoveryStrategy::default())
142+
.attach_run(run)
143+
.build()
144+
.await?;
145+
146+
// Stream telemetry to Sift using the SiftStream::send method for INGEST_DURATION
147+
// -----------------------------------------------------------------
148+
// NOTE: This approach uses `Flow` and `SiftStream::send()` for ease of use, and will
149+
// provide acceptable performance for most users
150+
// In cases where additional performance is required, a separate, more performant method
151+
// is also available that uses `FlowBuilder` and `SiftStream::send_requests_nonblocking`
152+
// See `examples/quick-start/` for an example using this alternate approach
153+
let start = std::time::Instant::now();
154+
while start.elapsed() < INGEST_DURATION {
155+
// Generate mock telemetry values
156+
// ---------------------------------------------------------
157+
// In a real system, these would come from sensors,
158+
// hardware interfaces, or production metrics.
159+
let velocity = rand::random::<f64>() * 10.0;
160+
let temperature = rand::random::<f64>() * 20.0 + 20.0;
161+
162+
// Create a Flow object that matches the FlowConfig schema
163+
// ---------------------------------------------------------
164+
// Requires the flow name, a timestamp, and channel values
165+
let flow = Flow::new(
166+
FLOW_NAME,
167+
TimeValue::now(),
168+
&[
169+
ChannelValue::new("velocity", velocity),
170+
ChannelValue::new("temperature", temperature),
171+
],
172+
);
173+
174+
// Send telemetry to Sift using Sift Stream
175+
// ---------------------------------------------------------
176+
// Each call sends the flow to a queue within Sift Stream for transmission to Sift.
177+
// Sift Stream uses a checkpoint system to verify this data reaches Sift, retrying if necessary.
178+
sift_stream.send(flow).await?;
179+
180+
// For demonstrative purposes, add a wait to send data at approximately the SEND_INTERVAL
181+
tokio::time::sleep(SEND_INTERVAL).await;
182+
}
183+
184+
// Calling finish() on sift stream ensures we properly send any queued data before gracefully closing the gRPC stream
185+
sift_stream.finish().await?;
186+
187+
tracing::info!("Streaming session complete.");
188+
189+
Ok(())
190+
}

0 commit comments

Comments
 (0)