Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

### Bugs Fixed

- Fixed a cold account-metadata fetch preempting its own cross-region failover when the caller's future was dropped (Rust's cancellation mechanism) mid-flight against an unhealthy region. The on-demand account-metadata fetch (and its inline regional-failover chain) now runs on a detached, internally-bounded task, so dropping the caller no longer cancels the in-flight attempt before the failover to the next region can run. The detached work is bounded by a hard deadline (5 minutes) so it cannot leak indefinitely. Note: this introduces a bounded detached task for the (idempotent, GET-only) account-metadata path, an intentional exception to the structural-cancellation model used for hedging; container and partition-key-range metadata reads are not yet covered. ([#4253](https://github.com/Azure/azure-sdk-for-rust/issues/4253))
- Fixed duplicate items being returned on cross-partition query resume after a physical partition split. When a cross-partition query was paused, serialized to a continuation token, and resumed after the underlying partition had split, the resumed iterator could re-emit items the caller had already consumed on a prior page. The continuation token now records per-range sibling state and is correctly propagated to every surviving leaf after a split. ([#4550](https://github.com/Azure/azure-sdk-for-rust/pull/4550))

### Other Changes
Expand Down
174 changes: 173 additions & 1 deletion sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,82 @@ use super::{
CosmosDriverRuntime,
};

/// Hard deadline (seconds) bounding a detached account-metadata fetch so a
/// misbehaving region or transport can never leak the background task
/// indefinitely. Mirrors the cross-SDK default for the same fix.
const ACCOUNT_METADATA_DETACHED_HARD_DEADLINE_SECS: i64 = 300;

/// Error surfaced when a detached account-metadata fetch exceeds its hard
/// deadline. Synthetic (no wire response), so it carries the transport-generated
/// 503 status used for other client-side synthetic failures.
fn account_metadata_detached_deadline_error(secs: i64) -> crate::error::CosmosError {
crate::error::CosmosError::builder()
.with_status(crate::error::CosmosStatus::TRANSPORT_GENERATED_503)
.with_message(format!(
"account metadata read exceeded its detached hard deadline of {secs} seconds"
))
.build()
}

/// Error surfaced when the detached account-metadata task ends without sending a
/// result (it panicked, or the runtime is shutting down). The runtime's default
/// panic hook surfaces the backtrace; here we degrade gracefully rather than
/// propagating a panic into the caller's task.
fn account_metadata_detached_task_failed() -> crate::error::CosmosError {
crate::error::CosmosError::builder()
.with_status(crate::error::CosmosStatus::TRANSPORT_GENERATED_503)
.with_message(
"detached account metadata task ended without producing a result \
(background task panicked or the runtime is shutting down)",
)
.build()
}

/// Runs `future` on a detached task bounded by `deadline`, returning its output.
///
/// The future is spawned via the runtime abstraction, whose handle *detaches*
/// (rather than aborts) on drop. So if the caller drops the returned future, the
/// spawned `future` keeps running to completion — this is what decouples a metadata
/// read's cross-region failover from caller cancellation (issue #4253). The caller
/// observes its own cancellation only on the response path (by dropping `rx`).
///
/// `deadline` bounds the detached work so a misbehaving region/transport can never
/// leak the task indefinitely; on expiry `on_deadline` produces the result. Returns
/// `Err(())` only if the task ended without sending (it panicked — the runtime's
/// default panic hook surfaces the backtrace — or the runtime is shutting down).
async fn spawn_detached_bounded<Fut, T, D>(
future: Fut,
deadline: azure_core::time::Duration,
on_deadline: D,
) -> std::result::Result<T, ()>
where
Fut: std::future::Future<Output = T> + Send + 'static,
T: Send + 'static,
D: FnOnce() -> T + Send + 'static,
{
let (tx, rx) = futures::channel::oneshot::channel();

// Dropping this handle detaches (does not abort) the task, so `future`
// continues even after the caller's future is dropped.
let _detached = azure_core::async_runtime::get_async_runtime().spawn(Box::pin(async move {
futures::pin_mut!(future);
let result = match futures::future::select(
future,
azure_core::async_runtime::get_async_runtime().sleep(deadline),
)
.await
{
futures::future::Either::Left((result, _)) => result,
futures::future::Either::Right(((), _)) => on_deadline(),
};
// The receiver is gone when the caller dropped its future; the work still
// ran to completion (the point of detaching), so dropping the result is fine.
let _ = tx.send(result);
}));

rx.await.map_err(|_canceled| ())
}

struct DriverRequestExecutor<'a> {
driver: &'a CosmosDriver,
options: &'a OperationOptions,
Expand Down Expand Up @@ -613,7 +689,36 @@ impl CosmosDriver {
&self,
account: &AccountReference,
) -> crate::error::Result<super::cache::AccountProperties> {
Self::refresh_account_properties(&self.runtime, account, &self.transport, None).await
// Decouple the metadata fetch + cross-region failover from caller
// cancellation (issue #4253). In Rust, cancellation is future-drop: if the
// caller's future is dropped while the first attempt against an unhealthy
// region is still in flight, the inline failover chain inside
// `refresh_account_properties` would be dropped with it, preempting the
// cross-region retry. Running the refresh on a detached task — whose handle
// detaches (rather than aborts) on drop — lets the failover run to completion
// even when the caller goes away; the caller observes cancellation only on the
// response path. The detached work is bounded by a hard deadline so it can
// never leak indefinitely.
let runtime = Arc::clone(&self.runtime);
let transport = Arc::clone(&self.transport);
let account = account.clone();

let deadline =
azure_core::time::Duration::seconds(ACCOUNT_METADATA_DETACHED_HARD_DEADLINE_SECS);
let refresh = async move {
Self::refresh_account_properties(&runtime, &account, &transport, None).await
};

match spawn_detached_bounded(refresh, deadline, || {
Err(account_metadata_detached_deadline_error(
ACCOUNT_METADATA_DETACHED_HARD_DEADLINE_SECS,
))
})
.await
{
Ok(result) => result,
Err(()) => Err(account_metadata_detached_task_failed()),
}
}

/// Fetches account properties using the current per-account transport.
Expand Down Expand Up @@ -2050,6 +2155,73 @@ mod tests {
options::ConnectionPoolOptions,
};

use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration as StdDuration;

/// Happy path: the detached future's value is delivered back to the caller.
#[tokio::test]
async fn spawn_detached_bounded_returns_value() {
let out = spawn_detached_bounded(
async { 42_i32 },
azure_core::time::Duration::seconds(5),
|| -1,
)
.await;
assert_eq!(out, Ok(42));
}

/// Core regression for #4253: dropping the caller's future MUST NOT cancel the
/// detached work. We start a future that records completion after a delay, drop
/// the caller (via a short timeout) while it is still running, and confirm the
/// work nonetheless ran to completion in the background.
#[tokio::test]
async fn spawn_detached_bounded_survives_caller_drop() {
let completed = Arc::new(AtomicBool::new(false));
let completed_in_task = completed.clone();

let work = async move {
// Still in flight when the caller's timeout below fires.
tokio::time::sleep(StdDuration::from_millis(200)).await;
completed_in_task.store(true, Ordering::SeqCst);
7_i32
};

// Caller gives up after 50ms, dropping its view of the detached work.
let caller = tokio::time::timeout(
StdDuration::from_millis(50),
spawn_detached_bounded(work, azure_core::time::Duration::seconds(5), || 0),
)
.await;
assert!(
caller.is_err(),
"the caller's timeout must fire and drop its future"
);

// The detached task keeps running past the caller's drop.
tokio::time::sleep(StdDuration::from_millis(400)).await;
assert!(
completed.load(Ordering::SeqCst),
"detached work must run to completion even though the caller was dropped"
);
}

/// The hard deadline bounds the detached work: a future that never completes
/// yields the `on_deadline` value instead of hanging forever.
#[tokio::test]
async fn spawn_detached_bounded_honors_deadline() {
let out = spawn_detached_bounded(
async {
// Never completes within the deadline below.
tokio::time::sleep(StdDuration::from_secs(30)).await;
1_i32
},
azure_core::time::Duration::milliseconds(50),
|| -1,
)
.await;
assert_eq!(out, Ok(-1), "the deadline value must be surfaced");
}

const ACCOUNT_PROPERTIES_PAYLOAD: &str = r#"{
"_self": "",
"id": "test",
Expand Down