Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
c0e63f2
Add feed operations spec for Cosmos driver
analogrelay Apr 22, 2026
2956b34
Revise feed operations spec design
analogrelay Apr 22, 2026
beaeb5f
Replace OpenTelemetry section with diagnostics hierarchy
analogrelay Apr 22, 2026
561d771
Refine feed operations spec from review
analogrelay Apr 22, 2026
fbe9a0a
Remove backend query plan caching section
analogrelay Apr 22, 2026
4349142
Handle splits within Fetch steps, keeping them focused on the same EP…
analogrelay Apr 23, 2026
0b506b6
Apply suggestion from @Copilot
analogrelay Apr 23, 2026
41a4353
spec refinements
analogrelay Apr 29, 2026
2ebf07a
pr feedback
analogrelay Apr 30, 2026
a1e6add
Merge branch 'release/azure_data_cosmos-previews' into ashleyst/feed-…
analogrelay May 1, 2026
36ce91b
rename ResponseBody::Single to ResponseBody::Bytes
analogrelay May 1, 2026
3d32bbb
Plan implementation of feed operations spec
analogrelay May 5, 2026
682f428
Restructure to a simpler requirements document instead of a detailed
analogrelay May 7, 2026
3bf13bd
Initial dataflow pipeline
analogrelay May 7, 2026
3c3f800
Add OperationTarget and driver FeedRange
analogrelay May 7, 2026
9c38dab
Flow routing headers down into operation pipeline via
analogrelay May 7, 2026
3b1b897
Add an explicit "Plan" API to set up a pipeline
analogrelay May 7, 2026
067538b
Add SequentialDrain, split recovery, topology
analogrelay May 7, 2026
ad0969b
Add query plan models and plan_operation API
analogrelay May 8, 2026
f84bf28
Separate planning and execution of operations
analogrelay May 8, 2026
25f7eed
Integrate 2-stage planning for non-point operations
analogrelay May 8, 2026
b01ad3c
Wire everything up for queries
analogrelay May 8, 2026
2a08af9
Rebase updates
analogrelay May 8, 2026
7748c48
Consolidate FeedRange types
analogrelay May 11, 2026
61dc449
Move EPK and hashing from SDK to driver
analogrelay May 11, 2026
4080a01
Move PartitionKey and PartitionKeyValue to driver
analogrelay May 11, 2026
d457403
Add QueryScope parameter to query_items
analogrelay May 11, 2026
ffd491a
Fix cross-partition test
analogrelay May 11, 2026
73f08ad
Continuation tokens
analogrelay May 11, 2026
417c7a2
Dataflow cleanup: remove ChildNodes, split mod.rs, drop StubTopologyP…
analogrelay May 11, 2026
60f89d3
Merge release branch
analogrelay May 12, 2026
cc744a5
fix doc issues
analogrelay May 12, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure_data_cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
- Removed the `request_url()` accessor (gated on the `fault_injection` feature) from `ItemResponse`/`ResourceResponse`/`BatchResponse`. Driver-routed operations never populated it, so it always returned `None` in current usage.

