Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -447,3 +447,136 @@ async fn wrapping_sdk_identifier_appears_on_all_requests() {
);
}
}

/// Expected cross-SDK feature-flag token on the wire for the emulator's
/// default client configuration: per-partition circuit breaker (PPCB, bit
/// `0x2`, enabled by default) OR HTTP/2 (bit `0x10`, the connection-pool
/// default) == `0x12`, encoded as `|F12`.
///
/// This is the Rust side of the cross-SDK `User-Agent` feature-flag contract
/// (see `UserAgentFeatureFlags` in `azure_data_cosmos_driver`).
const EXPECTED_FEATURE_TOKEN: &str = "|F12";

/// Verifies that the cross-SDK feature-flag token (`|F<HEX>`) is advertised in
/// the `User-Agent` header on data-plane requests, so backend telemetry can
/// bucket Rust traffic by enabled client feature.
#[tokio::test]
async fn user_agent_advertises_feature_flags_on_the_wire() {
let observer = RecordingObserver::new();
let emulator = build_emulator(observer.clone());

perform_create_and_read(emulator, None).await;

let snapshots = observer.snapshots();
let data_plane: Vec<&RequestSnapshot> = snapshots
.iter()
.filter(|s| is_item_data_plane_request(s))
.collect();

// Sanity-check that the data plane was actually exercised; otherwise the
// assertion below would be vacuously satisfied.
assert!(
data_plane.len() >= 2,
"expected at least one create_item POST and one read_item GET to reach the emulator; \
captured requests: {:?}",
snapshots
.iter()
.map(|s| (s.method, s.url.as_str(), s.user_agent.as_deref()))
.collect::<Vec<_>>(),
);

let missing: Vec<_> = data_plane
.iter()
.filter(|s| {
!s.user_agent
.as_deref()
.is_some_and(|ua| ua.ends_with(EXPECTED_FEATURE_TOKEN))
})
.map(|s| (s.method, s.url.as_str(), s.user_agent.as_deref()))
.collect();
assert!(
missing.is_empty(),
"expected every data-plane request to end with feature-flag token {EXPECTED_FEATURE_TOKEN:?}; \
requests missing the token: {missing:?}",
);
}

/// Verifies that when a [`UserAgentSuffix`] is configured, the feature-flag
/// token is appended *after* the suffix with no separating space — matching
/// the .NET/Java `userAgent + "|F" + hex` encoding — so both the operator
/// suffix and the feature bitmask survive on the wire.
#[tokio::test]
async fn user_agent_appends_feature_token_after_suffix_on_the_wire() {
const SUFFIX: &str = "myapp-westus2";

let observer = RecordingObserver::new();
let emulator = build_emulator(observer.clone());

perform_create_and_read(emulator, Some(UserAgentSuffix::new(SUFFIX))).await;

let snapshots = observer.snapshots();
let data_plane: Vec<&RequestSnapshot> = snapshots
.iter()
.filter(|s| is_item_data_plane_request(s))
.collect();

assert!(
data_plane.len() >= 2,
"expected at least one create_item POST and one read_item GET to reach the emulator; \
captured requests: {:?}",
snapshots
.iter()
.map(|s| (s.method, s.url.as_str(), s.user_agent.as_deref()))
.collect::<Vec<_>>(),
);

let expected_tail = format!("{SUFFIX}{EXPECTED_FEATURE_TOKEN}");
let missing: Vec<_> = data_plane
.iter()
.filter(|s| {
!s.user_agent
.as_deref()
.is_some_and(|ua| ua.ends_with(&expected_tail))
})
.map(|s| (s.method, s.url.as_str(), s.user_agent.as_deref()))
.collect();
assert!(
missing.is_empty(),
"expected every data-plane request to end with {expected_tail:?}; \
requests missing it: {missing:?}",
);
}

