From dbb163a7356dafe1fca102ee455840ff84468ac3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 17 Jun 2026 17:08:08 +0000 Subject: [PATCH 1/7] Initial plan From 34722b560512cef56f37a4ec4bfa3924cf312730 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 17 Jun 2026 17:23:42 +0000 Subject: [PATCH 2/7] feat(cosmos): enforce max fan-out limit in query pipeline Adds a configurable maximum fan-out limit for cross-partition queries. When a query would fan out to more physical partitions than the limit, plan_operation returns a CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED error (HTTP 400 / sub-status 20307). The default cap is 100 and can be raised via FeedOptions::with_max_fan_out / QueryOptions::with_max_fan_out. Fixes #4453 Co-authored-by: analogrelay <7574+analogrelay@users.noreply.github.com> --- sdk/cosmos/azure_data_cosmos/CHANGELOG.md | 1 + .../src/clients/container_client.rs | 1 + .../src/clients/cosmos_client.rs | 2 +- .../src/clients/database_client.rs | 2 +- .../azure_data_cosmos/src/options/feed.rs | 38 +++ .../azure_data_cosmos_driver/CHANGELOG.md | 2 + .../src/driver/cosmos_driver.rs | 20 +- .../integration_tests/query_resume.rs | 52 ++-- .../src/driver/dataflow/planner.rs | 275 ++++++++++++++---- .../src/error/cosmos_status.rs | 20 ++ 10 files changed, 329 insertions(+), 84 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md index f2bfe687018..2dccf908e18 100644 --- a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features Added +- Added `FeedOptions::with_max_fan_out` (and the matching `QueryOptions::with_max_fan_out` shortcut) to cap the number of physical partitions a cross-partition query may target. Queries that would fan out beyond this limit now return an error instead of silently degrading; the default cap is 100. ([#4453](https://github.com/Azure/azure-sdk-for-rust/issues/4453)) - Derived `SafeDebug` on `CosmosCredential`, `ItemResponse`, `ResourceResponse`, and `BatchResponse`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - Added standard derives (`Clone`, `Copy`, `PartialEq`, `Eq`, `Hash`, `Serialize`, `Deserialize`) to `ConsistencyLevel` and `RoutingStrategy`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - `Query::with_text` now accepts `impl Into`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs index 9a9476d219a..d18f46c9763 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs @@ -844,6 +844,7 @@ impl ContainerClient { initial_operation, &options.operation, options.feed.continuation_token.as_ref(), + options.feed.max_fan_out, ) .await?; Ok(QueryItemIterator::new( diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs index db25d62ae1a..118d3d7b1d1 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs @@ -146,7 +146,7 @@ impl CosmosClient { let plan = self .context .driver - .plan_operation(initial_operation, &operation_options, None) + .plan_operation(initial_operation, &operation_options, None, None) .await?; Ok(QueryItemIterator::new( diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs index 42dea72dd2b..76243f61903 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs @@ -131,7 +131,7 @@ impl DatabaseClient { let plan = self .context .driver - .plan_operation(initial_operation, &operation_options, None) + .plan_operation(initial_operation, &operation_options, None, None) .await?; Ok(QueryItemIterator::new( diff --git a/sdk/cosmos/azure_data_cosmos/src/options/feed.rs b/sdk/cosmos/azure_data_cosmos/src/options/feed.rs index f8b05fb25a2..0f981698e7a 100644 --- a/sdk/cosmos/azure_data_cosmos/src/options/feed.rs +++ b/sdk/cosmos/azure_data_cosmos/src/options/feed.rs @@ -39,6 +39,20 @@ pub struct FeedOptions { /// /// See [`QueryPageIterator::to_continuation_token`](crate::feed::QueryPageIterator::to_continuation_token). pub continuation_token: Option, + + /// Maximum number of physical partitions a cross-partition query may fan + /// out to. + /// + /// When `None`, the SDK uses its built-in default + /// (`QueryOptions::DEFAULT_MAX_FAN_OUT` = 100). Setting this to a higher + /// value allows queries that span very large containers, though such + /// queries are typically expensive and should be avoided in + /// latency-sensitive paths. + /// + /// If a query would target more physical partitions than this limit, + /// [`plan_operation`](azure_data_cosmos_driver::driver::CosmosDriver::plan_operation) + /// returns an error with status 400 / sub-status 20307. + pub max_fan_out: Option, } impl FeedOptions { @@ -56,6 +70,16 @@ impl FeedOptions { self.continuation_token = Some(continuation_token); self } + + /// Sets the maximum number of physical partitions a cross-partition query + /// may fan out to. + /// + /// Overrides the built-in default (100). Pass a larger value only when + /// you understand the performance implications of a wide fan-out query. + pub fn with_max_fan_out(mut self, max_fan_out: usize) -> Self { + self.max_fan_out = Some(max_fan_out); + self + } } /// Options for query operations. @@ -145,4 +169,18 @@ impl QueryOptions { self.feed = self.feed.with_continuation_token(continuation_token); self } + + /// Sets the maximum number of physical partitions a cross-partition query + /// may fan out to. + /// + /// Delegates to [`FeedOptions::with_max_fan_out`] on the inner + /// [`feed`](Self::feed). Pass a larger value only when you understand the + /// performance implications of a wide fan-out query. + /// + /// The default is 100. Queries that target more physical partitions than + /// this limit fail with a 400 error. + pub fn with_max_fan_out(mut self, max_fan_out: usize) -> Self { + self.feed = self.feed.with_max_fan_out(max_fan_out); + self + } } diff --git a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md index 3809f06e58b..38837d420c6 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Added `max_fan_out: Option` parameter to `CosmosDriver::plan_operation` and `planner::build_sequential_drain` to enforce a cap on cross-partition query fan-out. When the number of physical partitions targeted by a query exceeds the limit (default 100 when `None` is passed), `plan_operation` returns a `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307) error. ([#4453](https://github.com/Azure/azure-sdk-for-rust/issues/4453)) +- Added `SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (20307) and `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307). ([#4453](https://github.com/Azure/azure-sdk-for-rust/issues/4453)) - Added support for using a native query planning library to generate query plans locally, avoiding a Gateway round-trip on cross-partition queries. Gated behind the `__internal_native_query_plan` feature flag. ([#4554](https://github.com/Azure/azure-sdk-for-rust/pull/4554)) - Restructured the client / runtime options layering on the driver. Two new nested option groups, a per-client overrides surface on `DriverOptionsBuilder`, and a single canonical `AZURE_COSMOS_PPCB_*` namespace for partition-failover environment variables. The driver now consumes partition-failover configuration once at construction (`CosmosDriver::new` no longer fabricates an `OperationOptionsView` outside any operation context) ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): - Added new nested `OperationOptions::throughput_control` group (`ThroughputControlOptions` / `…Builder` / `…View`, mirroring the `ThrottlingRetryOptions` pattern). Exposes three layered fields ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs index 79451504a43..407a03bddc4 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs @@ -1759,7 +1759,7 @@ impl CosmosDriver { // We need to do some refactoring here to shrink the future size and avoid this heap allocation if possible. Box::pin(async { let container = operation.container().cloned(); - let mut plan = self.plan_operation(operation, &options, None).await?; + let mut plan = self.plan_operation(operation, &options, None, None).await?; self.execute_plan(&mut plan, container, options).await }) .await @@ -2059,11 +2059,16 @@ impl CosmosDriver { /// - Opaque server-issued tokens (no `c.` prefix) are accepted only /// for trivial operations; passing one to a cross-partition query /// returns a `Client`-shaped error. + /// + /// `max_fan_out` caps the number of physical partitions a cross-partition + /// query may target. Defaults to + /// [`DEFAULT_MAX_FAN_OUT`](planner::DEFAULT_MAX_FAN_OUT) when `None`. pub async fn plan_operation( &self, operation: CosmosOperation, options: &OperationOptions, continuation: Option<&ContinuationToken>, + max_fan_out: Option, ) -> crate::error::Result { if !self.initialized.load(Ordering::Acquire) { let endpoint = AccountEndpoint::from(self.options.account()); @@ -2146,9 +2151,14 @@ impl CosmosDriver { |container, continuation| self.fetch_pk_ranges_from_service(container, continuation), ); - let pipeline = - planner::build_sequential_drain(&query_plan, &mut topology, &operation, resume_state) - .await?; + let pipeline = planner::build_sequential_drain( + &query_plan, + &mut topology, + &operation, + resume_state, + max_fan_out, + ) + .await?; Ok(OperationPlan::new(pipeline, operation)) } @@ -3141,7 +3151,7 @@ mod tests { assert_send(driver.execute_operation(todo!(), todo!())); assert_send(driver.execute_singleton_operation(todo!(), todo!())); assert_send(driver.execute_plan(todo!(), todo!(), todo!())); - assert_send(driver.plan_operation(todo!(), todo!(), todo!())); + assert_send(driver.plan_operation(todo!(), todo!(), todo!(), todo!())); } // Account properties with two readable locations for regional fallback tests. diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs index 2b6214c33da..7ea808f8b9f 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs @@ -193,7 +193,7 @@ async fn single_partition_resume_roundtrips_cleanly() { let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-0")])]); let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1", Some("ct-1")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -207,9 +207,10 @@ async fn single_partition_resume_roundtrips_cleanly() { let mut topology2 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-0")])]); let mut executor2 = MockRequestExecutor::new(vec![Ok(page_response(b"page-2", None))]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; assert_eq!(pages2, vec![b"page-2".to_vec()]); assert_eq!( @@ -237,7 +238,7 @@ async fn resume_after_split_forwards_continuation_to_every_surviving_leaf() { Some("ct-pre-split"), ))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -258,9 +259,10 @@ async fn resume_after_split_forwards_continuation_to_every_surviving_leaf() { Ok(page_response(b"page-right", None)), ]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; // Every page exactly once, in EPK order. @@ -303,7 +305,7 @@ async fn resume_mid_fanout_preserves_every_sibling_state() { let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"left-page-1", Some("ct-left")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -349,9 +351,10 @@ async fn resume_mid_fanout_preserves_every_sibling_state() { Ok(page_response(b"right-page-1", None)), ]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; assert_eq!( pages2, @@ -397,7 +400,7 @@ async fn resume_mid_fanout_then_split_preserves_state_and_fans_out_continuation( let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"left-page-1", Some("ct-left")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -420,9 +423,10 @@ async fn resume_mid_fanout_then_split_preserves_state_and_fans_out_continuation( Ok(page_response(b"right-page-1", None)), ]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; assert_eq!( @@ -489,7 +493,7 @@ async fn resume_does_not_requery_already_drained_sibling_scope() { ])]); let mut executor = MockRequestExecutor::new(vec![Ok(page_response(b"right-page-1", None))]); - let mut pipeline = build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state)) + let mut pipeline = build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state), None) .await .unwrap(); let pages = drain_all(&mut pipeline, &mut executor).await; @@ -529,7 +533,7 @@ async fn resume_fails_loudly_when_saved_range_cannot_be_covered() { ])]); let err: Result = - build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state)).await; + build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state), None).await; let err = err.expect_err("expected unhonored-saved-range error"); let rendered = err.to_string(); assert!( @@ -572,7 +576,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() { let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-pre")])]); let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1-pre", Some("T1")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages_s1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -615,7 +619,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() { b"page-1-postsplit-left", Some("T2_a"), ))]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2)) + let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2), None) .await .unwrap(); let pages_s2 = drain_pages(&mut pipeline2, &mut executor2, 1).await; @@ -677,7 +681,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() { Ok(page_response(b"page-2-postsplit-left", None)), Ok(page_response(b"page-1-postsplit-right", None)), ]); - let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3)) + let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3), None) .await .unwrap(); let pages_s3 = drain_all(&mut pipeline3, &mut executor3).await; @@ -730,7 +734,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() { let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-pre")])]); let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1-pre", Some("T1")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages_s1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -749,7 +753,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() { ])]); let mut executor2 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1-postsplit-left", None))]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2)) + let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2), None) .await .unwrap(); let pages_s2 = drain_pages(&mut pipeline2, &mut executor2, 1).await; @@ -787,7 +791,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() { Ok(page_response(b"page-1-back-left", None)), Ok(page_response(b"page-1-back-right", None)), ]); - let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3)) + let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3), None) .await .unwrap(); let pages_s3 = drain_all(&mut pipeline3, &mut executor3).await; diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs index 7cb73a8a82f..9d2ced195b9 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs @@ -24,6 +24,15 @@ use super::{ Request, RequestTarget, SequentialDrain, TopologyProvider, }; +/// Default maximum number of physical partitions a cross-partition query may +/// fan out to. Exceeding this limit returns an error to prevent inadvertent +/// full-container scans on very large clusters. +/// +/// Users who truly need more can increase the limit via +/// [`FeedOptions::max_fan_out`](crate::options::FeedOptions::max_fan_out) / +/// `QueryOptions::with_max_fan_out`. +pub(crate) const DEFAULT_MAX_FAN_OUT: usize = 100; + /// Builds a single-node [`Pipeline`] for a trivial operation. /// /// Trivial operations are those that can be satisfied by a single request to @@ -147,6 +156,7 @@ pub(crate) async fn build_sequential_drain( topology_provider: &mut dyn TopologyProvider, operation: &Arc, resume: Option, + max_fan_out: Option, ) -> crate::error::Result { validate_query_plan(query_plan)?; @@ -179,7 +189,20 @@ pub(crate) async fn build_sequential_drain( plan_fresh(query_plan, topology_provider, operation).await? }; - // TODO: enforce max fan-out (default 100, configurable). See FEED_OPERATIONS_REQS.md §3. + let effective_max_fan_out = max_fan_out.unwrap_or(DEFAULT_MAX_FAN_OUT); + if request_nodes.len() > effective_max_fan_out { + return Err(crate::error::CosmosError::builder() + .with_status(crate::error::CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED) + .with_message(format!( + "cross-partition query would fan out to {} physical partitions, \ + which exceeds the maximum of {}; use \ + QueryOptions::with_max_fan_out() to raise the limit if this \ + level of fan-out is intentional", + request_nodes.len(), + effective_max_fan_out, + )) + .build()); + } if request_nodes.is_empty() { // Resumed past every range that still has work: the pipeline is @@ -845,7 +868,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = MockTopologyProvider::new(vec![Ok(vec![rr("", "FF", "pkrange-0")])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests(pipeline, &[("", "FF", "pkrange-0")]); @@ -861,7 +884,7 @@ mod tests { rr("80", "FF", "pkrange-right"), ])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -880,7 +903,7 @@ mod tests { Ok(vec![rr("80", "FF", "pkrange-C")]), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -900,7 +923,7 @@ mod tests { rr("80", "C0", "pkrange-3"), ])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -932,7 +955,7 @@ mod tests { ]), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -953,7 +976,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = MockTopologyProvider::new(vec![Ok(vec![rr("", "FF", "pkrange-wide")])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests_with_partitions(pipeline, &[("20", "80", "pkrange-wide", "", "FF")]); @@ -971,7 +994,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -993,7 +1016,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1017,7 +1040,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1039,7 +1062,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1061,7 +1084,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1087,7 +1110,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1103,7 +1126,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = MockTopologyProvider::new(vec![Ok(vec![rr("", "FF", "pkrange-0")])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests(pipeline, &[("", "FF", "pkrange-0")]); @@ -1115,7 +1138,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1137,7 +1160,7 @@ mod tests { .with_message("topology resolution failed") .build())]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1214,6 +1237,7 @@ mod tests { &mut topology, &Arc::new(op), Some(PipelineNodeState::Drained), + None, ) .await .unwrap(); @@ -1242,9 +1266,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests(pipeline, &[("55", "AA", "pk-b"), ("AA", "FF", "pk-c")]); } @@ -1269,9 +1294,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1301,9 +1327,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1330,9 +1357,10 @@ mod tests { ("C0", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1358,9 +1386,10 @@ mod tests { active_tokens: vec![], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert!(matches!( pipeline.snapshot_state().unwrap(), PipelineNodeState::Drained @@ -1382,9 +1411,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, @@ -1408,7 +1438,7 @@ mod tests { ("00", "55", saved_request(Some("tok-b"))), ]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .unwrap_err(); assert_eq!( @@ -1431,7 +1461,7 @@ mod tests { ("55", "FF", saved_request(Some("tok-b"))), ]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .unwrap_err(); assert_eq!( @@ -1452,7 +1482,7 @@ mod tests { let resume = saved_drain(vec![("55", "AA", saved_request(Some("server-token-xyz")))]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .unwrap_err(); assert_eq!( @@ -1491,9 +1521,10 @@ mod tests { }], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1521,9 +1552,10 @@ mod tests { ("30", "60", saved_request(Some("tok-b"))), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1566,9 +1598,10 @@ mod tests { }], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1594,7 +1627,7 @@ mod tests { }; let result = - build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(legacy)).await; + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(legacy), None).await; let err = result.expect_err("bare top-level Request shape must be rejected on resume"); assert_eq!( err.status(), @@ -1622,7 +1655,7 @@ mod tests { }], }; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .expect_err("zero-width active_tokens entry must be rejected"); assert_eq!( @@ -1657,7 +1690,7 @@ mod tests { }], }; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .expect_err("malformed min>max entry must be rejected by the validator"); assert_eq!( @@ -1694,7 +1727,7 @@ mod tests { ], }; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .expect_err("appended malformed min>max entry must still be rejected"); assert_eq!( @@ -1733,9 +1766,10 @@ mod tests { }], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .expect("front-sibling cascading split must plan cleanly"); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .expect("front-sibling cascading split must plan cleanly"); // Walk the planned children via snapshot: the two front grand- // children must each carry T1; the back range must be a @@ -1772,4 +1806,139 @@ mod tests { ); } } + + // --- Fan-out limit enforcement tests --- + + #[tokio::test] + async fn rejects_when_fan_out_exceeds_default_limit() { + // Create a topology with DEFAULT_MAX_FAN_OUT + 1 partitions to exceed the default. + let n = DEFAULT_MAX_FAN_OUT + 1; + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + // Build n evenly-spaced partition ranges across ["", "FF"). + // We use hex suffixes: each partition gets a 2-hex-digit boundary. + // For simplicity, generate n ranges where each boundary is i * (0xFF / n) as a hex string. + let boundaries: Vec = (0..=n) + .map(|i| { + if i == 0 { + String::new() + } else if i == n { + "FF".to_string() + } else { + format!("{:02X}", (i * 0xFF / n).clamp(1, 0xFE)) + } + }) + .collect(); + // Deduplicate adjacent boundaries (could happen with small n values). + let mut ranges: Vec = Vec::new(); + for i in 0..boundaries.len() - 1 { + if boundaries[i] != boundaries[i + 1] { + ranges.push(rr( + &boundaries[i], + &boundaries[i + 1], + &format!("pkrange-{i}"), + )); + } + } + + let mut topology = MockTopologyProvider::new(vec![Ok(ranges)]); + + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) + .await + .unwrap_err(); + assert_eq!( + err.status(), + crate::error::CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED, + "expected FAN_OUT_LIMIT_EXCEEDED status; got {err:?}", + ); + let msg = err.to_string(); + assert!( + msg.contains("QueryOptions::with_max_fan_out"), + "error should mention QueryOptions::with_max_fan_out; got: {msg}", + ); + } + + #[tokio::test] + async fn accepts_query_at_exactly_the_default_limit() { + // A topology with exactly DEFAULT_MAX_FAN_OUT partitions should succeed. + let n = DEFAULT_MAX_FAN_OUT; + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + let ranges: Vec = (0..n) + .map(|i| { + let min = if i == 0 { + String::new() + } else { + format!("{:02X}", (i * 0xFF / n).clamp(1, 0xFE)) + }; + let max = if i == n - 1 { + "FF".to_string() + } else { + format!("{:02X}", ((i + 1) * 0xFF / n).clamp(1, 0xFE)) + }; + rr(&min, &max, &format!("pkrange-{i}")) + }) + .collect(); + let count = ranges.len(); + let mut topology = MockTopologyProvider::new(vec![Ok(ranges)]); + + let result = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None).await; + assert!( + result.is_ok(), + "exactly {} partitions should not exceed the default limit of {}; got: {:?}", + count, + DEFAULT_MAX_FAN_OUT, + result.unwrap_err(), + ); + } + + #[tokio::test] + async fn custom_max_fan_out_is_honored() { + // With a custom limit of 2, 3 partitions should fail. + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + let mut topology = MockTopologyProvider::new(vec![Ok(vec![ + rr("", "40", "pkrange-0"), + rr("40", "80", "pkrange-1"), + rr("80", "FF", "pkrange-2"), + ])]); + + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, Some(2)) + .await + .unwrap_err(); + assert_eq!( + err.status(), + crate::error::CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED, + "expected FAN_OUT_LIMIT_EXCEEDED with custom limit 2; got {err:?}", + ); + let msg = err.to_string(); + assert!( + msg.contains("3") && msg.contains("2"), + "error should mention actual count (3) and limit (2); got: {msg}", + ); + } + + #[tokio::test] + async fn custom_max_fan_out_allows_more_partitions() { + // With a custom limit of 5, 3 partitions should succeed. + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + let mut topology = MockTopologyProvider::new(vec![Ok(vec![ + rr("", "40", "pkrange-0"), + rr("40", "80", "pkrange-1"), + rr("80", "FF", "pkrange-2"), + ])]); + + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, Some(5)) + .await + .unwrap(); + assert_drain_requests( + pipeline, + &[ + ("", "40", "pkrange-0"), + ("40", "80", "pkrange-1"), + ("80", "FF", "pkrange-2"), + ], + ); + } } diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs b/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs index 7528e8f3cae..b6485e588bb 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs @@ -516,6 +516,7 @@ impl SubStatusCode { 20303 => Some("ServiceReturnedOfferWithoutId"), 20304 => Some("ClientThroughputPollerIncomplete"), 20305 => Some("ClientTopologyResolutionFailed"), + 20307 => Some("ClientQueryFanOutLimitExceeded"), // SDK Server-side codes (21xxx) - consistent across .NET and Java 21001 => Some("NameCacheIsStaleExceededRetryLimit"), @@ -1424,6 +1425,14 @@ impl SubStatusCode { /// has no routing information for the operation. Paired with HTTP /// 503 — an internal client-side condition, not a transport failure. pub const CLIENT_TOPOLOGY_RESOLUTION_FAILED: SubStatusCode = SubStatusCode(20305); + + /// A cross-partition query would fan out to more physical partitions + /// than the configured maximum (20307). Paired with HTTP 400 because + /// this is a client-side policy violation — the caller can raise the + /// limit via `FeedOptions::max_fan_out` / + /// `QueryOptions::with_max_fan_out` if they truly need this level of + /// fan-out. + pub const CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED: SubStatusCode = SubStatusCode(20307); } impl Default for SubStatusCode { @@ -2216,6 +2225,17 @@ impl CosmosStatus { status_code: StatusCode::InternalServerError, sub_status: Some(SubStatusCode::SERVICE_RETURNED_OBJECT_WITHOUT_RID), }; + + /// 400 / 20307 — cross-partition query fan-out limit exceeded. + /// + /// The query would require contacting more physical partitions than + /// the configured maximum. Raise the limit via + /// `FeedOptions::max_fan_out` / `QueryOptions::with_max_fan_out` if + /// this level of fan-out is intentional. + pub const CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED: CosmosStatus = CosmosStatus { + status_code: StatusCode::BadRequest, + sub_status: Some(SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED), + }; } impl fmt::Debug for CosmosStatus { From 63743ec227ed2f9657a98eda053118c08d762fd2 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 17 Jun 2026 17:25:14 +0000 Subject: [PATCH 3/7] fix: address code review feedback - Rename effective_max_fan_out to fan_out_limit in planner.rs - Fix incorrect doc comment reference in feed.rs Co-authored-by: analogrelay <7574+analogrelay@users.noreply.github.com> --- sdk/cosmos/azure_data_cosmos/src/options/feed.rs | 4 ++-- .../azure_data_cosmos_driver/src/driver/dataflow/planner.rs | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos/src/options/feed.rs b/sdk/cosmos/azure_data_cosmos/src/options/feed.rs index 0f981698e7a..e341ee61ec1 100644 --- a/sdk/cosmos/azure_data_cosmos/src/options/feed.rs +++ b/sdk/cosmos/azure_data_cosmos/src/options/feed.rs @@ -43,8 +43,8 @@ pub struct FeedOptions { /// Maximum number of physical partitions a cross-partition query may fan /// out to. /// - /// When `None`, the SDK uses its built-in default - /// (`QueryOptions::DEFAULT_MAX_FAN_OUT` = 100). Setting this to a higher + /// When `None`, the SDK uses a built-in default of 100. Setting this to a + /// higher /// value allows queries that span very large containers, though such /// queries are typically expensive and should be avoided in /// latency-sensitive paths. diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs index 9d2ced195b9..0a050cf89f0 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs @@ -189,8 +189,8 @@ pub(crate) async fn build_sequential_drain( plan_fresh(query_plan, topology_provider, operation).await? }; - let effective_max_fan_out = max_fan_out.unwrap_or(DEFAULT_MAX_FAN_OUT); - if request_nodes.len() > effective_max_fan_out { + let fan_out_limit = max_fan_out.unwrap_or(DEFAULT_MAX_FAN_OUT); + if request_nodes.len() > fan_out_limit { return Err(crate::error::CosmosError::builder() .with_status(crate::error::CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED) .with_message(format!( @@ -199,7 +199,7 @@ pub(crate) async fn build_sequential_drain( QueryOptions::with_max_fan_out() to raise the limit if this \ level of fan-out is intentional", request_nodes.len(), - effective_max_fan_out, + fan_out_limit, )) .build()); } From e94091f4a861130e27760b4de3fb10707bfa7ff4 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 17 Jun 2026 20:52:25 +0000 Subject: [PATCH 4/7] refactor(cosmos): introduce PlanOptions for plan_operation Group continuation token and max_fan_out into a PlanOptions struct so plan_operation can accept Option as its final parameter. This avoids signature churn when adding new planning options in future. - Add PlanOptions struct (driver/plan_options.rs), exported from the driver crate - Change CosmosDriver::plan_operation to accept Option with unwrap_or_default() at the start - Update all SDK callers (container_client, cosmos_client, database_client) to construct PlanOptions from FeedOptions - Fix private intra-doc link on plan_operation (was referencing the private planner::DEFAULT_MAX_FAN_OUT constant, breaking cargo doc) - Update CHANGELOG entries to reference PR #4615 (not issue #4453) Co-authored-by: analogrelay <7574+analogrelay@users.noreply.github.com> --- sdk/cosmos/azure_data_cosmos/CHANGELOG.md | 2 +- .../src/clients/container_client.rs | 11 +++--- .../src/clients/cosmos_client.rs | 2 +- .../src/clients/database_client.rs | 2 +- .../azure_data_cosmos_driver/CHANGELOG.md | 4 +- .../src/driver/cosmos_driver.rs | 38 +++++++++---------- .../src/driver/mod.rs | 2 + .../src/driver/plan_options.rs | 28 ++++++++++++++ .../azure_data_cosmos_driver/src/lib.rs | 4 +- 9 files changed, 61 insertions(+), 32 deletions(-) create mode 100644 sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs diff --git a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md index 2dccf908e18..e4501e266ad 100644 --- a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md @@ -4,7 +4,7 @@ ### Features Added -- Added `FeedOptions::with_max_fan_out` (and the matching `QueryOptions::with_max_fan_out` shortcut) to cap the number of physical partitions a cross-partition query may target. Queries that would fan out beyond this limit now return an error instead of silently degrading; the default cap is 100. ([#4453](https://github.com/Azure/azure-sdk-for-rust/issues/4453)) +- Added `FeedOptions::with_max_fan_out` (and the matching `QueryOptions::with_max_fan_out` shortcut) to cap the number of physical partitions a cross-partition query may target. Queries that would fan out beyond this limit now return an error instead of silently degrading; the default cap is 100. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Derived `SafeDebug` on `CosmosCredential`, `ItemResponse`, `ResourceResponse`, and `BatchResponse`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - Added standard derives (`Clone`, `Copy`, `PartialEq`, `Eq`, `Hash`, `Serialize`, `Deserialize`) to `ConsistencyLevel` and `RoutingStrategy`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - `Query::with_text` now accepts `impl Into`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs index d18f46c9763..a9525c4f3de 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs @@ -837,15 +837,14 @@ impl ContainerClient { if let Some(hint) = options.feed.max_item_count { initial_operation = initial_operation.with_max_item_count(hint); } + let plan_options = azure_data_cosmos_driver::PlanOptions { + continuation: options.feed.continuation_token, + max_fan_out: options.feed.max_fan_out, + }; let plan = self .context .driver - .plan_operation( - initial_operation, - &options.operation, - options.feed.continuation_token.as_ref(), - options.feed.max_fan_out, - ) + .plan_operation(initial_operation, &options.operation, Some(plan_options)) .await?; Ok(QueryItemIterator::new( self.context.driver.clone(), diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs index 118d3d7b1d1..db25d62ae1a 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/cosmos_client.rs @@ -146,7 +146,7 @@ impl CosmosClient { let plan = self .context .driver - .plan_operation(initial_operation, &operation_options, None, None) + .plan_operation(initial_operation, &operation_options, None) .await?; Ok(QueryItemIterator::new( diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs index 76243f61903..42dea72dd2b 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/database_client.rs @@ -131,7 +131,7 @@ impl DatabaseClient { let plan = self .context .driver - .plan_operation(initial_operation, &operation_options, None, None) + .plan_operation(initial_operation, &operation_options, None) .await?; Ok(QueryItemIterator::new( diff --git a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md index 38837d420c6..3636a0c3379 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md @@ -4,8 +4,8 @@ ### Features Added -- Added `max_fan_out: Option` parameter to `CosmosDriver::plan_operation` and `planner::build_sequential_drain` to enforce a cap on cross-partition query fan-out. When the number of physical partitions targeted by a query exceeds the limit (default 100 when `None` is passed), `plan_operation` returns a `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307) error. ([#4453](https://github.com/Azure/azure-sdk-for-rust/issues/4453)) -- Added `SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (20307) and `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307). ([#4453](https://github.com/Azure/azure-sdk-for-rust/issues/4453)) +- Added `PlanOptions` struct to `CosmosDriver::plan_operation`, grouping continuation-token and fan-out cap into a single parameter, making the API extensible without future call-site churn. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) +- Added `SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (20307) and `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307). ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Added support for using a native query planning library to generate query plans locally, avoiding a Gateway round-trip on cross-partition queries. Gated behind the `__internal_native_query_plan` feature flag. ([#4554](https://github.com/Azure/azure-sdk-for-rust/pull/4554)) - Restructured the client / runtime options layering on the driver. Two new nested option groups, a per-client overrides surface on `DriverOptionsBuilder`, and a single canonical `AZURE_COSMOS_PPCB_*` namespace for partition-failover environment variables. The driver now consumes partition-failover configuration once at construction (`CosmosDriver::new` no longer fabricates an `OperationOptionsView` outside any operation context) ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): - Added new nested `OperationOptions::throughput_control` group (`ThroughputControlOptions` / `…Builder` / `…View`, mirroring the `ThrottlingRetryOptions` pattern). Exposes three layered fields ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs index 407a03bddc4..6b8a9ab930b 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs @@ -24,8 +24,8 @@ use crate::{ }, models::{ effective_partition_key::EffectivePartitionKey, AccountEndpoint, AccountReference, - ContainerProperties, ContainerReference, ContinuationToken, CosmosOperation, - DatabaseReference, PartitionKey, ResolvedToken, ResourceType, UserAgent, + ContainerProperties, ContainerReference, CosmosOperation, DatabaseReference, PartitionKey, + ResolvedToken, ResourceType, UserAgent, }, options::{ ConnectionPoolOptions, DriverOptions, OperationOptions, OperationOptionsView, @@ -46,7 +46,7 @@ use super::{ cosmos_headers, cosmos_transport_client::HttpRequest, request_signing, AuthorizationContext, CosmosTransport, }, - CosmosDriverRuntime, + CosmosDriverRuntime, PlanOptions, }; struct DriverRequestExecutor<'a> { @@ -1759,7 +1759,7 @@ impl CosmosDriver { // We need to do some refactoring here to shrink the future size and avoid this heap allocation if possible. Box::pin(async { let container = operation.container().cloned(); - let mut plan = self.plan_operation(operation, &options, None, None).await?; + let mut plan = self.plan_operation(operation, &options, None).await?; self.execute_plan(&mut plan, container, options).await }) .await @@ -2051,25 +2051,23 @@ impl CosmosDriver { /// singleton pipeline immediately. For cross-partition queries, fetches a /// query plan from the backend and builds a fan-out pipeline. /// - /// `continuation` optionally provides resume state from a prior call. Two - /// kinds of tokens are accepted: + /// `plan_options` groups planning-specific settings. Pass `None` to use + /// all defaults. See [`PlanOptions`] for available fields: /// - /// - SDK-issued tokens (`c1.…`) carry a serialized snapshot of the - /// previous pipeline's state and can resume any operation. - /// - Opaque server-issued tokens (no `c.` prefix) are accepted only - /// for trivial operations; passing one to a cross-partition query - /// returns a `Client`-shaped error. - /// - /// `max_fan_out` caps the number of physical partitions a cross-partition - /// query may target. Defaults to - /// [`DEFAULT_MAX_FAN_OUT`](planner::DEFAULT_MAX_FAN_OUT) when `None`. + /// - [`continuation`](PlanOptions::continuation) optionally provides resume + /// state from a prior call. SDK-issued tokens (`c1.…`) can resume any + /// operation; opaque server-issued tokens are only accepted for trivial + /// (single-partition) operations. + /// - [`max_fan_out`](PlanOptions::max_fan_out) caps the number of physical + /// partitions a cross-partition query may target (default 100). pub async fn plan_operation( &self, operation: CosmosOperation, options: &OperationOptions, - continuation: Option<&ContinuationToken>, - max_fan_out: Option, + plan_options: Option, ) -> crate::error::Result { + let plan_options = plan_options.unwrap_or_default(); + if !self.initialized.load(Ordering::Acquire) { let endpoint = AccountEndpoint::from(self.options.account()); return Err(crate::error::CosmosError::builder() @@ -2090,7 +2088,7 @@ impl CosmosDriver { // Resolve the continuation token (if any) into a planner-ready resume // state. Server-issued tokens are only valid for trivial operations. - let resume_state = match continuation { + let resume_state = match plan_options.continuation.as_ref() { None => None, Some(token) => { match token.resolve()? { @@ -2156,7 +2154,7 @@ impl CosmosDriver { &mut topology, &operation, resume_state, - max_fan_out, + plan_options.max_fan_out, ) .await?; Ok(OperationPlan::new(pipeline, operation)) @@ -3151,7 +3149,7 @@ mod tests { assert_send(driver.execute_operation(todo!(), todo!())); assert_send(driver.execute_singleton_operation(todo!(), todo!())); assert_send(driver.execute_plan(todo!(), todo!(), todo!())); - assert_send(driver.plan_operation(todo!(), todo!(), todo!(), todo!())); + assert_send(driver.plan_operation(todo!(), todo!(), todo!())); } // Account properties with two readable locations for regional fallback tests. diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs index c870bf5fa1b..86ee5b8b93b 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs @@ -16,12 +16,14 @@ mod cosmos_driver; pub(crate) mod dataflow; pub(crate) mod jitter; pub(crate) mod pipeline; +mod plan_options; pub(crate) mod routing; mod runtime; pub(crate) mod transport; pub use cosmos_driver::CosmosDriver; pub use dataflow::OperationPlan; +pub use plan_options::PlanOptions; pub use runtime::{CosmosDriverRuntime, CosmosDriverRuntimeBuilder}; /// Walks an error's `.source()` chain and joins all distinct messages into a diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs new file mode 100644 index 00000000000..6b62247d0d4 --- /dev/null +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +//! Options for the planning phase of a Cosmos DB operation. + +use crate::models::ContinuationToken; + +/// Options passed to [`CosmosDriver::plan_operation`](crate::driver::CosmosDriver::plan_operation). +/// +/// Group both continuation-token resumption and the fan-out cap into a single +/// struct so the signature of `plan_operation` can grow without forcing a +/// change at every call site. Callers that need no special behaviour can pass +/// `None`; `plan_operation` treats `None` as `PlanOptions::default()`. +#[derive(Default)] +pub struct PlanOptions { + /// Continuation token to resume a previous query from where it left off. + /// + /// When `None`, the query starts from the beginning. + pub continuation: Option, + + /// Maximum number of physical partitions a cross-partition query may fan + /// out to. + /// + /// When `None`, a built-in default of 100 applies. Queries that would + /// target more partitions than this limit fail with HTTP 400 / + /// sub-status 20307. + pub max_fan_out: Option, +} diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs b/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs index cd94c0ef497..4a06974dbc4 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs @@ -61,7 +61,9 @@ pub mod testing; // Re-export key types at crate root pub use diagnostics::{DiagnosticsContext, ExecutionContext, RequestDiagnostics, RequestHandle}; -pub use driver::{CosmosDriver, CosmosDriverRuntime, CosmosDriverRuntimeBuilder, OperationPlan}; +pub use driver::{ + CosmosDriver, CosmosDriverRuntime, CosmosDriverRuntimeBuilder, OperationPlan, PlanOptions, +}; pub use error::{CosmosError, CosmosErrorBuilder, CosmosStatus, Result, SubStatusCode}; pub use models::{ActivityId, CosmosResponse, RequestCharge, ResponseBody}; pub use options::{DiagnosticsOptions, DiagnosticsVerbosity, DriverOptions}; From b76774526c5356d5b0a0feee7733a70be35fa193 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Wed, 17 Jun 2026 15:21:46 -0700 Subject: [PATCH 5/7] Apply suggestions from code review Co-authored-by: Ashley Stanton-Nurse --- sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs index 6b62247d0d4..7f2cf2d5d87 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs @@ -9,7 +9,7 @@ use crate::models::ContinuationToken; /// /// Group both continuation-token resumption and the fan-out cap into a single /// struct so the signature of `plan_operation` can grow without forcing a -/// change at every call site. Callers that need no special behaviour can pass +/// change at every call site. Callers that need no special behavior can pass /// `None`; `plan_operation` treats `None` as `PlanOptions::default()`. #[derive(Default)] pub struct PlanOptions { From 2cf6368ac0d7dc6ea5e749bccd829ba9b28ad0e9 Mon Sep 17 00:00:00 2001 From: Ashley Stanton-Nurse Date: Thu, 18 Jun 2026 10:04:30 -0700 Subject: [PATCH 6/7] Potential fix for pull request finding Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs index a9525c4f3de..394f88590e2 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs @@ -840,6 +840,7 @@ impl ContainerClient { let plan_options = azure_data_cosmos_driver::PlanOptions { continuation: options.feed.continuation_token, max_fan_out: options.feed.max_fan_out, + ..Default::default() }; let plan = self .context From 75a71fb190d142f409c04a8affcbdc8ce5beb765 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Thu, 18 Jun 2026 18:07:35 +0000 Subject: [PATCH 7/7] refactor(cosmos): address PR review feedback - Derive Clone and Debug on PlanOptions - Move plan_operation signature change to breaking changes section - Remove FeedOptions reference from internal constant doc Co-authored-by: analogrelay <7574+analogrelay@users.noreply.github.com> --- sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md | 2 +- .../azure_data_cosmos_driver/src/driver/dataflow/planner.rs | 4 ---- .../azure_data_cosmos_driver/src/driver/plan_options.rs | 2 +- 3 files changed, 2 insertions(+), 6 deletions(-) diff --git a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md index 3636a0c3379..ad0242bbc75 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md @@ -4,7 +4,6 @@ ### Features Added -- Added `PlanOptions` struct to `CosmosDriver::plan_operation`, grouping continuation-token and fan-out cap into a single parameter, making the API extensible without future call-site churn. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Added `SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (20307) and `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307). ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Added support for using a native query planning library to generate query plans locally, avoiding a Gateway round-trip on cross-partition queries. Gated behind the `__internal_native_query_plan` feature flag. ([#4554](https://github.com/Azure/azure-sdk-for-rust/pull/4554)) - Restructured the client / runtime options layering on the driver. Two new nested option groups, a per-client overrides surface on `DriverOptionsBuilder`, and a single canonical `AZURE_COSMOS_PPCB_*` namespace for partition-failover environment variables. The driver now consumes partition-failover configuration once at construction (`CosmosDriver::new` no longer fabricates an `OperationOptionsView` outside any operation context) ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): @@ -20,6 +19,7 @@ ### Breaking Changes +- `CosmosDriver::plan_operation` signature change: the final parameter changed from `Option<&ContinuationToken>` to `Option`. Direct driver consumers must wrap continuation tokens in `PlanOptions { continuation: Some(token), ..Default::default() }`, or pass `None` for queries starting from the beginning. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Cross-partition query continuation tokens minted by `0.4.0` cannot be resumed against `0.5.0`. The on-wire token shape was reshaped to record per-range sibling state so that pausing a fan-out query mid-flight preserves information about siblings that hadn't been touched yet. Callers holding a `0.4.0`-minted token will receive a continuation-token error on resume and must re-issue the query. ([#4550](https://github.com/Azure/azure-sdk-for-rust/pull/4550)) - `azure_data_cosmos_driver::models::ETag` has been removed. Use `azure_core::http::Etag` directly. The previous `ETag::new(...)` is gone; construct via `Etag::from(&str)` / `Etag::from(String)`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - Migration impact of the client / runtime options restructure. Per-runtime concerns (transport, cert validation, proxy, UA defaults) are configured once on the `CosmosDriverRuntime`; per-client concerns (operation defaults, FI rules, throughput-control groups, partition-failover tuning) move onto `DriverOptions` ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs index 0a050cf89f0..401d84fdca5 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs @@ -27,10 +27,6 @@ use super::{ /// Default maximum number of physical partitions a cross-partition query may /// fan out to. Exceeding this limit returns an error to prevent inadvertent /// full-container scans on very large clusters. -/// -/// Users who truly need more can increase the limit via -/// [`FeedOptions::max_fan_out`](crate::options::FeedOptions::max_fan_out) / -/// `QueryOptions::with_max_fan_out`. pub(crate) const DEFAULT_MAX_FAN_OUT: usize = 100; /// Builds a single-node [`Pipeline`] for a trivial operation. diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs index 7f2cf2d5d87..6f36bd28371 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs @@ -11,7 +11,7 @@ use crate::models::ContinuationToken; /// struct so the signature of `plan_operation` can grow without forcing a /// change at every call site. Callers that need no special behavior can pass /// `None`; `plan_operation` treats `None` as `PlanOptions::default()`. -#[derive(Default)] +#[derive(Clone, Debug, Default)] pub struct PlanOptions { /// Continuation token to resume a previous query from where it left off. ///