Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 114 additions & 5 deletions sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,26 @@ impl RequestExecutor for DriverRequestExecutor<'_> {
}
}

/// Newtype wrapping the endpoint probe closure so it can live in the
/// `#[derive(Debug)]` [`CosmosDriver`] struct (the `dyn Fn` itself is not
/// `Debug`). Test-only; see [`CosmosDriver::run_endpoint_probe_once_for_testing`].
#[cfg(all(
feature = "tokio",
any(test, feature = "__internal_in_memory_emulator")
))]
#[derive(Clone)]
struct TestEndpointProbeFn(super::routing::EndpointProbeFn);

#[cfg(all(
feature = "tokio",
any(test, feature = "__internal_in_memory_emulator")
))]
impl std::fmt::Debug for TestEndpointProbeFn {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str("TestEndpointProbeFn(..)")
}
}

/// Cosmos DB driver instance.
///
/// A driver represents a connection to a specific Cosmos DB account. It is
Expand All @@ -127,6 +147,16 @@ pub struct CosmosDriver {
transport: Arc<ArcSwap<CosmosTransport>>,
/// Shared operation routing state for multi-region failover.
location_state_store: Arc<LocationStateStore>,
/// Clone of the connectivity-probe closure that the background
/// endpoint-probe loop owns. Stored **only** for internal integration
/// tests, which use it to drive a single probe-and-failback iteration
/// deterministically instead of waiting for the 60-second loop interval.
/// Production code never reads this field.
#[cfg(all(
feature = "tokio",
any(test, feature = "__internal_in_memory_emulator")
))]
endpoint_probe_fn: TestEndpointProbeFn,
/// Cache for partition key range routing maps.
/// Used to pre-resolve partition key range IDs for PPAF/PPCB
/// before the first request attempt.
Expand Down Expand Up @@ -1232,11 +1262,11 @@ impl CosmosDriver {
// traffic routed to it, time out, and be re-marked unavailable — a
// sustained low-throughput loop (issue #4597).
#[cfg(feature = "tokio")]
{
let endpoint_probe_fn: EndpointProbeFn = {
let account_for_probe = account.clone();
let transport_for_probe = Arc::clone(&transport);
let user_agent_for_probe = Arc::clone(&user_agent);
let probe_fn: EndpointProbeFn = Arc::new(move |url: Url| {
Arc::new(move |url: Url| {
let account = account_for_probe.clone();
let transport_holder = Arc::clone(&transport_for_probe);
let user_agent = Arc::clone(&user_agent_for_probe);
Expand All @@ -1251,9 +1281,20 @@ impl CosmosDriver {
probe_endpoint_connectivity(&metadata_transport, &probe_account, &user_agent)
.await
}) as BoxFuture<'static, bool>
});
location_state_store.start_endpoint_probe_loop(probe_fn);
}
}) as EndpointProbeFn
};

// Keep a clone for the internal test hook before the loop takes
// ownership; integration tests live outside the crate and cannot
// rebuild this closure themselves.
#[cfg(all(
feature = "tokio",
any(test, feature = "__internal_in_memory_emulator")
))]
let endpoint_probe_fn_for_tests = Arc::clone(&endpoint_probe_fn);

#[cfg(feature = "tokio")]
location_state_store.start_endpoint_probe_loop(endpoint_probe_fn);

// Driver-level throughput-control registry.
//
Expand All @@ -1266,6 +1307,11 @@ impl CosmosDriver {
options,
transport,
location_state_store,
#[cfg(all(
feature = "tokio",
any(test, feature = "__internal_in_memory_emulator")
))]
endpoint_probe_fn: TestEndpointProbeFn(endpoint_probe_fn_for_tests),
pk_range_cache: PartitionKeyRangeCache::new(),
session_manager: SessionManager::new(),
initialized: AtomicBool::new(false),
Expand Down Expand Up @@ -1346,6 +1392,69 @@ impl CosmosDriver {
.per_partition_automatic_failover_enabled
}

