Skip to content

Commit 188f22c

Browse files
authored
rust(feat): Run and Asset Tags and Metadata (#319)
1 parent 9fad5db commit 188f22c

9 files changed

Lines changed: 913 additions & 18 deletions

File tree

rust/crates/sift_error/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,8 @@ pub enum ErrorKind {
108108
RetrieveRunError,
109109
/// Indicates that the program was unable to retrieve the asset being requested.
110110
RetrieveAssetError,
111+
/// Indicates that the program was unable to update the asset being requested.
112+
UpdateAssetError,
111113
/// Indicates a failure to update a run.
112114
UpdateRunError,
113115
/// Indicates that the program was unable to retrieve the ingestion config being requested.
@@ -193,6 +195,7 @@ impl fmt::Display for ErrorKind {
193195
Self::GrpcConnectError => write!(f, "GrpcConnectError"),
194196
Self::RetriesExhausted => write!(f, "RetriesExhausted"),
195197
Self::RetrieveAssetError => write!(f, "RetrieveAssetError"),
198+
Self::UpdateAssetError => write!(f, "UpdateAssetError"),
196199
Self::RetrieveRunError => write!(f, "RetrieveRunError"),
197200
Self::RetrieveIngestionConfigError => write!(f, "RetrieveIngestionConfigError"),
198201
Self::EmptyResponseError => write!(f, "EmptyResponseError"),

rust/crates/sift_rs/src/wrappers/assets.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,13 @@
11
use std::ops::{Deref, DerefMut};
22

33
use async_trait::async_trait;
4+
use pbjson_types::FieldMask;
45
use sift_connect::SiftChannel;
56
use sift_error::prelude::*;
67

7-
use crate::assets::v1::{Asset, GetAssetRequest, asset_service_client::AssetServiceClient};
8+
use crate::assets::v1::{
9+
Asset, GetAssetRequest, UpdateAssetRequest, asset_service_client::AssetServiceClient,
10+
};
811

912
/// Return an implementation of [AssetServiceWrapper] which also exposes methods from the
1013
/// raw [AssetServiceClient].
@@ -17,6 +20,9 @@ pub fn new_asset_service(grpc_channel: SiftChannel) -> impl AssetServiceWrapper
1720
pub trait AssetServiceWrapper: Deref<Target = AssetServiceClient<SiftChannel>> + DerefMut {
1821
/// Retrieves an asset by ID
1922
async fn try_get_asset_by_id(&mut self, asset_id: &str) -> Result<Asset>;
23+
24+
/// Update an asset
25+
async fn try_update_asset(&mut self, asset: Asset, update_mask: Vec<String>) -> Result<Asset>;
2026
}
2127

2228
/// A convenience wrapper around [AssetServiceClient].
@@ -37,6 +43,21 @@ impl AssetServiceWrapper for AssetServiceImpl {
3743
Error::new_empty_response("unexpected empty response from AssetService/GetAsset")
3844
})
3945
}
46+
47+
async fn try_update_asset(&mut self, asset: Asset, update_mask: Vec<String>) -> Result<Asset> {
48+
let req = UpdateAssetRequest {
49+
asset: Some(asset),
50+
update_mask: Some(FieldMask { paths: update_mask }),
51+
};
52+
let resp = self
53+
.update_asset(req)
54+
.await
55+
.map_err(|e| Error::new(ErrorKind::UpdateAssetError, e))?;
56+
57+
resp.into_inner().asset.ok_or_else(|| {
58+
Error::new_empty_response("unexpected empty response from AssetService/UpdateAsset")
59+
})
60+
}
4061
}
4162

