Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

#### Bugs Fixed
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)
* Fixed a bug in `query_items(feed_range=...)` where pagination could return incorrect results after a partition split caused the supplied feed range to overlap multiple physical partitions. See [PR 46506](https://github.com/Azure/azure-sdk-for-python/pull/46506)
Comment thread
dibahlfi marked this conversation as resolved.
* Fixed `SELECT VALUE` aggregation classification across partitions: booleans are no longer treated as numeric aggregates, non-aggregate numeric projections are no longer merged, and `MIN`/`MAX` detection is now correct. See [PR 46506](https://github.com/Azure/azure-sdk-for-python/pull/46506)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation · Docs: Missing behavioral change in CHANGELOG

SELECT VALUE AVG(...) cross-partition now raises ValueError — not documented

The CHANGELOG entry correctly calls out the boolean exclusion and non-aggregate numeric fix, but it omits a user-facing behavioral change: SELECT VALUE AVG(c.x) FROM c across multiple partitions now raises ValueError("VALUE AVG aggregate merge across partitions is not supported client-side.") instead of silently returning incorrect results (the old code summed per-partition averages).

This is the right design choice (loud failure > silent data corruption), but users who previously depended on the (incorrect) sum-of-averages behavior will encounter an unexpected exception on upgrade. The CHANGELOG should call this out explicitly so users can prepare — e.g.:

SELECT VALUE AVG(...) queries spanning multiple partitions now raise ValueError instead of returning an incorrect sum of per-partition averages. Use SELECT VALUE {"sum": SUM(c.x), "count": COUNT(c.x)} FROM c and compute the average client-side.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.


#### Other Changes

Expand Down
96 changes: 54 additions & 42 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@
from . import documents
from . import http_constants
from . import _runtime_constants
from ._query_aggregate_utils import (
_AggregatePartialClassification,
_classify_aggregate_partial,
_get_select_value_aggregate_function,
)
from ._constants import _Constants as Constants
from .auth import _get_authorization_header
from .offer import ThroughputProperties
Expand Down Expand Up @@ -129,6 +134,7 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
options['accessCondition'] = {'type': 'IfNoneMatch', 'condition': if_none_match}
return options


def _merge_query_results(
results: dict[str, Any],
partial_result: dict[str, Any],
Expand Down Expand Up @@ -168,22 +174,13 @@ def _merge_query_results(

results_docs = results.get("Documents")

# Check if both results are aggregate queries
is_partial_agg = (
isinstance(partial_docs, list)
and len(partial_docs) == 1
and isinstance(partial_docs[0], dict)
and partial_docs[0].get("_aggregate") is not None
)
is_results_agg = (
results_docs
and isinstance(results_docs, list)
and len(results_docs) == 1
and isinstance(results_docs[0], dict)
and results_docs[0].get("_aggregate") is not None
)
partial_aggregate_class = _classify_aggregate_partial(partial_docs, query)
results_aggregate_class = _classify_aggregate_partial(results_docs, query)

if is_partial_agg and is_results_agg:
if (
partial_aggregate_class == _AggregatePartialClassification.OBJECT
and results_aggregate_class == _AggregatePartialClassification.OBJECT
):
agg_results = results_docs[0]["_aggregate"] # type: ignore[index]
agg_partial = partial_docs[0]["_aggregate"]
for key in agg_partial:
Expand All @@ -201,33 +198,26 @@ def _merge_query_results(
agg_results[key] += agg_partial[key]
return results

# Check if both are VALUE aggregate queries
is_partial_value_agg = (
isinstance(partial_docs, list)
and len(partial_docs) == 1
and isinstance(partial_docs[0], (int, float))
)
is_results_value_agg = (
results_docs
and isinstance(results_docs, list)
and len(results_docs) == 1
and isinstance(results_docs[0], (int, float))
)

if is_partial_value_agg and is_results_value_agg:
query_text = query.get("query") if isinstance(query, dict) else query
if query_text:
query_upper = query_text.upper()
# For MIN/MAX, we find the min/max of the partial results.
# For COUNT/SUM, we sum the partial results.
# Without robust query parsing, we can't distinguish them reliably.
# Defaulting to sum for COUNT/SUM. MIN/MAX VALUE queries are not fully supported client-side.
if " SELECT VALUE MIN" in query_upper:
results_docs[0] = min(results_docs[0], partial_docs[0]) # type: ignore[index]
elif " SELECT VALUE MAX" in query_upper:
results_docs[0] = max(results_docs[0], partial_docs[0]) # type: ignore[index]
else: # For COUNT/SUM, we sum the partial results
results_docs[0] += partial_docs[0] # type: ignore[index]
if (
partial_aggregate_class == _AggregatePartialClassification.VALUE
and results_aggregate_class == _AggregatePartialClassification.VALUE
):
aggregate_fn = _get_select_value_aggregate_function(query)
Comment thread
dibahlfi marked this conversation as resolved.
if aggregate_fn is None:
raise ValueError(
"Invariant violation: VALUE aggregate classification requires a recognized aggregate function."
)
if aggregate_fn == "MIN":
Comment thread
tvaron3 marked this conversation as resolved.
results_docs[0] = min(results_docs[0], partial_docs[0]) # type: ignore[index]
elif aggregate_fn == "MAX":
results_docs[0] = max(results_docs[0], partial_docs[0]) # type: ignore[index]
elif aggregate_fn == "AVG":
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation · Design: SQL text scanning duplicates aggregate detection the backend already provides

The SDK already has comprehensive, backend-driven aggregate detection that doesn't require SQL parsing:

  • _PartitionedQueryExecutionInfo.has_aggregates() / get_aggregates() — wraps the queryInfo.aggregates array from the backend query plan (returns ["Count"], ["Average"], etc.)
  • aggregators.py — full aggregator classes including _AverageAggregator that correctly tracks sum/count decomposition
  • _QueryExecutionAggregateEndpointComponent — pipeline component that uses queryInfo.aggregates for correct cross-partition aggregation

This new SQL scanner creates a parallel, less reliable detection mechanism:

  1. AVG regression: _AverageAggregator in the existing pipeline handles AVG correctly (sum/count decomposition). This new code raises ValueError for SELECT VALUE AVG(...), making AVG queries that work through the normal pipeline fail on the feed-range path.

  2. Line comments: _strip_sql_block_comments handles /* */ but not --. A -- comment before an aggregate function name could cause misclassification.

  3. Evolution burden: Every new Cosmos SQL feature (new aggregate functions, query syntax) requires scanner updates.

Preferred alternative: Fetch or cache the query plan at the feed-range entry point and pass queryInfo.aggregates to _merge_query_results. This eliminates the scanner entirely, restores AVG support, handles all current+future aggregate functions without maintenance, and aligns with how the rest of the SDK works.

If there's a latency constraint preventing a query-plan round-trip on the feed-range path, this trade-off should be documented explicitly, and the AVG limitation called out in the CHANGELOG or docstring.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

raise ValueError(
"VALUE AVG aggregate merge across partitions is not supported client-side."
)
else:
# COUNT/SUM are additive.
results_docs[0] += partial_docs[0] # type: ignore[index]
return results

# Standard query, append documents
Expand All @@ -239,6 +229,28 @@ def _merge_query_results(
return results


def _raise_query_merge_value_error(merge_error: ValueError) -> None:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation · Type Safety: Return type should be NoReturn, not None

_raise_query_merge_value_error unconditionally raises — it never returns. The -> None annotation is semantically incorrect and prevents static type checkers (mypy, pyright) from treating the call site as a guaranteed raise. This means:

  1. Code after base._raise_query_merge_value_error(merge_error) in the except ValueError block won't generate an "unreachable code" warning
  2. If someone later adds a code path that doesn't raise, it would silently return None with no type error
from typing import NoReturn

def _raise_query_merge_value_error(merge_error: ValueError) -> NoReturn:

Minor but cheap to fix, and it strengthens the type contract for a function on a correctness-critical error path.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

"""Raise a clearer user-facing error for unsupported VALUE aggregate merges.

``SELECT VALUE AVG(...)`` partials cannot be merged correctly client-side
across multiple partition/range responses. We fail loudly instead of
falling back to list concatenation (which would silently produce
mathematically incorrect results).

:param merge_error: ValueError raised while merging partial query results.
:type merge_error: ValueError
:raises ValueError: Always re-raises, potentially with a clearer message.
"""
merge_message = str(merge_error)
if "VALUE AVG aggregate merge across partitions is not supported client-side." in merge_message:
raise ValueError(
"Unsupported query shape for range-scoped pagination: "
"SELECT VALUE AVG(...) cannot be merged client-side when the query "
"scope spans multiple physical partitions."
) from merge_error
raise merge_error


def GetHeaders( # pylint: disable=too-many-statements,too-many-branches
cosmos_client_connection: Union["CosmosClientConnection", "AsyncClientConnection"],
default_headers: Mapping[str, Any],
Expand Down
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class _Constants:
AAD_DEFAULT_SCOPE: str = "https://cosmos.azure.com/.default"
INFERENCE_SERVICE_DEFAULT_SCOPE = "https://dbinference.azure.com/.default"
SEMANTIC_RERANKER_INFERENCE_ENDPOINT: str = "AZURE_COSMOS_SEMANTIC_RERANKER_INFERENCE_ENDPOINT"
EMIT_STRUCTURED_CONTINUATION_PK_CONFIG: str = "AZURE_COSMOS_EMIT_STRUCTURED_CONTINUATION_PK"

# Health Check Retry Policy constants
AZURE_COSMOS_HEALTH_CHECK_MAX_RETRIES: str = "AZURE_COSMOS_HEALTH_CHECK_MAX_RETRIES"
Expand Down
Loading
Loading