Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions sdk/cosmos/.cspell.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
"hostnames",
"hotfixes",
"IMDS",
"inclusivity",
"isquery",
"japaneast",
"japanwest",
Expand Down Expand Up @@ -96,12 +97,14 @@
"readfeed",
"replicaset",
"reqs",
"Replicaset",
"Retriable",
"retryable",
"rfind",
"RNTBD",
"roundtrips",
"rwcache",
"sess",
"southafricanorth",
"southafricawest",
"southcentralus",
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure_data_cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

- Added throughput control API: re-exported `ThroughputControlGroupOptions` and `PriorityLevel` from the driver. Users can register throughput control groups on `CosmosClientBuilder` via `with_throughput_control_group()` to configure priority-based execution and throughput bucket server features. ([#4078](https://github.com/Azure/azure-sdk-for-rust/pull/4078))
- Added `ThroughputPoller` type that implements `IntoFuture` and `Stream` for tracking asynchronous throughput replacement operations.
- Added `FeedRange` type with `ContainerClient::read_feed_ranges()` and `ContainerClient::feed_range_from_partition_key()` - supports hierarchical partition keys (MultiHash) including prefix partition keys that return multiple feed ranges. ([#4149](https://github.com/Azure/azure-sdk-for-rust/pull/4149))

### Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure_data_cosmos/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ async-lock.workspace = true
async-trait.workspace = true
azure_core = { workspace = true, default-features = false }
azure_data_cosmos_driver = { workspace = true }
base64.workspace = true
futures.workspace = true
pin-project.workspace = true
reqwest = { workspace = true, optional = true }
Expand Down
150 changes: 136 additions & 14 deletions sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

use crate::{
clients::{offers_client, ClientContext},
feed_range::FeedRange,
models::{
BatchResponse, ContainerProperties, CosmosResponse, ItemResponse, ResourceResponse,
ThroughputProperties,
},
options::{BatchOptions, QueryOptions, ReadContainerOptions},
options::{BatchOptions, QueryOptions, ReadContainerOptions, ReadFeedRangesOptions},
resource_context::{ResourceLink, ResourceType},
transactional_batch::TransactionalBatch,
DeleteContainerOptions, FeedItemIterator, ItemReadOptions, ItemWriteOptions, PartitionKey,
Expand All @@ -22,7 +23,10 @@ use crate::operation_context::OperationType;
use crate::routing::partition_key_range_cache::PartitionKeyRangeCache;
use azure_core::http::headers::AsHeaders;
use azure_core::http::Context;
use azure_data_cosmos_driver::models::{ContainerReference, CosmosOperation, ItemReference};
use azure_data_cosmos_driver::models::{
effective_partition_key::EffectivePartitionKey as DriverEpk, ContainerReference,
CosmosOperation, ItemReference, PartitionKeyKind,
};
use serde::{de::DeserializeOwned, Serialize};

/// A client for working with a specific container in a Cosmos DB account.
Expand All @@ -33,8 +37,6 @@ pub struct ContainerClient {
link: ResourceLink,
items_link: ResourceLink,
container_connection: Arc<ContainerConnection>,
#[expect(dead_code, reason = "will be used when tracing spans are re-added")]
container_id: String,
Comment thread
simorenoh marked this conversation as resolved.
container_ref: ContainerReference,
context: ClientContext,
}
Expand Down Expand Up @@ -78,7 +80,6 @@ impl ContainerClient {
link,
items_link,
container_connection,
container_id: container_id.to_string(),
container_ref,
context,
})
Expand Down Expand Up @@ -779,36 +780,157 @@ impl ContainerClient {
.await
.map(BatchResponse::new)
}

/// Gets the feed ranges for this container.
pub async fn read_feed_ranges(
&self,
options: Option<ReadFeedRangesOptions>,
) -> azure_core::Result<Vec<FeedRange>> {
let options = options.unwrap_or_default();
let routing_map = self
.container_connection
.resolve_routing_map(options.force_refresh())
.await?
.ok_or_else(|| {
azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"failed to resolve routing map for container",
)
})?;
Ok(routing_map
.ordered_partition_key_ranges()
.iter()
.map(FeedRange::from_sdk_partition_key_range)
.collect())
}

/// Returns the [`FeedRange`]s covering the given partition key.
///
/// Full keys return a single-element `Vec`. Prefix keys on MultiHash
/// containers return one or more feed ranges.
pub async fn feed_range_from_partition_key(
&self,
partition_key: impl Into<PartitionKey>,
options: Option<ReadFeedRangesOptions>,
) -> azure_core::Result<Vec<FeedRange>> {
let partition_key = partition_key.into();
let driver_pk = partition_key.into_driver_partition_key();
let options = options.unwrap_or_default();
let pk_def = self.container_connection.partition_key_definition();
let values = driver_pk.values();

if values.is_empty() {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"partition key must have at least one component",
));
}
if values.len() > pk_def.paths().len() {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
format!(
"partition key has {} components but container definition has {} paths",
values.len(),
pk_def.paths().len()
),
));
}

