diff --git a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md index f2bfe68701..e4501e266a 100644 --- a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features Added +- Added `FeedOptions::with_max_fan_out` (and the matching `QueryOptions::with_max_fan_out` shortcut) to cap the number of physical partitions a cross-partition query may target. Queries that would fan out beyond this limit now return an error instead of silently degrading; the default cap is 100. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Derived `SafeDebug` on `CosmosCredential`, `ItemResponse`, `ResourceResponse`, and `BatchResponse`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - Added standard derives (`Clone`, `Copy`, `PartialEq`, `Eq`, `Hash`, `Serialize`, `Deserialize`) to `ConsistencyLevel` and `RoutingStrategy`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - `Query::with_text` now accepts `impl Into`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs index 9a9476d219..394f88590e 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs @@ -837,14 +837,15 @@ impl ContainerClient { if let Some(hint) = options.feed.max_item_count { initial_operation = initial_operation.with_max_item_count(hint); } + let plan_options = azure_data_cosmos_driver::PlanOptions { + continuation: options.feed.continuation_token, + max_fan_out: options.feed.max_fan_out, + ..Default::default() + }; let plan = self .context .driver - .plan_operation( - initial_operation, - &options.operation, - options.feed.continuation_token.as_ref(), - ) + .plan_operation(initial_operation, &options.operation, Some(plan_options)) .await?; Ok(QueryItemIterator::new( self.context.driver.clone(), diff --git a/sdk/cosmos/azure_data_cosmos/src/options/feed.rs b/sdk/cosmos/azure_data_cosmos/src/options/feed.rs index f8b05fb25a..e341ee61ec 100644 --- a/sdk/cosmos/azure_data_cosmos/src/options/feed.rs +++ b/sdk/cosmos/azure_data_cosmos/src/options/feed.rs @@ -39,6 +39,20 @@ pub struct FeedOptions { /// /// See [`QueryPageIterator::to_continuation_token`](crate::feed::QueryPageIterator::to_continuation_token). pub continuation_token: Option, + + /// Maximum number of physical partitions a cross-partition query may fan + /// out to. + /// + /// When `None`, the SDK uses a built-in default of 100. Setting this to a + /// higher + /// value allows queries that span very large containers, though such + /// queries are typically expensive and should be avoided in + /// latency-sensitive paths. + /// + /// If a query would target more physical partitions than this limit, + /// [`plan_operation`](azure_data_cosmos_driver::driver::CosmosDriver::plan_operation) + /// returns an error with status 400 / sub-status 20307. + pub max_fan_out: Option, } impl FeedOptions { @@ -56,6 +70,16 @@ impl FeedOptions { self.continuation_token = Some(continuation_token); self } + + /// Sets the maximum number of physical partitions a cross-partition query + /// may fan out to. + /// + /// Overrides the built-in default (100). Pass a larger value only when + /// you understand the performance implications of a wide fan-out query. + pub fn with_max_fan_out(mut self, max_fan_out: usize) -> Self { + self.max_fan_out = Some(max_fan_out); + self + } } /// Options for query operations. @@ -145,4 +169,18 @@ impl QueryOptions { self.feed = self.feed.with_continuation_token(continuation_token); self } + + /// Sets the maximum number of physical partitions a cross-partition query + /// may fan out to. + /// + /// Delegates to [`FeedOptions::with_max_fan_out`] on the inner + /// [`feed`](Self::feed). Pass a larger value only when you understand the + /// performance implications of a wide fan-out query. + /// + /// The default is 100. Queries that target more physical partitions than + /// this limit fail with a 400 error. + pub fn with_max_fan_out(mut self, max_fan_out: usize) -> Self { + self.feed = self.feed.with_max_fan_out(max_fan_out); + self + } } diff --git a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md index 3809f06e58..ad0242bbc7 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos_driver/CHANGELOG.md @@ -4,6 +4,7 @@ ### Features Added +- Added `SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (20307) and `CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED` (HTTP 400 / sub-status 20307). ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Added support for using a native query planning library to generate query plans locally, avoiding a Gateway round-trip on cross-partition queries. Gated behind the `__internal_native_query_plan` feature flag. ([#4554](https://github.com/Azure/azure-sdk-for-rust/pull/4554)) - Restructured the client / runtime options layering on the driver. Two new nested option groups, a per-client overrides surface on `DriverOptionsBuilder`, and a single canonical `AZURE_COSMOS_PPCB_*` namespace for partition-failover environment variables. The driver now consumes partition-failover configuration once at construction (`CosmosDriver::new` no longer fabricates an `OperationOptionsView` outside any operation context) ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): - Added new nested `OperationOptions::throughput_control` group (`ThroughputControlOptions` / `…Builder` / `…View`, mirroring the `ThrottlingRetryOptions` pattern). Exposes three layered fields ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): @@ -18,6 +19,7 @@ ### Breaking Changes +- `CosmosDriver::plan_operation` signature change: the final parameter changed from `Option<&ContinuationToken>` to `Option`. Direct driver consumers must wrap continuation tokens in `PlanOptions { continuation: Some(token), ..Default::default() }`, or pass `None` for queries starting from the beginning. ([#4615](https://github.com/Azure/azure-sdk-for-rust/pull/4615)) - Cross-partition query continuation tokens minted by `0.4.0` cannot be resumed against `0.5.0`. The on-wire token shape was reshaped to record per-range sibling state so that pausing a fan-out query mid-flight preserves information about siblings that hadn't been touched yet. Callers holding a `0.4.0`-minted token will receive a continuation-token error on resume and must re-issue the query. ([#4550](https://github.com/Azure/azure-sdk-for-rust/pull/4550)) - `azure_data_cosmos_driver::models::ETag` has been removed. Use `azure_core::http::Etag` directly. The previous `ETag::new(...)` is gone; construct via `Etag::from(&str)` / `Etag::from(String)`. ([#4512](https://github.com/Azure/azure-sdk-for-rust/pull/4512)) - Migration impact of the client / runtime options restructure. Per-runtime concerns (transport, cert validation, proxy, UA defaults) are configured once on the `CosmosDriverRuntime`; per-client concerns (operation defaults, FI rules, throughput-control groups, partition-failover tuning) move onto `DriverOptions` ([#4588](https://github.com/Azure/azure-sdk-for-rust/pull/4588)): diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs index 79451504a4..6b8a9ab930 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/cosmos_driver.rs @@ -24,8 +24,8 @@ use crate::{ }, models::{ effective_partition_key::EffectivePartitionKey, AccountEndpoint, AccountReference, - ContainerProperties, ContainerReference, ContinuationToken, CosmosOperation, - DatabaseReference, PartitionKey, ResolvedToken, ResourceType, UserAgent, + ContainerProperties, ContainerReference, CosmosOperation, DatabaseReference, PartitionKey, + ResolvedToken, ResourceType, UserAgent, }, options::{ ConnectionPoolOptions, DriverOptions, OperationOptions, OperationOptionsView, @@ -46,7 +46,7 @@ use super::{ cosmos_headers, cosmos_transport_client::HttpRequest, request_signing, AuthorizationContext, CosmosTransport, }, - CosmosDriverRuntime, + CosmosDriverRuntime, PlanOptions, }; struct DriverRequestExecutor<'a> { @@ -2051,20 +2051,23 @@ impl CosmosDriver { /// singleton pipeline immediately. For cross-partition queries, fetches a /// query plan from the backend and builds a fan-out pipeline. /// - /// `continuation` optionally provides resume state from a prior call. Two - /// kinds of tokens are accepted: + /// `plan_options` groups planning-specific settings. Pass `None` to use + /// all defaults. See [`PlanOptions`] for available fields: /// - /// - SDK-issued tokens (`c1.…`) carry a serialized snapshot of the - /// previous pipeline's state and can resume any operation. - /// - Opaque server-issued tokens (no `c.` prefix) are accepted only - /// for trivial operations; passing one to a cross-partition query - /// returns a `Client`-shaped error. + /// - [`continuation`](PlanOptions::continuation) optionally provides resume + /// state from a prior call. SDK-issued tokens (`c1.…`) can resume any + /// operation; opaque server-issued tokens are only accepted for trivial + /// (single-partition) operations. + /// - [`max_fan_out`](PlanOptions::max_fan_out) caps the number of physical + /// partitions a cross-partition query may target (default 100). pub async fn plan_operation( &self, operation: CosmosOperation, options: &OperationOptions, - continuation: Option<&ContinuationToken>, + plan_options: Option, ) -> crate::error::Result { + let plan_options = plan_options.unwrap_or_default(); + if !self.initialized.load(Ordering::Acquire) { let endpoint = AccountEndpoint::from(self.options.account()); return Err(crate::error::CosmosError::builder() @@ -2085,7 +2088,7 @@ impl CosmosDriver { // Resolve the continuation token (if any) into a planner-ready resume // state. Server-issued tokens are only valid for trivial operations. - let resume_state = match continuation { + let resume_state = match plan_options.continuation.as_ref() { None => None, Some(token) => { match token.resolve()? { @@ -2146,9 +2149,14 @@ impl CosmosDriver { |container, continuation| self.fetch_pk_ranges_from_service(container, continuation), ); - let pipeline = - planner::build_sequential_drain(&query_plan, &mut topology, &operation, resume_state) - .await?; + let pipeline = planner::build_sequential_drain( + &query_plan, + &mut topology, + &operation, + resume_state, + plan_options.max_fan_out, + ) + .await?; Ok(OperationPlan::new(pipeline, operation)) } diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs index 2b6214c33d..7ea808f8b9 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/integration_tests/query_resume.rs @@ -193,7 +193,7 @@ async fn single_partition_resume_roundtrips_cleanly() { let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-0")])]); let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1", Some("ct-1")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -207,9 +207,10 @@ async fn single_partition_resume_roundtrips_cleanly() { let mut topology2 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-0")])]); let mut executor2 = MockRequestExecutor::new(vec![Ok(page_response(b"page-2", None))]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; assert_eq!(pages2, vec![b"page-2".to_vec()]); assert_eq!( @@ -237,7 +238,7 @@ async fn resume_after_split_forwards_continuation_to_every_surviving_leaf() { Some("ct-pre-split"), ))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -258,9 +259,10 @@ async fn resume_after_split_forwards_continuation_to_every_surviving_leaf() { Ok(page_response(b"page-right", None)), ]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; // Every page exactly once, in EPK order. @@ -303,7 +305,7 @@ async fn resume_mid_fanout_preserves_every_sibling_state() { let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"left-page-1", Some("ct-left")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -349,9 +351,10 @@ async fn resume_mid_fanout_preserves_every_sibling_state() { Ok(page_response(b"right-page-1", None)), ]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; assert_eq!( pages2, @@ -397,7 +400,7 @@ async fn resume_mid_fanout_then_split_preserves_state_and_fans_out_continuation( let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"left-page-1", Some("ct-left")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -420,9 +423,10 @@ async fn resume_mid_fanout_then_split_preserves_state_and_fans_out_continuation( Ok(page_response(b"right-page-1", None)), ]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state)) - .await - .unwrap(); + let mut pipeline2 = + build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_state), None) + .await + .unwrap(); let pages2 = drain_all(&mut pipeline2, &mut executor2).await; assert_eq!( @@ -489,7 +493,7 @@ async fn resume_does_not_requery_already_drained_sibling_scope() { ])]); let mut executor = MockRequestExecutor::new(vec![Ok(page_response(b"right-page-1", None))]); - let mut pipeline = build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state)) + let mut pipeline = build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state), None) .await .unwrap(); let pages = drain_all(&mut pipeline, &mut executor).await; @@ -529,7 +533,7 @@ async fn resume_fails_loudly_when_saved_range_cannot_be_covered() { ])]); let err: Result = - build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state)).await; + build_sequential_drain(&plan, &mut topology, &op, Some(resumed_state), None).await; let err = err.expect_err("expected unhonored-saved-range error"); let rendered = err.to_string(); assert!( @@ -572,7 +576,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() { let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-pre")])]); let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1-pre", Some("T1")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages_s1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -615,7 +619,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() { b"page-1-postsplit-left", Some("T2_a"), ))]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2)) + let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2), None) .await .unwrap(); let pages_s2 = drain_pages(&mut pipeline2, &mut executor2, 1).await; @@ -677,7 +681,7 @@ async fn three_session_loop_propagates_presplit_token_through_two_snapshots() { Ok(page_response(b"page-2-postsplit-left", None)), Ok(page_response(b"page-1-postsplit-right", None)), ]); - let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3)) + let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3), None) .await .unwrap(); let pages_s3 = drain_all(&mut pipeline3, &mut executor3).await; @@ -730,7 +734,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() { let mut topology1 = MockTopologyProvider::new(vec![Ok(vec![resolved("", "FF", "pk-pre")])]); let mut executor1 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1-pre", Some("T1")))]); - let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None) + let mut pipeline1 = build_sequential_drain(&plan, &mut topology1, &op, None, None) .await .unwrap(); let pages_s1 = drain_pages(&mut pipeline1, &mut executor1, 1).await; @@ -749,7 +753,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() { ])]); let mut executor2 = MockRequestExecutor::new(vec![Ok(page_response(b"page-1-postsplit-left", None))]); - let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2)) + let mut pipeline2 = build_sequential_drain(&plan, &mut topology2, &op, Some(resumed_s2), None) .await .unwrap(); let pages_s2 = drain_pages(&mut pipeline2, &mut executor2, 1).await; @@ -787,7 +791,7 @@ async fn cascading_split_propagates_back_sibling_token_to_every_grand_child() { Ok(page_response(b"page-1-back-left", None)), Ok(page_response(b"page-1-back-right", None)), ]); - let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3)) + let mut pipeline3 = build_sequential_drain(&plan, &mut topology3, &op, Some(resumed_s3), None) .await .unwrap(); let pages_s3 = drain_all(&mut pipeline3, &mut executor3).await; diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs index 7cb73a8a82..401d84fdca 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/dataflow/planner.rs @@ -24,6 +24,11 @@ use super::{ Request, RequestTarget, SequentialDrain, TopologyProvider, }; +/// Default maximum number of physical partitions a cross-partition query may +/// fan out to. Exceeding this limit returns an error to prevent inadvertent +/// full-container scans on very large clusters. +pub(crate) const DEFAULT_MAX_FAN_OUT: usize = 100; + /// Builds a single-node [`Pipeline`] for a trivial operation. /// /// Trivial operations are those that can be satisfied by a single request to @@ -147,6 +152,7 @@ pub(crate) async fn build_sequential_drain( topology_provider: &mut dyn TopologyProvider, operation: &Arc, resume: Option, + max_fan_out: Option, ) -> crate::error::Result { validate_query_plan(query_plan)?; @@ -179,7 +185,20 @@ pub(crate) async fn build_sequential_drain( plan_fresh(query_plan, topology_provider, operation).await? }; - // TODO: enforce max fan-out (default 100, configurable). See FEED_OPERATIONS_REQS.md §3. + let fan_out_limit = max_fan_out.unwrap_or(DEFAULT_MAX_FAN_OUT); + if request_nodes.len() > fan_out_limit { + return Err(crate::error::CosmosError::builder() + .with_status(crate::error::CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED) + .with_message(format!( + "cross-partition query would fan out to {} physical partitions, \ + which exceeds the maximum of {}; use \ + QueryOptions::with_max_fan_out() to raise the limit if this \ + level of fan-out is intentional", + request_nodes.len(), + fan_out_limit, + )) + .build()); + } if request_nodes.is_empty() { // Resumed past every range that still has work: the pipeline is @@ -845,7 +864,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = MockTopologyProvider::new(vec![Ok(vec![rr("", "FF", "pkrange-0")])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests(pipeline, &[("", "FF", "pkrange-0")]); @@ -861,7 +880,7 @@ mod tests { rr("80", "FF", "pkrange-right"), ])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -880,7 +899,7 @@ mod tests { Ok(vec![rr("80", "FF", "pkrange-C")]), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -900,7 +919,7 @@ mod tests { rr("80", "C0", "pkrange-3"), ])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -932,7 +951,7 @@ mod tests { ]), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests( @@ -953,7 +972,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = MockTopologyProvider::new(vec![Ok(vec![rr("", "FF", "pkrange-wide")])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests_with_partitions(pipeline, &[("20", "80", "pkrange-wide", "", "FF")]); @@ -971,7 +990,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -993,7 +1012,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1017,7 +1036,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1039,7 +1058,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1061,7 +1080,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1087,7 +1106,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1103,7 +1122,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = MockTopologyProvider::new(vec![Ok(vec![rr("", "FF", "pkrange-0")])]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap(); assert_drain_requests(pipeline, &[("", "FF", "pkrange-0")]); @@ -1115,7 +1134,7 @@ mod tests { let op = cross_partition_query_operation(); let mut topology = NoopTopologyProvider; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1137,7 +1156,7 @@ mod tests { .with_message("topology resolution failed") .build())]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) .await .unwrap_err(); let rendered = err.to_string(); @@ -1214,6 +1233,7 @@ mod tests { &mut topology, &Arc::new(op), Some(PipelineNodeState::Drained), + None, ) .await .unwrap(); @@ -1242,9 +1262,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests(pipeline, &[("55", "AA", "pk-b"), ("AA", "FF", "pk-c")]); } @@ -1269,9 +1290,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1301,9 +1323,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1330,9 +1353,10 @@ mod tests { ("C0", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1358,9 +1382,10 @@ mod tests { active_tokens: vec![], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert!(matches!( pipeline.snapshot_state().unwrap(), PipelineNodeState::Drained @@ -1382,9 +1407,10 @@ mod tests { ("AA", "FF", saved_request(None)), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, @@ -1408,7 +1434,7 @@ mod tests { ("00", "55", saved_request(Some("tok-b"))), ]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .unwrap_err(); assert_eq!( @@ -1431,7 +1457,7 @@ mod tests { ("55", "FF", saved_request(Some("tok-b"))), ]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .unwrap_err(); assert_eq!( @@ -1452,7 +1478,7 @@ mod tests { let resume = saved_drain(vec![("55", "AA", saved_request(Some("server-token-xyz")))]); - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .unwrap_err(); assert_eq!( @@ -1491,9 +1517,10 @@ mod tests { }], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1521,9 +1548,10 @@ mod tests { ("30", "60", saved_request(Some("tok-b"))), ]); - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1566,9 +1594,10 @@ mod tests { }], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .unwrap(); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .unwrap(); assert_drain_requests_with_partitions_and_continuation( pipeline, &[ @@ -1594,7 +1623,7 @@ mod tests { }; let result = - build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(legacy)).await; + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(legacy), None).await; let err = result.expect_err("bare top-level Request shape must be rejected on resume"); assert_eq!( err.status(), @@ -1622,7 +1651,7 @@ mod tests { }], }; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .expect_err("zero-width active_tokens entry must be rejected"); assert_eq!( @@ -1657,7 +1686,7 @@ mod tests { }], }; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .expect_err("malformed min>max entry must be rejected by the validator"); assert_eq!( @@ -1694,7 +1723,7 @@ mod tests { ], }; - let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) .await .expect_err("appended malformed min>max entry must still be rejected"); assert_eq!( @@ -1733,9 +1762,10 @@ mod tests { }], }; - let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume)) - .await - .expect("front-sibling cascading split must plan cleanly"); + let pipeline = + build_sequential_drain(&plan, &mut topology, &Arc::new(op), Some(resume), None) + .await + .expect("front-sibling cascading split must plan cleanly"); // Walk the planned children via snapshot: the two front grand- // children must each carry T1; the back range must be a @@ -1772,4 +1802,139 @@ mod tests { ); } } + + // --- Fan-out limit enforcement tests --- + + #[tokio::test] + async fn rejects_when_fan_out_exceeds_default_limit() { + // Create a topology with DEFAULT_MAX_FAN_OUT + 1 partitions to exceed the default. + let n = DEFAULT_MAX_FAN_OUT + 1; + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + // Build n evenly-spaced partition ranges across ["", "FF"). + // We use hex suffixes: each partition gets a 2-hex-digit boundary. + // For simplicity, generate n ranges where each boundary is i * (0xFF / n) as a hex string. + let boundaries: Vec = (0..=n) + .map(|i| { + if i == 0 { + String::new() + } else if i == n { + "FF".to_string() + } else { + format!("{:02X}", (i * 0xFF / n).clamp(1, 0xFE)) + } + }) + .collect(); + // Deduplicate adjacent boundaries (could happen with small n values). + let mut ranges: Vec = Vec::new(); + for i in 0..boundaries.len() - 1 { + if boundaries[i] != boundaries[i + 1] { + ranges.push(rr( + &boundaries[i], + &boundaries[i + 1], + &format!("pkrange-{i}"), + )); + } + } + + let mut topology = MockTopologyProvider::new(vec![Ok(ranges)]); + + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None) + .await + .unwrap_err(); + assert_eq!( + err.status(), + crate::error::CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED, + "expected FAN_OUT_LIMIT_EXCEEDED status; got {err:?}", + ); + let msg = err.to_string(); + assert!( + msg.contains("QueryOptions::with_max_fan_out"), + "error should mention QueryOptions::with_max_fan_out; got: {msg}", + ); + } + + #[tokio::test] + async fn accepts_query_at_exactly_the_default_limit() { + // A topology with exactly DEFAULT_MAX_FAN_OUT partitions should succeed. + let n = DEFAULT_MAX_FAN_OUT; + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + let ranges: Vec = (0..n) + .map(|i| { + let min = if i == 0 { + String::new() + } else { + format!("{:02X}", (i * 0xFF / n).clamp(1, 0xFE)) + }; + let max = if i == n - 1 { + "FF".to_string() + } else { + format!("{:02X}", ((i + 1) * 0xFF / n).clamp(1, 0xFE)) + }; + rr(&min, &max, &format!("pkrange-{i}")) + }) + .collect(); + let count = ranges.len(); + let mut topology = MockTopologyProvider::new(vec![Ok(ranges)]); + + let result = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, None).await; + assert!( + result.is_ok(), + "exactly {} partitions should not exceed the default limit of {}; got: {:?}", + count, + DEFAULT_MAX_FAN_OUT, + result.unwrap_err(), + ); + } + + #[tokio::test] + async fn custom_max_fan_out_is_honored() { + // With a custom limit of 2, 3 partitions should fail. + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + let mut topology = MockTopologyProvider::new(vec![Ok(vec![ + rr("", "40", "pkrange-0"), + rr("40", "80", "pkrange-1"), + rr("80", "FF", "pkrange-2"), + ])]); + + let err = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, Some(2)) + .await + .unwrap_err(); + assert_eq!( + err.status(), + crate::error::CosmosStatus::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED, + "expected FAN_OUT_LIMIT_EXCEEDED with custom limit 2; got {err:?}", + ); + let msg = err.to_string(); + assert!( + msg.contains("3") && msg.contains("2"), + "error should mention actual count (3) and limit (2); got: {msg}", + ); + } + + #[tokio::test] + async fn custom_max_fan_out_allows_more_partitions() { + // With a custom limit of 5, 3 partitions should succeed. + let plan = plan_with_ranges(vec![qr("", "FF")]); + let op = cross_partition_query_operation(); + let mut topology = MockTopologyProvider::new(vec![Ok(vec![ + rr("", "40", "pkrange-0"), + rr("40", "80", "pkrange-1"), + rr("80", "FF", "pkrange-2"), + ])]); + + let pipeline = build_sequential_drain(&plan, &mut topology, &Arc::new(op), None, Some(5)) + .await + .unwrap(); + assert_drain_requests( + pipeline, + &[ + ("", "40", "pkrange-0"), + ("40", "80", "pkrange-1"), + ("80", "FF", "pkrange-2"), + ], + ); + } } diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs index c870bf5fa1..86ee5b8b93 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/mod.rs @@ -16,12 +16,14 @@ mod cosmos_driver; pub(crate) mod dataflow; pub(crate) mod jitter; pub(crate) mod pipeline; +mod plan_options; pub(crate) mod routing; mod runtime; pub(crate) mod transport; pub use cosmos_driver::CosmosDriver; pub use dataflow::OperationPlan; +pub use plan_options::PlanOptions; pub use runtime::{CosmosDriverRuntime, CosmosDriverRuntimeBuilder}; /// Walks an error's `.source()` chain and joins all distinct messages into a diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs new file mode 100644 index 0000000000..6f36bd2837 --- /dev/null +++ b/sdk/cosmos/azure_data_cosmos_driver/src/driver/plan_options.rs @@ -0,0 +1,28 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +//! Options for the planning phase of a Cosmos DB operation. + +use crate::models::ContinuationToken; + +/// Options passed to [`CosmosDriver::plan_operation`](crate::driver::CosmosDriver::plan_operation). +/// +/// Group both continuation-token resumption and the fan-out cap into a single +/// struct so the signature of `plan_operation` can grow without forcing a +/// change at every call site. Callers that need no special behavior can pass +/// `None`; `plan_operation` treats `None` as `PlanOptions::default()`. +#[derive(Clone, Debug, Default)] +pub struct PlanOptions { + /// Continuation token to resume a previous query from where it left off. + /// + /// When `None`, the query starts from the beginning. + pub continuation: Option, + + /// Maximum number of physical partitions a cross-partition query may fan + /// out to. + /// + /// When `None`, a built-in default of 100 applies. Queries that would + /// target more partitions than this limit fail with HTTP 400 / + /// sub-status 20307. + pub max_fan_out: Option, +} diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs b/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs index 7528e8f3ca..b6485e588b 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/error/cosmos_status.rs @@ -516,6 +516,7 @@ impl SubStatusCode { 20303 => Some("ServiceReturnedOfferWithoutId"), 20304 => Some("ClientThroughputPollerIncomplete"), 20305 => Some("ClientTopologyResolutionFailed"), + 20307 => Some("ClientQueryFanOutLimitExceeded"), // SDK Server-side codes (21xxx) - consistent across .NET and Java 21001 => Some("NameCacheIsStaleExceededRetryLimit"), @@ -1424,6 +1425,14 @@ impl SubStatusCode { /// has no routing information for the operation. Paired with HTTP /// 503 — an internal client-side condition, not a transport failure. pub const CLIENT_TOPOLOGY_RESOLUTION_FAILED: SubStatusCode = SubStatusCode(20305); + + /// A cross-partition query would fan out to more physical partitions + /// than the configured maximum (20307). Paired with HTTP 400 because + /// this is a client-side policy violation — the caller can raise the + /// limit via `FeedOptions::max_fan_out` / + /// `QueryOptions::with_max_fan_out` if they truly need this level of + /// fan-out. + pub const CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED: SubStatusCode = SubStatusCode(20307); } impl Default for SubStatusCode { @@ -2216,6 +2225,17 @@ impl CosmosStatus { status_code: StatusCode::InternalServerError, sub_status: Some(SubStatusCode::SERVICE_RETURNED_OBJECT_WITHOUT_RID), }; + + /// 400 / 20307 — cross-partition query fan-out limit exceeded. + /// + /// The query would require contacting more physical partitions than + /// the configured maximum. Raise the limit via + /// `FeedOptions::max_fan_out` / `QueryOptions::with_max_fan_out` if + /// this level of fan-out is intentional. + pub const CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED: CosmosStatus = CosmosStatus { + status_code: StatusCode::BadRequest, + sub_status: Some(SubStatusCode::CLIENT_QUERY_FAN_OUT_LIMIT_EXCEEDED), + }; } impl fmt::Debug for CosmosStatus { diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs b/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs index cd94c0ef49..4a06974dbc 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/lib.rs @@ -61,7 +61,9 @@ pub mod testing; // Re-export key types at crate root pub use diagnostics::{DiagnosticsContext, ExecutionContext, RequestDiagnostics, RequestHandle}; -pub use driver::{CosmosDriver, CosmosDriverRuntime, CosmosDriverRuntimeBuilder, OperationPlan}; +pub use driver::{ + CosmosDriver, CosmosDriverRuntime, CosmosDriverRuntimeBuilder, OperationPlan, PlanOptions, +}; pub use error::{CosmosError, CosmosErrorBuilder, CosmosStatus, Result, SubStatusCode}; pub use models::{ActivityId, CosmosResponse, RequestCharge, ResponseBody}; pub use options::{DiagnosticsOptions, DiagnosticsVerbosity, DriverOptions};