/// **Internal test hook -- not part of the public API.**
///
/// Returns whether any account-level endpoint whose host matches `host`
/// currently carries an "unavailable" mark in the live routing snapshot.
/// Matching by host (rather than full URL) keeps integration tests robust
/// to scheme/path/trailing-slash normalization of the stored key.
///
/// Used to assert probe-gated failback transitions (marked -> still marked
/// after a failed probe -> cleared after a successful probe).
///
/// **Do not call from production code.** May change or be removed at any
/// time without a semver bump.
#[cfg(any(test, feature = "__internal_in_memory_emulator"))]
#[doc(hidden)]
pub fn is_endpoint_host_marked_unavailable_for_testing(&self, host: &str) -> bool {
self.location_state_store
.account_snapshot()
.unavailable_endpoints
.keys()
.any(|url| url.host_str() == Some(host))
}

/// **Internal test hook -- not part of the public API.**
///
/// Marks the regional endpoint for `region` unavailable (seeds the state
/// the probe loop later clears). Returns `false` if no endpoint for
/// `region` exists in the current routing snapshot.
///
/// **Do not call from production code.** May change or be removed at any
/// time without a semver bump.
#[cfg(any(test, feature = "__internal_in_memory_emulator"))]
#[doc(hidden)]
pub fn mark_region_endpoint_unavailable_for_testing(
&self,
region: &crate::options::Region,
) -> bool {
self.location_state_store
.mark_region_endpoint_unavailable_for_testing(region)
}

/// **Internal test hook -- not part of the public API.**
///
/// Runs exactly one iteration of the account-level endpoint
/// probe-and-failback sweep using the real connectivity-probe closure (the
/// same one the background loop owns). Lets integration tests drive
/// probe-gated failback deterministically instead of waiting for the
/// 60-second background interval. Endpoints are only probed once their
/// unavailability cooldown (`endpoint_unavailability_ttl`) has elapsed, so
/// tests configure a short TTL via [`DriverOptions`].
///
/// **Do not call from production code.** May change or be removed at any
/// time without a semver bump.
#[cfg(all(
feature = "tokio",
any(test, feature = "__internal_in_memory_emulator")
))]
#[doc(hidden)]
pub async fn run_endpoint_probe_once_for_testing(&self) {
self.location_state_store
.probe_and_failback_unavailable_endpoints(&self.endpoint_probe_fn.0)
.await;
}

/// Returns the current per-account transport.
///
/// Lock-free via ArcSwap::load_full() — returns a cloned Arc with no
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,38 @@ impl LocationStateStore {
Arc::new(current.clone())
}

/// **Internal test hook -- not part of the public API.**
///
/// Marks the regional endpoint for `region` unavailable, reusing the
/// endpoint object from the current snapshot so the probe loop later
/// targets the exact URL the router uses. Returns `false` if no endpoint
/// for `region` exists in the current snapshot.
///
/// Integration tests use this to seed the "unavailable" state directly
/// (operation-driven marking is covered elsewhere) and then exercise the
/// real connectivity-probe-gated failback path.
#[cfg(any(test, feature = "__internal_in_memory_emulator"))]
pub(crate) fn mark_region_endpoint_unavailable_for_testing(&self, region: &Region) -> bool {
let snapshot = self.account_snapshot();
let endpoint = snapshot
.preferred_read_endpoints
.iter()
.chain(snapshot.preferred_write_endpoints.iter())
.find(|e| e.region() == Some(region))
.cloned();
let Some(endpoint) = endpoint else {
return false;
};
self.apply_account(|current| {
mark_endpoint_unavailable(
current,
&endpoint,
crate::driver::routing::UnavailableReason::TransportError,
)
});
true
}

/// Applies location effects (endpoint unavailability and account refresh).
pub async fn apply(&self, effects: &[LocationEffect]) {
for effect in effects {
Expand Down
Loading