diff --git a/Cargo.lock b/Cargo.lock index 27592fc1181..7d9651b14e6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -409,7 +409,9 @@ dependencies = [ "azure_core_test", "azure_identity", "azure_messaging_eventhubs", + "clap", "criterion", + "dotenvy", "fe2o3-amqp", "futures", "rand 0.10.1", diff --git a/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml b/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml index 992466050d1..f2673fb9095 100644 --- a/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml +++ b/sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml @@ -37,7 +37,9 @@ azure_identity.workspace = true azure_messaging_eventhubs = { path = ".", features = [ "in_memory_checkpoint_store", ] } +clap.workspace = true criterion.workspace = true +dotenvy = "0.15.7" fe2o3-amqp = { workspace = true, features = ["tracing"] } tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] } @@ -50,5 +52,9 @@ default = ["azure_core_amqp/default"] name = "benchmarks" harness = false +[[test]] +name = "stress_tests" +harness = false + [lints] workspace = true diff --git a/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests.rs b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests.rs new file mode 100644 index 00000000000..af86774a48d --- /dev/null +++ b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests.rs @@ -0,0 +1,86 @@ +// Copyright (c) Microsoft Corporation. All Rights reserved +// Licensed under the MIT license. + +//! Main stress test runner for Azure Event Hubs +//! +//! This binary runs stress tests with a custom harness (harness = false). Each +//! stress test registers its own CLI options and runner so adding new tests +//! only requires updating the registry below. + +use clap::{ArgMatches, Command}; +use futures::future::BoxFuture; +use std::{error::Error, process}; + +// Load the stress tests modules from the stress_tests directory +#[path = "stress_tests/basic_publish_read_test.rs"] +mod basic_publish_read_test; +#[path = "stress_tests/continuous_send_receive_stress.rs"] +mod continuous_send_receive_stress; + +use basic_publish_read_test::basic_publish_read_spec; +use continuous_send_receive_stress::continuous_send_receive_spec; + +type StressTestFuture = BoxFuture<'static, Result<(), Box>>; + +#[derive(Clone)] +struct StressTestSpec { + name: &'static str, + description: &'static str, + configure: fn(Command) -> Command, + run: fn(ArgMatches) -> StressTestFuture, +} + +#[tokio::main] +async fn main() { + // Initialize tracing for test output + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let matches = build_cli().get_matches(); + + if let Some((subcommand, sub_matches)) = matches.subcommand() { + if let Some(spec) = registry().into_iter().find(|s| s.name == subcommand) { + if let Err(e) = (spec.run)(sub_matches.clone()).await { + eprintln!("{} FAILED: {}", spec.name, e); + process::exit(1); + } + return; + } + + eprintln!("Unknown test: {}", subcommand); + process::exit(1); + } + + // No subcommand: run all with default args + println!("Running all stress tests with default settings..."); + for spec in registry() { + let default_matches = (spec.configure)(Command::new(spec.name)) + .no_binary_name(true) + .get_matches_from(Vec::<&str>::new()); + + if let Err(e) = (spec.run)(default_matches).await { + eprintln!("{} FAILED: {}", spec.name, e); + process::exit(1); + } + } +} + +fn registry() -> Vec { + vec![basic_publish_read_spec(), continuous_send_receive_spec()] +} + +fn build_cli() -> Command { + let mut cmd = Command::new("stress_tests") + .about("Azure Event Hubs Stress Tests") + .long_about( + "This binary runs stress tests for Azure Event Hubs with configurable parameters.", + ); + + for spec in registry() { + let sub = (spec.configure)(Command::new(spec.name).about(spec.description)); + cmd = cmd.subcommand(sub); + } + + cmd +} diff --git a/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/README.md b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/README.md new file mode 100644 index 00000000000..320e5854e5e --- /dev/null +++ b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/README.md @@ -0,0 +1,76 @@ +# Event Hubs Stress Tests + +This folder contains the custom (non-harness) stress tests for Azure Event Hubs. Two scenarios ship today, each exposed as a clap subcommand: + +1. **basic_publish_read_test** – bounded publish/read throughput + validation. +2. **continuous_send_receive_stress** – long-running durability/consistency loop (72h default) inspired by the C# sample. + +## Overview + +Each test registers a `StressTestSpec` (name, description, `configure(Command)`, `run(ArgMatches)`). The entrypoint (`stress_tests.rs`, `harness = false`) builds the CLI from the registry and either: + +- Runs a specific subcommand if provided. +- Runs all registered tests sequentially with their default arguments when no subcommand is given. + +Both tests use `dotenvy` to load credentials and run under `tokio` with structured logging via `tracing_subscriber`. + +## Running the Test + +### Running (custom harness) + +These tests disable the standard test harness (`harness = false`) and rely on clap/dotenvy. Use the stress test binary: + +```bash +# Run all registered stress tests with their defaults +cargo test --test stress_tests + +# Run a specific subcommand +cargo test --test stress_tests basic_publish_read_test +cargo test --test stress_tests continuous_send_receive_stress + +# Pass custom args to a subcommand +cargo test --test stress_tests -- basic_publish_read_test --events 1000 --producers 2 --consumers 2 +cargo test --test stress_tests -- continuous_send_receive_stress --duration-hours 4 --min-batch 10 --max-batch 50 + +# More logs +RUST_LOG=info cargo test --test stress_tests + +# Discover options +cargo test --test stress_tests -- --help +cargo test --test stress_tests -- basic_publish_read_test --help +cargo test --test stress_tests -- continuous_send_receive_stress --help +``` + +#### Command Line Options + +##### **Basic Publish/Read** + +- `--events, -e` (default: 100): Number of events to publish and read +- `--producers, -p` (default: 1): Concurrent producer tasks +- `--consumers, -c` (default: 1): Concurrent consumer tasks +- `--timeout, -t` (default: 120s): End-to-end timeout +- `--event-size` (default: 512 bytes): Payload size +- `--batch-size` (default: 10): Events per batch + +##### **Continuous Send/Receive** + +- `--duration-hours` (default: 72): Duration of the run +- `--min-batch` (default: 20): Minimum batch size +- `--max-batch` (default: 100): Maximum batch size +- `--min-delay` (default: 1s): Minimum delay between batches +- `--max-delay` (default: 10s): Maximum delay between batches + +## Environment Variables + +The following environment variables are required (loaded via `dotenvy`, so a `.env` file works): + +- `EVENTHUBS_HOST`: Event Hubs namespace hostname (e.g., "my-namespace.servicebus.windows.net") +- `EVENTHUB_NAME`: Event Hub name +- `RUST_LOG`: Log level (optional, defaults to warnings) + +## Behavior & Metrics (high level) + +- **basic_publish_read_test**: Publishes a bounded set, tracks per-task publish counts, measures publish/consume durations and throughput, and validates end-to-end completion/count-based results (collects errors on timeouts or task join failures). It does not currently compare consumed events against the originally published payloads. +- **continuous_send_receive_stress**: Long-running loop that emits random batches across partitions, periodically logs heartbeat metrics (sent/received, missing, corrupted body/properties, producer/consumer failures), and emits a final summary including lost events. + +Both tests log via `tracing` and exit non-zero on failure to make CI-friendly verdicts. diff --git a/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/basic_publish_read_test.rs b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/basic_publish_read_test.rs new file mode 100644 index 00000000000..e6dabce8aab --- /dev/null +++ b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/basic_publish_read_test.rs @@ -0,0 +1,703 @@ +// Copyright (c) Microsoft Corporation. All Rights reserved +// Licensed under the MIT license. + +use super::StressTestSpec; +use azure_messaging_eventhubs::{ + models::{EventData, MessageId}, + ConsumerClient, EventDataBatchOptions, OpenReceiverOptions, ProducerClient, StartLocation, + StartPosition, +}; +use clap::{Arg, ArgMatches, Command}; +use futures::stream::StreamExt; +use std::{ + collections::HashMap, + error::Error, + sync::{Arc, Mutex}, + time::{Duration, Instant}, +}; +use tokio::time::timeout; +use tracing::{info, warn}; + +/// Configuration for the basic publish/read stress test +#[derive(Debug, Clone)] +pub struct TestConfig { + /// Number of events to publish and read + pub event_count: usize, + /// Number of concurrent producer tasks + pub producer_count: usize, + /// Number of concurrent consumer tasks + pub consumer_count: usize, + /// Timeout for the entire test + pub test_timeout: Duration, + /// Size of each event payload in bytes + pub event_size: usize, + /// Batch size for publishing events + pub batch_size: usize, +} + +impl Default for TestConfig { + fn default() -> Self { + Self { + event_count: 1000, + producer_count: 2, + consumer_count: 2, + test_timeout: Duration::from_secs(300), // 5 minutes + event_size: 1024, // 1KB + batch_size: 100, + } + } +} + +/// Metrics collected during the stress test +#[derive(Debug, Default)] +pub struct TestMetrics { + pub events_published: usize, + pub events_consumed: usize, + pub publish_duration: Duration, + pub consume_duration: Duration, + pub publish_throughput: f64, // events per second + pub consume_throughput: f64, // events per second + pub errors: Vec, +} + +pub async fn run_stress_test_inner( + host: String, + eventhub: String, + credential: Arc, + config: TestConfig, +) -> Result> { + // Wrap the entire stress test with the configured timeout + let result = timeout(config.test_timeout, async { + run_stress_test_inner_impl(host, eventhub, credential, config).await + }) + .await; + + match result { + Ok(metrics) => metrics, + Err(_) => Err("Stress test timed out".into()), + } +} + +pub fn basic_publish_read_spec() -> StressTestSpec { + StressTestSpec { + name: "basic_publish_read_test", + description: "Bounded publish/read throughput validation", + configure: |cmd: Command| { + cmd.arg( + Arg::new("events") + .long("events") + .short('e') + .help("Number of events to publish and read") + .value_parser(clap::value_parser!(u32)) + .default_value("100"), + ) + .arg( + Arg::new("producers") + .long("producers") + .short('p') + .help("Number of concurrent producer tasks") + .value_parser(clap::value_parser!(u32)) + .default_value("1"), + ) + .arg( + Arg::new("consumers") + .long("consumers") + .short('c') + .help("Number of concurrent consumer tasks") + .value_parser(clap::value_parser!(u32)) + .default_value("1"), + ) + .arg( + Arg::new("timeout") + .long("timeout") + .short('t') + .help("Timeout for the entire test in seconds") + .value_parser(clap::value_parser!(u64)) + .default_value("120"), + ) + .arg( + Arg::new("event-size") + .long("event-size") + .help("Size of each event payload in bytes") + .value_parser(clap::value_parser!(u32)) + .default_value("512"), + ) + .arg( + Arg::new("batch-size") + .long("batch-size") + .help("Batch size for publishing events") + .value_parser(clap::value_parser!(u32)) + .default_value("10"), + ) + }, + run: |matches: ArgMatches| Box::pin(async move { run_standalone(matches).await }), + } +} + +async fn run_standalone(matches: ArgMatches) -> Result<(), Box> { + use azure_identity::{DeveloperToolsCredential, DeveloperToolsCredentialOptions}; + use dotenvy::{dotenv, var}; + + dotenv().ok(); + + let host = + var("EVENTHUBS_HOST").map_err(|_| "EVENTHUBS_HOST environment variable is required")?; + let eventhub = + var("EVENTHUB_NAME").map_err(|_| "EVENTHUB_NAME environment variable is required")?; + + let credential: Arc = + DeveloperToolsCredential::new(Some(DeveloperToolsCredentialOptions::default()))?; + + let config = TestConfig { + event_count: *matches.get_one::("events").unwrap_or(&100) as usize, + producer_count: *matches.get_one::("producers").unwrap_or(&1) as usize, + consumer_count: *matches.get_one::("consumers").unwrap_or(&1) as usize, + test_timeout: std::time::Duration::from_secs( + *matches.get_one::("timeout").unwrap_or(&120), + ), + event_size: *matches.get_one::("event-size").unwrap_or(&512) as usize, + batch_size: *matches.get_one::("batch-size").unwrap_or(&10) as usize, + }; + + info!("Running basic publish/read with config: {:?}", config); + + let metrics = run_stress_test_inner(host, eventhub, credential, config).await?; + + log_test_results(&metrics); + validate_test_results(&metrics)?; + + Ok(()) +} + +async fn run_stress_test_inner_impl( + host: String, + eventhub: String, + credential: Arc, + config: TestConfig, +) -> Result> { + let mut metrics = TestMetrics::default(); + + // Create a unique test run identifier + let test_id = format!( + "stress-{}", + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + ); + info!("Test ID: {}", test_id); + + // Track published events for validation + let published_events = Arc::new(Mutex::new(HashMap::::new())); + let consumed_events = Arc::new(Mutex::new(HashMap::::new())); + + // Start consumers first + let consume_start = Instant::now(); + let consumer_handles = start_consumers( + host.clone(), + eventhub.clone(), + credential.clone(), + &config, + consumed_events.clone(), + ) + .await?; + + // Wait a moment for consumers to be ready + tokio::time::sleep(Duration::from_secs(2)).await; + + // Start producers + let publish_start = Instant::now(); + let producer_handles = start_producers( + host, + eventhub, + credential, + &config, + test_id, + published_events.clone(), + ) + .await?; + + // Wait for all producers to complete + for handle in producer_handles { + match handle.await { + Ok(Ok(events_published)) => { + metrics.events_published += events_published; + } + Ok(Err(e)) => { + warn!("Producer error: {:?}", e); + metrics.errors.push(format!("Producer error: {:?}", e)); + } + Err(e) => { + warn!("Producer join error: {:?}", e); + metrics.errors.push(format!("Producer join error: {:?}", e)); + } + } + } + metrics.publish_duration = publish_start.elapsed(); + + info!( + "Published {} events in {:?}", + metrics.events_published, metrics.publish_duration + ); + + // Wait for consumers to finish (with timeout) + let consume_timeout = Duration::from_secs(60); + let consume_result = timeout(consume_timeout, async { + for handle in consumer_handles { + match handle.await { + Ok(Ok(events_consumed)) => { + metrics.events_consumed += events_consumed; + } + Ok(Err(e)) => { + warn!("Consumer error: {:?}", e); + metrics.errors.push(format!("Consumer error: {:?}", e)); + } + Err(e) => { + warn!("Consumer join error: {:?}", e); + metrics.errors.push(format!("Consumer join error: {:?}", e)); + } + } + } + Ok::<(), Box>(()) + }) + .await; + + match consume_result { + Ok(_) => { + metrics.consume_duration = consume_start.elapsed(); + } + Err(_) => { + warn!("Consumer timeout - some events may not have been consumed"); + metrics.consume_duration = consume_timeout; + // Get current count + let consumed = consumed_events.lock().unwrap(); + metrics.events_consumed = consumed.len(); + } + } + + // Calculate throughput + if metrics.publish_duration.as_secs_f64() > 0.0 { + metrics.publish_throughput = + metrics.events_published as f64 / metrics.publish_duration.as_secs_f64(); + } + if metrics.consume_duration.as_secs_f64() > 0.0 { + metrics.consume_throughput = + metrics.events_consumed as f64 / metrics.consume_duration.as_secs_f64(); + } + + Ok(metrics) +} + +async fn start_producers( + host: String, + eventhub: String, + credential: Arc, + config: &TestConfig, + test_id: String, + published_events: Arc>>, +) -> Result< + Vec>>>, + Box, +> { + let base_events_per_producer = config.event_count / config.producer_count; + let remainder = config.event_count % config.producer_count; + let mut handles = Vec::new(); + + for producer_id in 0..config.producer_count { + let host = host.clone(); + let eventhub = eventhub.clone(); + let credential = credential.clone(); + let config = config.clone(); + let test_id = test_id.clone(); + let published_events = published_events.clone(); + + // Distribute remainder across first producers + let events_for_this_producer = + base_events_per_producer + if producer_id < remainder { 1 } else { 0 }; + + let handle = tokio::spawn(async move { + let ctx = ProducerContext { + host, + eventhub, + credential, + }; + run_producer_task( + ctx, + producer_id, + events_for_this_producer, + &config, + test_id, + published_events, + ) + .await + }); + handles.push(handle); + } + + Ok(handles) +} + +struct ProducerContext { + host: String, + eventhub: String, + credential: Arc, +} + +async fn run_producer_task( + ctx: ProducerContext, + producer_id: usize, + event_count: usize, + config: &TestConfig, + test_id: String, + published_events: Arc>>, +) -> Result> { + let producer = ProducerClient::builder() + .with_application_id(format!("stress-producer-{}", producer_id)) + .open(ctx.host.as_str(), ctx.eventhub.as_str(), ctx.credential) + .await + .map_err(|e| Box::new(e) as Box)?; + + let mut events_published = 0; + let payload = "x".repeat(config.event_size); + + // Send events in batches + let mut batch_start = 0; + while batch_start < event_count { + let batch_end = std::cmp::min(batch_start + config.batch_size, event_count); + + let mut batch = producer + .create_batch(Some(EventDataBatchOptions::default())) + .await + .map_err(|e| Box::new(e) as Box)?; + + for i in batch_start..batch_end { + let event_id = format!("{}-producer-{}-event-{}", test_id, producer_id, i); + let event_data = EventData::builder() + .with_body(payload.as_bytes()) + .with_message_id(i as u64) + .add_property("event_id".to_string(), event_id.clone()) + .build(); + + // Track the event + { + let mut published = published_events.lock().unwrap(); + published.insert(event_id.clone(), payload.clone()); + } + + if !batch + .try_add_event_data(event_data, None) + .map_err(|e| Box::new(e) as Box)? + { + // Batch is full, send it and start a new one + if !batch.is_empty() { + let batch_len = batch.len(); + producer + .send_batch(batch, None) + .await + .map_err(|e| Box::new(e) as Box)?; + events_published += batch_len; + } + + // Create a new batch and add the current event + batch = producer + .create_batch(Some(EventDataBatchOptions::default())) + .await + .map_err(|e| Box::new(e) as Box)?; + let event_data = EventData::builder() + .with_body(payload.as_bytes()) + .with_message_id(i as u64) + .add_property("event_id".to_string(), event_id) + .build(); + batch + .try_add_event_data(event_data, None) + .map_err(|e| Box::new(e) as Box)?; + } + } + + // Send the final batch if it has events + if !batch.is_empty() { + let batch_len = batch.len(); + producer + .send_batch(batch, None) + .await + .map_err(|e| Box::new(e) as Box)?; + events_published += batch_len; + } + + batch_start = batch_end; + + // Small delay between batches to avoid overwhelming + tokio::time::sleep(Duration::from_millis(10)).await; + } + + info!( + "Producer {} published {} events", + producer_id, events_published + ); + Ok(events_published) +} + +async fn start_consumers( + host: String, + eventhub: String, + credential: Arc, + config: &TestConfig, + consumed_events: Arc>>, +) -> Result< + Vec>>>, + Box, +> { + // Discover partitions up-front so we can spawn one task per partition. + // Each partition gets its own dedicated consumer client and polling loop, + // which avoids the stream-multiplexing issues that come with select_all. + let discovery_client = ConsumerClient::builder() + .with_application_id("stress-consumer-discovery".to_string()) + .open(host.as_str(), eventhub.clone(), credential.clone()) + .await + .map_err(|e| Box::new(e) as Box)?; + + let eventhub_properties = discovery_client + .get_eventhub_properties() + .await + .map_err(|e| Box::new(e) as Box)?; + + let partition_ids = eventhub_properties.partition_ids.clone(); + if partition_ids.is_empty() { + return Err(Box::new(std::io::Error::other( + "Event Hub has no partitions", + ))); + } + + info!( + "Discovered {} partitions: {:?} (consumer_count hint: {})", + partition_ids.len(), + partition_ids, + config.consumer_count, + ); + + let mut handles = Vec::new(); + let total_expected_events = config.event_count; + + // Spawn one consumer task per partition for full coverage + for (idx, partition_id) in partition_ids.into_iter().enumerate() { + let host = host.clone(); + let eventhub = eventhub.clone(); + let credential = credential.clone(); + let consumed_events = consumed_events.clone(); + + let handle = tokio::spawn(async move { + run_consumer_task( + host, + eventhub, + credential, + idx, + partition_id, + total_expected_events, + consumed_events, + ) + .await + }); + handles.push(handle); + } + + Ok(handles) +} + +async fn run_consumer_task( + host: String, + eventhub: String, + credential: Arc, + consumer_id: usize, + partition_id: String, + total_expected_events: usize, + consumed_events: Arc>>, +) -> Result> { + let consumer = ConsumerClient::builder() + .with_application_id(format!("stress-consumer-{}", consumer_id)) + .open(host.as_str(), eventhub, credential) + .await + .map_err(|e| Box::new(e) as Box)?; + + let receiver = consumer + .open_receiver_on_partition( + partition_id.clone(), + Some(OpenReceiverOptions { + start_position: Some(StartPosition { + location: StartLocation::Latest, + ..Default::default() + }), + ..Default::default() + }), + ) + .await + .map_err(|e| Box::new(e) as Box)?; + + info!( + "Consumer {} listening on partition {}", + consumer_id, partition_id + ); + + let mut receive_stream = receiver.stream_events(); + let mut events_consumed = 0; + let start_time = Instant::now(); + let timeout_duration = Duration::from_secs(120); + let mut consecutive_timeouts = 0; + let max_consecutive_timeouts = 6; + + while start_time.elapsed() < timeout_duration { + // Check shared consumed count to see if all expected events have been received across all consumers + { + let consumed = consumed_events.lock().unwrap(); + if consumed.len() >= total_expected_events { + break; + } + } + + match timeout(Duration::from_secs(10), receive_stream.next()).await { + Ok(Some(Ok(partition_event))) => { + consecutive_timeouts = 0; + let event_data = partition_event.event_data(); + + // Try to get event_id from properties first + let event_id = if let Some(properties) = event_data.properties() { + if let Some(id) = properties.get("event_id") { + match id { + azure_core_amqp::AmqpSimpleValue::String(s) => s.clone(), + azure_core_amqp::AmqpSimpleValue::ULong(u) => u.to_string(), + azure_core_amqp::AmqpSimpleValue::Long(l) => l.to_string(), + azure_core_amqp::AmqpSimpleValue::Int(i) => i.to_string(), + azure_core_amqp::AmqpSimpleValue::UInt(ui) => ui.to_string(), + _ => format!("event-{}", events_consumed), + } + } else { + format!("event-{}", events_consumed) + } + } else if let Some(message_id) = event_data.message_id() { + match message_id { + MessageId::String(s) => s.clone(), + MessageId::Uuid(u) => u.to_string(), + MessageId::Ulong(u) => u.to_string(), + MessageId::Binary(b) => String::from_utf8_lossy(b).to_string(), + } + } else { + format!("event-{}", events_consumed) + }; + + let body = String::from_utf8_lossy(event_data.body().unwrap_or(&[])); + + // Track the consumed event + { + let mut consumed = consumed_events.lock().unwrap(); + consumed.insert(event_id.clone(), body.to_string()); + } + + events_consumed += 1; + + if events_consumed % 100 == 0 { + info!( + "Consumer {} consumed {} events", + consumer_id, events_consumed + ); + } + } + Ok(Some(Err(e))) => { + warn!("Consumer {} error receiving event: {:?}", consumer_id, e); + } + Ok(None) => { + info!("Consumer {} stream ended", consumer_id); + break; + } + Err(_) => { + consecutive_timeouts += 1; + info!( + "Consumer {} timeout waiting for events ({}/{})", + consumer_id, consecutive_timeouts, max_consecutive_timeouts + ); + if consecutive_timeouts >= max_consecutive_timeouts { + info!( + "Consumer {} exiting after {} consecutive timeouts", + consumer_id, consecutive_timeouts + ); + break; + } + } + } + } + + info!( + "Consumer {} finished with {} events consumed", + consumer_id, events_consumed + ); + Ok(events_consumed) +} + +pub fn log_test_results(metrics: &TestMetrics) { + info!("=== Basic Publish/Read Stress Test Results ==="); + info!("Events Published: {}", metrics.events_published); + info!("Events Consumed: {}", metrics.events_consumed); + info!("Publish Duration: {:?}", metrics.publish_duration); + info!("Consume Duration: {:?}", metrics.consume_duration); + info!( + "Publish Throughput: {:.2} events/sec", + metrics.publish_throughput + ); + info!( + "Consume Throughput: {:.2} events/sec", + metrics.consume_throughput + ); + + if !metrics.errors.is_empty() { + warn!("Errors encountered: {:?}", metrics.errors); + } +} + +pub fn validate_test_results(metrics: &TestMetrics) -> Result<(), Box> { + // Basic validation - we should have published some events + if metrics.events_published == 0 { + return Err("No events were published".into()); + } + + // We should have consumed at least some events (allowing for some loss in stress scenarios) + let min_expected_consumed = metrics.events_published / 2; // Allow 50% loss for stress test + if metrics.events_consumed < min_expected_consumed { + return Err(format!( + "Too few events consumed: {} < {} (minimum expected)", + metrics.events_consumed, min_expected_consumed + ) + .into()); + } + + // Throughput should be reasonable (at least 1 event per second) + if metrics.publish_throughput < 1.0 { + return Err(format!( + "Publish throughput too low: {:.2} events/sec", + metrics.publish_throughput + ) + .into()); + } + + info!("Stress test validation passed"); + Ok(()) +} + +#[cfg(test)] +mod tests { + #[test] + fn test_config_default() { + use super::TestConfig; + let config = TestConfig::default(); + assert_eq!(config.event_count, 1000); + assert_eq!(config.producer_count, 2); + assert_eq!(config.consumer_count, 2); + } + + #[test] + fn test_metrics_default() { + use super::TestMetrics; + let metrics = TestMetrics::default(); + assert_eq!(metrics.events_published, 0); + assert_eq!(metrics.events_consumed, 0); + assert_eq!(metrics.publish_throughput, 0.0); + assert_eq!(metrics.consume_throughput, 0.0); + } +} diff --git a/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/continuous_send_receive_stress.rs b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/continuous_send_receive_stress.rs new file mode 100644 index 00000000000..2024ac6485e --- /dev/null +++ b/sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests/continuous_send_receive_stress.rs @@ -0,0 +1,517 @@ +// Copyright (c) Microsoft Corporation. All Rights reserved +// Licensed under the MIT license. + +// cspell:ignore rngs Seedable + +use super::StressTestSpec; +use azure_core_amqp::AmqpSimpleValue; +use azure_messaging_eventhubs::{ + models::EventData, ConsumerClient, EventDataBatchOptions, OpenReceiverOptions, ProducerClient, + StartLocation, StartPosition, +}; +use clap::{Arg, ArgMatches, Command}; +use futures::StreamExt; +use rand::{rngs::StdRng, RngExt, SeedableRng}; +use std::{ + collections::HashMap, + error::Error, + sync::Arc, + time::{Duration, Instant}, +}; +use tokio::sync::Mutex; +use tracing::{info, warn}; + +#[derive(Debug, Clone)] +pub struct ContinuousStressConfig { + pub duration: Duration, + pub min_batch_size: usize, + pub max_batch_size: usize, + pub min_delay_secs: u64, + pub max_delay_secs: u64, +} + +impl Default for ContinuousStressConfig { + fn default() -> Self { + Self { + duration: Duration::from_secs(72 * 60 * 60), + min_batch_size: 20, + max_batch_size: 100, + min_delay_secs: 1, + max_delay_secs: 10, + } + } +} + +#[derive(Debug, Default, Clone)] +pub struct ContinuousStressMetrics { + pub batches_sent: u64, + pub events_sent: u64, + pub events_received: u64, + pub corrupted_body_failures: u64, + pub corrupted_properties_failures: u64, + pub producer_failures: u64, + pub consumer_failures: u64, + pub lost_events: usize, +} + +#[derive(Debug, Clone)] +struct ExpectedEvent { + batch_index: u64, + batch_size: u32, + index: u32, +} + +struct SharedState { + missing_events: Mutex>, + last_sequence: Mutex>, + metrics: Mutex, + start_instant: Instant, +} + +impl SharedState { + fn new() -> Self { + Self { + missing_events: Mutex::new(HashMap::new()), + last_sequence: Mutex::new(HashMap::new()), + metrics: Mutex::new(ContinuousStressMetrics::default()), + start_instant: Instant::now(), + } + } +} + +/// Run a continuous send/receive stress test similar to the C# sample. +pub async fn run_continuous_stress( + host: String, + eventhub: String, + credential: Arc, + config: ContinuousStressConfig, +) -> Result> { + let state = Arc::new(SharedState::new()); + + let producer = ProducerClient::builder() + .with_application_id("continuous-stress-producer".to_string()) + .open(host.as_str(), eventhub.as_str(), credential.clone()) + .await?; + let producer = Arc::new(producer); + + let partition_ids = producer.get_eventhub_properties().await?.partition_ids; + + let end_instant = Instant::now() + config.duration; + let sender = tokio::spawn(send_loop( + producer.clone(), + state.clone(), + config.clone(), + end_instant, + )); + + let mut receiver_handles = Vec::new(); + for partition_id in partition_ids { + let state = state.clone(); + let host = host.clone(); + let eventhub = eventhub.clone(); + let credential = credential.clone(); + + receiver_handles.push(tokio::spawn(async move { + receive_loop(host, eventhub, credential, partition_id, state, end_instant).await + })); + } + + let reporter = { + let reporter_state = state.clone(); + tokio::spawn(async move { + periodic_report(reporter_state, end_instant).await; + }) + }; + + let sender_result = sender + .await + .unwrap_or_else(|e| Err(Box::new(e) as Box)); + + let mut receiver_results = Vec::new(); + for handle in receiver_handles { + receiver_results.push( + handle + .await + .unwrap_or_else(|e| Err(Box::new(e) as Box)), + ); + } + + reporter.await.ok(); + + if let Err(e) = sender_result { + let mut metrics = state.metrics.lock().await; + metrics.producer_failures += 1; + warn!("Producer failure: {:?}", e); + } + + for result in receiver_results { + if let Err(e) = result { + let mut metrics = state.metrics.lock().await; + metrics.consumer_failures += 1; + warn!("Consumer failure: {:?}", e); + } + } + + finalize_metrics(state).await +} + +pub fn continuous_send_receive_spec() -> StressTestSpec { + StressTestSpec { + name: "continuous_send_receive_stress", + description: "Long-running send/receive durability loop", + configure: |cmd: Command| { + cmd.arg( + Arg::new("duration-hours") + .long("duration-hours") + .help("Duration of the continuous stress test in hours") + .value_parser(clap::value_parser!(u64)) + .default_value("72"), + ) + .arg( + Arg::new("min-batch") + .long("min-batch") + .help("Minimum batch size for continuous stress test") + .value_parser(clap::value_parser!(u32)) + .default_value("20"), + ) + .arg( + Arg::new("max-batch") + .long("max-batch") + .help("Maximum batch size for continuous stress test") + .value_parser(clap::value_parser!(u32)) + .default_value("100"), + ) + .arg( + Arg::new("min-delay") + .long("min-delay") + .help("Minimum delay between batches (seconds) for continuous stress test") + .value_parser(clap::value_parser!(u64)) + .default_value("1"), + ) + .arg( + Arg::new("max-delay") + .long("max-delay") + .help("Maximum delay between batches (seconds) for continuous stress test") + .value_parser(clap::value_parser!(u64)) + .default_value("10"), + ) + }, + run: |matches: ArgMatches| Box::pin(async move { run_standalone(matches).await }), + } +} + +async fn run_standalone(matches: ArgMatches) -> Result<(), Box> { + use azure_identity::{DeveloperToolsCredential, DeveloperToolsCredentialOptions}; + use dotenvy::{dotenv, var}; + + dotenv().ok(); + + let host = + var("EVENTHUBS_HOST").map_err(|_| "EVENTHUBS_HOST environment variable is required")?; + let eventhub = + var("EVENTHUB_NAME").map_err(|_| "EVENTHUB_NAME environment variable is required")?; + + let credential: Arc = + DeveloperToolsCredential::new(Some(DeveloperToolsCredentialOptions::default()))?; + + let duration_hours = *matches.get_one::("duration-hours").unwrap_or(&72); + let min_batch_size = *matches.get_one::("min-batch").unwrap_or(&20) as usize; + let max_batch_size = *matches.get_one::("max-batch").unwrap_or(&100) as usize; + let min_delay = *matches.get_one::("min-delay").unwrap_or(&1); + let max_delay = *matches.get_one::("max-delay").unwrap_or(&10); + + let config = ContinuousStressConfig { + duration: Duration::from_secs(duration_hours * 60 * 60), + min_batch_size: min_batch_size.min(max_batch_size), + max_batch_size: max_batch_size.max(min_batch_size), + min_delay_secs: min_delay.min(max_delay), + max_delay_secs: max_delay.max(min_delay), + }; + + info!("Running continuous stress with config: {:?}", config); + + let metrics = run_continuous_stress(host, eventhub, credential, config).await?; + + info!( + "Continuous results: batches={}, sent={}, received={}, lost={}, corrupted_body={}, corrupted_props={}, producer_failures={}, consumer_failures={}", + metrics.batches_sent, + metrics.events_sent, + metrics.events_received, + metrics.lost_events, + metrics.corrupted_body_failures, + metrics.corrupted_properties_failures, + metrics.producer_failures, + metrics.consumer_failures + ); + + Ok(()) +} + +async fn finalize_metrics( + state: Arc, +) -> Result> { + let missing = state.missing_events.lock().await; + let mut metrics = state.metrics.lock().await; + metrics.lost_events = missing.len(); + Ok(metrics.clone()) +} + +async fn send_loop( + producer: Arc, + state: Arc, + config: ContinuousStressConfig, + end_instant: Instant, +) -> Result<(), Box> { + let seed: [u8; 32] = rand::random(); + let mut rng = StdRng::from_seed(seed); + let mut batch_index: u64 = 0; + + while Instant::now() < end_instant { + let batch_size = rng.random_range(config.min_batch_size..=config.max_batch_size); + let mut batch = producer + .create_batch(Some(EventDataBatchOptions::default())) + .await?; + + for event_index in 0..batch_size { + let key = format!( + "evt-{}-{}-{}", + batch_index, + event_index, + rng.random::() + ); + let expected = ExpectedEvent { + batch_index, + batch_size: batch_size as u32, + index: event_index as u32, + }; + + let event = EventData::builder() + .with_body(key.as_bytes()) + .add_property("batch_index".to_string(), expected.batch_index) + .add_property("batch_size".to_string(), expected.batch_size) + .add_property("index".to_string(), expected.index) + .with_message_id(key.clone()) + .build(); + + match batch.try_add_event_data(event, None)? { + true => { + // Event was successfully added, record it as expected + let mut missing = state.missing_events.lock().await; + missing.insert(key, expected); + } + false => { + // Batch is full, send it and create a new one + producer.send_batch(batch, None).await?; + + { + let mut metrics = state.metrics.lock().await; + metrics.batches_sent += 1; + } + + batch = producer + .create_batch(Some(EventDataBatchOptions::default())) + .await?; + + // Add the current event to the new batch + let event = EventData::builder() + .with_body(key.as_bytes()) + .add_property("batch_index".to_string(), expected.batch_index) + .add_property("batch_size".to_string(), expected.batch_size) + .add_property("index".to_string(), expected.index) + .with_message_id(key.clone()) + .build(); + + batch.try_add_event_data(event, None)?; + + // Record it as expected now that it's in a batch + let mut missing = state.missing_events.lock().await; + missing.insert(key, expected); + } + } + } + + producer.send_batch(batch, None).await?; + + { + let mut metrics = state.metrics.lock().await; + metrics.batches_sent += 1; + metrics.events_sent += batch_size as u64; + + // Light heartbeat so we know the sender is making progress without overwhelming logs. + if metrics.batches_sent % 10 == 0 { + info!( + "Sender progress: batches sent={}, events sent={} (last batch size={})", + metrics.batches_sent, metrics.events_sent, batch_size + ); + } + } + + batch_index += 1; + + let delay = rng.random_range(config.min_delay_secs..=config.max_delay_secs); + tokio::time::sleep(Duration::from_secs(delay)).await; + } + + Ok(()) +} + +async fn receive_loop( + host: String, + eventhub: String, + credential: Arc, + partition_id: String, + state: Arc, + end_instant: Instant, +) -> Result<(), Box> { + let consumer = ConsumerClient::builder() + .with_application_id(format!("continuous-stress-consumer-{}", &partition_id)) + .open(host.as_str(), eventhub, credential) + .await?; + + let start_position = { + let last = state.last_sequence.lock().await; + if let Some(seq) = last.get(&partition_id) { + StartPosition { + location: StartLocation::SequenceNumber(*seq), + ..Default::default() + } + } else { + StartPosition { + location: StartLocation::Latest, + ..Default::default() + } + } + }; + + let receiver = consumer + .open_receiver_on_partition( + partition_id.clone(), + Some(OpenReceiverOptions { + start_position: Some(start_position), + ..Default::default() + }), + ) + .await?; + + let mut stream = receiver.stream_events(); + + while Instant::now() < end_instant { + match tokio::time::timeout(Duration::from_secs(10), stream.next()).await { + Ok(Some(Ok(partition_event))) => { + let event_data = partition_event.event_data(); + let body = String::from_utf8_lossy(event_data.body().unwrap_or(&[])).to_string(); + + if let Some(expected) = { + let mut missing = state.missing_events.lock().await; + missing.remove(&body) + } { + if !properties_match(event_data, &expected) { + let mut metrics = state.metrics.lock().await; + metrics.corrupted_properties_failures += 1; + warn!( + "Partition {} received event with mismatched properties", + partition_id + ); + } else { + let mut metrics = state.metrics.lock().await; + metrics.events_received += 1; + + // Light heartbeat to show consumer activity. + if metrics.events_received % 500 == 0 { + info!( + "Consumer {} progress: events received={}, pending={}", + partition_id, + metrics.events_received, + state.missing_events.lock().await.len() + ); + } + } + } else { + let mut metrics = state.metrics.lock().await; + metrics.corrupted_body_failures += 1; + warn!("Partition {} received unknown event body", partition_id); + } + + if let Some(sequence) = partition_event.sequence_number() { + let mut last = state.last_sequence.lock().await; + last.insert(partition_id.clone(), sequence); + } + } + Ok(Some(Err(err))) => { + let mut metrics = state.metrics.lock().await; + metrics.consumer_failures += 1; + warn!("Consumer for partition {} error: {:?}", partition_id, err); + } + Ok(None) => { + break; + } + Err(_) => { + // Timeout waiting for events; continue until end_instant + } + } + } + + Ok(()) +} + +fn properties_match(event_data: &EventData, expected: &ExpectedEvent) -> bool { + let properties = match event_data.properties() { + Some(props) => props, + None => return false, + }; + + let batch_index = extract_u64(properties.get("batch_index")); + let batch_size = extract_u64(properties.get("batch_size")); + let index = extract_u64(properties.get("index")); + + batch_index == Some(expected.batch_index) + && batch_size == Some(expected.batch_size as u64) + && index == Some(expected.index as u64) +} + +fn extract_u64(value: Option<&AmqpSimpleValue>) -> Option { + match value? { + AmqpSimpleValue::UByte(v) => Some(*v as u64), + AmqpSimpleValue::UShort(v) => Some(*v as u64), + AmqpSimpleValue::UInt(v) => Some(*v as u64), + AmqpSimpleValue::ULong(v) => Some(*v), + AmqpSimpleValue::Byte(v) => (*v).try_into().ok(), + AmqpSimpleValue::Short(v) => (*v).try_into().ok(), + AmqpSimpleValue::Int(v) => (*v).try_into().ok(), + AmqpSimpleValue::Long(v) => (*v).try_into().ok(), + AmqpSimpleValue::Boolean(v) => Some(*v as u64), + AmqpSimpleValue::Float(v) => Some((*v as f64).round() as u64), + AmqpSimpleValue::Double(v) => Some((*v).round() as u64), + AmqpSimpleValue::Char(v) => Some(*v as u64), + _ => None, + } +} + +async fn periodic_report(state: Arc, end_instant: Instant) { + let mut last_print = Instant::now(); + + while Instant::now() < end_instant { + if last_print.elapsed() >= Duration::from_secs(30) { + let metrics = state.metrics.lock().await.clone(); + let missing = state.missing_events.lock().await.len(); + let elapsed = Instant::now().saturating_duration_since(state.start_instant); + + info!( + "Elapsed: {:?}, batches sent: {}, events sent: {}, received: {}, lost (pending): {}, corrupted body: {}, corrupted props: {}, producer failures: {}, consumer failures: {}", + elapsed, + metrics.batches_sent, + metrics.events_sent, + metrics.events_received, + missing, + metrics.corrupted_body_failures, + metrics.corrupted_properties_failures, + metrics.producer_failures, + metrics.consumer_failures + ); + + last_print = Instant::now(); + } + + tokio::time::sleep(Duration::from_secs(5)).await; + } +}