|
1 | 1 | use super::super::{SiftStream, SiftStreamMode, channel::ChannelValue, time::TimeValue}; |
2 | 2 | use crate::{ |
| 3 | + FlowDescriptor, |
3 | 4 | metrics::SiftStreamMetrics, |
4 | 5 | stream::{ |
5 | 6 | run::{RunSelector, load_run_by_form, load_run_by_id}, |
@@ -213,6 +214,27 @@ impl SiftStream<IngestionConfigMode> { |
213 | 214 | Ok(()) |
214 | 215 | } |
215 | 216 |
|
| 217 | + /// This method offers a way to send data in a manner that's identical to the raw |
| 218 | + /// [`gRPC service`] for ingestion-config based streaming. Users are expected to handle |
| 219 | + /// channel value ordering as well as empty values correctly. |
| 220 | + /// |
| 221 | + /// ### Important |
| 222 | + /// |
| 223 | + /// Note if using this interface, you should use [FlowBuilder::request] to ensure proper |
| 224 | + /// building of the request. |
| 225 | + /// |
| 226 | + /// [`gRPC service`]: https://github.com/sift-stack/sift/blob/main/protos/sift/ingest/v1/ingest.proto#L11 |
| 227 | + pub fn send_requests_nonblocking<I>(&mut self, requests: I) -> Result<()> |
| 228 | + where |
| 229 | + I: IntoIterator<Item = IngestWithConfigDataStreamRequest>, |
| 230 | + { |
| 231 | + for req in requests { |
| 232 | + self.metrics.messages_received.increment(); |
| 233 | + self.send_impl(req)?; |
| 234 | + } |
| 235 | + Ok(()) |
| 236 | + } |
| 237 | + |
216 | 238 | /// Concerned with sending the actual ingest request to [DataStream] which will then write it |
217 | 239 | /// to the gRPC stream. If backups are enabled, the request will be backed up as well. |
218 | 240 | fn send_impl(&mut self, request: IngestWithConfigDataStreamRequest) -> Result<()> { |
@@ -368,6 +390,28 @@ impl SiftStream<IngestionConfigMode> { |
368 | 390 | .collect() |
369 | 391 | } |
370 | 392 |
|
| 393 | + /// Get the flow descriptor for a given flow name. |
| 394 | + pub fn get_flow_descriptor(&self, flow_name: &str) -> Result<FlowDescriptor<String>> { |
| 395 | + let Some(flow) = self.mode.flows_by_name.get(flow_name) else { |
| 396 | + return Err(Error::new_msg( |
| 397 | + ErrorKind::NotFoundError, |
| 398 | + format!("flow '{}' not found", flow_name), |
| 399 | + )); |
| 400 | + }; |
| 401 | + |
| 402 | + if flow.is_empty() { |
| 403 | + return Err(Error::new_msg( |
| 404 | + ErrorKind::NotFoundError, |
| 405 | + format!("flow '{}' not found", flow_name), |
| 406 | + )); |
| 407 | + } |
| 408 | + |
| 409 | + FlowDescriptor::try_from(( |
| 410 | + self.mode.ingestion_config.ingestion_config_id.clone(), |
| 411 | + &flow[0], |
| 412 | + )) |
| 413 | + } |
| 414 | + |
371 | 415 | /// Attach a run to the stream. Any data provided through [SiftStream::send] after return |
372 | 416 | /// of this function will be associated with the run. |
373 | 417 | pub async fn attach_run(&mut self, run_selector: RunSelector) -> Result<()> { |
|
0 commit comments