let is_prefix =
pk_def.kind() == PartitionKeyKind::MultiHash && values.len() < pk_def.paths().len();
if !is_prefix && values.len() != pk_def.paths().len() {
return Err(azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"prefix partition keys are only supported for MultiHash (hierarchical) containers",
));
}

let routing_map = self
.container_connection
.resolve_routing_map(options.force_refresh())
.await?
.ok_or_else(|| {
azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"failed to resolve routing map for container",
)
})?;

if is_prefix {
let epk_range = DriverEpk::compute_range(values, pk_def)?;
let query_range = crate::routing::range::Range::new(
epk_range.start.as_str().to_owned(),
epk_range.end.as_str().to_owned(),
true,
false,
);
let pkranges = routing_map.get_overlapping_ranges(&query_range);
if pkranges.is_empty() {
let refreshed = self
.container_connection
.resolve_routing_map(true)
.await?
.ok_or_else(|| {
azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"failed to resolve routing map after refresh",
)
})?;
Ok(refreshed
.get_overlapping_ranges(&query_range)
.iter()
.map(FeedRange::from_sdk_partition_key_range)
.collect())
} else {
Ok(pkranges
.iter()
.map(FeedRange::from_sdk_partition_key_range)
.collect())
}
} else {
let epk = DriverEpk::compute(values, pk_def.kind(), pk_def.version());
match routing_map.get_range_by_effective_partition_key(epk.as_str()) {
Ok(pkr) => Ok(vec![FeedRange::from_sdk_partition_key_range(pkr)]),
Err(_) => {
let refreshed = self
.container_connection
.resolve_routing_map(true)
.await?
.ok_or_else(|| {
azure_core::Error::with_message(
azure_core::error::ErrorKind::Other,
"failed to resolve routing map after refresh",
)
})?;
let pkr = refreshed.get_range_by_effective_partition_key(epk.as_str())?;
Ok(vec![FeedRange::from_sdk_partition_key_range(pkr)])
}
}
}
}
}

#[cfg(test)]
mod tests {
use super::*;

/// Compile-time assertion that `ContainerClient` async method futures are `Send`.
///
/// This function is never called; it only needs to compile.
/// If any future is not `Send`, compilation will fail.
#[allow(dead_code, unreachable_code, unused_variables)]
fn _assert_futures_are_send() {
fn assert_send<T: Send>(_: T) {}
let client: &ContainerClient = todo!();

// Container operations
assert_send(client.read(todo!()));
assert_send(client.replace(todo!(), todo!()));
assert_send(client.read_throughput(todo!()));
assert_send(client.begin_replace_throughput(todo!(), todo!()));
assert_send(client.delete(todo!()));

// Item operations (use "" for partition_key to avoid never-type fallback issues)
assert_send(client.create_item::<serde_json::Value>("", todo!(), todo!()));
assert_send(client.replace_item::<serde_json::Value>("", todo!(), todo!(), todo!()));
assert_send(client.upsert_item::<serde_json::Value>("", todo!(), todo!()));
assert_send(client.read_item::<serde_json::Value>("", todo!(), todo!()));
assert_send(client.delete_item("", todo!(), todo!()));

// Batch operations
assert_send(client.execute_transactional_batch(todo!(), todo!()));
assert_send(client.read_feed_ranges(todo!()));
assert_send(client.feed_range_from_partition_key("", todo!()));
}
}
Loading
Loading