Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions sdk/eventhubs/azure_messaging_eventhubs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ azure_identity.workspace = true
azure_messaging_eventhubs = { path = ".", features = [
"in_memory_checkpoint_store",
] }
clap.workspace = true
criterion.workspace = true
dotenvy = "0.15.7"
fe2o3-amqp = { workspace = true, features = ["tracing"] }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"] }
Comment on lines 37 to 45
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change adds clap/dotenvy dev-dependencies for the new stress-test CLI, but the crate is still missing a [[test]] section to declare the stress_tests integration test as harness = false (and ideally required-features = [...] to prevent it from running by default). Without that, the new tests/stress_tests.rs (which defines main) won’t compile as an integration test, and/or will run unexpectedly in CI.

Copilot uses AI. Check for mistakes.
Expand All @@ -50,5 +52,9 @@ default = ["azure_core_amqp/default"]
name = "benchmarks"
harness = false

[[test]]
name = "stress_tests"
harness = false

[lints]
workspace = true
86 changes: 86 additions & 0 deletions sdk/eventhubs/azure_messaging_eventhubs/tests/stress_tests.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
// Copyright (c) Microsoft Corporation. All Rights reserved
// Licensed under the MIT license.

//! Main stress test runner for Azure Event Hubs
//!
//! This binary runs stress tests with a custom harness (harness = false). Each
//! stress test registers its own CLI options and runner so adding new tests
//! only requires updating the registry below.

use clap::{ArgMatches, Command};
use futures::future::BoxFuture;
use std::{error::Error, process};

// Load the stress tests modules from the stress_tests directory
#[path = "stress_tests/basic_publish_read_test.rs"]
mod basic_publish_read_test;
#[path = "stress_tests/continuous_send_receive_stress.rs"]
mod continuous_send_receive_stress;

use basic_publish_read_test::basic_publish_read_spec;
use continuous_send_receive_stress::continuous_send_receive_spec;

type StressTestFuture = BoxFuture<'static, Result<(), Box<dyn Error + Send + Sync>>>;

#[derive(Clone)]
struct StressTestSpec {
name: &'static str,
description: &'static str,
configure: fn(Command) -> Command,
run: fn(ArgMatches) -> StressTestFuture,
}

#[tokio::main]
async fn main() {
// Initialize tracing for test output
tracing_subscriber::fmt()
Comment on lines +33 to +36
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/stress_tests.rs defines its own main and is intended to be a custom-harness test binary, but there is no [[test]] entry in this crate’s Cargo.toml with harness = false for the stress_tests target. As-is, this integration test will fail to compile due to the default libtest harness generating its own main. Add a [[test]] name = "stress_tests" section with harness = false (and keep the file as the entrypoint).

Copilot uses AI. Check for mistakes.
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();

let matches = build_cli().get_matches();

if let Some((subcommand, sub_matches)) = matches.subcommand() {
if let Some(spec) = registry().into_iter().find(|s| s.name == subcommand) {
if let Err(e) = (spec.run)(sub_matches.clone()).await {
eprintln!("{} FAILED: {}", spec.name, e);
process::exit(1);
}
return;
}

eprintln!("Unknown test: {}", subcommand);
process::exit(1);
}

// No subcommand: run all with default args
println!("Running all stress tests with default settings...");
for spec in registry() {
let default_matches = (spec.configure)(Command::new(spec.name))
.no_binary_name(true)
.get_matches_from(Vec::<&str>::new());

if let Err(e) = (spec.run)(default_matches).await {
eprintln!("{} FAILED: {}", spec.name, e);
process::exit(1);
}
}
Comment on lines +55 to +66
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even with a harness = false test target, cargo test will execute this binary by default, but it requires live env vars and (by default) runs a 72h scenario. This is very likely to break local cargo test runs and CI. Consider gating the test target behind required-features (e.g., required-features = ["stress-tests"]) and adding a non-default feature, or moving this CLI into examples/ or a separate crate/binary so it never runs as part of normal tests.

Suggested change
// No subcommand: run all with default args
println!("Running all stress tests with default settings...");
for spec in registry() {
let default_matches = (spec.configure)(Command::new(spec.name))
.no_binary_name(true)
.get_matches_from(Vec::<&str>::new());
if let Err(e) = (spec.run)(default_matches).await {
eprintln!("{} FAILED: {}", spec.name, e);
process::exit(1);
}
}
// No subcommand: do not run stress tests implicitly.
let mut cmd = build_cli();
let _ = cmd.print_help();
eprintln!();
eprintln!();
eprintln!(
"No stress test selected. Specify a subcommand to run a stress test explicitly."
);

Copilot uses AI. Check for mistakes.
Comment on lines +55 to +66
Copy link

Copilot AI Apr 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no subcommand is provided, main runs all stress tests with default args. With continuous_send_receive_stress defaulting to 72 hours, cargo test --test stress_tests can unexpectedly run for days. Consider requiring an explicit subcommand (or using a much shorter default duration when invoked via the “run all” path) to avoid accidental long-running executions.

Copilot uses AI. Check for mistakes.
}

