Skip to content

Commit 833f092

Browse files
authored
rust(feat): Adds FlowDescriptor and FlowBuilder to improve performance (#396)
1 parent 8a733a6 commit 833f092

6 files changed

Lines changed: 953 additions & 37 deletions

File tree

rust/crates/sift_stream/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,7 @@ pub use stream::{
409409
RetryPolicy, SiftStream,
410410
builder::{IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder},
411411
channel::{ChannelValue, Value},
412+
flow::{ChannelIndex, FlowBuilder, FlowDescriptor, FlowDescriptorBuilder},
412413
mode::ingestion_config::{Flow, IngestionConfigMode},
413414
time::TimeValue,
414415
};

rust/crates/sift_stream/src/stream/channel.rs

Lines changed: 33 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,38 @@ pub enum Value {
2828
BitField(Vec<u8>),
2929
}
3030

31+
impl Value {
32+
pub(crate) fn pb_data_type(&self) -> ChannelDataType {
33+
match self {
34+
Value::Bool(_) => ChannelDataType::Bool,
35+
Value::String(_) => ChannelDataType::String,
36+
Value::Double(_) => ChannelDataType::Double,
37+
Value::Float(_) => ChannelDataType::Float,
38+
Value::Int32(_) => ChannelDataType::Int32,
39+
Value::Int64(_) => ChannelDataType::Int64,
40+
Value::Uint32(_) => ChannelDataType::Uint32,
41+
Value::Uint64(_) => ChannelDataType::Uint64,
42+
Value::Enum(_) => ChannelDataType::Enum,
43+
Value::BitField(_) => ChannelDataType::BitField,
44+
}
45+
}
46+
47+
pub(crate) fn pb_value(&self) -> Type {
48+
match self {
49+
Value::Bool(val) => Type::Bool(*val),
50+
Value::String(val) => Type::String(val.clone()),
51+
Value::Double(val) => Type::Double(*val),
52+
Value::Float(val) => Type::Float(*val),
53+
Value::Int32(val) => Type::Int32(*val),
54+
Value::Int64(val) => Type::Int64(*val),
55+
Value::Uint32(val) => Type::Uint32(*val),
56+
Value::Uint64(val) => Type::Uint64(*val),
57+
Value::Enum(val) => Type::Enum(*val),
58+
Value::BitField(val) => Type::BitField(val.clone()),
59+
}
60+
}
61+
}
62+
3163
impl ChannelValue {
3264
/// Creates a [ChannelValue] for a channel of name `name`.
3365
///
@@ -43,38 +75,8 @@ impl ChannelValue {
4375
}
4476
}
4577

46-
pub(crate) fn empty_pb() -> Type {
47-
Type::Empty(pbjson_types::Empty {})
48-
}
49-
50-
pub(crate) fn pb_data_type(&self) -> i32 {
51-
match self.value {
52-
Value::Bool(_) => i32::from(ChannelDataType::Bool),
53-
Value::String(_) => i32::from(ChannelDataType::String),
54-
Value::Double(_) => i32::from(ChannelDataType::Double),
55-
Value::Float(_) => i32::from(ChannelDataType::Float),
56-
Value::Int32(_) => i32::from(ChannelDataType::Int32),
57-
Value::Int64(_) => i32::from(ChannelDataType::Int64),
58-
Value::Uint32(_) => i32::from(ChannelDataType::Uint32),
59-
Value::Uint64(_) => i32::from(ChannelDataType::Uint64),
60-
Value::Enum(_) => i32::from(ChannelDataType::Enum),
61-
Value::BitField(_) => i32::from(ChannelDataType::BitField),
62-
}
63-
}
64-
6578
pub(crate) fn pb_value(&self) -> Type {
66-
match self.value {
67-
Value::Bool(val) => Type::Bool(val),
68-
Value::String(ref val) => Type::String(val.clone()),
69-
Value::Double(val) => Type::Double(val),
70-
Value::Float(val) => Type::Float(val),
71-
Value::Int32(val) => Type::Int32(val),
72-
Value::Int64(val) => Type::Int64(val),
73-
Value::Uint32(val) => Type::Uint32(val),
74-
Value::Uint64(val) => Type::Uint64(val),
75-
Value::Enum(val) => Type::Enum(val),
76-
Value::BitField(ref val) => Type::BitField(val.clone()),
77-
}
79+
self.value.pb_value()
7880
}
7981
}
8082

