Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 177 additions & 0 deletions driver/examples/stream_processing_example.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
//! Demonstrates the full lifecycle of an Atlas Stream Processing (ASP) stream
//! processor using [`mongodb::stream_processing::StreamProcessingClient`].
//! It creates, starts, samples, stops, and drops a processor.
//!
//! Requirements:
//!
//! - An Atlas Stream Processing workspace with a hostname matching the pattern
//! `atlas-stream-<workspaceId>-<suffix>.<region>.a.query.mongodb.net` (or `.mongodb-<env>.net`
//! for staging).
//! - A user with the `atlasAdmin` role.
//! - Two connections registered in the workspace:
//! - `sample_stream_solar` (built-in sample source)
//! - `__testLog` (built-in test sink)
//!
//! Use the `MONGODB_STREAM_PROCESSING_URI` environment variable to specify
//! the workspace connection string (including username/password). Run with:
//!
//! ```text
//! MONGODB_STREAM_PROCESSING_URI='mongodb://user:pass@atlas-stream-….a.query.mongodb.net/' \
//! cargo run -p mongodb --example stream_processing_example
//! ```

use std::{
process::ExitCode,
time::{Duration, Instant},
};

use mongodb::{
bson::{doc, oid::ObjectId},
error::Result,
stream_processing::{
GetStreamProcessorSamplesOptions,
StreamProcessingClient,
StreamProcessors,
},
};

#[tokio::main]
async fn main() -> ExitCode {
let uri = match std::env::var("MONGODB_STREAM_PROCESSING_URI") {
Ok(uri) if !uri.is_empty() => uri,
_ => {
eprintln!("This example requires an Atlas Stream Processing workspace endpoint.");
eprintln!("Set MONGODB_STREAM_PROCESSING_URI to the workspace connection string.");
return ExitCode::from(1);
}
};

if !StreamProcessingClient::is_workspace_uri(&uri) {
eprintln!("MONGODB_STREAM_PROCESSING_URI does not look like a workspace endpoint.");
eprintln!(
"Expected hostname pattern: atlas-stream-*.<region>.a.query.mongodb.net (or \
.mongodb-stage.net for Atlas staging)"
);
return ExitCode::from(1);
}

let client = match StreamProcessingClient::with_uri_str(&uri).await {
Ok(c) => c,
Err(e) => {
eprintln!("StreamProcessingClient::with_uri_str failed: {e}");
return ExitCode::from(1);
}
};

let processors = client.stream_processors();
let name = format!("rustdriver_demo_{}", ObjectId::new().to_hex());

println!("Workspace: {uri}");
println!("Processor: {name}\n");

match run_lifecycle(&processors, &name).await {
Ok(()) => {
println!("OK.");
ExitCode::SUCCESS
}
Err(e) => {
eprintln!("\nFAILED: {e}");
// Best-effort cleanup so we don't leave processors behind.
if processors.get(&name).drop().await.is_ok() {
eprintln!("(cleaned up processor {name})");
}
ExitCode::from(1)
}
}
}

async fn run_lifecycle(processors: &StreamProcessors, name: &str) -> Result<()> {
let pipeline = vec![
doc! { "$source": { "connectionName": "sample_stream_solar" } },
doc! { "$emit": { "connectionName": "__testLog", "topic": "rust-driver-demo" } },
];

// 1. create
println!("[1/6] create({name})");
processors.create(name, pipeline, None).await?;
let info = processors.get_info(name).await?;
println!(" state={}\n", info.state);

// 2. start
println!("[2/6] start()");
let processor = processors.get(name);
processor.start(None).await?;
let state = wait_for_state(processors, name, "STARTED", Duration::from_secs(30)).await?;
println!(" state={state}\n");
if state != "STARTED" {
return Err(mongodb::error::Error::custom(format!(
"processor did not reach STARTED within 30s (got {state})"
)));
}

// 3. stats
println!("[3/6] stats()");
let stats = processor.stats(None).await?;
println!(" {stats:?}\n");

// 4. sample
println!("[4/6] samples()");
let opened = processor
.samples(GetStreamProcessorSamplesOptions::builder().limit(5).build())
.await?;
println!(
" open cursorId={} docs={}",
opened.cursor_id,
opened.documents.len()
);

if !opened.is_exhausted() {
// Give the stream a moment to produce something.
tokio::time::sleep(Duration::from_secs(2)).await;
let batch = processor
.samples(
GetStreamProcessorSamplesOptions::builder()
.cursor_id(opened.cursor_id)
.batch_size(5)
.build(),
)
.await?;
println!(
" batch cursorId={} docs={}",
batch.cursor_id,
batch.documents.len()
);
for (i, doc) in batch.documents.iter().enumerate() {
println!(" [{i}] {doc:?}");
}
}
println!();

// 5. stop
println!("[5/6] stop()");
processor.stop().await?;
let state = processors.get_info(name).await?.state;
println!(" state={state}\n");

// 6. drop
println!("[6/6] drop()");
processor.drop().await?;
println!(" dropped\n");

Ok(())
}

