Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure_data_cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features Added

- Added `FeedOptions::with_max_fan_out` (and the matching `QueryOptions::with_max_fan_out` shortcut) to cap the number of physical partitions a cross-partition query may target. Queries that would fan out beyond this limit now return an error instead of silently degrading; the default cap is 100. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615))
- Derived `SafeDebug` on `CosmosCredential`, `ItemResponse`, `ResourceResponse<T>`, and `BatchResponse`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512))
- Added standard derives (`Clone`, `Copy`, `PartialEq`, `Eq`, `Hash`, `Serialize`, `Deserialize`) to `ConsistencyLevel` and `RoutingStrategy`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512))
- `Query::with_text` now accepts `impl Into<String>`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512))
Expand Down
11 changes: 6 additions & 5 deletions sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -837,14 +837,15 @@ impl ContainerClient {
if let Some(hint) = options.feed.max_item_count {
initial_operation = initial_operation.with_max_item_count(hint);
}
let plan_options = azure_data_cosmos_driver::PlanOptions {
continuation: options.feed.continuation_token,
max_fan_out: options.feed.max_fan_out,
..Default::default()
};
Comment thread
Copilot marked this conversation as resolved.
let plan = self
.context
.driver
.plan_operation(
initial_operation,
&options.operation,
options.feed.continuation_token.as_ref(),
)
.plan_operation(initial_operation, &options.operation, Some(plan_options))
.await?;
Ok(QueryItemIterator::new(
self.context.driver.clone(),
Expand Down
38 changes: 38 additions & 0 deletions sdk/cosmos/azure_data_cosmos/src/options/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ pub struct FeedOptions {
///
/// See [`QueryPageIterator::to_continuation_token`](crate::feed::QueryPageIterator::to_continuation_token).
pub continuation_token: Option<ContinuationToken>,

/// Maximum number of physical partitions a cross-partition query may fan
/// out to.
///
/// When `None`, the SDK uses a built-in default of 100. Setting this to a
/// higher
/// value allows queries that span very large containers, though such
/// queries are typically expensive and should be avoided in
/// latency-sensitive paths.
///
/// If a query would target more physical partitions than this limit,
/// [`plan_operation`](azure_data_cosmos_driver::driver::CosmosDriver::plan_operation)
/// returns an error with status 400 / sub-status 20307.
pub max_fan_out: Option<usize>,
}

impl FeedOptions {
Expand All @@ -56,6 +70,16 @@ impl FeedOptions {
self.continuation_token = Some(continuation_token);
self
}

/// Sets the maximum number of physical partitions a cross-partition query
/// may fan out to.
///
/// Overrides the built-in default (100). Pass a larger value only when
/// you understand the performance implications of a wide fan-out query.
pub fn with_max_fan_out(mut self, max_fan_out: usize) -> Self {
self.max_fan_out = Some(max_fan_out);
self
}
}

/// Options for query operations.
Expand Down Expand Up @@ -145,4 +169,18 @@ impl QueryOptions {
self.feed = self.feed.with_continuation_token(continuation_token);
self
}

/// Sets the maximum number of physical partitions a cross-partition query
/// may fan out to.
///
/// Delegates to [`FeedOptions::with_max_fan_out`] on the inner
/// [`feed`](Self::feed). Pass a larger value only when you understand the
/// performance implications of a wide fan-out query.
///
/// The default is 100. Queries that target more physical partitions than
/// this limit fail with a 400 error.
pub fn with_max_fan_out(mut self, max_fan_out: usize) -> Self {
self.feed = self.feed.with_max_fan_out(max_fan_out);
self
}
}
2 changes: 2 additions & 0 deletions sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

### Features Added

- Added `SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (20307) and `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307). ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615))
- Added support for using a native query planning library to generate query plans locally, avoiding a Gateway round-trip on cross-partition queries. Gated behind the `__internal_native_query_plan` feature flag. ([#4554](https://github.com/Azure/azure-sdk-for-rust/pull/4554))
- Restructured the client / runtime options layering on the driver. Two new nested option groups, a per-client overrides surface on `DriverOptionsBuilder`, and a single canonical `AZURE_COSMOS_PPCB_*` namespace for partition-failover environment variables. The driver now consumes partition-failover configuration once at construction (`CosmosDriver::new` no longer fabricates an `OperationOptionsView` outside any operation context) ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)):
- Added new nested `OperationOptions::throughput_control` group (`ThroughputControlOptions` / `…Builder` / `…View`, mirroring the `ThrottlingRetryOptions` pattern). Exposes three layered fields ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)):
Expand All @@ -18,6 +19,7 @@

### Breaking Changes

