diff --git a/Cargo.lock b/Cargo.lock index 7c21d3d02e7..839a7a0d88c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -375,6 +375,7 @@ dependencies = [ "azure_core_test", "azure_data_cosmos_driver", "azure_identity", + "base64 0.22.1", "clap", "futures", "pin-project", diff --git a/sdk/cosmos/.cspell.json b/sdk/cosmos/.cspell.json index a207d75b477..739b681446e 100644 --- a/sdk/cosmos/.cspell.json +++ b/sdk/cosmos/.cspell.json @@ -53,6 +53,7 @@ "hostnames", "hotfixes", "IMDS", + "inclusivity", "isquery", "japaneast", "japanwest", @@ -96,12 +97,14 @@ "readfeed", "replicaset", "reqs", + "Replicaset", "Retriable", "retryable", "rfind", "RNTBD", "roundtrips", "rwcache", + "sess", "southafricanorth", "southafricawest", "southcentralus", diff --git a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md index e2db9496dea..3b7fd94fbf1 100644 --- a/sdk/cosmos/azure_data_cosmos/CHANGELOG.md +++ b/sdk/cosmos/azure_data_cosmos/CHANGELOG.md @@ -6,6 +6,7 @@ - Added throughput control API: re-exported `ThroughputControlGroupOptions` and `PriorityLevel` from the driver. Users can register throughput control groups on `CosmosClientBuilder` via `with_throughput_control_group()` to configure priority-based execution and throughput bucket server features. ([#4078](https://github.com/Azure/azure-sdk-for-rust/pull/4078)) - Added `ThroughputPoller` type that implements `IntoFuture` and `Stream` for tracking asynchronous throughput replacement operations. +- Added `FeedRange` type with `ContainerClient::read_feed_ranges()` and `ContainerClient::feed_range_from_partition_key()` - supports hierarchical partition keys (MultiHash) including prefix partition keys that return multiple feed ranges. ([#4149](https://github.com/Azure/azure-sdk-for-rust/pull/4149)) ### Breaking Changes diff --git a/sdk/cosmos/azure_data_cosmos/Cargo.toml b/sdk/cosmos/azure_data_cosmos/Cargo.toml index a28156494ee..60a9b9dc9d1 100644 --- a/sdk/cosmos/azure_data_cosmos/Cargo.toml +++ b/sdk/cosmos/azure_data_cosmos/Cargo.toml @@ -18,6 +18,7 @@ async-lock.workspace = true async-trait.workspace = true azure_core = { workspace = true, default-features = false } azure_data_cosmos_driver = { workspace = true } +base64.workspace = true futures.workspace = true pin-project.workspace = true reqwest = { workspace = true, optional = true } diff --git a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs index e18a27c5c1a..edd035f6c56 100644 --- a/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs +++ b/sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs @@ -3,11 +3,12 @@ use crate::{ clients::{offers_client, ClientContext}, + feed_range::FeedRange, models::{ BatchResponse, ContainerProperties, CosmosResponse, ItemResponse, ResourceResponse, ThroughputProperties, }, - options::{BatchOptions, QueryOptions, ReadContainerOptions}, + options::{BatchOptions, QueryOptions, ReadContainerOptions, ReadFeedRangesOptions}, resource_context::{ResourceLink, ResourceType}, transactional_batch::TransactionalBatch, DeleteContainerOptions, FeedItemIterator, ItemReadOptions, ItemWriteOptions, PartitionKey, @@ -22,7 +23,10 @@ use crate::operation_context::OperationType; use crate::routing::partition_key_range_cache::PartitionKeyRangeCache; use azure_core::http::headers::AsHeaders; use azure_core::http::Context; -use azure_data_cosmos_driver::models::{ContainerReference, CosmosOperation, ItemReference}; +use azure_data_cosmos_driver::models::{ + effective_partition_key::EffectivePartitionKey as DriverEpk, ContainerReference, + CosmosOperation, ItemReference, PartitionKeyKind, +}; use serde::{de::DeserializeOwned, Serialize}; /// A client for working with a specific container in a Cosmos DB account. @@ -33,8 +37,6 @@ pub struct ContainerClient { link: ResourceLink, items_link: ResourceLink, container_connection: Arc, - #[expect(dead_code, reason = "will be used when tracing spans are re-added")] - container_id: String, container_ref: ContainerReference, context: ClientContext, } @@ -78,7 +80,6 @@ impl ContainerClient { link, items_link, container_connection, - container_id: container_id.to_string(), container_ref, context, }) @@ -779,36 +780,157 @@ impl ContainerClient { .await .map(BatchResponse::new) } + + /// Gets the feed ranges for this container. + pub async fn read_feed_ranges( + &self, + options: Option, + ) -> azure_core::Result> { + let options = options.unwrap_or_default(); + let routing_map = self + .container_connection + .resolve_routing_map(options.force_refresh()) + .await? + .ok_or_else(|| { + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "failed to resolve routing map for container", + ) + })?; + Ok(routing_map + .ordered_partition_key_ranges() + .iter() + .map(FeedRange::from_sdk_partition_key_range) + .collect()) + } + + /// Returns the [`FeedRange`]s covering the given partition key. + /// + /// Full keys return a single-element `Vec`. Prefix keys on MultiHash + /// containers return one or more feed ranges. + pub async fn feed_range_from_partition_key( + &self, + partition_key: impl Into, + options: Option, + ) -> azure_core::Result> { + let partition_key = partition_key.into(); + let driver_pk = partition_key.into_driver_partition_key(); + let options = options.unwrap_or_default(); + let pk_def = self.container_connection.partition_key_definition(); + let values = driver_pk.values(); + + if values.is_empty() { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "partition key must have at least one component", + )); + } + if values.len() > pk_def.paths().len() { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + format!( + "partition key has {} components but container definition has {} paths", + values.len(), + pk_def.paths().len() + ), + )); + } + + let is_prefix = + pk_def.kind() == PartitionKeyKind::MultiHash && values.len() < pk_def.paths().len(); + if !is_prefix && values.len() != pk_def.paths().len() { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "prefix partition keys are only supported for MultiHash (hierarchical) containers", + )); + } + + let routing_map = self + .container_connection + .resolve_routing_map(options.force_refresh()) + .await? + .ok_or_else(|| { + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "failed to resolve routing map for container", + ) + })?; + + if is_prefix { + let epk_range = DriverEpk::compute_range(values, pk_def)?; + let query_range = crate::routing::range::Range::new( + epk_range.start.as_str().to_owned(), + epk_range.end.as_str().to_owned(), + true, + false, + ); + let pkranges = routing_map.get_overlapping_ranges(&query_range); + if pkranges.is_empty() { + let refreshed = self + .container_connection + .resolve_routing_map(true) + .await? + .ok_or_else(|| { + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "failed to resolve routing map after refresh", + ) + })?; + Ok(refreshed + .get_overlapping_ranges(&query_range) + .iter() + .map(FeedRange::from_sdk_partition_key_range) + .collect()) + } else { + Ok(pkranges + .iter() + .map(FeedRange::from_sdk_partition_key_range) + .collect()) + } + } else { + let epk = DriverEpk::compute(values, pk_def.kind(), pk_def.version()); + match routing_map.get_range_by_effective_partition_key(epk.as_str()) { + Ok(pkr) => Ok(vec![FeedRange::from_sdk_partition_key_range(pkr)]), + Err(_) => { + let refreshed = self + .container_connection + .resolve_routing_map(true) + .await? + .ok_or_else(|| { + azure_core::Error::with_message( + azure_core::error::ErrorKind::Other, + "failed to resolve routing map after refresh", + ) + })?; + let pkr = refreshed.get_range_by_effective_partition_key(epk.as_str())?; + Ok(vec![FeedRange::from_sdk_partition_key_range(pkr)]) + } + } + } + } } #[cfg(test)] mod tests { use super::*; - /// Compile-time assertion that `ContainerClient` async method futures are `Send`. - /// - /// This function is never called; it only needs to compile. - /// If any future is not `Send`, compilation will fail. #[allow(dead_code, unreachable_code, unused_variables)] fn _assert_futures_are_send() { fn assert_send(_: T) {} let client: &ContainerClient = todo!(); - // Container operations assert_send(client.read(todo!())); assert_send(client.replace(todo!(), todo!())); assert_send(client.read_throughput(todo!())); assert_send(client.begin_replace_throughput(todo!(), todo!())); assert_send(client.delete(todo!())); - - // Item operations (use "" for partition_key to avoid never-type fallback issues) assert_send(client.create_item::("", todo!(), todo!())); assert_send(client.replace_item::("", todo!(), todo!(), todo!())); assert_send(client.upsert_item::("", todo!(), todo!())); assert_send(client.read_item::("", todo!(), todo!())); assert_send(client.delete_item("", todo!(), todo!())); - - // Batch operations assert_send(client.execute_transactional_batch(todo!(), todo!())); + assert_send(client.read_feed_ranges(todo!())); + assert_send(client.feed_range_from_partition_key("", todo!())); } } diff --git a/sdk/cosmos/azure_data_cosmos/src/feed_range.rs b/sdk/cosmos/azure_data_cosmos/src/feed_range.rs new file mode 100644 index 00000000000..be70d838ac3 --- /dev/null +++ b/sdk/cosmos/azure_data_cosmos/src/feed_range.rs @@ -0,0 +1,534 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. + +//! Types for working with feed ranges in Azure Cosmos DB. +//! +//! A [`FeedRange`] represents a contiguous range of partitions in a Cosmos DB container, +//! defined by effective partition key (EPK) boundaries. Feed ranges enable: +//! +//! - Parallel query processing by distributing ranges across workers +//! - Scoped change feed consumption for specific partitions +//! - Workload distribution across multiple consumers +//! +//! # Examples +//! +//! ```rust,no_run +//! # use azure_data_cosmos::clients::ContainerClient; +//! # async fn example(container: ContainerClient) -> azure_core::Result<()> { +//! // Get physical partition feed ranges +//! let ranges = container.read_feed_ranges(None).await?; +//! println!("Container has {} physical partitions", ranges.len()); +//! +//! // Check if one range contains another +//! let pk_ranges = container.feed_range_from_partition_key("my_partition_key", None).await?; +//! for range in &ranges { +//! if range.contains(&pk_ranges[0]) { +//! println!("Partition key falls within this feed range"); +//! } +//! } +//! +//! // Serialize/deserialize for storage or transfer +//! let serialized = ranges[0].to_string(); +//! let restored: azure_data_cosmos::FeedRange = serialized.parse()?; +//! assert_eq!(ranges[0], restored); +//! # Ok(()) +//! # } +//! ``` + +use azure_core::fmt::SafeDebug; +use base64::Engine; +use serde::{Deserialize, Serialize}; +use std::fmt; +use std::str::FromStr; + +use azure_data_cosmos_driver::models::partition_key_range::PartitionKeyRange; + +use crate::hash::EffectivePartitionKey; +use crate::hash::{MAX_EXCLUSIVE_EFFECTIVE_PARTITION_KEY, MIN_INCLUSIVE_EFFECTIVE_PARTITION_KEY}; +use crate::routing::range::Range; + +/// An opaque representation of a contiguous range of partitions in a Cosmos DB container. +/// +/// Feed ranges are defined by effective partition key (EPK) boundaries and map to one or more +/// physical partitions. They are obtained from [`ContainerClient::read_feed_ranges()`](crate::clients::ContainerClient::read_feed_ranges) +/// or [`ContainerClient::feed_range_from_partition_key()`](crate::clients::ContainerClient::feed_range_from_partition_key). +/// +/// Feed ranges can be serialized to strings (via [`std::fmt::Display`]/[`std::str::FromStr`]) for storage or transfer +/// between processes. The serialization format is base64-encoded JSON, compatible with other +/// Azure Cosmos DB SDKs. +/// +/// # Serialization Formats +/// +/// `FeedRange` supports two distinct serialization formats: +/// +/// - **[`Display`](std::fmt::Display)/[`FromStr`]** — base64-encoded JSON, intended for string storage and cross-SDK transfer. +/// - **[`Serialize`]/[`Deserialize`]** — structured JSON (`{"Range": {...}}`), intended for embedding in JSON documents. +/// +/// These formats are **not interchangeable**: a value serialized with one cannot be deserialized with the other. +/// +/// # Comparison Methods +/// +/// Feed ranges support containment and overlap checks: +/// - [`contains()`](FeedRange::contains) — checks if another feed range is entirely within this one +/// - [`overlaps()`](FeedRange::overlaps) — checks if two feed ranges share any portion of the EPK space +#[derive(Clone, SafeDebug, PartialEq, Eq, Hash)] +#[non_exhaustive] +pub struct FeedRange { + pub(crate) min_inclusive: EffectivePartitionKey, + pub(crate) max_exclusive: EffectivePartitionKey, +} + +/// JSON wire format matching the cross-SDK feed range representation. +/// +/// Example: +/// ```json +/// {"Range": {"min": "", "max": "FF", "isMinInclusive": true, "isMaxInclusive": false}} +/// ``` +#[derive(Serialize, Deserialize)] +struct FeedRangeJson { + #[serde(rename = "Range")] + range: RangeJson, +} + +#[derive(Serialize, Deserialize)] +struct RangeJson { + min: String, + max: String, + #[serde(rename = "isMinInclusive")] + is_min_inclusive: bool, + #[serde(rename = "isMaxInclusive")] + is_max_inclusive: bool, +} + +impl FeedRange { + /// Creates a feed range covering the entire partition key space. + /// + /// This range spans from the minimum to maximum effective partition key values, + /// encompassing all partitions in a container. + pub fn full() -> Self { + Self { + min_inclusive: EffectivePartitionKey::from(MIN_INCLUSIVE_EFFECTIVE_PARTITION_KEY), + max_exclusive: EffectivePartitionKey::from(MAX_EXCLUSIVE_EFFECTIVE_PARTITION_KEY), + } + } + + /// Returns `true` if `other` is entirely contained within this feed range. + /// + /// A feed range A contains feed range B when A's minimum is less than or equal to B's minimum + /// and A's maximum is greater than or equal to B's maximum. + /// + /// # Examples + /// + /// ``` + /// # use azure_data_cosmos::FeedRange; + /// let full = FeedRange::full(); + /// let sub: FeedRange = "eyJSYW5nZSI6eyJtaW4iOiIiLCJtYXgiOiIzRkZGRkZGRkZGRkYiLCJpc01pbkluY2x1c2l2ZSI6dHJ1ZSwiaXNNYXhJbmNsdXNpdmUiOmZhbHNlfX0=".parse().unwrap(); + /// assert!(full.contains(&sub)); + /// ``` + pub fn contains(&self, other: &FeedRange) -> bool { + self.min_inclusive <= other.min_inclusive && self.max_exclusive >= other.max_exclusive + } + + /// Returns `true` if this feed range and `other` share any portion of the EPK space. + /// + /// Two feed ranges overlap when one starts before the other ends and vice versa. + pub fn overlaps(&self, other: &FeedRange) -> bool { + self.min_inclusive < other.max_exclusive && other.min_inclusive < self.max_exclusive + } + + /// Creates a `FeedRange` from an internal `Range`. + /// + /// The source range must have `[min, max)` semantics (min inclusive, max exclusive), + /// which is the invariant for all partition key ranges from the service. + #[allow( + dead_code, + reason = "will be used when query/change-feed gain FeedRange support" + )] + pub(crate) fn from_range(range: &Range) -> azure_core::Result { + if !range.is_min_inclusive || range.is_max_inclusive { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::DataConversion, + "FeedRange requires [min, max) semantics (isMinInclusive=true, isMaxInclusive=false)", + )); + } + Ok(Self { + min_inclusive: EffectivePartitionKey::from(range.min.as_str()), + max_exclusive: EffectivePartitionKey::from(range.max.as_str()), + }) + } + + /// Converts this `FeedRange` to an internal `Range`. + #[allow( + dead_code, + reason = "will be used when query/change-feed gain FeedRange support" + )] + pub(crate) fn to_range(&self) -> Range { + Range::new( + self.min_inclusive.as_str().to_owned(), + self.max_exclusive.as_str().to_owned(), + true, + false, + ) + } + + /// Creates a `FeedRange` from a driver `PartitionKeyRange`. + /// + /// Partition key ranges from the service always use `[min, max)` semantics + /// (min inclusive, max exclusive). Returns an error if the range is inverted. + #[allow( + dead_code, + reason = "will be used when feed range methods route through the driver's routing map" + )] + pub(crate) fn from_partition_key_range(pkr: &PartitionKeyRange) -> azure_core::Result { + if pkr.min_inclusive > pkr.max_exclusive { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::DataConversion, + "partition key range min_inclusive must be <= max_exclusive", + )); + } + Ok(Self { + min_inclusive: EffectivePartitionKey::from(pkr.min_inclusive.as_str()), + max_exclusive: EffectivePartitionKey::from(pkr.max_exclusive.as_str()), + }) + } + + /// Creates a `FeedRange` from the SDK's internal `PartitionKeyRange`. + /// + /// This uses the SDK-side routing map type (with `String` EPK fields). + pub(crate) fn from_sdk_partition_key_range( + pkr: &crate::routing::partition_key_range::PartitionKeyRange, + ) -> Self { + debug_assert!( + pkr.min_inclusive.as_str() <= pkr.max_exclusive.as_str(), + "partition key range min_inclusive must be <= max_exclusive" + ); + Self { + min_inclusive: EffectivePartitionKey::from(pkr.min_inclusive.as_str()), + max_exclusive: EffectivePartitionKey::from(pkr.max_exclusive.as_str()), + } + } + + /// Builds the JSON wire-format representation for serialization. + fn to_json(&self) -> FeedRangeJson { + FeedRangeJson { + range: RangeJson { + min: self.min_inclusive.as_str().to_owned(), + max: self.max_exclusive.as_str().to_owned(), + is_min_inclusive: true, + is_max_inclusive: false, + }, + } + } + + /// Validates and constructs a `FeedRange` from deserialized JSON fields. + /// + /// Checks inclusivity flags and min ≤ max ordering. + fn from_json(json: FeedRangeJson) -> azure_core::Result { + if !json.range.is_min_inclusive || json.range.is_max_inclusive { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::DataConversion, + "feed range must have [min, max) semantics (isMinInclusive=true, isMaxInclusive=false)", + )); + } + + let min = EffectivePartitionKey::from(json.range.min); + let max = EffectivePartitionKey::from(json.range.max); + + if min > max { + return Err(azure_core::Error::with_message( + azure_core::error::ErrorKind::DataConversion, + "feed range min must be less than or equal to max", + )); + } + + Ok(Self { + min_inclusive: min, + max_exclusive: max, + }) + } +} + +impl fmt::Display for FeedRange { + /// Formats this feed range as a base64-encoded JSON string. + /// + /// The output is compatible with other Azure Cosmos DB SDKs and can be + /// parsed back using [`std::str::FromStr`]. + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let json_str = serde_json::to_string(&self.to_json()).map_err(|_| fmt::Error)?; + let encoded = base64::engine::general_purpose::STANDARD.encode(json_str.as_bytes()); + f.write_str(&encoded) + } +} + +impl FromStr for FeedRange { + type Err = azure_core::Error; + + /// Parses a feed range from a base64-encoded JSON string. + /// + /// The input should be a string produced by [`std::fmt::Display`] or by another Azure Cosmos DB SDK. + fn from_str(s: &str) -> Result { + let decoded_bytes = base64::engine::general_purpose::STANDARD + .decode(s) + .map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::DataConversion, e))?; + + let json: FeedRangeJson = serde_json::from_slice(&decoded_bytes) + .map_err(|e| azure_core::Error::new(azure_core::error::ErrorKind::DataConversion, e))?; + + Self::from_json(json) + } +} + +impl Serialize for FeedRange { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + self.to_json().serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for FeedRange { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let json = FeedRangeJson::deserialize(deserializer)?; + Self::from_json(json).map_err(|e| serde::de::Error::custom(e.to_string())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn full_range() { + let full = FeedRange::full(); + assert_eq!(full.min_inclusive.as_str(), ""); + assert_eq!(full.max_exclusive.as_str(), "FF"); + } + + #[test] + fn contains_full_contains_sub() { + let full = FeedRange::full(); + let sub = FeedRange { + min_inclusive: EffectivePartitionKey::from("00"), + max_exclusive: EffectivePartitionKey::from("80"), + }; + assert!(full.contains(&sub)); + assert!(!sub.contains(&full)); + } + + #[test] + fn contains_self() { + let range = FeedRange { + min_inclusive: EffectivePartitionKey::from("20"), + max_exclusive: EffectivePartitionKey::from("80"), + }; + assert!(range.contains(&range)); + } + + #[test] + fn overlaps_basic() { + let a = FeedRange { + min_inclusive: EffectivePartitionKey::from("00"), + max_exclusive: EffectivePartitionKey::from("50"), + }; + let b = FeedRange { + min_inclusive: EffectivePartitionKey::from("30"), + max_exclusive: EffectivePartitionKey::from("80"), + }; + assert!(a.overlaps(&b)); + assert!(b.overlaps(&a)); + } + + #[test] + fn overlaps_adjacent_no_overlap() { + let a = FeedRange { + min_inclusive: EffectivePartitionKey::from("00"), + max_exclusive: EffectivePartitionKey::from("50"), + }; + let b = FeedRange { + min_inclusive: EffectivePartitionKey::from("50"), + max_exclusive: EffectivePartitionKey::from("FF"), + }; + // Adjacent ranges (a's max == b's min) do NOT overlap because max is exclusive + assert!(!a.overlaps(&b)); + assert!(!b.overlaps(&a)); + } + + #[test] + fn overlaps_disjoint() { + let a = FeedRange { + min_inclusive: EffectivePartitionKey::from("00"), + max_exclusive: EffectivePartitionKey::from("30"), + }; + let b = FeedRange { + min_inclusive: EffectivePartitionKey::from("50"), + max_exclusive: EffectivePartitionKey::from("FF"), + }; + assert!(!a.overlaps(&b)); + assert!(!b.overlaps(&a)); + } + + #[test] + fn display_produces_expected_base64_full_range() { + let range = FeedRange { + min_inclusive: EffectivePartitionKey::from(""), + max_exclusive: EffectivePartitionKey::from("FF"), + }; + assert_eq!( + range.to_string(), + "eyJSYW5nZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiIsImlzTWluSW5jbHVzaXZlIjp0cnVlLCJpc01heEluY2x1c2l2ZSI6ZmFsc2V9fQ==" + ); + } + + #[test] + fn display_produces_expected_base64_sub_range() { + let range = FeedRange { + min_inclusive: EffectivePartitionKey::from("3FFFFFFFFFFF"), + max_exclusive: EffectivePartitionKey::from("7FFFFFFFFFFF"), + }; + assert_eq!( + range.to_string(), + "eyJSYW5nZSI6eyJtaW4iOiIzRkZGRkZGRkZGRkYiLCJtYXgiOiI3RkZGRkZGRkZGRkYiLCJpc01pbkluY2x1c2l2ZSI6dHJ1ZSwiaXNNYXhJbmNsdXNpdmUiOmZhbHNlfX0=" + ); + } + + #[test] + fn from_str_parses_full_range() { + let input = "eyJSYW5nZSI6eyJtaW4iOiIiLCJtYXgiOiJGRiIsImlzTWluSW5jbHVzaXZlIjp0cnVlLCJpc01heEluY2x1c2l2ZSI6ZmFsc2V9fQ=="; + let range: FeedRange = input.parse().unwrap(); + assert_eq!(range.min_inclusive.as_str(), ""); + assert_eq!(range.max_exclusive.as_str(), "FF"); + } + + #[test] + fn from_str_parses_sub_range() { + let input = "eyJSYW5nZSI6eyJtaW4iOiIzRkZGRkZGRkZGRkYiLCJtYXgiOiI3RkZGRkZGRkZGRkYiLCJpc01pbkluY2x1c2l2ZSI6dHJ1ZSwiaXNNYXhJbmNsdXNpdmUiOmZhbHNlfX0="; + let range: FeedRange = input.parse().unwrap(); + assert_eq!(range.min_inclusive.as_str(), "3FFFFFFFFFFF"); + assert_eq!(range.max_exclusive.as_str(), "7FFFFFFFFFFF"); + } + + #[test] + fn serde_json_serializes_to_cross_sdk_format() { + let range = FeedRange { + min_inclusive: EffectivePartitionKey::from(""), + max_exclusive: EffectivePartitionKey::from("FF"), + }; + let json = serde_json::to_string(&range).unwrap(); + + let value: serde_json::Value = serde_json::from_str(&json).unwrap(); + let inner = value.get("Range").expect("expected 'Range' key"); + assert_eq!(inner.get("min").unwrap().as_str().unwrap(), ""); + assert_eq!(inner.get("max").unwrap().as_str().unwrap(), "FF"); + assert!(inner.get("isMinInclusive").unwrap().as_bool().unwrap()); + assert!(!inner.get("isMaxInclusive").unwrap().as_bool().unwrap()); + } + + #[test] + fn serde_json_deserializes_cross_sdk_format() { + let json = + r#"{"Range":{"min":"","max":"FF","isMinInclusive":true,"isMaxInclusive":false}}"#; + let range: FeedRange = serde_json::from_str(json).unwrap(); + assert_eq!(range.min_inclusive.as_str(), ""); + assert_eq!(range.max_exclusive.as_str(), "FF"); + } + + #[test] + fn from_str_invalid_base64() { + let result = "not-valid-base64!!!".parse::(); + assert!(result.is_err()); + } + + #[test] + fn from_str_invalid_json() { + let encoded = base64::engine::general_purpose::STANDARD.encode(b"not json"); + let result = encoded.parse::(); + assert!(result.is_err()); + } + + #[test] + fn from_partition_key_range() { + let pkr = PartitionKeyRange::new("0".to_string(), "".to_string(), "FF".to_string()); + let feed_range = FeedRange::from_partition_key_range(&pkr).unwrap(); + assert_eq!(feed_range.min_inclusive.as_str(), ""); + assert_eq!(feed_range.max_exclusive.as_str(), "FF"); + } + + #[test] + fn to_range_produces_expected_fields() { + let feed_range = FeedRange { + min_inclusive: EffectivePartitionKey::from("20"), + max_exclusive: EffectivePartitionKey::from("80"), + }; + let range = feed_range.to_range(); + assert_eq!(range.min, "20"); + assert_eq!(range.max, "80"); + assert!(range.is_min_inclusive); + assert!(!range.is_max_inclusive); + } + + #[test] + fn from_range_parses_expected_fields() { + let range = Range::new("20".to_owned(), "80".to_owned(), true, false); + let feed_range = FeedRange::from_range(&range).unwrap(); + assert_eq!(feed_range.min_inclusive.as_str(), "20"); + assert_eq!(feed_range.max_exclusive.as_str(), "80"); + } + + #[test] + fn cross_sdk_compatibility() { + // Verify that the full range serializes to the same base64 string regardless of platform + let full = FeedRange::full(); + let serialized = full.to_string(); + + // Decode and verify the JSON structure + let decoded = base64::engine::general_purpose::STANDARD + .decode(&serialized) + .unwrap(); + let json: serde_json::Value = serde_json::from_slice(&decoded).unwrap(); + + let range = json.get("Range").unwrap(); + assert_eq!(range.get("min").unwrap().as_str().unwrap(), ""); + assert_eq!(range.get("max").unwrap().as_str().unwrap(), "FF"); + assert!(range.get("isMinInclusive").unwrap().as_bool().unwrap()); + assert!(!range.get("isMaxInclusive").unwrap().as_bool().unwrap()); + } + + #[test] + fn from_str_rejects_max_inclusive() { + let json = r#"{"Range":{"min":"","max":"FF","isMinInclusive":true,"isMaxInclusive":true}}"#; + let encoded = base64::engine::general_purpose::STANDARD.encode(json.as_bytes()); + assert!(encoded.parse::().is_err()); + } + + #[test] + fn serde_rejects_min_not_inclusive() { + let json = + r#"{"Range":{"min":"","max":"FF","isMinInclusive":false,"isMaxInclusive":false}}"#; + assert!(serde_json::from_str::(json).is_err()); + } + + #[test] + fn from_str_rejects_inverted_range() { + let json = + r#"{"Range":{"min":"FF","max":"","isMinInclusive":true,"isMaxInclusive":false}}"#; + let encoded = base64::engine::general_purpose::STANDARD.encode(json.as_bytes()); + assert!(encoded.parse::().is_err()); + } + + #[test] + fn serde_rejects_inverted_range() { + let json = + r#"{"Range":{"min":"FF","max":"","isMinInclusive":true,"isMaxInclusive":false}}"#; + assert!(serde_json::from_str::(json).is_err()); + } + + #[test] + fn from_range_rejects_wrong_inclusivity() { + let range = Range::new("".to_string(), "FF".to_string(), false, true); + assert!(FeedRange::from_range(&range).is_err()); + } +} diff --git a/sdk/cosmos/azure_data_cosmos/src/handler/container_connection.rs b/sdk/cosmos/azure_data_cosmos/src/handler/container_connection.rs index fc68b2e2588..d95b106740c 100644 --- a/sdk/cosmos/azure_data_cosmos/src/handler/container_connection.rs +++ b/sdk/cosmos/azure_data_cosmos/src/handler/container_connection.rs @@ -41,6 +41,40 @@ impl ContainerConnection { } } + /// Returns the partition key definition from the eagerly-resolved container reference. + pub(crate) fn partition_key_definition( + &self, + ) -> &azure_data_cosmos_driver::models::PartitionKeyDefinition { + self.container_ref.partition_key_definition() + } + + /// Resolves the routing map for this container. + pub(crate) async fn resolve_routing_map( + &self, + force_refresh: bool, + ) -> Result< + Option, + azure_core::Error, + > { + let collection_rid = self.container_ref.rid(); + let collection_name = self.container_ref.name(); + let routing_map = self + .pk_range_cache + .try_lookup(collection_name, collection_rid, None) + .await?; + + if force_refresh { + if let Some(previous) = routing_map { + return self + .pk_range_cache + .try_lookup(collection_name, collection_rid, Some(previous)) + .await; + } + } + + Ok(routing_map) + } + pub async fn send( &self, mut cosmos_request: CosmosRequest, diff --git a/sdk/cosmos/azure_data_cosmos/src/hash.rs b/sdk/cosmos/azure_data_cosmos/src/hash.rs index 42d696b534f..5a0bce076a6 100644 --- a/sdk/cosmos/azure_data_cosmos/src/hash.rs +++ b/sdk/cosmos/azure_data_cosmos/src/hash.rs @@ -6,14 +6,19 @@ use crate::murmur_hash::{murmurhash3_128, murmurhash3_32}; use std::fmt::Write; const MAX_STRING_BYTES_TO_APPEND: usize = 100; -const MIN_INCLUSIVE_EFFECTIVE_PARTITION_KEY: &str = ""; -const MAX_EXCLUSIVE_EFFECTIVE_PARTITION_KEY: &str = "FF"; +pub(crate) const MIN_INCLUSIVE_EFFECTIVE_PARTITION_KEY: &str = ""; +pub(crate) const MAX_EXCLUSIVE_EFFECTIVE_PARTITION_KEY: &str = "FF"; /// A strongly-typed wrapper around the hex-encoded effective partition key string. /// /// Use [`AsRef`] to obtain the underlying string when passing to APIs /// that accept `&str`. -#[derive(Debug, Clone, PartialEq, Eq)] +/// +/// Ordering is lexicographic on the underlying hex string. This is correct because: +/// - All actual EPK hash values are uppercase hex strings of consistent length +/// - The sentinel MAX ("FF") sorts after all real hashes by the Cosmos DB EPK space design +/// - The sentinel MIN ("") sorts before everything +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct EffectivePartitionKey(String); impl EffectivePartitionKey { @@ -29,6 +34,18 @@ impl AsRef for EffectivePartitionKey { } } +impl From for EffectivePartitionKey { + fn from(s: String) -> Self { + Self(s) + } +} + +impl From<&str> for EffectivePartitionKey { + fn from(s: &str) -> Self { + Self(s.to_owned()) + } +} + /// Contains all allowed markers for component marker types. mod component { pub const UNDEFINED: u8 = 0x00; diff --git a/sdk/cosmos/azure_data_cosmos/src/lib.rs b/sdk/cosmos/azure_data_cosmos/src/lib.rs index e0f69e7312a..1691bea16ad 100644 --- a/sdk/cosmos/azure_data_cosmos/src/lib.rs +++ b/sdk/cosmos/azure_data_cosmos/src/lib.rs @@ -11,6 +11,7 @@ mod connection_string; pub mod constants; mod credential; mod feed; +mod feed_range; pub mod options; mod partition_key; pub(crate) mod pipeline; @@ -42,6 +43,7 @@ pub use transactional_batch::{ }; pub use feed::{FeedItemIterator, FeedPage, FeedPageIterator, QueryFeedPage}; +pub use feed_range::FeedRange; mod background_task_manager; mod cosmos_request; mod driver_bridge; diff --git a/sdk/cosmos/azure_data_cosmos/src/options/mod.rs b/sdk/cosmos/azure_data_cosmos/src/options/mod.rs index 2547b14eefd..93f13b158f0 100644 --- a/sdk/cosmos/azure_data_cosmos/src/options/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/src/options/mod.rs @@ -400,6 +400,26 @@ pub struct ReadDatabaseOptions; #[non_exhaustive] pub struct ThroughputOptions; +/// Options for [`ContainerClient::read_feed_ranges()`](crate::clients::ContainerClient::read_feed_ranges) +/// and [`ContainerClient::feed_range_from_partition_key()`](crate::clients::ContainerClient::feed_range_from_partition_key). +#[derive(Clone, Default, Debug)] +#[non_exhaustive] +pub struct ReadFeedRangesOptions { + force_refresh: bool, +} + +impl ReadFeedRangesOptions { + /// When `true`, discards any cached routing map and fetches a fresh copy from the service. + pub fn with_force_refresh(mut self, force_refresh: bool) -> Self { + self.force_refresh = force_refresh; + self + } + + pub(crate) fn force_refresh(&self) -> bool { + self.force_refresh + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/sdk/cosmos/azure_data_cosmos/tests/emulator_tests/cosmos_feed_ranges.rs b/sdk/cosmos/azure_data_cosmos/tests/emulator_tests/cosmos_feed_ranges.rs new file mode 100644 index 00000000000..9e25e7939f5 --- /dev/null +++ b/sdk/cosmos/azure_data_cosmos/tests/emulator_tests/cosmos_feed_ranges.rs @@ -0,0 +1,305 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +#![cfg(feature = "key_auth")] + +use super::framework; + +use std::error::Error; + +use azure_data_cosmos::{ + models::{ContainerProperties, ThroughputProperties}, + CreateContainerOptions, FeedRange, +}; +use base64::Engine; + +use framework::TestClient; + +#[tokio::test] +#[cfg_attr( + not(test_category = "emulator"), + ignore = "requires test_category 'emulator'" +)] +pub async fn read_feed_ranges_returns_physical_partitions() -> Result<(), Box> { + TestClient::run_with_unique_db( + async |run_context, db_client| { + let properties = ContainerProperties::new("FeedRangeContainer", "/pk".into()); + + // Use 11000 RU/s to ensure at least 2 physical partitions (10000 RU/s per partition). + let throughput = ThroughputProperties::manual(11000); + let options = CreateContainerOptions::default().with_throughput(throughput); + + let container_client = run_context + .create_container(db_client, properties, Some(options)) + .await?; + + let ranges = container_client.read_feed_ranges(None).await?; + + // With 11000 RU/s the service should create at least 2 physical partitions. + assert!( + ranges.len() >= 2, + "expected at least 2 feed ranges with 11000 RU/s, got {}", + ranges.len() + ); + + // All ranges should be contained within the full EPK space. + let full = FeedRange::full(); + for range in &ranges { + assert!( + full.contains(range), + "full range should contain every partition range" + ); + } + + // No two ranges should overlap. + for i in 0..ranges.len() { + for j in (i + 1)..ranges.len() { + assert!( + !ranges[i].overlaps(&ranges[j]), + "ranges {i} and {j} should not overlap" + ); + } + } + + // Each range should be serializable via Display and parseable via FromStr. + for range in &ranges { + let serialized = range.to_string(); + // Verify the serialized string is valid base64-encoded JSON + // with the expected cross-SDK structure. + let decoded = base64::engine::general_purpose::STANDARD + .decode(&serialized) + .expect("feed range Display should produce valid base64"); + let json: serde_json::Value = + serde_json::from_slice(&decoded).expect("decoded base64 should be valid JSON"); + let inner = json.get("Range").expect("expected 'Range' key"); + assert!(inner.get("min").is_some(), "expected 'min' field"); + assert!(inner.get("max").is_some(), "expected 'max' field"); + assert!( + inner.get("isMinInclusive").unwrap().as_bool().unwrap(), + "isMinInclusive should be true" + ); + assert!( + !inner.get("isMaxInclusive").unwrap().as_bool().unwrap(), + "isMaxInclusive should be false" + ); + + // Verify FromStr can parse the serialized string and produces + // a range contained within the full EPK space. + let parsed: FeedRange = serialized + .parse() + .expect("feed range should be parseable from Display output"); + assert!( + full.contains(&parsed), + "parsed feed range should be within full EPK space" + ); + } + + Ok(()) + }, + None, + ) + .await +} + +#[tokio::test] +#[cfg_attr( + not(test_category = "emulator"), + ignore = "requires test_category 'emulator'" +)] +pub async fn feed_range_from_partition_key_maps_correctly() -> Result<(), Box> { + TestClient::run_with_unique_db( + async |run_context, db_client| { + let properties = ContainerProperties::new("FeedRangeFromPK", "/pk".into()); + + // Use 11000 RU/s to ensure at least 2 physical partitions. + let throughput = ThroughputProperties::manual(11000); + let options = CreateContainerOptions::default().with_throughput(throughput); + + let container_client = run_context + .create_container(db_client, properties, Some(options)) + .await?; + + // Get the physical partition ranges. + let physical_ranges = container_client.read_feed_ranges(None).await?; + + // Get the feed range for a specific partition key. + let pk_ranges = container_client + .feed_range_from_partition_key("test_partition_key", None) + .await?; + + // Full key should return exactly one feed range. + assert_eq!( + pk_ranges.len(), + 1, + "full partition key should map to exactly one feed range" + ); + let pk_range = &pk_ranges[0]; + + // The returned range must match one of the physical partitions. + let matches_physical = physical_ranges.iter().any(|pr| pr == pk_range); + assert!( + matches_physical, + "feed_range_from_partition_key should return one of the physical partition ranges" + ); + + // The same partition key should always map to the same range (deterministic). + let pk_ranges_again = container_client + .feed_range_from_partition_key("test_partition_key", None) + .await?; + assert_eq!( + pk_ranges, pk_ranges_again, + "same PK should map to same range" + ); + + Ok(()) + }, + None, + ) + .await +} + +/// Validates that `feed_range_from_partition_key` returns exactly one feed range +/// for a full hierarchical partition key (all components provided). +#[tokio::test] +#[cfg_attr( + not(test_category = "emulator"), + ignore = "requires test_category 'emulator'" +)] +pub async fn feed_range_from_full_hpk_returns_single_range() -> Result<(), Box> { + TestClient::run_with_unique_db( + async |run_context, db_client| { + let properties = ContainerProperties::new( + "FeedRangeHPKFull", + ("/tenant", "/user", "/session").into(), + ); + + let container_client = run_context + .create_container(db_client, properties, None) + .await?; + + // Full key: all 3 components provided. + let pk = azure_data_cosmos::PartitionKey::from(("tenantA", "user1", "sess1")); + let ranges = container_client + .feed_range_from_partition_key(pk, None) + .await?; + + assert_eq!( + ranges.len(), + 1, + "full HPK should map to exactly one feed range" + ); + + // The range should be within the full EPK space. + let full = FeedRange::full(); + assert!( + full.contains(&ranges[0]), + "feed range should be within the full EPK space" + ); + + Ok(()) + }, + None, + ) + .await +} + +/// Validates that `feed_range_from_partition_key` returns one or more feed ranges +/// for a prefix hierarchical partition key (fewer components than paths). +#[tokio::test] +#[cfg_attr( + not(test_category = "emulator"), + ignore = "requires test_category 'emulator'" +)] +pub async fn feed_range_from_prefix_hpk_returns_ranges() -> Result<(), Box> { + TestClient::run_with_unique_db( + async |run_context, db_client| { + let properties = ContainerProperties::new( + "FeedRangeHPKPrefix", + ("/tenant", "/user", "/session").into(), + ); + + let container_client = run_context + .create_container(db_client, properties, None) + .await?; + + // Prefix key: only 1 of 3 components. + let pk = azure_data_cosmos::PartitionKey::from("tenantA"); + let ranges = container_client + .feed_range_from_partition_key(pk, None) + .await?; + + // Should return at least one feed range. + assert!( + !ranges.is_empty(), + "prefix HPK should return at least one feed range" + ); + + // All returned ranges should be within the full EPK space. + let full = FeedRange::full(); + for range in &ranges { + assert!( + full.contains(range), + "each feed range should be within the full EPK space" + ); + } + + // No two returned ranges should overlap. + for i in 0..ranges.len() { + for j in (i + 1)..ranges.len() { + assert!( + !ranges[i].overlaps(&ranges[j]), + "returned feed ranges should not overlap" + ); + } + } + + // Prefix with 2 of 3 components. + let pk2 = azure_data_cosmos::PartitionKey::from(("tenantA", "user1")); + let ranges2 = container_client + .feed_range_from_partition_key(pk2, None) + .await?; + + assert!( + !ranges2.is_empty(), + "prefix HPK (2-of-3) should return at least one feed range" + ); + + Ok(()) + }, + None, + ) + .await +} + +/// Validates that `feed_range_from_partition_key` works correctly for +/// a full key on a single-hash container. +#[tokio::test] +#[cfg_attr( + not(test_category = "emulator"), + ignore = "requires test_category 'emulator'" +)] +pub async fn feed_range_from_partition_key_single_hash_full_key() -> Result<(), Box> { + TestClient::run_with_unique_db( + async |run_context, db_client| { + let properties = ContainerProperties::new("FeedRangeSingleHash", "/pk".into()); + let container_client = run_context + .create_container(db_client, properties, None) + .await?; + + let result = container_client + .feed_range_from_partition_key("valid_key", None) + .await; + assert!(result.is_ok(), "full key on single-hash should succeed"); + + let ranges = result.unwrap(); + assert_eq!( + ranges.len(), + 1, + "full key should return exactly one feed range" + ); + + Ok(()) + }, + None, + ) + .await +} diff --git a/sdk/cosmos/azure_data_cosmos/tests/emulator_tests/mod.rs b/sdk/cosmos/azure_data_cosmos/tests/emulator_tests/mod.rs index de2df3a0d93..02b9779b1b3 100644 --- a/sdk/cosmos/azure_data_cosmos/tests/emulator_tests/mod.rs +++ b/sdk/cosmos/azure_data_cosmos/tests/emulator_tests/mod.rs @@ -5,6 +5,7 @@ mod cosmos_batch; mod cosmos_containers; mod cosmos_databases; mod cosmos_fault_injection; +mod cosmos_feed_ranges; mod cosmos_items; mod cosmos_offers; mod cosmos_proxy; diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/models/effective_partition_key.rs b/sdk/cosmos/azure_data_cosmos_driver/src/models/effective_partition_key.rs index d3cd12d13d8..d806db7ec3a 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/models/effective_partition_key.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/models/effective_partition_key.rs @@ -23,7 +23,7 @@ use std::fmt::Write; /// where an EPK is expected. #[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] #[serde(transparent)] -pub(crate) struct EffectivePartitionKey(String); +pub struct EffectivePartitionKey(String); impl EffectivePartitionKey { /// Returns the minimum EPK (empty string), representing the start of the EPK space. diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/models/mod.rs b/sdk/cosmos/azure_data_cosmos_driver/src/models/mod.rs index 7a91fd678d1..a4888a6b7a3 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/models/mod.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/models/mod.rs @@ -29,11 +29,11 @@ mod user_agent; pub(crate) mod vector_session_token; pub(crate) use cosmos_headers::request_header_names; #[allow(dead_code)] -pub(crate) mod effective_partition_key; +pub mod effective_partition_key; #[allow(dead_code)] mod murmur_hash; #[allow(dead_code)] -pub(crate) mod partition_key_range; +pub mod partition_key_range; #[allow(dead_code)] pub(crate) mod range; diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key.rs b/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key.rs index 0b140c12170..f2c76e58d7b 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key.rs @@ -318,8 +318,8 @@ impl PartitionKey { self.0.len() } - /// Returns the partition key components for use by the EPK hashing logic. - pub(crate) fn values(&self) -> &[PartitionKeyValue] { + /// Returns the partition key components. + pub fn values(&self) -> &[PartitionKeyValue] { &self.0 } } diff --git a/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key_range.rs b/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key_range.rs index 2b107f1ea59..73908705d26 100644 --- a/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key_range.rs +++ b/sdk/cosmos/azure_data_cosmos_driver/src/models/partition_key_range.rs @@ -10,7 +10,7 @@ use std::hash::{Hash, Hasher}; /// Represents a partition key range in the Azure Cosmos DB service. #[derive(Debug, Clone, Serialize, Deserialize)] -pub(crate) struct PartitionKeyRange { +pub struct PartitionKeyRange { /// Gets or sets the Id of the resource #[serde(rename = "id")] pub id: String, @@ -51,9 +51,11 @@ pub(crate) struct PartitionKeyRange { #[serde(rename = "targetThroughput", skip_serializing_if = "Option::is_none")] pub target_throughput: Option, - /// Status of the partition key range + /// Status of the partition key range. + /// + /// Not part of the public API surface; uses a crate-internal enum type. #[serde(rename = "status", default)] - pub status: PartitionKeyRangeStatus, + pub(crate) status: PartitionKeyRangeStatus, /// Log Sequence Number #[serde(rename = "_lsn", default)] @@ -116,7 +118,7 @@ impl PartitionKeyRange { } /// Returns a view of this partition key range as an `EpkRange<&EffectivePartitionKey>`. - pub fn as_range(&self) -> EpkRange<&EffectivePartitionKey> { + pub(crate) fn as_range(&self) -> EpkRange<&EffectivePartitionKey> { EpkRange { min: &self.min_inclusive, max: &self.max_exclusive,