Skip to content

Commit f2835f9

Browse files
authored
rust(features): add new flow after stream initialization + stream init from grpc channel (#229)
1 parent 7c7a37c commit f2835f9

8 files changed

Lines changed: 186 additions & 37 deletions

File tree

rust/crates/sift_connect/src/grpc/config.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,13 @@ pub const SIFT_CONFIG_NAME: &str = "sift.toml";
88
/// query the corresponding table from [`SIFT_CONFIG_NAME`] located at
99
/// [these locations](https://docs.rs/dirs/6.0.0/dirs/fn.config_local_dir.html)
1010
/// depending on your operating system. If `None` is provided, then the top-level table is used.
11-
#[derive(Debug)]
11+
#[derive(Debug, Clone)]
1212
pub enum Credentials {
1313
Profile(Option<String>),
1414
Config { uri: String, apikey: String },
1515
}
1616

17-
#[derive(Default)]
17+
#[derive(Default, Clone)]
1818
pub(crate) struct SiftChannelConfig {
1919
pub uri: String,
2020
pub apikey: String,

rust/crates/sift_error/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,8 @@ pub enum ErrorKind {
106106
GrpcConnectError,
107107
/// Indicates that the program was unable to retrieve the run being requested.
108108
RetrieveRunError,
109+
/// Indicates that the program was unable to retrieve the asset being requested.
110+
RetrieveAssetError,
109111
/// Indicates a failure to update a run.
110112
UpdateRunError,
111113
/// Indicates that the program was unable to retrieve the ingestion config being requested.
@@ -190,6 +192,7 @@ impl fmt::Display for ErrorKind {
190192
match self {
191193
Self::GrpcConnectError => write!(f, "GrpcConnectError"),
192194
Self::RetriesExhausted => write!(f, "RetriesExhausted"),
195+
Self::RetrieveAssetError => write!(f, "RetrieveAssetError"),
193196
Self::RetrieveRunError => write!(f, "RetrieveRunError"),
194197
Self::RetrieveIngestionConfigError => write!(f, "RetrieveIngestionConfigError"),
195198
Self::EmptyResponseError => write!(f, "EmptyResponseError"),
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::ops::{Deref, DerefMut};
2+
3+
use async_trait::async_trait;
4+
use sift_connect::SiftChannel;
5+
use sift_error::prelude::*;
6+
7+
use crate::assets::v1::{Asset, GetAssetRequest, asset_service_client::AssetServiceClient};
8+
9+
/// Return an implementation of [AssetServiceWrapper] which also exposes methods from the
10+
/// raw [AssetServiceClient].
11+
pub fn new_asset_service(grpc_channel: SiftChannel) -> impl AssetServiceWrapper {
12+
AssetServiceImpl(AssetServiceClient::new(grpc_channel))
13+
}
14+
15+
/// Convenience methods
16+
#[async_trait]
17+
pub trait AssetServiceWrapper: Deref<Target = AssetServiceClient<SiftChannel>> + DerefMut {
18+
/// Retrieves an asset by ID
19+
async fn try_get_asset_by_id(&mut self, asset_id: &str) -> Result<Asset>;
20+
}
21+
22+
/// A convenience wrapper around [AssetServiceClient].
23+
struct AssetServiceImpl(AssetServiceClient<SiftChannel>);
24+
25+
#[async_trait]
26+
impl AssetServiceWrapper for AssetServiceImpl {
27+
async fn try_get_asset_by_id(&mut self, asset_id: &str) -> Result<Asset> {
28+
let req = GetAssetRequest {
29+
asset_id: asset_id.into(),
30+
};
31+
let resp = self
32+
.get_asset(req)
33+
.await
34+
.map_err(|e| Error::new(ErrorKind::RetrieveAssetError, e))?;
35+
36+
resp.into_inner().asset.ok_or_else(|| {
37+
Error::new_empty_response("unexpected empty response from AssetService/GetAsset")
38+
})
39+
}
40+
}
41+
42+
impl Deref for AssetServiceImpl {
43+
type Target = AssetServiceClient<SiftChannel>;
44+
45+
fn deref(&self) -> &Self::Target {
46+
&self.0
47+
}
48+
}
49+
50+
impl DerefMut for AssetServiceImpl {
51+
fn deref_mut(&mut self) -> &mut Self::Target {
52+
&mut self.0
53+
}
54+
}

rust/crates/sift_rs/src/wrappers/ingestion_configs.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -78,11 +78,6 @@ impl IngestionConfigServiceWrapper for IngestionConfigServiceImpl {
7878
"ingestion config client key cannot be blank",
7979
));
8080
}
81-
if flows.is_empty() {
82-
return Err(Error::new_arg_error(
83-
"cannot create an ingestion config with no flows",
84-
));
85-
}
8681

8782
self.create_ingestion_config(CreateIngestionConfigRequest {
8883
asset_name: asset_name.to_string(),

rust/crates/sift_rs/src/wrappers/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
/// Offers a wrapper over Sift's assets API.
2+
pub mod assets;
3+
14
/// Offers a wrapper over Sift's ingestion configs API.
25
pub mod ingestion_configs;
36

rust/crates/sift_stream/src/backup/disk/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ where
8787
};
8888
let backups_dir = backups_root.join(new_dir_name);
8989

90-
match fs::create_dir(&backups_dir) {
90+
match fs::create_dir_all(&backups_dir) {
9191
Err(err) if err.kind() != IoErrorKind::AlreadyExists => {
9292
return Err(Error::new(ErrorKind::BackupsError, err))
9393
.with_context(|| format!("failed to create directory for backups at {}", backups_dir.display()))

rust/crates/sift_stream/src/stream/builder.rs

Lines changed: 97 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use sift_rs::{
1111
ping::v1::{PingRequest, ping_service_client::PingServiceClient},
1212
runs::v2::Run,
1313
wrappers::{
14+
assets::{AssetServiceWrapper, new_asset_service},
1415
ingestion_configs::{IngestionConfigServiceWrapper, new_ingestion_config_service},
1516
runs::{RunServiceWrapper, new_run_service},
1617
},
@@ -39,7 +40,8 @@ pub const DEFAULT_CHECKPOINT_INTERVAL: Duration = Duration::from_secs(60);
3940
/// [tokio](https://docs.rs/tokio/latest/tokio/) asynchronous runtime is required, otherwise
4041
/// attempts to call [SiftStreamBuilder::build] will panic.
4142
pub struct SiftStreamBuilder<C> {
42-
credentials: Credentials,
43+
credentials: Option<Credentials>,
44+
channel: Option<SiftChannel>,
4345
recovery_strategy: Option<RecoveryStrategy>,
4446
checkpoint_interval: Duration,
4547
ingestion_config: Option<IngestionConfigForm>,
@@ -171,7 +173,8 @@ where
171173
self
172174
}
173175

174-
/// Disables TLS. Useful for testing.
176+
/// Disables TLS. Useful for testing. This is ignored if [SiftStreamBuilder::from_channel] is
177+
/// used to initialize the builder.
175178
pub fn disable_tls(mut self) -> SiftStreamBuilder<C> {
176179
self.enable_tls = false;
177180
self
@@ -286,10 +289,26 @@ where
286289

287290
/// Builds a [SiftStream] specifically for ingestion-config based streaming.
288291
impl SiftStreamBuilder<IngestionConfigMode> {
289-
/// Initializes a new builder for ingestion-config-based streaming.
292+
/// Initializes a new builder for ingestion-config-based streaming from [Credentials].
290293
pub fn new(credentials: Credentials) -> SiftStreamBuilder<IngestionConfigMode> {
291294
SiftStreamBuilder {
292-
credentials,
295+
credentials: Some(credentials),
296+
channel: None,
297+
enable_tls: true,
298+
ingestion_config: None,
299+
run: None,
300+
run_id: None,
301+
kind: PhantomData,
302+
checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
303+
recovery_strategy: None,
304+
}
305+
}
306+
307+
/// Initializes a new builder for ingestion-config-based streaming from a [SiftChannel].
308+
pub fn from_channel(channel: SiftChannel) -> SiftStreamBuilder<IngestionConfigMode> {
309+
SiftStreamBuilder {
310+
credentials: None,
311+
channel: Some(channel),
293312
enable_tls: true,
294313
ingestion_config: None,
295314
run: None,
@@ -304,26 +323,37 @@ impl SiftStreamBuilder<IngestionConfigMode> {
304323
/// streaming.
305324
pub async fn build(self) -> Result<SiftStream<IngestionConfigMode>> {
306325
let SiftStreamBuilder {
307-
credentials,
308326
checkpoint_interval,
327+
channel: grpc_channel,
328+
credentials,
329+
enable_tls,
309330
ingestion_config,
331+
recovery_strategy,
310332
run,
311333
run_id,
312-
recovery_strategy,
313-
enable_tls,
314334
..
315335
} = self;
316336

317337
let Some(ingestion_config) = ingestion_config else {
318338
return Err(Error::new_arg_error("ingestion_config is required"));
319339
};
320340

321-
let mut sift_channel_builder = SiftChannelBuilder::new(credentials);
341+
let channel = match grpc_channel {
342+
Some(ch) => ch,
343+
None if credentials.is_some() => {
344+
let mut sift_channel_builder = SiftChannelBuilder::new(credentials.unwrap());
322345

323-
if enable_tls {
324-
sift_channel_builder = sift_channel_builder.use_tls(true);
325-
}
326-
let channel = sift_channel_builder.build()?;
346+
if enable_tls {
347+
sift_channel_builder = sift_channel_builder.use_tls(true);
348+
}
349+
sift_channel_builder.build()?
350+
}
351+
None => {
352+
return Err(Error::new_arg_error(
353+
"either credentials or a gRPC channel must be provided",
354+
));
355+
}
356+
};
327357

328358
// Since the gRPC connection is lazy, we'll connect right away and ensure the connection is
329359
// valid.
@@ -420,7 +450,8 @@ impl SiftStreamBuilder<IngestionConfigMode> {
420450
#[cfg(feature = "tracing")]
421451
tracing::info_span!("load_ingestion_config");
422452

423-
let mut ingestion_config_service = new_ingestion_config_service(grpc_channel);
453+
let mut ingestion_config_service = new_ingestion_config_service(grpc_channel.clone());
454+
let mut asset_service = new_asset_service(grpc_channel);
424455

425456
let IngestionConfigForm {
426457
asset_name,
@@ -437,24 +468,31 @@ impl SiftStreamBuilder<IngestionConfigMode> {
437468
.try_create_ingestion_config(&asset_name, &client_key, &flows)
438469
.await?;
439470

440-
let flows = ingestion_config_service
441-
.try_filter_flows(&ingestion_config.ingestion_config_id, "")
442-
.await?;
471+
let new_flows = {
472+
if flows.is_empty() {
473+
Vec::new()
474+
} else {
475+
ingestion_config_service
476+
.try_filter_flows(&ingestion_config.ingestion_config_id, "")
477+
.await?
478+
}
479+
};
443480

444481
#[cfg(feature = "tracing")]
445482
{
446-
let flow_names = flows
447-
.iter()
448-
.map(|f| f.name.as_str())
449-
.collect::<Vec<&str>>()
450-
.join(",");
451-
tracing::info!(
452-
ingestion_config_id = ingestion_config.ingestion_config_id,
453-
flow_names = flow_names,
454-
"created new ingestion config"
455-
);
483+
if !new_flows.is_empty() {
484+
let flow_names = new_flows
485+
.iter()
486+
.map(|f| f.name.as_str())
487+
.collect::<Vec<&str>>()
488+
.join(",");
489+
tracing::info!(
490+
ingestion_config_id = ingestion_config.ingestion_config_id,
491+
flow_names = flow_names,
492+
"created new ingestion config"
493+
);
494+
}
456495
}
457-
458496
Ok((ingestion_config, flows))
459497
}
460498
Err(err) => Err(err),
@@ -466,13 +504,32 @@ impl SiftStreamBuilder<IngestionConfigMode> {
466504
"an existing ingestion config was found with the provided client-key"
467505
);
468506

507+
let asset = asset_service
508+
.try_get_asset_by_id(&ingestion_config.asset_id)
509+
.await
510+
.context("failed to retrieve asset specified by ingestion config")?;
511+
512+
if asset.name != asset_name {
513+
return Err(Error::new_msg(
514+
ErrorKind::IncompatibleIngestionConfigChange,
515+
format!(
516+
"local ingestion config references asset '{asset_name}' but this existing config in Sift refers to asset '{}'",
517+
asset.name
518+
),
519+
));
520+
}
521+
469522
let flow_names = flows
470523
.iter()
471524
.map(|f| format!("'{}'", f.name))
472525
.collect::<Vec<String>>()
473526
.join(",");
474527

475-
let filter = format!("flow_name in [{flow_names}]");
528+
let filter = flow_names
529+
.is_empty()
530+
.then(String::new)
531+
.unwrap_or_else(|| format!("flow_name in [{flow_names}]"));
532+
476533
let existing_flows = ingestion_config_service
477534
.try_filter_flows(&ingestion_config.ingestion_config_id, &filter)
478535
.await?;
@@ -534,3 +591,15 @@ impl SiftStreamBuilder<IngestionConfigMode> {
534591
}
535592
}
536593
}
594+
595+
impl From<Credentials> for SiftStreamBuilder<IngestionConfigMode> {
596+
fn from(value: Credentials) -> Self {
597+
Self::new(value)
598+
}
599+
}
600+
601+
impl From<SiftChannel> for SiftStreamBuilder<IngestionConfigMode> {
602+
fn from(value: SiftChannel) -> Self {
603+
Self::from_channel(value)
604+
}
605+
}

rust/crates/sift_stream/src/stream/mode/ingestion_config.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ use sift_rs::{
1313
},
1414
ingestion_configs::v2::{FlowConfig, IngestionConfig},
1515
runs::v2::Run,
16+
wrappers::ingestion_configs::{IngestionConfigServiceWrapper, new_ingestion_config_service},
1617
};
1718
use std::{
1819
collections::HashMap,
@@ -298,6 +299,29 @@ impl SiftStream<IngestionConfigMode> {
298299
}
299300
}
300301

302+
/// Modify the existing ingestion config by adding new flows that weren't accounted for during
303+
/// initialization.
304+
pub async fn add_new_flows(&mut self, flow_configs: &[FlowConfig]) -> Result<()> {
305+
new_ingestion_config_service(self.grpc_channel.clone())
306+
.try_create_flows(
307+
&self.mode.ingestion_config.ingestion_config_id,
308+
flow_configs,
309+
)
310+
.await
311+
.context("SiftStream::add_new_flows")?;
312+
313+
for flow_config in flow_configs {
314+
self.mode
315+
.flows_by_name
316+
.entry(flow_config.name.clone())
317+
.and_modify(|flows| flows.push(flow_config.clone()))
318+
.or_insert_with(|| vec![flow_config.clone()]);
319+
320+
tracing::info!(flow = flow_config.name, "successfully registered new flow");
321+
}
322+
Ok(())
323+
}
324+
301325
async fn backup_data(&mut self, req: &IngestWithConfigDataStreamRequest) -> Result<()> {
302326
if let Some(backups_manager) = self.mode.backups_manager.as_mut() {
303327
match backups_manager {
@@ -709,7 +733,8 @@ impl SiftStream<IngestionConfigMode> {
709733
})
710734
}
711735

712-
/// Flows passed into this function should have names match `flow_name`.
736+
/// Flows passed into this function should have names match `flow_name`. The only case
737+
/// in which this returns `None` is if there is no [FlowConfig] for the given `message`.
713738
pub(crate) fn message_to_ingest_req(
714739
message: &Flow,
715740
ingestion_config_id: &str,

0 commit comments

Comments
 (0)