- `CosmosDriver::plan_operation` signature change: the final parameter changed from `Option<&ContinuationToken>` to `Option<PlanOptions>`. Direct driver consumers must wrap continuation tokens in `PlanOptions { continuation: Some(token), ..Default::default() }`, or pass `None` for queries starting from the beginning. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615))
- Cross-partition query continuation tokens minted by `0.4.0` cannot be resumed against `0.5.0`. The on-wire token shape was reshaped to record per-range sibling state so that pausing a fan-out query mid-flight preserves information about siblings that hadn't been touched yet. Callers holding a `0.4.0`-minted token will receive a continuation-token error on resume and must re-issue the query. ([#4550](https://github.com/Azure/azure-sdk-for-rust/pull/4550))
- `azure_data_cosmos_driver::models::ETag` has been removed. Use `azure_core::http::Etag` directly. The previous `ETag::new(...)` is gone; construct via `Etag::from(&str)` / `Etag::from(String)`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512))
- Migration impact of the client / runtime options restructure. Per-runtime concerns (transport, cert validation, proxy, UA defaults) are configured once on the `CosmosDriverRuntime`; per-client concerns (operation defaults, FI rules, throughput-control groups, partition-failover tuning) move onto `DriverOptions` ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)):
Expand Down
38 changes: 23 additions & 15 deletions sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ use crate::{
},
models::{
effective_partition_key::EffectivePartitionKey, AccountEndpoint, AccountReference,
ContainerProperties, ContainerReference, ContinuationToken, CosmosOperation,
DatabaseReference, PartitionKey, ResolvedToken, ResourceType, UserAgent,
ContainerProperties, ContainerReference, CosmosOperation, DatabaseReference, PartitionKey,
ResolvedToken, ResourceType, UserAgent,
},
options::{
ConnectionPoolOptions, DriverOptions, OperationOptions, OperationOptionsView,
Expand All @@ -46,7 +46,7 @@ use super::{
cosmos_headers, cosmos_transport_client::HttpRequest, request_signing,
AuthorizationContext, CosmosTransport,
},
CosmosDriverRuntime,
CosmosDriverRuntime, PlanOptions,
};

struct DriverRequestExecutor<'a> {
Expand Down Expand Up @@ -2051,20 +2051,23 @@ impl CosmosDriver {
/// singleton pipeline immediately. For cross-partition queries, fetches a
/// query plan from the backend and builds a fan-out pipeline.
///
/// `continuation` optionally provides resume state from a prior call. Two
/// kinds of tokens are accepted:
/// `plan_options` groups planning-specific settings. Pass `None` to use
/// all defaults. See [`PlanOptions`] for available fields:
///
/// - SDK-issued tokens (`c1.…`) carry a serialized snapshot of the
/// previous pipeline's state and can resume any operation.
/// - Opaque server-issued tokens (no `c<N>.` prefix) are accepted only
/// for trivial operations; passing one to a cross-partition query
/// returns a `Client`-shaped error.
/// - [`continuation`](PlanOptions::continuation) optionally provides resume
/// state from a prior call. SDK-issued tokens (`c1.…`) can resume any
/// operation; opaque server-issued tokens are only accepted for trivial
/// (single-partition) operations.
/// - [`max_fan_out`](PlanOptions::max_fan_out) caps the number of physical
/// partitions a cross-partition query may target (default 100).
pub async fn plan_operation(
&self,
operation: CosmosOperation,
options: &OperationOptions,
continuation: Option<&ContinuationToken>,
plan_options: Option<PlanOptions>,
) -> crate::error::Result<OperationPlan> {
let plan_options = plan_options.unwrap_or_default();

if !self.initialized.load(Ordering::Acquire) {
let endpoint = AccountEndpoint::from(self.options.account());
return Err(crate::error::CosmosError::builder()
Expand All @@ -2085,7 +2088,7 @@ impl CosmosDriver {

// Resolve the continuation token (if any) into a planner-ready resume
// state. Server-issued tokens are only valid for trivial operations.
let resume_state = match continuation {
let resume_state = match plan_options.continuation.as_ref() {
None => None,
Some(token) => {
match token.resolve()? {
Expand Down Expand Up @@ -2146,9 +2149,14 @@ impl CosmosDriver {
|container, continuation| self.fetch_pk_ranges_from_service(container, continuation),
);

let pipeline =
planner::build_sequential_drain(&query_plan, &mut topology, &operation, resume_state)
.await?;
let pipeline = planner::build_sequential_drain(
&query_plan,
&mut topology,
&operation,
resume_state,
plan_options.max_fan_out,
)
.await?;
Ok(OperationPlan::new(pipeline, operation))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async fn single_partition_resume_roundtrips_cleanly() {
let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-0")])]);
let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1", Some("ct-1")))]);

let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None)
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None)
.await
.unwrap();
let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await;
Expand All @@ -207,9 +207,10 @@ async fn single_partition_resume_roundtrips_cleanly() {
let mut topology2 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-0")])]);
let mut executor2 = MockRequestExecutor::new(vec![Ok(page_response(b"page-2", None))]);

let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state))
.await
.unwrap();
let mut pipeline2 =
build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None)
.await
.unwrap();
let pages2 = drain_all(&mut pipeline2, &mut executor2).await;
assert_eq!(pages2, vec![b"page-2".to_vec()]);
assert_eq!(
Expand Down Expand Up @@ -237,7 +238,7 @@ async fn resume_after_split_forwards_continuation_to_every_surviving_leaf() {
Some("ct-pre-split"),
))]);

let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None)
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None)
.await
.unwrap();
let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await;
Expand All @@ -258,9 +259,10 @@ async fn resume_after_split_forwards_continuation_to_every_surviving_leaf() {
Ok(page_response(b"page-right", None)),
]);

let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state))
.await
.unwrap();
let mut pipeline2 =
build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None)
.await
.unwrap();
let pages2 = drain_all(&mut pipeline2, &mut executor2).await;

