Skip to content
Open
5 changes: 4 additions & 1 deletion sdk/cosmos/.cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
"centralindia",
"centralus",
"centraluseuap",
"cgroupv",
"cgroupv2",
"changefeed",
"chinaeast",
"chinanorth",
Expand Down Expand Up @@ -162,6 +164,7 @@
"usdodsouthcentral",
"usdodsouthwest",
"usdodwestcentral",
"usec",
"USGOV",
"usgovarizona",
"usgoviowa",
Expand Down Expand Up @@ -196,4 +199,4 @@
]
}
]
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
//! Create operation.

use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use azure_data_cosmos::clients::ContainerClient;
use azure_data_cosmos::options::ItemWriteOptions;
use rand::RngExt;
use uuid::Uuid;

use super::{Operation, PerfItem};
use super::{extract_backend_duration, Operation, PerfItem};
use crate::seed::{SeededItem, SharedItems};

/// Creates a new item with a unique ID and partition key.
Expand All @@ -36,7 +37,7 @@ impl Operation for CreateItemOperation {
"CreateItem"
}

async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> {
async fn execute(&self, container: &ContainerClient) -> azure_core::Result<Option<Duration>> {
let id = Uuid::new_v4().to_string();
let partition_key = Uuid::new_v4().to_string();
let value = rand::rng().random_range(0..u64::MAX);
Expand All @@ -48,11 +49,12 @@ impl Operation for CreateItemOperation {
payload: "perf-test-created".to_string(),
};

container
let response = container
.create_item(&item.partition_key, &id, &item, self.options.clone())
.await?;
let backend = extract_backend_duration(response.headers());

self.items.push(SeededItem { id, partition_key });
Ok(())
Ok(backend)
}
}
28 changes: 27 additions & 1 deletion sdk/cosmos/azure_data_cosmos_perf/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,15 @@ mod read_item;
mod upsert_item;

use async_trait::async_trait;
use azure_core::http::headers::{HeaderName, Headers};
use azure_data_cosmos::clients::ContainerClient;
use azure_data_cosmos::options::{
ExcludedRegions, ItemReadOptions, ItemWriteOptions, OperationOptions,
};
use azure_data_cosmos::regions::Region;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use std::time::Duration;

use crate::config::{Config, ExcludeRegionsScope};
pub use crate::operations::create_item::CreateItemOperation;
Expand All @@ -29,6 +31,23 @@ pub use crate::operations::read_item::ReadItemOperation;
pub use crate::operations::upsert_item::UpsertItemOperation;
use crate::seed::SharedItems;

/// `x-ms-request-duration-ms` — server-reported request processing time in
/// milliseconds (floating-point string).
const REQUEST_DURATION_MS: HeaderName = HeaderName::from_static("x-ms-request-duration-ms");

/// Extracts the server-reported request duration from a Cosmos response.
///
/// Returns `None` when the header is missing (e.g., on responses served
/// from cache or when the gateway omitted the diagnostic header) or when
/// the value cannot be parsed as `f64`.
pub(crate) fn extract_backend_duration(headers: &Headers) -> Option<Duration> {
headers
.get_optional_str(&REQUEST_DURATION_MS)
.and_then(|s| s.parse::<f64>().ok())
.filter(|ms| ms.is_finite() && *ms >= 0.0)
.map(|ms| Duration::from_secs_f64(ms / 1000.0))
}

/// A single executable perf test operation.
///
/// Implementations are expected to be stateless or use interior mutability.
Expand All @@ -39,7 +58,14 @@ pub trait Operation: Send + Sync {
fn name(&self) -> &'static str;

/// Executes one instance of the operation.
async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()>;
///
/// Returns `Ok(Some(d))` when the server reported a processing duration
/// via the `x-ms-request-duration-ms` response header (this is the
/// backend latency surfaced separately from the client-observed
/// wall-clock latency). Returns `Ok(None)` when no backend duration
/// could be observed (multi-page query streams may aggregate, see
/// individual implementations).
async fn execute(&self, container: &ContainerClient) -> azure_core::Result<Option<Duration>>;
}

/// The item type used for seeding, reading, querying, and upserting.
Expand Down
21 changes: 16 additions & 5 deletions sdk/cosmos/azure_data_cosmos_perf/src/operations/query_items.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
//! Single-partition query operation.

use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use azure_data_cosmos::clients::ContainerClient;
use azure_data_cosmos::Query;
use futures::StreamExt;

use super::Operation;
use super::{extract_backend_duration, Operation};
use crate::seed::SharedItems;

/// Runs a single-partition query against a random seeded partition key.
Expand All @@ -31,18 +32,28 @@ impl Operation for QueryItemsOperation {
"QueryItems"
}