- `CosmosClientBuilder::with_user_agent_suffix` (and `CosmosClientOptions::with_user_agent_suffix`) now take `UserAgentSuffix` instead of `impl Into<String>`. Callers passing a `&str` or `String` must construct the value explicitly via `UserAgentSuffix::new` (panics on invalid input) or `UserAgentSuffix::try_new` (returns `Option`). Validation rules (max 25 characters, HTTP-header-safe) are now enforced at the construction site instead of being applied silently inside the builder. ([#4368](https://github.com/Azure/azure-sdk-for-rust/pull/4368))
- `ContainerClient::query_items()` now takes a `QueryScope` (`QueryScope::partition(...)`, `QueryScope::feed_range(...)`, or `QueryScope::full_container()`) instead of a partition key where `()` represented cross-partition queries.

- Replaced `CosmosDiagnostics` with `CosmosDiagnosticsContext` (a re-export of `azure_data_cosmos_driver::diagnostics::DiagnosticsContext`). All response types now return `Arc<CosmosDiagnosticsContext>` from `diagnostics()` (the returned `Arc` derefs transparently to `CosmosDiagnosticsContext` for read-only inspection, and can be retained alongside a consumed response body). The previous `activity_id() -> Option<&str>` and `server_duration_ms() -> Option<f64>` accessors on `CosmosDiagnostics` are replaced by `CosmosDiagnosticsContext::activity_id() -> &ActivityId` and per-request server timing via `CosmosDiagnosticsContext::requests()[i].server_duration_ms()`.

Expand Down
17 changes: 9 additions & 8 deletions sdk/cosmos/azure_data_cosmos/examples/cosmos/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::error::Error;

use azure_data_cosmos::{CosmosClient, PartitionKey};
use azure_data_cosmos::{query::QueryScope, CosmosClient};
use clap::{Args, Subcommand};
use futures::TryStreamExt;

Expand Down Expand Up @@ -55,13 +55,14 @@ impl QueryCommand {
let db_client = client.database_client(&database);
let container_client = db_client.container_client(&container).await?;

let pk = match partition_key {
Some(pk) => PartitionKey::from(pk),
None => PartitionKey::EMPTY,
let scope = match partition_key {
Some(pk) => QueryScope::partition(pk),
None => QueryScope::full_container(),
};

let mut items =
container_client.query_items::<serde_json::Value>(&query, pk, None)?;
let mut items = container_client
.query_items::<serde_json::Value>(&query, scope, None)
.await?;

println!("Items:");
while let Some(item) = items.try_next().await? {
Expand All @@ -70,7 +71,7 @@ impl QueryCommand {
Ok(())
}
Subcommands::Databases { query } => {
let mut dbs = client.query_databases(query, None)?;
let mut dbs = client.query_databases(query, None).await?;

println!("Databases:");
while let Some(item) = dbs.try_next().await? {
Expand All @@ -80,7 +81,7 @@ impl QueryCommand {
}
Subcommands::Containers { database, query } => {
let db_client = client.database_client(&database);
let mut dbs = db_client.query_containers(query, None)?;
let mut dbs = db_client.query_containers(query, None).await?;

println!("Containers:");
while let Some(item) = dbs.try_next().await? {
Expand Down
106 changes: 57 additions & 49 deletions sdk/cosmos/azure_data_cosmos/src/clients/container_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,17 @@

use crate::{
clients::{offers_client, ClientContext},
feed_range::FeedRange,
models::{
BatchResponse, ContainerProperties, ItemResponse, ResourceResponse, ThroughputProperties,
},
options::{
BatchOptions, Precondition, QueryOptions, ReadContainerOptions, ReadFeedRangesOptions,
SessionToken,
},
query::QueryScope,
transactional_batch::TransactionalBatch,
DeleteContainerOptions, FeedItemIterator, ItemReadOptions, ItemWriteOptions, PartitionKey,
Query, ReplaceContainerOptions, ThroughputOptions,
DeleteContainerOptions, FeedItemIterator, FeedRange, ItemReadOptions, ItemWriteOptions,
PartitionKey, Query, ReplaceContainerOptions, ThroughputOptions,
};

use super::ThroughputPoller;
Expand Down Expand Up @@ -84,7 +84,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, OperationOptions::default())
.execute_point_operation(operation, OperationOptions::default())
.await?;

Ok(ResourceResponse::new(
Expand Down Expand Up @@ -138,7 +138,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, operation_options)
.execute_point_operation(operation, operation_options)
.await?;

Ok(ResourceResponse::new(
Expand Down Expand Up @@ -230,7 +230,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, OperationOptions::default())
.execute_point_operation(operation, OperationOptions::default())
.await?;

Ok(ResourceResponse::new(
Expand Down Expand Up @@ -316,7 +316,7 @@ impl ContainerClient {
// Build the driver's item reference from our stored container metadata.
let item_ref = ItemReference::from_name(
&self.container_ref,
partition_key.into().into_driver_partition_key(),
partition_key.into(),
item_id.to_owned(),
);

Expand All @@ -328,7 +328,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, options.operation)
.execute_point_operation(operation, options.operation)
.await?;

// Bridge the driver response to the SDK response type.
Expand Down Expand Up @@ -414,7 +414,7 @@ impl ContainerClient {
// Build the driver's item reference from our stored container metadata.
let item_ref = ItemReference::from_name(
&self.container_ref,
partition_key.into().into_driver_partition_key(),
partition_key.into(),
item_id.to_owned(),
);

Expand All @@ -426,7 +426,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, options.operation)
.execute_point_operation(operation, options.operation)
.await?;

// Bridge the driver response to the SDK response type.
Expand Down Expand Up @@ -516,7 +516,7 @@ impl ContainerClient {
// Build the driver's item reference from our stored container metadata.
let item_ref = ItemReference::from_name(
&self.container_ref,
partition_key.into().into_driver_partition_key(),
partition_key.into(),
item_id.to_owned(),
);

Expand All @@ -528,7 +528,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, options.operation)
.execute_point_operation(operation, options.operation)
.await?;

// Bridge the driver response to the SDK response type.
Expand Down Expand Up @@ -576,7 +576,7 @@ impl ContainerClient {
// Build the driver's item reference from our stored container metadata.
let item_ref = ItemReference::from_name(
&self.container_ref,
partition_key.into().into_driver_partition_key(),
partition_key.into(),
item_id.to_owned(),
);

Expand All @@ -588,7 +588,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, options.operation)
.execute_point_operation(operation, options.operation)
.await?;

// Bridge the driver response to the SDK response type.
Expand Down Expand Up @@ -628,7 +628,7 @@ impl ContainerClient {
// Build the driver's item reference from our stored container metadata.
let item_ref = ItemReference::from_name(
&self.container_ref,
partition_key.into().into_driver_partition_key(),
partition_key.into(),
item_id.to_owned(),
);

Expand All @@ -640,7 +640,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, options.operation)
.execute_point_operation(operation, options.operation)
.await?;

// Bridge the driver response to the SDK response type.
Expand All @@ -661,7 +661,7 @@ impl ContainerClient {
/// # Arguments
///
/// * `query` - The query to execute.
/// * `partition_key` - The partition key to scope the query on, or specify an empty key (`()`) to perform a cross-partition query.
/// * `scope` - The [`QueryScope`] specifying the scope of the query.
/// * `options` - Optional parameters for the request.
///
/// # Cross Partition Queries
Expand All @@ -672,11 +672,12 @@ impl ContainerClient {
///
/// # Examples
///
/// The `query` and `partition_key` parameters accept anything that can be transformed [`Into`] their relevant types.
/// The `query` parameter accepts anything that can be transformed [`Into`] a [`Query`], and `scope` controls partition targeting.
/// This allows simple queries without parameters to be expressed easily:
///
/// ```rust,no_run
/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
/// # use azure_data_cosmos::query::QueryScope;
/// # let container_client: azure_data_cosmos::clients::ContainerClient = panic!("this is a non-running example");
/// #[derive(serde::Deserialize)]
/// struct Customer {
Expand All @@ -685,16 +686,17 @@ impl ContainerClient {
/// }
/// let items = container_client.query_items::<Customer>(
/// "SELECT * FROM c",
/// "some_partition_key",
/// None)?;
/// QueryScope::partition("some_partition_key"),
/// None,
/// ).await?;
/// # }
/// ```
///
/// You can specify parameters by using [`Query::from()`] and [`Query::with_parameter()`]:
///
/// ```rust,no_run
/// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
/// use azure_data_cosmos::Query;
/// use azure_data_cosmos::{query::QueryScope, Query};
/// # let container_client: azure_data_cosmos::clients::ContainerClient = panic!("this is a non-running example");
/// #[derive(serde::Deserialize)]
/// struct Customer {
Expand All @@ -703,34 +705,49 @@ impl ContainerClient {
/// }
/// let query = Query::from("SELECT COUNT(*) FROM c WHERE c.customer_id = @customer_id")
/// .with_parameter("@customer_id", 42)?;
/// let items = container_client.query_items::<Customer>(query, "some_partition_key", None)?;
/// let items = container_client
/// .query_items::<Customer>(query, QueryScope::partition("some_partition_key"), None).await?;
/// # }
/// ```
///
/// See [`PartitionKey`](crate::PartitionKey) for more information on how to specify a partition key, and [`Query`] for more information on how to specify a query.
pub fn query_items<T: DeserializeOwned + Send + 'static>(
pub async fn query_items<T: DeserializeOwned + Send + 'static>(
&self,
query: impl Into<Query>,
partition_key: impl Into<PartitionKey>,
scope: QueryScope,
options: Option<QueryOptions>,
) -> azure_core::Result<FeedItemIterator<T>> {
let options = options.unwrap_or_default();
let partition_key: PartitionKey = partition_key.into();
let query = query.into();

let driver_pk = partition_key.into_driver_partition_key();
let container_ref = self.container_ref.clone();
let factory =
move || CosmosOperation::query_items(container_ref.clone(), driver_pk.clone());

crate::query::executor::QueryExecutor::new(
// The first operation to execute in the query items flow.
// This holds the session token provided by the user, if any.
let mut initial_operation =
CosmosOperation::query_items(container_ref.clone(), scope.into())
.with_body(serde_json::to_vec(&query)?);
if let Some(token) = options.session_token {
initial_operation = initial_operation.with_session_token(token);
}
if let Some(max_item_count) = options.max_item_count {
initial_operation = initial_operation.with_max_item_count(max_item_count);
}
let plan = self
.context
.driver
.plan_operation(
initial_operation,
&options.operation,
options.continuation_token.as_ref(),
)
.await?;
Ok(FeedItemIterator::new(
self.context.driver.clone(),
factory,
query,
Some(self.container_ref.clone()),
plan,
options.operation,
options.session_token,
)
.into_stream()
))
}

/// Executes a transactional batch of operations.
Expand Down Expand Up @@ -781,7 +798,7 @@ impl ContainerClient {
) -> azure_core::Result<BatchResponse> {
let options = options.unwrap_or_default();
let body = serde_json::to_vec(batch.operations())?;
let driver_pk = batch.partition_key().clone().into_driver_partition_key();
let driver_pk = batch.partition_key().clone();

let operation =
CosmosOperation::batch(self.container_ref.clone(), driver_pk).with_body(body);
Expand All @@ -790,7 +807,7 @@ impl ContainerClient {
let driver_response = self
.context
.driver
.execute_operation(operation, options.operation)
.execute_point_operation(operation, options.operation)
.await?;

Ok(BatchResponse::new(
Expand Down Expand Up @@ -840,10 +857,7 @@ impl ContainerClient {
));
}

ranges
.iter()
.map(FeedRange::from_partition_key_range)
.collect()
ranges.iter().map(FeedRange::try_from).collect()
}

/// Returns the [`FeedRange`]s covering the given partition key.
Expand All @@ -856,7 +870,7 @@ impl ContainerClient {
options: Option<ReadFeedRangesOptions>,
) -> azure_core::Result<Vec<FeedRange>> {
let partition_key = partition_key.into();
let driver_pk = partition_key.into_driver_partition_key();
let driver_pk = partition_key;
let options = options.unwrap_or_default();
let pk_def = self.container_ref.partition_key_definition();
let values = driver_pk.values();
Expand Down Expand Up @@ -925,15 +939,9 @@ impl ContainerClient {
));
}

ranges
.iter()
.map(FeedRange::from_partition_key_range)
.collect()
ranges.iter().map(FeedRange::try_from).collect()
} else {
ranges
.iter()
.map(FeedRange::from_partition_key_range)
.collect()
ranges.iter().map(FeedRange::try_from).collect()
}
}

Expand Down
Loading
Loading