diff --git a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md index 50707015d6..8dc5275392 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md @@ -13,6 +13,7 @@ ### Bugs Fixed +- Fixed a cold account-metadata fetch preempting its own cross-region failover when the caller's future was dropped (Rust's cancellation mechanism) mid-flight against an unhealthy region. The on-demand account-metadata fetch (and its inline regional-failover chain) now runs on a detached, internally-bounded task, so dropping the caller no longer cancels the in-flight attempt before the failover to the next region can run. The detached work is bounded by a hard deadline (5 minutes) so it cannot leak indefinitely. Note: this introduces a bounded detached task for the (idempotent, GET-only) account-metadata path, an intentional exception to the structural-cancellation model used for hedging; container and partition-key-range metadata reads are not yet covered. ([#4253](https://github.com/Azure/azure-sdk-for-rust/issues/4253)) - Fixed duplicate items being returned on cross-partition query resume after a physical partition split. When a cross-partition query was paused, serialized to a continuation token, and resumed after the underlying partition had split, the resumed iterator could re-emit items the caller had already consumed on a prior page. The continuation token now records per-range sibling state and is correctly propagated to every surviving leaf after a split. ([#4550](https://github.com/Azure/azure-sdk-for-rust/pull/4550)) ### Other Changes diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs index aa89dc26d2..bde0ce75bf 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs @@ -50,6 +50,82 @@ use super::{ CosmosDriverRuntime, }; +/// Hard deadline (seconds) bounding a detached account-metadata fetch so a +/// misbehaving region or transport can never leak the background task +/// indefinitely. Mirrors the cross-SDK default for the same fix. +const ACCOUNT_METADATA_DETACHED_HARD_DEADLINE_SECS: i64 = 300; + +/// Error surfaced when a detached account-metadata fetch exceeds its hard +/// deadline. Synthetic (no wire response), so it carries the transport-generated +/// 503 status used for other client-side synthetic failures. +fn account_metadata_detached_deadline_error(secs: i64) -> crate::error::CosmosError { + crate::error::CosmosError::builder() + .with_status(crate::error::CosmosStatus::TRANSPORT_GENERATED_503) + .with_message(format!( + "account metadata read exceeded its detached hard deadline of {secs} seconds" + )) + .build() +} + +/// Error surfaced when the detached account-metadata task ends without sending a +/// result (it panicked, or the runtime is shutting down). The runtime's default +/// panic hook surfaces the backtrace; here we degrade gracefully rather than +/// propagating a panic into the caller's task. +fn account_metadata_detached_task_failed() -> crate::error::CosmosError { + crate::error::CosmosError::builder() + .with_status(crate::error::CosmosStatus::TRANSPORT_GENERATED_503) + .with_message( + "detached account metadata task ended without producing a result \ + (background task panicked or the runtime is shutting down)", + ) + .build() +} + +/// Runs `future` on a detached task bounded by `deadline`, returning its output. +/// +/// The future is spawned via the runtime abstraction, whose handle *detaches* +/// (rather than aborts) on drop. So if the caller drops the returned future, the +/// spawned `future` keeps running to completion — this is what decouples a metadata +/// read's cross-region failover from caller cancellation (issue #4253). The caller +/// observes its own cancellation only on the response path (by dropping `rx`). +/// +/// `deadline` bounds the detached work so a misbehaving region/transport can never +/// leak the task indefinitely; on expiry `on_deadline` produces the result. Returns +/// `Err(())` only if the task ended without sending (it panicked — the runtime's +/// default panic hook surfaces the backtrace — or the runtime is shutting down). +async fn spawn_detached_bounded( + future: Fut, + deadline: azure_core::time::Duration, + on_deadline: D, +) -> std::result::Result +where + Fut: std::future::Future + Send + 'static, + T: Send + 'static, + D: FnOnce() -> T + Send + 'static, +{ + let (tx, rx) = futures::channel::oneshot::channel(); + + // Dropping this handle detaches (does not abort) the task, so `future` + // continues even after the caller's future is dropped. + let _detached = azure_core::async_runtime::get_async_runtime().spawn(Box::pin(async move { + futures::pin_mut!(future); + let result = match futures::future::select( + future, + azure_core::async_runtime::get_async_runtime().sleep(deadline), + ) + .await + { + futures::future::Either::Left((result, _)) => result, + futures::future::Either::Right(((), _)) => on_deadline(), + }; + // The receiver is gone when the caller dropped its future; the work still + // ran to completion (the point of detaching), so dropping the result is fine. + let _ = tx.send(result); + })); + + rx.await.map_err(|_canceled| ()) +} + struct DriverRequestExecutor<'a> { driver: &'a CosmosDriver, options: &'a OperationOptions, @@ -613,7 +689,36 @@ impl CosmosDriver { &self, account: &AccountReference, ) -> crate::error::Result { - Self::refresh_account_properties(&self.runtime, account, &self.transport, None).await + // Decouple the metadata fetch + cross-region failover from caller + // cancellation (issue #4253). In Rust, cancellation is future-drop: if the + // caller's future is dropped while the first attempt against an unhealthy + // region is still in flight, the inline failover chain inside + // `refresh_account_properties` would be dropped with it, preempting the + // cross-region retry. Running the refresh on a detached task — whose handle + // detaches (rather than aborts) on drop — lets the failover run to completion + // even when the caller goes away; the caller observes cancellation only on the + // response path. The detached work is bounded by a hard deadline so it can + // never leak indefinitely. + let runtime = Arc::clone(&self.runtime); + let transport = Arc::clone(&self.transport); + let account = account.clone(); + + let deadline = + azure_core::time::Duration::seconds(ACCOUNT_METADATA_DETACHED_HARD_DEADLINE_SECS); + let refresh = async move { + Self::refresh_account_properties(&runtime, &account, &transport, None).await + }; + + match spawn_detached_bounded(refresh, deadline, || { + Err(account_metadata_detached_deadline_error( + ACCOUNT_METADATA_DETACHED_HARD_DEADLINE_SECS, + )) + }) + .await + { + Ok(result) => result, + Err(()) => Err(account_metadata_detached_task_failed()), + } } /// Fetches account properties using the current per-account transport. @@ -2050,6 +2155,73 @@ mod tests { options::ConnectionPoolOptions, }; + use std::sync::atomic::{AtomicBool, Ordering}; + use std::time::Duration as StdDuration; + + /// Happy path: the detached future's value is delivered back to the caller. + #[tokio::test] + async fn spawn_detached_bounded_returns_value() { + let out = spawn_detached_bounded( + async { 42_i32 }, + azure_core::time::Duration::seconds(5), + || -1, + ) + .await; + assert_eq!(out, Ok(42)); + } + + /// Core regression for #4253: dropping the caller's future MUST NOT cancel the + /// detached work. We start a future that records completion after a delay, drop + /// the caller (via a short timeout) while it is still running, and confirm the + /// work nonetheless ran to completion in the background. + #[tokio::test] + async fn spawn_detached_bounded_survives_caller_drop() { + let completed = Arc::new(AtomicBool::new(false)); + let completed_in_task = completed.clone(); + + let work = async move { + // Still in flight when the caller's timeout below fires. + tokio::time::sleep(StdDuration::from_millis(200)).await; + completed_in_task.store(true, Ordering::SeqCst); + 7_i32 + }; + + // Caller gives up after 50ms, dropping its view of the detached work. + let caller = tokio::time::timeout( + StdDuration::from_millis(50), + spawn_detached_bounded(work, azure_core::time::Duration::seconds(5), || 0), + ) + .await; + assert!( + caller.is_err(), + "the caller's timeout must fire and drop its future" + ); + + // The detached task keeps running past the caller's drop. + tokio::time::sleep(StdDuration::from_millis(400)).await; + assert!( + completed.load(Ordering::SeqCst), + "detached work must run to completion even though the caller was dropped" + ); + } + + /// The hard deadline bounds the detached work: a future that never completes + /// yields the `on_deadline` value instead of hanging forever. + #[tokio::test] + async fn spawn_detached_bounded_honors_deadline() { + let out = spawn_detached_bounded( + async { + // Never completes within the deadline below. + tokio::time::sleep(StdDuration::from_secs(30)).await; + 1_i32 + }, + azure_core::time::Duration::milliseconds(50), + || -1, + ) + .await; + assert_eq!(out, Ok(-1), "the deadline value must be surfaced"); + } + const ACCOUNT_PROPERTIES_PAYLOAD: &str = r#"{ "_self": "", "id": "test",