// Every page exactly once, in EPK order.
Expand Down Expand Up @@ -303,7 +305,7 @@ async fn resume_mid_fanout_preserves_every_sibling_state() {
let mut executor1 =
MockRequestExecutor::new(vec![Ok(page_response(b"left-page-1", Some("ct-left")))]);

let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None)
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None)
.await
.unwrap();
let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await;
Expand Down Expand Up @@ -349,9 +351,10 @@ async fn resume_mid_fanout_preserves_every_sibling_state() {
Ok(page_response(b"right-page-1", None)),
]);

let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state))
.await
.unwrap();
let mut pipeline2 =
build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None)
.await
.unwrap();
let pages2 = drain_all(&mut pipeline2, &mut executor2).await;
assert_eq!(
pages2,
Expand Down Expand Up @@ -397,7 +400,7 @@ async fn resume_mid_fanout_then_split_preserves_state_and_fans_out_continuation(
let mut executor1 =
MockRequestExecutor::new(vec![Ok(page_response(b"left-page-1", Some("ct-left")))]);

let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None)
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None)
.await
.unwrap();
let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await;
Expand All @@ -420,9 +423,10 @@ async fn resume_mid_fanout_then_split_preserves_state_and_fans_out_continuation(
Ok(page_response(b"right-page-1", None)),
]);

let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state))
.await
.unwrap();
let mut pipeline2 =
build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None)
.await
.unwrap();
let pages2 = drain_all(&mut pipeline2, &mut executor2).await;

assert_eq!(
Expand Down Expand Up @@ -489,7 +493,7 @@ async fn resume_does_not_requery_already_drained_sibling_scope() {
])]);
let mut executor = MockRequestExecutor::new(vec![Ok(page_response(b"right-page-1", None))]);

let mut pipeline = build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state))
let mut pipeline = build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state), None)
.await
.unwrap();
let pages = drain_all(&mut pipeline, &mut executor).await;
Expand Down Expand Up @@ -529,7 +533,7 @@ async fn resume_fails_loudly_when_saved_range_cannot_be_covered() {
])]);

let err: Result<Pipeline> =
build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state)).await;
build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state), None).await;
let err = err.expect_err("expected unhonored-saved-range error");
let rendered = err.to_string();
assert!(
Expand Down Expand Up @@ -572,7 +576,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() {
let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-pre")])]);
let mut executor1 =
MockRequestExecutor::new(vec![Ok(page_response(b"page-1-pre", Some("T1")))]);
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None)
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None)
.await
.unwrap();
let pages_s1 = drain_pages(&mut pipeline1, &mut executor1, 1).await;
Expand Down Expand Up @@ -615,7 +619,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() {
b"page-1-postsplit-left",
Some("T2_a"),
))]);
let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2))
let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2), None)
.await
.unwrap();
let pages_s2 = drain_pages(&mut pipeline2, &mut executor2, 1).await;
Expand Down Expand Up @@ -677,7 +681,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() {
Ok(page_response(b"page-2-postsplit-left", None)),
Ok(page_response(b"page-1-postsplit-right", None)),
]);
let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3))
let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3), None)
.await
.unwrap();
let pages_s3 = drain_all(&mut pipeline3, &mut executor3).await;
Expand Down Expand Up @@ -730,7 +734,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() {
let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-pre")])]);
let mut executor1 =
MockRequestExecutor::new(vec![Ok(page_response(b"page-1-pre", Some("T1")))]);
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None)
let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None)
.await
.unwrap();
let pages_s1 = drain_pages(&mut pipeline1, &mut executor1, 1).await;
Expand All @@ -749,7 +753,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() {
])]);
let mut executor2 =
MockRequestExecutor::new(vec![Ok(page_response(b"page-1-postsplit-left", None))]);
let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2))
let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2), None)
.await
.unwrap();
let pages_s2 = drain_pages(&mut pipeline2, &mut executor2, 1).await;
Expand Down Expand Up @@ -787,7 +791,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() {
Ok(page_response(b"page-1-back-left", None)),
Ok(page_response(b"page-1-back-right", None)),
]);
let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3))
let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3), None)
.await
.unwrap();
let pages_s3 = drain_all(&mut pipeline3, &mut executor3).await;
Expand Down
Loading
Loading