Skip to content

Commit 6f485d7

Browse files
authored
rust/support setting empty value in flow builder (#451)
1 parent 737efa8 commit 6f485d7

5 files changed

Lines changed: 270 additions & 51 deletions

File tree

rust/crates/sift_stream/src/stream/channel.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ pub struct ChannelEnum(pub u32);
5454
/// ```
5555
#[derive(Debug, PartialEq, Clone)]
5656
pub enum Value {
57+
Empty,
5758
Bool(bool),
5859
String(String),
5960
Float(f32),
@@ -69,6 +70,7 @@ pub enum Value {
6970
impl Value {
7071
pub(crate) fn pb_data_type(&self) -> ChannelDataType {
7172
match self {
73+
Value::Empty => ChannelDataType::Unspecified,
7274
Value::Bool(_) => ChannelDataType::Bool,
7375
Value::String(_) => ChannelDataType::String,
7476
Value::Double(_) => ChannelDataType::Double,
@@ -84,6 +86,7 @@ impl Value {
8486

8587
pub(crate) fn pb_value(&self) -> Type {
8688
match self {
89+
Value::Empty => Type::Empty(pbjson_types::Empty {}),
8790
Value::Bool(val) => Type::Bool(*val),
8891
Value::String(val) => Type::String(val.clone()),
8992
Value::Double(val) => Type::Double(*val),
@@ -135,6 +138,12 @@ impl ChannelValue {
135138
}
136139
}
137140

141+
impl From<()> for Value {
142+
fn from(_: ()) -> Self {
143+
Value::Empty
144+
}
145+
}
146+
138147
impl From<bool> for Value {
139148
fn from(value: bool) -> Self {
140149
Value::Bool(value)
@@ -209,6 +218,15 @@ impl From<&[u8]> for Value {
209218

210219
#[test]
211220
fn test_channel_value_conversion() {
221+
let empty_value = ChannelValue::new("channel", ());
222+
assert_eq!(
223+
ChannelValue {
224+
name: String::from("channel"),
225+
value: Value::Empty
226+
},
227+
empty_value
228+
);
229+
212230
let bool_value = ChannelValue::new("channel", true);
213231
assert_eq!(
214232
ChannelValue {

rust/crates/sift_stream/src/stream/flow/mod.rs

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -250,21 +250,25 @@ where
250250
V: Into<Value>,
251251
{
252252
let value = value.into();
253-
let pb_data_type = value.pb_data_type();
254253
let pb_value = value.pb_value();
255254

256-
// Since the [ChannelIndex] is only created by the [FlowDescriptor], we can safely
257-
// assume that the index is valid and index directly into the `field_types` vector.
258-
let expected_data_type = self.flow_descriptor.field_types[index.0];
255+
// If the value is not empty, validate that the value has the correct data type.
256+
if !matches!(value, Value::Empty) {
257+
let pb_data_type = value.pb_data_type();
259258

260-
// Validate that the value has the correct data type.
261-
if expected_data_type != pb_data_type {
262-
return Err(Error::new_msg(
263-
ErrorKind::ArgumentValidationError,
264-
format!(
265-
"value has incorrect data type, expected {expected_data_type:?}, got {pb_data_type:?}"
266-
),
267-
));
259+
// Since the [ChannelIndex] is only created by the [FlowDescriptor], we can safely
260+
// assume that the index is valid and index directly into the `field_types` vector.
261+
let expected_data_type = self.flow_descriptor.field_types[index.0];
262+
263+
// Validate that the value has the correct data type.
264+
if expected_data_type != pb_data_type {
265+
return Err(Error::new_msg(
266+
ErrorKind::ArgumentValidationError,
267+
format!(
268+
"value has incorrect data type, expected {expected_data_type:?}, got {pb_data_type:?}"
269+
),
270+
));
271+
}
268272
}
269273

270274
// Update the value.

rust/crates/sift_stream/src/stream/flow/test.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -450,6 +450,30 @@ fn test_flow_builder_set_wrong_type() {
450450
);
451451
}
452452

453+
#[test]
454+
fn test_flow_builder_set_empty_value() {
455+
let mut builder = FlowDescriptorBuilder::new("config_id", "test_flow");
456+
let temp_idx = builder.add("temperature", ChannelDataType::Double);
457+
458+
let descriptor = builder.build();
459+
let mut flow_builder = FlowBuilder::new(&descriptor);
460+
461+
// First set to a non-empty value.
462+
assert!(flow_builder.set(temp_idx, 23.0_f64).is_ok());
463+
464+
// Try to set back to an empty value.
465+
assert!(flow_builder.set(temp_idx, ()).is_ok());
466+
467+
// Verify the value was set correctly by building a request
468+
let now = TimeValue::now();
469+
let request = flow_builder.request(now);
470+
assert_eq!(request.channel_values.len(), 1);
471+
assert_eq!(
472+
request.channel_values[0].r#type,
473+
Some(Type::Empty(pbjson_types::Empty {}))
474+
);
475+
}
476+
453477
#[test]
454478
fn test_flow_builder_all_value_types() {
455479
let mut builder = FlowDescriptorBuilder::new("config_id", "all_types_flow");

rust/crates/sift_stream_bindings/sift_stream_bindings.pyi

Lines changed: 140 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,15 @@ class ChannelBitFieldElementPy:
6565

6666
@typing.final
6767
class ChannelConfigPy:
68+
r"""
69+
Python binding for [`ChannelConfig`](sift_rs::ingestion_configs::v2::ChannelConfig).
70+
71+
This is a thin wrapper around the Rust `ChannelConfig` type. For detailed documentation,
72+
see [`ChannelConfig`](sift_rs::ingestion_configs::v2::ChannelConfig).
73+
74+
A `ChannelConfig` defines the schema for a single telemetry channel, including its
75+
name, data type, unit, and description.
76+
"""
6877
name: builtins.str
6978
unit: builtins.str
7079
description: builtins.str
@@ -75,6 +84,12 @@ class ChannelConfigPy:
7584

7685
@typing.final
7786
class ChannelEnumPy:
87+
r"""
88+
Python binding for channel enum values.
89+
90+
Represents a specific enumeration value for an enum channel. Enum channels use
91+
numeric values to represent discrete states.
92+
"""
7893
def __new__(cls, val:builtins.int) -> ChannelEnumPy: ...
7994

8095
@typing.final
@@ -89,6 +104,15 @@ class ChannelIndexPy:
89104

90105
@typing.final
91106
class ChannelValuePy:
107+
r"""
108+
Python binding for [`ChannelValue`](sift_stream::ChannelValue).
109+
110+
This is a thin wrapper around the Rust `ChannelValue` type. For detailed documentation,
111+
see [`ChannelValue`](sift_stream::ChannelValue).
112+
113+
A `ChannelValue` pairs a channel name with its typed value, used when constructing
114+
[`Flow`](sift_stream::Flow) instances.
115+
"""
92116
name: builtins.str
93117
value: ValuePy
94118
def __new__(cls, name:builtins.str, value:ValuePy) -> ChannelValuePy: ...
@@ -134,6 +158,15 @@ class CheckpointMetricsSnapshotPy:
134158

135159
@typing.final
136160
class DiskBackupPolicyPy:
161+
r"""
162+
Python binding for [`DiskBackupPolicy`](sift_stream::backup::DiskBackupPolicy).
163+
164+
This is a thin wrapper around the Rust `DiskBackupPolicy` type. For detailed documentation,
165+
see [`DiskBackupPolicy`](sift_stream::backup::DiskBackupPolicy).
166+
167+
A disk backup policy configures how telemetry data is backed up to disk, including
168+
backup directory, file size limits, and retention policies.
169+
"""
137170
backups_dir: typing.Optional[builtins.str]
138171
max_backup_file_size: builtins.int
139172
rolling_file_policy: RollingFilePolicyPy
@@ -170,6 +203,15 @@ class FlowBuilderPy:
170203

171204
@typing.final
172205
class FlowConfigPy:
206+
r"""
207+
Python binding for [`FlowConfig`](sift_rs::ingestion_configs::v2::FlowConfig).
208+
209+
This is a thin wrapper around the Rust `FlowConfig` type. For detailed documentation,
210+
see [`FlowConfig`](sift_rs::ingestion_configs::v2::FlowConfig).
211+
212+
A `FlowConfig` defines the schema for a flow, which is a named group of channels
213+
that are often telemetered together.
214+
"""
173215
name: builtins.str
174216
channels: builtins.list[ChannelConfigPy]
175217
def __new__(cls, name:builtins.str, channels:typing.Sequence[ChannelConfigPy]) -> FlowConfigPy: ...
@@ -201,6 +243,15 @@ class FlowDescriptorPy:
201243

202244
@typing.final
203245
class FlowPy:
246+
r"""
247+
Python binding for [`Flow`](sift_stream::Flow).
248+
249+
This is a thin wrapper around the Rust `Flow` type. For detailed documentation,
250+
see [`Flow`](sift_stream::Flow).
251+
252+
A `Flow` represents a single telemetry message containing channel values that share
253+
a common timestamp.
254+
"""
204255
def __new__(cls, flow_name:builtins.str, timestamp:TimeValuePy, values:typing.Sequence[ChannelValuePy]) -> FlowPy: ...
205256

206257
@typing.final
@@ -246,6 +297,15 @@ class IngestWithConfigDataStreamRequestWrapperPy:
246297

247298
@typing.final
248299
class IngestionConfigFormPy:
300+
r"""
301+
Python binding for [`IngestionConfigForm`](sift_stream::stream::builder::IngestionConfigForm).
302+
303+
This is a thin wrapper around the Rust `IngestionConfigForm` type. For detailed documentation,
304+
see [`IngestionConfigForm`](sift_stream::stream::builder::IngestionConfigForm).
305+
306+
An `IngestionConfigForm` is used to create a new ingestion config or retrieve an existing
307+
one based on the `client_key`. It defines the schema of an asset's telemetry.
308+
"""
249309
asset_name: builtins.str
250310
flows: builtins.list[FlowConfigPy]
251311
client_key: builtins.str
@@ -259,6 +319,17 @@ class MetadataPy:
259319

260320
@typing.final
261321
class RecoveryStrategyPy:
322+
r"""
323+
Python binding for [`RecoveryStrategy`](sift_stream::stream::builder::RecoveryStrategy).
324+
325+
This is a thin wrapper around the Rust `RecoveryStrategy` enum. For detailed documentation,
326+
see [`RecoveryStrategy`](sift_stream::stream::builder::RecoveryStrategy).
327+
328+
A recovery strategy defines how the stream handles errors and failures, including
329+
retry policies and optional disk backups.
330+
331+
Note: PyO3 doesn't support nested enums, so this is implemented as a struct wrapper.
332+
"""
262333
@staticmethod
263334
def retry_only(retry_policy:RetryPolicyPy) -> RecoveryStrategyPy: ...
264335
@staticmethod
@@ -268,6 +339,15 @@ class RecoveryStrategyPy:
268339

269340
@typing.final
270341
class RetryPolicyPy:
342+
r"""
343+
Python binding for [`RetryPolicy`](sift_stream::RetryPolicy).
344+
345+
This is a thin wrapper around the Rust `RetryPolicy` type. For detailed documentation,
346+
see [`RetryPolicy`](sift_stream::RetryPolicy).
347+
348+
A retry policy configures the retry behavior of a Sift stream, including the number
349+
of attempts and exponential backoff parameters.
350+
"""
271351
max_attempts: builtins.int
272352
initial_backoff: DurationPy
273353
max_backoff: DurationPy
@@ -284,6 +364,16 @@ class RollingFilePolicyPy:
284364

285365
@typing.final
286366
class RunFormPy:
367+
r"""
368+
Python binding for [`RunForm`](sift_stream::stream::builder::RunForm).
369+
370+
This is a thin wrapper around the Rust `RunForm` type. For detailed documentation,
371+
see [`RunForm`](sift_stream::stream::builder::RunForm).
372+
373+
A `RunForm` is used to create a new run or retrieve an existing run based on the
374+
`client_key`. If a run with the given `client_key` exists, it will be updated
375+
with any changed fields.
376+
"""
287377
name: builtins.str
288378
client_key: builtins.str
289379
description: typing.Optional[builtins.str]
@@ -300,6 +390,15 @@ class RunSelectorPy:
300390

301391
@typing.final
302392
class SiftStreamBuilderPy:
393+
r"""
394+
Python binding for [`SiftStreamBuilder`](sift_stream::stream::builder::SiftStreamBuilder).
395+
396+
This is a thin wrapper around the Rust `SiftStreamBuilder` type. For detailed documentation,
397+
see [`SiftStreamBuilder`](sift_stream::stream::builder::SiftStreamBuilder).
398+
399+
The builder provides a fluent API for configuring and creating a [`SiftStreamPy`] instance
400+
with various options including ingestion configs, retry policies, checkpoint intervals, and more.
401+
"""
303402
uri: builtins.str
304403
apikey: builtins.str
305404
enable_tls: builtins.bool
@@ -333,6 +432,15 @@ class SiftStreamMetricsSnapshotPy:
333432

334433
@typing.final
335434
class SiftStreamPy:
435+
r"""
436+
Python binding for [`SiftStream`](sift_stream::SiftStream).
437+
438+
This is a thin wrapper around the Rust `SiftStream` type. For detailed documentation,
439+
see [`SiftStream`](sift_stream::SiftStream).
440+
441+
The Python binding provides the same functionality as the Rust type, with methods
442+
adapted for Python's async/await syntax.
443+
"""
336444
def send(self, flow:FlowPy) -> typing.Any: ...
337445
def batch_send(self, flows:typing.Any) -> typing.Any: ...
338446
def send_requests(self, requests:typing.Sequence[IngestWithConfigDataStreamRequestPy]) -> typing.Any: ...
@@ -348,6 +456,16 @@ class SiftStreamPy:
348456

349457
@typing.final
350458
class TimeValuePy:
459+
r"""
460+
Python binding for [`TimeValue`](sift_stream::stream::time::TimeValue).
461+
462+
This is a thin wrapper around the Rust `TimeValue` type. For detailed documentation,
463+
see [`TimeValue`](sift_stream::stream::time::TimeValue).
464+
465+
`TimeValue` represents a timestamp that can be constructed from various time
466+
representations (Unix timestamps, RFC3339 strings, etc.). All times are stored
467+
and transmitted as UTC.
468+
"""
351469
def __new__(cls) -> TimeValuePy: ...
352470
@staticmethod
353471
def from_timestamp(secs:builtins.int, nsecs:builtins.int) -> TimeValuePy: ...
@@ -362,26 +480,38 @@ class TimeValuePy:
362480

363481
@typing.final
364482
class ValuePy:
483+
r"""
484+
Python binding for [`Value`](sift_stream::Value).
485+
486+
This is a thin wrapper around the Rust `Value` enum. For detailed documentation,
487+
see [`Value`](sift_stream::Value).
488+
489+
`Value` represents a typed value emitted by a channel, supporting all standard
490+
telemetry data types (bool, numbers, strings, enums, bitfields).
491+
"""
365492
@staticmethod
366-
def Bool(value:builtins.bool) -> ValuePy: ...
493+
def Empty() -> ValuePy: ...
367494
@staticmethod
368-
def String(value:builtins.str) -> ValuePy: ...
495+
def Bool(value:typing.Optional[builtins.bool]) -> ValuePy: ...
369496
@staticmethod
370-
def Float(value:builtins.float) -> ValuePy: ...
497+
def String(value:typing.Optional[builtins.str]) -> ValuePy: ...
371498
@staticmethod
372-
def Double(value:builtins.float) -> ValuePy: ...
499+
def Float(value:typing.Optional[builtins.float]) -> ValuePy: ...
373500
@staticmethod
374-
def Int32(value:builtins.int) -> ValuePy: ...
501+
def Double(value:typing.Optional[builtins.float]) -> ValuePy: ...
375502
@staticmethod
376-
def Int64(value:builtins.int) -> ValuePy: ...
503+
def Int32(value:typing.Optional[builtins.int]) -> ValuePy: ...
377504
@staticmethod
378-
def Uint32(value:builtins.int) -> ValuePy: ...
505+
def Int64(value:typing.Optional[builtins.int]) -> ValuePy: ...
379506
@staticmethod
380-
def Uint64(value:builtins.int) -> ValuePy: ...
507+
def Uint32(value:typing.Optional[builtins.int]) -> ValuePy: ...
381508
@staticmethod
382-
def Enum(value:builtins.int) -> ValuePy: ...
509+
def Uint64(value:typing.Optional[builtins.int]) -> ValuePy: ...
383510
@staticmethod
384-
def BitField(value:typing.Sequence[builtins.int]) -> ValuePy: ...
511+
def Enum(value:typing.Optional[builtins.int]) -> ValuePy: ...
512+
@staticmethod
513+
def BitField(value:typing.Optional[typing.Sequence[builtins.int]]) -> ValuePy: ...
514+
def is_empty(self) -> builtins.bool: ...
385515
def is_bool(self) -> builtins.bool: ...
386516
def is_string(self) -> builtins.bool: ...
387517
def is_float(self) -> builtins.bool: ...

0 commit comments

Comments
 (0)