async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> {
async fn execute(&self, container: &ContainerClient) -> azure_core::Result<Option<Duration>> {
let item = self.items.random();
let pk = &item.partition_key;

let query =
Query::from("SELECT * FROM c WHERE c.partition_key = @pk").with_parameter("@pk", pk)?;

let mut stream = container.query_items::<serde_json::Value>(query, pk, None)?;
let mut stream = container
.query_items::<serde_json::Value>(query, pk, None)?
.into_pages();

// Sum backend durations across pages so a multi-page query reports
// the total server processing time, mirroring how the client-observed
// elapsed wraps the entire stream consumption.
let mut backend_total: Option<Duration> = None;
while let Some(result) = stream.next().await {
result?;
let page = result?;
if let Some(d) = extract_backend_duration(page.headers()) {
backend_total = Some(backend_total.unwrap_or_default() + d);
}
}

Ok(())
Ok(backend_total)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
//! Point read operation.

use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use azure_data_cosmos::clients::ContainerClient;
use azure_data_cosmos::options::ItemReadOptions;

use super::Operation;
use super::{extract_backend_duration, Operation};
use crate::seed::SharedItems;

/// Reads a random seeded item by ID and partition key.
Expand All @@ -31,12 +32,12 @@ impl Operation for ReadItemOperation {
"ReadItem"
}

async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> {
async fn execute(&self, container: &ContainerClient) -> azure_core::Result<Option<Duration>> {
let item = self.items.random();

container
let response = container
.read_item::<serde_json::Value>(&item.partition_key, &item.id, self.options.clone())
.await?;
Ok(())
Ok(extract_backend_duration(response.headers()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@
//! Upsert operation.

use std::sync::Arc;
use std::time::Duration;

use async_trait::async_trait;
use azure_data_cosmos::clients::ContainerClient;
use azure_data_cosmos::options::ItemWriteOptions;
use rand::RngExt;

use super::{Operation, PerfItem};
use super::{extract_backend_duration, Operation, PerfItem};
use crate::seed::SharedItems;

/// Upserts an item into a random seeded partition.
Expand All @@ -32,7 +33,7 @@ impl Operation for UpsertItemOperation {
"UpsertItem"
}

async fn execute(&self, container: &ContainerClient) -> azure_core::Result<()> {
async fn execute(&self, container: &ContainerClient) -> azure_core::Result<Option<Duration>> {
let seeded = self.items.random();
let value = rand::rng().random_range(0..u64::MAX);

Expand All @@ -43,9 +44,9 @@ impl Operation for UpsertItemOperation {
payload: "perf-test-payload".to_string(),
};

container
let response = container
.upsert_item(&item.partition_key, &seeded.id, &item, self.options.clone())
.await?;
Ok(())
Ok(extract_backend_duration(response.headers()))
}
}
33 changes: 29 additions & 4 deletions sdk/cosmos/azure_data_cosmos_perf/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,11 +51,28 @@ struct PerfResult {
p50_ms: f64,
p90_ms: f64,
p99_ms: f64,
/// Server-reported request processing latency parsed from
/// `x-ms-request-duration-ms` response header. `None` for intervals
/// without backend samples.
#[serde(skip_serializing_if = "Option::is_none")]
backend_min_ms: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
backend_max_ms: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
backend_mean_ms: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
backend_p50_ms: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
backend_p90_ms: Option<f64>,
#[serde(skip_serializing_if = "Option::is_none")]
backend_p99_ms: Option<f64>,
cpu_percent: f32,
memory_bytes: u64,
system_cpu_percent: f32,
system_total_memory_bytes: u64,
system_used_memory_bytes: u64,
#[serde(skip_serializing_if = "Option::is_none")]
cgroup_cpu_percent: Option<f32>,
Comment thread
tvaron3 marked this conversation as resolved.
// Tokio runtime metrics (present only when tokio-metrics feature is enabled)
#[serde(skip_serializing_if = "Option::is_none")]
tokio_workers: Option<u64>,
Expand Down Expand Up @@ -279,8 +296,8 @@ pub async fn run(config: RunConfig) {

let op_start = Instant::now();
match op.execute(&container).await {
Ok(()) => {
stats.record_latency(op.name(), op_start.elapsed());
Ok(backend) => {
stats.record_latency(op.name(), op_start.elapsed(), backend);
}
Err(e) => {
stats.record_error(op.name());
Expand Down Expand Up @@ -345,17 +362,18 @@ async fn upsert_results(
let now = time::OffsetDateTime::now_utc()
.format(&time::format_description::well_known::Rfc3339)
.expect("RFC 3339 formatting should never fail");
let (cpu, mem, sys_cpu, sys_total, sys_used) = metrics
let (cpu, mem, sys_cpu, sys_total, sys_used, cgroup_cpu) = metrics
.map(|m| {
(
m.cpu_percent,
m.memory_bytes,
m.system_cpu_percent,
m.system_total_memory_bytes,
m.system_used_memory_bytes,
m.cgroup_cpu_percent,
)
})
.unwrap_or((0.0, 0, 0.0, 0, 0));
.unwrap_or((0.0, 0, 0.0, 0, 0, None));

for s in summaries {
let result = PerfResult {
Expand All @@ -374,11 +392,18 @@ async fn upsert_results(
p50_ms: s.p50.as_secs_f64() * 1000.0,
p90_ms: s.p90.as_secs_f64() * 1000.0,
p99_ms: s.p99.as_secs_f64() * 1000.0,
backend_min_ms: s.backend_min.map(|d| d.as_secs_f64() * 1000.0),
backend_max_ms: s.backend_max.map(|d| d.as_secs_f64() * 1000.0),
backend_mean_ms: s.backend_mean.map(|d| d.as_secs_f64() * 1000.0),
backend_p50_ms: s.backend_p50.map(|d| d.as_secs_f64() * 1000.0),
backend_p90_ms: s.backend_p90.map(|d| d.as_secs_f64() * 1000.0),
backend_p99_ms: s.backend_p99.map(|d| d.as_secs_f64() * 1000.0),
cpu_percent: cpu,
memory_bytes: mem,
system_cpu_percent: sys_cpu,
system_total_memory_bytes: sys_total,
system_used_memory_bytes: sys_used,
cgroup_cpu_percent: cgroup_cpu,
tokio_workers: tokio_fields.map(|t| t.workers),
tokio_busy_pct: tokio_fields.map(|t| t.busy_pct),
tokio_park_count: tokio_fields.map(|t| t.park_count),
Expand Down
Loading
Loading