/// Negative control: the feature-flag token must be a genuine, separately
/// computed artifact — assert that the default `User-Agent` (no suffix) ends
/// with exactly one `|F` token and nothing trails it.
#[tokio::test]
async fn feature_flag_token_is_the_trailing_user_agent_segment() {
let observer = RecordingObserver::new();
let emulator = build_emulator(observer.clone());

perform_create_and_read(emulator, None).await;

let snapshots = observer.snapshots();
let data_plane: Vec<&RequestSnapshot> = snapshots
.iter()
.filter(|s| is_item_data_plane_request(s))
.collect();

assert!(data_plane.len() >= 2, "data plane not exercised");

for snap in &data_plane {
let ua = snap
.user_agent
.as_deref()
.expect("data-plane request carried a User-Agent");
// Exactly one feature token, and it is the final segment.
assert_eq!(
ua.matches("|F").count(),
1,
"expected exactly one feature-flag token in {ua:?}",
);
let token = &ua[ua.rfind("|F").unwrap()..];
assert_eq!(token, EXPECTED_FEATURE_TOKEN, "unexpected token in {ua:?}");
}
}
1 change: 1 addition & 0 deletions sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

### Features Added

- The `User-Agent` header now advertises enabled client features (PPCB, HTTP/2) via the cross-SDK `|F<HEX>` feature-flag token, consistent with the .NET and Java Cosmos SDKs. ([#4635](https://github.com/Azure/azure-sdk-for-rust/pull/4635))
- Added support for using a native query planning library to generate query plans locally, avoiding a Gateway round-trip on cross-partition queries. Gated behind the `__internal_native_query_plan` feature flag. ([#4554](https://github.com/Azure/azure-sdk-for-rust/pull/4554))
- Restructured the client / runtime options layering on the driver. Two new nested option groups, a per-client overrides surface on `DriverOptionsBuilder`, and a single canonical `AZURE_COSMOS_PPCB_*` namespace for partition-failover environment variables. The driver now consumes partition-failover configuration once at construction (`CosmosDriver::new` no longer fabricates an `OperationOptionsView` outside any operation context) ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)):
- Added new nested `OperationOptions::throughput_control` group (`ThroughputControlOptions` / `…Builder` / `…View`, mirroring the `ThrottlingRetryOptions` pattern). Exposes three layered fields ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)):
Expand Down
84 changes: 79 additions & 5 deletions sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::{
effective_partition_key::EffectivePartitionKey, AccountEndpoint, AccountReference,
ContainerProperties, ContainerReference, ContinuationToken, CosmosOperation,
DatabaseReference, PartitionKey, ResolvedToken, ResourceType, UserAgent,
UserAgentFeatureFlags,
},
options::{
ConnectionPoolOptions, DriverOptions, OperationOptions, OperationOptionsView,
Expand Down Expand Up @@ -1105,16 +1106,29 @@ impl CosmosDriver {
let account_endpoint = AccountEndpoint::from(&account);
let default_endpoint = CosmosEndpoint::global(account.endpoint().clone());

// Per-driver User-Agent: when the driver-level suffix override is unset,
// share the runtime's `Arc<UserAgent>` (cheap atomic refcount bump);
// when set, compute a fresh `UserAgent` from the runtime's wrapping-SDK
// identifier and the driver's suffix, owned by this driver alone.
// Per-driver User-Agent: compute the cross-SDK feature flags advertised
// in the header from this driver's effective client configuration —
// HTTP/2 (runtime connection pool) and PPCB (this driver's partition
// failover options). When the driver-level suffix override is unset and
// the flags match the runtime's base flags (the common case), share the
// runtime's `Arc<UserAgent>` (cheap atomic refcount bump). Otherwise
// compute a fresh `UserAgent` owned by this driver alone.
let feature_flags = UserAgentFeatureFlags::from_client_config(
runtime.connection_pool().is_http2_allowed(),
options
.partition_failover_options()
.circuit_breaker_enabled(),
);
let user_agent = match options.user_agent_suffix() {
Some(suffix) => Arc::new(UserAgent::from_suffix(
runtime.wrapping_sdk_identifier(),
suffix,
feature_flags,
)),
None => Arc::clone(runtime.user_agent()),
None if feature_flags == runtime.user_agent_feature_flags() => {
Arc::clone(runtime.user_agent())
}
None => Arc::new(runtime.user_agent_with_feature_flags(feature_flags)),
};

// Per-driver HTTP client factory: wrap with fault injection if rules
Expand Down Expand Up @@ -4247,6 +4261,66 @@ mod tests {
assert!(!driver.user_agent().as_str().contains("runtime-default"));
}

#[tokio::test]
async fn driver_disabling_ppcb_recomputes_user_agent_without_ppcb_bit() {
// A driver that disables PPCB (with no per-driver suffix override) must
// NOT share the runtime's base `Arc<UserAgent>`: its feature flags
// differ from the runtime's, so it recomputes its own `UserAgent` whose
// cross-SDK feature token drops the PPCB bit (0x2) while retaining
// HTTP/2 (0x10) -> `|F10`. This exercises the `None => recompute` branch
// in `CosmosDriver::new` and proves the emitted token tracks per-driver
// client configuration rather than a hardcoded value.
let factory = Arc::new(ScriptedFactory::new(std::iter::repeat_n(
ResponsePlan::Success,
10,
)));
let runtime = Arc::new(
CosmosDriverRuntimeBuilder::new()
.with_http_client_factory(factory)
.build()
.await
.unwrap(),
);

// The runtime's base header advertises HTTP/2 + PPCB by default (|F12).
assert_eq!(
runtime.user_agent_feature_flags(),
UserAgentFeatureFlags::HTTP2 | UserAgentFeatureFlags::PER_PARTITION_CIRCUIT_BREAKER,
);
assert!(
runtime.user_agent().as_str().ends_with("|F12"),
"unexpected runtime User-Agent: {}",
runtime.user_agent().as_str()
);

let driver = CosmosDriver::new(
Arc::clone(&runtime),
DriverOptionsBuilder::new(signed_test_account(
"https://account.documents.azure.com:443/",
))
.with_partition_failover_options(
crate::options::PartitionFailoverOptions::builder()
.with_circuit_breaker_enabled(false)
.build()
.unwrap(),
)
.build(),
)
.expect("CosmosDriver::new should succeed in tests");

// Distinct allocation (the recompute branch), not the shared runtime Arc.
assert!(
!Arc::ptr_eq(driver.user_agent(), runtime.user_agent()),
"driver disabling PPCB must own a distinct User-Agent Arc"
);
// PPCB bit (0x2) dropped, HTTP/2 (0x10) retained -> |F10.
assert!(
driver.user_agent().as_str().ends_with("|F10"),
"expected driver User-Agent to drop the PPCB bit (|F10): {}",
driver.user_agent().as_str()
);
}

// =========================================================================
// pre_resolve_partition_key_range_id — EPK-range seeding (#4611 fix)
//
Expand Down
94 changes: 81 additions & 13 deletions sdk/cosmos/azure_data_cosmos_driver/src/driver/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@ use std::{

use crate::{
diagnostics::ProxyConfiguration,
models::{normalize_wrapping_sdk_identifier, UserAgent},
models::{normalize_wrapping_sdk_identifier, UserAgent, UserAgentFeatureFlags},
options::{
parse_duration_millis_from_env, ConnectionPoolOptions, CorrelationId, DriverOptions,
OperationOptions, UserAgentSuffix, WorkloadId,
OperationOptions, PartitionFailoverOptions, UserAgentSuffix, WorkloadId,
},
system::{CpuMemoryMonitor, VmMetadataService},
};
Expand Down Expand Up @@ -153,6 +153,15 @@ pub struct CosmosDriverRuntime {
/// driver's own identifier.
wrapping_sdk_identifier: Option<String>,

/// Cross-SDK feature flags advertised in this runtime's base `User-Agent`.
///
/// Computed from runtime-scoped client configuration (HTTP/2 transport and
/// the default per-partition circuit breaker setting). Drivers built from
/// this runtime compare their own computed flags against this value: when
/// they match (the common case), they share the runtime's `Arc<UserAgent>`;
/// otherwise they recompute their own `User-Agent`.
user_agent_feature_flags: UserAgentFeatureFlags,

/// Shared container metadata cache used by drivers in this runtime.
container_cache: ContainerCache,

Expand Down Expand Up @@ -305,6 +314,32 @@ impl CosmosDriverRuntime {
self.wrapping_sdk_identifier.as_deref()
}

/// Returns the cross-SDK feature flags advertised in this runtime's base
/// `User-Agent` header.
pub(crate) fn user_agent_feature_flags(&self) -> UserAgentFeatureFlags {
self.user_agent_feature_flags
}

/// Recomputes a `User-Agent` from this runtime's suffix source (suffix,
/// workload id, or correlation id, in priority order) plus the supplied
/// feature flags.
///
/// Used by a driver that overrode a feature-affecting option (e.g. disabled
/// PPCB) without supplying its own suffix, so it cannot share the runtime's
/// shared `Arc<UserAgent>`.
pub(crate) fn user_agent_with_feature_flags(
&self,
feature_flags: UserAgentFeatureFlags,
) -> UserAgent {
compute_user_agent(
self.wrapping_sdk_identifier.as_deref(),
self.user_agent_suffix.as_ref(),
self.workload_id,
self.correlation_id.as_ref(),
feature_flags,
)
}

/// Returns the effective correlation dimension.
///
/// Returns `correlation_id` if set, otherwise falls back to `user_agent_suffix`.
Expand Down Expand Up @@ -532,20 +567,28 @@ impl CosmosDriverRuntimeBuilder {
/// configuration failure).
///
pub async fn build(self) -> crate::error::Result<Arc<CosmosDriverRuntime>> {
let connection_pool = self.connection_pool.unwrap_or_default();

// Compute the base feature flags advertised in the User-Agent from
// runtime-scoped client configuration. HTTP/2 comes from the connection
// pool; PPCB uses the driver default (its per-driver value is folded in
// later by `CosmosDriver::new`). PPAF is server-driven per-partition and
// therefore unknown here, so it is not advertised in the shared header.
let user_agent_feature_flags = UserAgentFeatureFlags::from_client_config(
connection_pool.is_http2_allowed(),
PartitionFailoverOptions::default().circuit_breaker_enabled(),
);

// Compute user agent from suffix/workloadId/correlationId (in priority order),
// optionally prepending a wrapping-SDK identifier.
let wrapping = self.wrapping_sdk_identifier.as_deref();
let user_agent = Arc::new(if let Some(ref suffix) = self.user_agent_suffix {
UserAgent::from_suffix(wrapping, suffix)
} else if let Some(workload_id) = self.workload_id {
UserAgent::from_workload_id(wrapping, workload_id)
} else if let Some(ref correlation_id) = self.correlation_id {
UserAgent::from_correlation_id(wrapping, correlation_id)
} else {
UserAgent::from_wrapping_sdk_identifier(wrapping)
});
let user_agent = Arc::new(compute_user_agent(
self.wrapping_sdk_identifier.as_deref(),
self.user_agent_suffix.as_ref(),
self.workload_id,
self.correlation_id.as_ref(),
user_agent_feature_flags,
));

let connection_pool = self.connection_pool.unwrap_or_default();
let proxy_configuration = ProxyConfiguration::from_env(connection_pool.proxy_allowed());
let http_client_factory: Arc<dyn HttpClientFactory> = {
#[cfg(any(
Expand Down Expand Up @@ -634,6 +677,7 @@ impl CosmosDriverRuntimeBuilder {
correlation_id: self.correlation_id,
user_agent_suffix: self.user_agent_suffix,
wrapping_sdk_identifier: self.wrapping_sdk_identifier,
user_agent_feature_flags,
container_cache: ContainerCache::new(),
account_metadata_cache: Arc::new(AccountMetadataCache::new()),
cpu_monitor,
Expand All @@ -645,6 +689,30 @@ impl CosmosDriverRuntimeBuilder {

static NEXT_RUNTIME_ID: AtomicUsize = AtomicUsize::new(0);

/// Builds a [`UserAgent`] from a suffix source (suffix, workload id, or
/// correlation id, in priority order) plus the supplied feature flags.
///
/// Shared by [`CosmosDriverRuntimeBuilder::build`] and
/// [`CosmosDriverRuntime::user_agent_with_feature_flags`] so the priority
/// ordering is defined in exactly one place.
fn compute_user_agent(
wrapping_sdk_identifier: Option<&str>,
user_agent_suffix: Option<&UserAgentSuffix>,
workload_id: Option<WorkloadId>,
correlation_id: Option<&CorrelationId>,
feature_flags: UserAgentFeatureFlags,
) -> UserAgent {
if let Some(suffix) = user_agent_suffix {
UserAgent::from_suffix(wrapping_sdk_identifier, suffix, feature_flags)
} else if let Some(workload_id) = workload_id {
UserAgent::from_workload_id(wrapping_sdk_identifier, workload_id, feature_flags)
} else if let Some(correlation_id) = correlation_id {
UserAgent::from_correlation_id(wrapping_sdk_identifier, correlation_id, feature_flags)
} else {
UserAgent::from_wrapping_sdk_identifier(wrapping_sdk_identifier, feature_flags)
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down
Loading