rust/crates/sift_stream/src/stream/flow/mod.rs

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,297 @@
1+
use std::collections::HashMap;
2+
use std::hash::Hash;
3+
14
use sift_error::prelude::*;
5+
use sift_rs::common::r#type::v1::ChannelDataType;
6+
use sift_rs::ingest::v1::IngestWithConfigDataChannelValue;
7+
use sift_rs::ingest::v1::{
8+
IngestWithConfigDataStreamRequest, ingest_with_config_data_channel_value::Type,
9+
};
210
use sift_rs::ingestion_configs::v2::FlowConfig;
311

12+
use crate::{TimeValue, Value};
13+
14+
/// Represents the index of a channel in a flow.
15+
///
16+
/// This provides a convenient and performant way to access the value at the given channel index
17+
/// when building a new flow.
18+
///
19+
/// This type is only returned by the [`FlowDescriptor`] when adding a new channel to the
20+
/// flow ensuring that the index is safe to use.
21+
#[derive(Copy, Clone, PartialEq, Eq, Hash)]
22+
pub struct ChannelIndex(usize);
23+
24+
/// Describes the schema of a flow, providing a convenient, performant, and correct way to
25+
/// build the flow being described.
26+
///
27+
/// The descriptor itself is immutable, to ensure that the flow is constructed correctly
28+
/// since successful ingestion requires Sift and the client to agree on the schema of the flow.
29+
///
30+
/// While the key `K` can be arbitrary, it is recommended to use a trivial key that avoids
31+
/// allocations, such as a `usize` or `u32`, though for convenience, a string (the channel
32+
/// name) could also be used and will still help minimize additional string allocations.
33+
///
34+
/// # Example
35+
///
36+
/// ```rust
37+
/// use sift_stream::{FlowDescriptor, FlowDescriptorBuilder, FlowBuilder, ChannelDataType};
38+
///
39+
/// let mut flow_descriptor_builder = FlowDescriptorBuilder::new("ingestion_config_id", "my_flow_name");
40+
/// let my_channel_idx = flow_descriptor_builder.add("my_channel_key", ChannelDataType::String);
41+
/// let my_other_channel_idx = flow_descriptor_builder.add("my_other_channel_key", ChannelDataType::Uint64);
42+
///
43+
/// let flow_descriptor = flow_descriptor_builder.build();
44+
///
45+
/// let mut flow_builder = FlowBuilder::new(&flow_descriptor);
46+
/// flow_builder.set(my_channel_idx, "my_value".to_string());
47+
/// flow_builder.set_with_key("my_other_channel_key", 123_u64);
48+
/// ```
49+
#[derive(Clone)]
50+
pub struct FlowDescriptor<K> {
51+
/// The name of the flow.
52+
name: String,
53+
54+
/// The ID of the ingestion config that this flow belongs to.
55+
ingestion_config_id: String,
56+
57+
/// The data types of the channels in the flow which will be used
58+
/// to validate the values when building a new flow.
59+
field_types: Vec<ChannelDataType>,
60+
61+
/// A mapping of arbitrary keys to the index of the channel in the flow.
62+
///
63+
/// Ideally the key should be a trivial key that avoids allocations, though
64+
/// for convenience, a string (the channel name) could also be used.
65+
index_map: HashMap<K, ChannelIndex>,
66+
}
67+
68+
impl<K> FlowDescriptor<K>
69+
where
70+
K: Eq + Hash,
71+
{
72+
/// Initializes a new flow descriptor with the provided ingestion config ID and flow name.
73+
fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
74+
Self {
75+
ingestion_config_id: ingestion_config_id.into(),
76+
name: name.into(),
77+
field_types: Vec::new(),
78+
index_map: HashMap::new(),
79+
}
80+
}
81+
82+
/// Gets the type of the channel with the given key.
83+
pub fn get<Q>(&self, key: &Q) -> Option<ChannelDataType>
84+
where
85+
K: core::borrow::Borrow<Q>,
86+
Q: Eq + Hash + ?Sized,
87+
{
88+
let index = self.index_map.get(key)?.0;
89+
Some(self.field_types[index])
90+
}
91+
92+
/// Gets the mapping of keys to channel indices.
93+
pub fn mapping(&self) -> &HashMap<K, ChannelIndex> {
94+
&self.index_map
95+
}
96+
}
97+
98+
/// Builds a [`FlowDescriptor`], which defines the schema of a flow.
99+
///
100+
/// The builder is mutable, to allow for the addition of channels to the flow descriptor
101+
/// while the descriptor itself is immuatble, ensuring that the described flow will be
102+
/// constructed correctly.
103+
pub struct FlowDescriptorBuilder<K> {
104+
flow_descriptor: FlowDescriptor<K>,
105+
}
106+
107+
impl<K> FlowDescriptorBuilder<K>
108+
where
109+
K: Eq + Hash,
110+
{
111+
/// Initializes a new [`FlowDescriptorBuilder`] with the provided ingestion config ID and flow name.
112+
pub fn new(ingestion_config_id: impl Into<String>, name: impl Into<String>) -> Self {
113+
Self {
114+
flow_descriptor: FlowDescriptor::new(ingestion_config_id, name),
115+
}
116+
}
117+
118+
/// Adds a new channel to the flow.
119+
///
120+
/// This returns the index of the channel in the flow. This index can then be used to
121+
/// access the value at the given channel index when building a new flow.
122+
pub fn add(&mut self, key: K, field_type: ChannelDataType) -> ChannelIndex {
123+
let index = self.flow_descriptor.field_types.len();
124+
self.flow_descriptor.field_types.push(field_type);
125+
126+
self.flow_descriptor
127+
.index_map
128+
.insert(key, ChannelIndex(index));
129+
130+
ChannelIndex(index)
131+
}
132+
133+
/// Builds the [`FlowDescriptor`] from the builder.
134+
pub fn build(self) -> FlowDescriptor<K> {
135+
self.flow_descriptor
136+
}
137+
}
138+
139+
impl<S> TryFrom<(S, &'_ FlowConfig)> for FlowDescriptor<String>
140+
where
141+
S: ToString,
142+
{
143+
type Error = Error;
144+
145+
fn try_from((ingestion_config_id, flow_config): (S, &'_ FlowConfig)) -> Result<Self> {
146+
let mut builder =
147+
FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name.clone());
148+
for channel in flow_config.channels.iter() {
149+
let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
150+
Error::new_msg(
151+
ErrorKind::ArgumentValidationError,
152+
format!(
153+
"invalid data type {:?} for channel {}",
154+
channel.data_type, channel.name
155+
),
156+
)
157+
})?;
158+
159+
builder.add(channel.name.clone(), data_type);
160+
}
161+
Ok(builder.build())
162+
}
163+
}
164+
165+
impl<S> TryFrom<(S, FlowConfig)> for FlowDescriptor<String>
166+
where
167+
S: ToString,
168+
{
169+
type Error = Error;
170+
171+
fn try_from((ingestion_config_id, flow_config): (S, FlowConfig)) -> Result<Self> {
172+
let mut builder =
173+
FlowDescriptorBuilder::new(ingestion_config_id.to_string(), flow_config.name);
174+
for channel in flow_config.channels {
175+
let data_type = ChannelDataType::try_from(channel.data_type).map_err(|_| {
176+
Error::new_msg(
177+
ErrorKind::ArgumentValidationError,
178+
format!(
179+
"invalid data type {:?} for channel {}",
180+
channel.data_type, channel.name
181+
),
182+
)
183+
})?;
184+
185+
builder.add(channel.name, data_type);
186+
}
187+
Ok(builder.build())
188+
}
189+
}
190+
191+
/// Builder to assist in constructing a flow, utilizing the flow descriptor
192+
/// to ensure that the flow is constructed correctly (i.e. value in the
193+
/// correct order and the correct data type).
194+
///
195+
/// By using the builder and the flow descriptor, the channel names are not
196+
/// necessary, which helps improve performance.
197+
pub struct FlowBuilder<'a, K> {
198+
/// The flow descriptor which defines the value schema of the flow.
199+
flow_descriptor: &'a FlowDescriptor<K>,
200+
201+
/// The values of the flow, where the index of the value corresponds to
202+
/// the index of the channel in the [`FlowDescriptor`].
203+
values: Vec<IngestWithConfigDataChannelValue>,
204+
205+
/// The optional run ID of the flow.
206+
run_id: String,
207+
}
208+
209+
impl<K> FlowBuilder<'_, K> {
210+
/// Builds an [IngestWithConfigDataStreamRequest], consuming the builder.
211+
pub fn request(self, now: TimeValue) -> IngestWithConfigDataStreamRequest {
212+
IngestWithConfigDataStreamRequest {
213+
ingestion_config_id: self.flow_descriptor.ingestion_config_id.clone(),
214+
flow: self.flow_descriptor.name.clone(),
215+
timestamp: Some(now.0),
216+
channel_values: self.values,
217+
run_id: self.run_id,
218+
..Default::default()
219+
}
220+
}
221+
}
222+
223+
impl<'a, K> FlowBuilder<'a, K>
224+
where
225+
K: Eq + Hash,
226+
{
227+
/// Initializes a new flow builder with the provided flow descriptor.
228+
pub fn new(flow_descriptor: &'a FlowDescriptor<K>) -> Self {
229+
let values = vec![
230+
IngestWithConfigDataChannelValue {
231+
r#type: Some(Type::Empty(pbjson_types::Empty {}))
232+
};
233+
flow_descriptor.field_types.len()
234+
];
235+
Self {
236+
flow_descriptor,
237+
values,
238+
run_id: String::new(),
239+
}
240+
}
241+
242+
/// Attaches a run ID to the flow.
243+
pub fn attach_run_id(&mut self, run_id: impl Into<String>) {
244+
self.run_id = run_id.into();
245+
}
246+
247+
/// Sets the value of the channel with the given key.
248+
pub fn set<V>(&mut self, index: ChannelIndex, value: V) -> Result<()>
249+
where
250+
V: Into<Value>,
251+
{
252+
let value = value.into();
253+
let pb_data_type = value.pb_data_type();
254+
let pb_value = value.pb_value();
255+
256+
// Since the [ChannelIndex] is only created by the [FlowDescriptor], we can safely
257+
// assume that the index is valid and index directly into the `field_types` vector.
258+
let expected_data_type = self.flow_descriptor.field_types[index.0];
259+
260+
// Validate that the value has the correct data type.
261+
if expected_data_type != pb_data_type {
262+
return Err(Error::new_msg(
263+
ErrorKind::ArgumentValidationError,
264+
format!(
265+
"value has incorrect data type, expected {expected_data_type:?}, got {pb_data_type:?}"
266+
),
267+
));
268+
}
269+
270+
// Update the value.
271+
self.values[index.0].r#type = Some(pb_value);
272+
273+
Ok(())
274+
}
275+
276+
/// Sets the value of the channel with the given key.
277+
pub fn set_with_key<Q, V>(&mut self, key: &Q, value: V) -> Result<()>
278+
where
279+
K: core::borrow::Borrow<Q>,
280+
Q: Eq + Hash + ?Sized,
281+
V: Into<Value>,
282+
{
283+
// Get the index of the channel with the given key.
284+
let Some(index) = self.flow_descriptor.index_map.get(key) else {
285+
return Err(Error::new_msg(
286+
ErrorKind::NotFoundError,
287+
"provided key was not found in flow descriptor",
288+
));
289+
};
290+
291+
self.set(*index, value)
292+
}
293+
}
294+
4295
#[cfg(test)]
5296
mod test;
6297

0 commit comments

Comments
 (0)