async fn wait_for_state(
processors: &StreamProcessors,
name: &str,
target: &str,
timeout: Duration,
) -> Result<String> {
let deadline = Instant::now() + timeout;
let mut state = processors.get_info(name).await?.state;
while state != target && Instant::now() < deadline {
tokio::time::sleep(Duration::from_millis(500)).await;
state = processors.get_info(name).await?.state;
}
Ok(state)
}
1 change: 1 addition & 0 deletions driver/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ mod search_index;
mod selection_criteria;
mod serde_util;
mod srv;
pub mod stream_processing;
#[cfg(feature = "sync")]
pub mod sync;
#[cfg(test)]
Expand Down
1 change: 1 addition & 0 deletions driver/src/operation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub(crate) mod raw_output;
pub(crate) mod run_command;
pub(crate) mod run_cursor_command;
mod search_index;
pub(crate) mod stream_processing;
mod update;

use std::{borrow::Cow, fmt::Debug, ops::Deref};
Expand Down
10 changes: 10 additions & 0 deletions driver/src/operation/stream_processing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! Operation implementations for Atlas Stream Processing commands.

pub(crate) mod create_stream_processor;
pub(crate) mod drop_stream_processor;
pub(crate) mod get_more_sample_stream_processor;
pub(crate) mod get_stream_processor;
pub(crate) mod get_stream_processor_stats;
pub(crate) mod start_sample_stream_processor;
pub(crate) mod start_stream_processor;
pub(crate) mod stop_stream_processor;
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
use crate::{
bson::{doc, Document},
bson_compat::{cstr, cstr_to_str, CStr},
bson_util,
cmap::{Command, RawCommandResponse, StreamDescription},
error::Result,
operation::{append_options, ExecutionContext, OperationWithDefaults},
stream_processing::options::CreateStreamProcessorOptions,
Client,
};

#[derive(Debug)]
pub(crate) struct CreateStreamProcessor {
client: Client,
name: String,
pipeline: Vec<Document>,
options: Option<CreateStreamProcessorOptions>,
}

impl CreateStreamProcessor {
pub(crate) fn new(
client: Client,
name: String,
pipeline: Vec<Document>,
options: Option<CreateStreamProcessorOptions>,
) -> Self {
Self {
client,
name,
pipeline,
options,
}
}
}

impl OperationWithDefaults for CreateStreamProcessor {
type O = ();

const NAME: &'static CStr = cstr!("createStreamProcessor");

fn build(&mut self, _description: &StreamDescription) -> Result<Command> {
let mut body = doc! {
cstr_to_str(Self::NAME): self.name.as_str(),
"pipeline": bson_util::to_bson_array(&self.pipeline),
};

if let Some(opts) = self.options.as_ref() {
let mut sub = Document::new();
append_options(&mut sub, Some(opts))?;
if !sub.is_empty() {
body.insert("options", sub);
}
}

Ok(Command::from_operation(self, (&body).try_into()?))
}

fn handle_response<'a>(
&'a self,
_response: &'a RawCommandResponse,
_context: ExecutionContext<'a>,
) -> Result<Self::O> {
// { ok: 1 }
Ok(())
}

fn target(&self) -> super::super::OperationTarget {
super::super::OperationTarget::admin(&self.client)
}

#[cfg(feature = "opentelemetry")]
type Otel = crate::otel::Witness<Self>;
}

#[cfg(feature = "opentelemetry")]
impl crate::otel::OtelInfoDefaults for CreateStreamProcessor {}
51 changes: 51 additions & 0 deletions driver/src/operation/stream_processing/drop_stream_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
use crate::{
bson::rawdoc,
bson_compat::{cstr, CStr},
cmap::{Command, RawCommandResponse, StreamDescription},
error::Result,
operation::{ExecutionContext, OperationWithDefaults},
Client,
};

#[derive(Debug)]
pub(crate) struct DropStreamProcessor {
client: Client,
name: String,
}

impl DropStreamProcessor {
pub(crate) fn new(client: Client, name: String) -> Self {
Self { client, name }
}
}

impl OperationWithDefaults for DropStreamProcessor {
type O = ();

const NAME: &'static CStr = cstr!("dropStreamProcessor");

fn build(&mut self, _description: &StreamDescription) -> Result<Command> {
Ok(Command::from_operation(
self,
rawdoc! { Self::NAME: self.name.as_str() },
))
}

fn handle_response<'a>(
&'a self,
_response: &'a RawCommandResponse,
_context: ExecutionContext<'a>,
) -> Result<Self::O> {
Ok(())
}

fn target(&self) -> super::super::OperationTarget {
super::super::OperationTarget::admin(&self.client)
}

#[cfg(feature = "opentelemetry")]
type Otel = crate::otel::Witness<Self>;
}

#[cfg(feature = "opentelemetry")]
impl crate::otel::OtelInfoDefaults for DropStreamProcessor {}
Loading