fn registry() -> Vec<StressTestSpec> {
vec![basic_publish_read_spec(), continuous_send_receive_spec()]
}

fn build_cli() -> Command {
let mut cmd = Command::new("stress_tests")
.about("Azure Event Hubs Stress Tests")
.long_about(
"This binary runs stress tests for Azure Event Hubs with configurable parameters.",
);

for spec in registry() {
let sub = (spec.configure)(Command::new(spec.name).about(spec.description));
cmd = cmd.subcommand(sub);
}

cmd
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Event Hubs Stress Tests

This folder contains the custom (non-harness) stress tests for Azure Event Hubs. Two scenarios ship today, each exposed as a clap subcommand:

1. **basic_publish_read_test** – bounded publish/read throughput + validation.
2. **continuous_send_receive_stress** – long-running durability/consistency loop (72h default) inspired by the C# sample.

## Overview

Each test registers a `StressTestSpec` (name, description, `configure(Command)`, `run(ArgMatches)`). The entrypoint (`stress_tests.rs`, `harness = false`) builds the CLI from the registry and either:

- Runs a specific subcommand if provided.
- Runs all registered tests sequentially with their default arguments when no subcommand is given.

Both tests use `dotenvy` to load credentials and run under `tokio` with structured logging via `tracing_subscriber`.

## Running the Test

### Running (custom harness)

These tests disable the standard test harness (`harness = false`) and rely on clap/dotenvy. Use the stress test binary:

```bash
# Run all registered stress tests with their defaults
cargo test --test stress_tests

# Run a specific subcommand
cargo test --test stress_tests basic_publish_read_test
cargo test --test stress_tests continuous_send_receive_stress

# Pass custom args to a subcommand
cargo test --test stress_tests -- basic_publish_read_test --events 1000 --producers 2 --consumers 2
cargo test --test stress_tests -- continuous_send_receive_stress --duration-hours 4 --min-batch 10 --max-batch 50

# More logs
RUST_LOG=info cargo test --test stress_tests

# Discover options
cargo test --test stress_tests -- --help
cargo test --test stress_tests -- basic_publish_read_test --help
cargo test --test stress_tests -- continuous_send_receive_stress --help
```

#### Command Line Options

##### **Basic Publish/Read**

- `--events, -e` (default: 100): Number of events to publish and read
- `--producers, -p` (default: 1): Concurrent producer tasks
- `--consumers, -c` (default: 1): Concurrent consumer tasks
- `--timeout, -t` (default: 120s): End-to-end timeout
- `--event-size` (default: 512 bytes): Payload size
- `--batch-size` (default: 10): Events per batch

##### **Continuous Send/Receive**

- `--duration-hours` (default: 72): Duration of the run
- `--min-batch` (default: 20): Minimum batch size
- `--max-batch` (default: 100): Maximum batch size
- `--min-delay` (default: 1s): Minimum delay between batches
- `--max-delay` (default: 10s): Maximum delay between batches

## Environment Variables

The following environment variables are required (loaded via `dotenvy`, so a `.env` file works):

- `EVENTHUBS_HOST`: Event Hubs namespace hostname (e.g., "my-namespace.servicebus.windows.net")
- `EVENTHUB_NAME`: Event Hub name
- `RUST_LOG`: Log level (optional, defaults to warnings)

## Behavior & Metrics (high level)

- **basic_publish_read_test**: Publishes a bounded set, tracks per-task publish counts, measures publish/consume durations and throughput, and validates end-to-end completion/count-based results (collects errors on timeouts or task join failures). It does not currently compare consumed events against the originally published payloads.
- **continuous_send_receive_stress**: Long-running loop that emits random batches across partitions, periodically logs heartbeat metrics (sent/received, missing, corrupted body/properties, producer/consumer failures), and emits a final summary including lost events.

Both tests log via `tracing` and exit non-zero on failure to make CI-friendly verdicts.
Loading
Loading