4263
impl Deref for AssetServiceImpl {
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use crate::metadata::v1::{MetadataKey, MetadataKeyType};
2+
3+
pub use crate::metadata::v1::MetadataValue;
4+
pub use crate::metadata::v1::metadata_value::Value as MetadataEnumValue;
5+
6+
impl From<f64> for MetadataEnumValue {
7+
fn from(value: f64) -> MetadataEnumValue {
8+
MetadataEnumValue::NumberValue(value)
9+
}
10+
}
11+
12+
impl From<bool> for MetadataEnumValue {
13+
fn from(value: bool) -> MetadataEnumValue {
14+
MetadataEnumValue::BooleanValue(value)
15+
}
16+
}
17+
18+
impl From<String> for MetadataEnumValue {
19+
fn from(value: String) -> MetadataEnumValue {
20+
MetadataEnumValue::StringValue(value)
21+
}
22+
}
23+
24+
impl From<&str> for MetadataEnumValue {
25+
fn from(value: &str) -> MetadataEnumValue {
26+
MetadataEnumValue::StringValue(value.to_string())
27+
}
28+
}
29+
30+
impl<T: Into<MetadataEnumValue>> From<(String, T)> for MetadataValue {
31+
fn from((name, value): (String, T)) -> MetadataValue {
32+
MetadataValue::from((name.as_str(), value))
33+
}
34+
}
35+
36+
impl<T: Into<MetadataEnumValue>> From<(&str, T)> for MetadataValue {
37+
fn from((name, value): (&str, T)) -> MetadataValue {
38+
let enum_value: MetadataEnumValue = value.into();
39+
let key_type = match enum_value {
40+
MetadataEnumValue::NumberValue(_) => MetadataKeyType::Number,
41+
MetadataEnumValue::BooleanValue(_) => MetadataKeyType::Boolean,
42+
MetadataEnumValue::StringValue(_) => MetadataKeyType::String,
43+
};
44+
45+
let key = MetadataKey {
46+
name: name.to_string(),
47+
r#type: key_type.into(),
48+
archived_date: None,
49+
};
50+
51+
MetadataValue {
52+
key: Some(key),
53+
value: Some(enum_value),
54+
archived_date: None,
55+
}
56+
}
57+
}
58+
59+
/// A macro for easily creating an array of metadata to be provided to Sift.
60+
/// Returns a Vec<[MetadataValue]>
61+
///
62+
/// # Example
63+
/// ```
64+
/// let metadata = metadata![
65+
/// ("test_number", 5.0),
66+
/// ("is_simulation", true),
67+
/// ("location", "SiftHQ"),
68+
/// ];
69+
/// ```
70+
#[macro_export]
71+
macro_rules! metadata {
72+
( $( ($k:expr, $v:expr) ),* $(,)? ) => {
73+
vec![ $( ($k, $v).into() ),* ]
74+
}
75+
}

rust/crates/sift_rs/src/wrappers/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ pub mod assets;
44
/// Offers a wrapper over Sift's ingestion configs API.
55
pub mod ingestion_configs;
66

7+
/// Offers a wrapper over Sift's metadata API.
8+
pub mod metadata;
9+
710
/// Offers a wrapper over Sift's runs API.
811
pub mod runs;
912

rust/crates/sift_rs/src/wrappers/runs.rs

Lines changed: 18 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
use super::ResourceIdentifier;
2-
use crate::runs::v2::{
3-
CreateRunRequest, GetRunRequest, ListRunsRequest, Run, UpdateRunRequest,
4-
run_service_client::RunServiceClient,
2+
use crate::{
3+
metadata::v1::MetadataValue,
4+
runs::v2::{
5+
CreateRunRequest, GetRunRequest, ListRunsRequest, Run, UpdateRunRequest,
6+
run_service_client::RunServiceClient,
7+
},
58
};
69
use async_trait::async_trait;
710
use pbjson_types::FieldMask;
@@ -25,16 +28,17 @@ pub trait RunServiceWrapper: Deref<Target = RunServiceClient<SiftChannel>> + Der
2528
client_key: &str,
2629
description: &str,
2730
tags: &[String],
31+
metadata: &[MetadataValue],
2832
) -> Result<Run>;
2933

3034
/// Update a run. The `updated_run` is expected to contain the `run_id` or `client_key` used to
31-
/// identify the run to update. The `field_mask` is a list of snake_cased field names used to
35+
/// identify the run to update. The `update_mask` is a list of snake_cased field names used to
3236
/// indicate which fields should actually be updated. A list of valid field names can be found
33-
/// at [`this link`]. The [Run] returned is the updated run. If `field_masks` is empty, then no
37+
/// at [`this link`]. The [Run] returned is the updated run. If `update_mask` is empty, then no
3438
/// update is required and the `updated_run` is simply returned.
3539
///
3640
/// [`this link`]: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/runs#updaterunrequest
37-
async fn try_update_run(&mut self, updated_run: Run, field_masks: &[String]) -> Result<Run>;
41+
async fn try_update_run(&mut self, updated_run: Run, update_mask: &[String]) -> Result<Run>;
3842

3943
/// Retrieve a run by ID.
4044
async fn try_get_run_by_id(&mut self, run_id: &str) -> Result<Run>;
@@ -94,8 +98,10 @@ impl RunServiceWrapper for RunServiceWrapperImpl {
9498
client_key: &str,
9599
description: &str,
96100
tags: &[String],
101+
metadata: &[MetadataValue],
97102
) -> Result<Run> {
98103
let tags = tags.to_vec();
104+
let metadata = metadata.to_vec();
99105

100106
if name.is_empty() {
101107
return Err(Error::new_arg_error("run name cannot be blank"));
@@ -110,6 +116,7 @@ impl RunServiceWrapper for RunServiceWrapperImpl {
110116
description: description.to_string(),
111117
tags,
112118
client_key: Some(client_key.to_string()),
119+
metadata,
113120
..Default::default()
114121
})
115122
.await
@@ -123,21 +130,21 @@ impl RunServiceWrapper for RunServiceWrapperImpl {
123130
}
124131

125132
/// Update a run. The `updated_run` is expected to contain the `run_id` or `client_key` used to
126-
/// identify the run to update. The `field_mask` is a list of snake_cased field names used to
133+
/// identify the run to update. The `update_mask` is a list of snake_cased field names used to
127134
/// indicate which fields should actually be updated. A list of valid field names can be found
128-
/// at [`this link`]. The [Run] returned is the updated run. If `field_masks` is empty, then no
135+
/// at [`this link`]. The [Run] returned is the updated run. If `update_mask` is empty, then no
129136
/// update is required and the `updated_run` is simply returned.
130137
///
131138
/// [`this link`]: https://docs.siftstack.com/docs/api/grpc/protocol-buffers/runs#updaterunrequest
132-
async fn try_update_run(&mut self, updated_run: Run, field_masks: &[String]) -> Result<Run> {
133-
if field_masks.is_empty() {
139+
async fn try_update_run(&mut self, updated_run: Run, update_mask: &[String]) -> Result<Run> {
140+
if update_mask.is_empty() {
134141
return Ok(updated_run);
135142
}
136143

137144
let run = self
138145
.update_run(UpdateRunRequest {
139146
update_mask: Some(FieldMask {
140-
paths: field_masks.to_vec(),
147+
paths: update_mask.to_vec(),
141148
}),
142149
run: Some(updated_run),
143150
})

rust/crates/sift_stream/examples/quick-start/main.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use sift_rs::metadata;
12
use sift_stream::{
23
ChannelConfig, ChannelDataType, ChannelValue, Credentials, Flow, FlowConfig,
34
IngestionConfigForm, RecoveryStrategy, RunForm, SiftStreamBuilder, TimeValue,
@@ -53,12 +54,20 @@ async fn run() -> Result<(), Box<dyn Error>> {
5354
.map(|d| d.as_millis())
5455
.unwrap();
5556

57+
// Create metadata using the metadata macro
58+
let metadata = metadata![
59+
("test_number", 5.0),
60+
("is_simulation", true),
61+
("location", "SiftHQ"),
62+
];
63+
5664
// Define an optional run to group together data for this period of telemetry ingestion.
5765
let run = RunForm {
5866
name: format!("[MarsRover0].{ts}"),
5967
client_key: format!("mars-rover-sim-{ts}"),
6068
description: Some("simulation run".into()),
6169
tags: Some(vec!["simulation".into()]),
70+
metadata: Some(metadata),
6271
};
6372

6473
// Initialize your Sift Stream

0 commit comments

Comments
 (0)