-
Notifications
You must be signed in to change notification settings - Fork 3.3k
feed range query fix #46506
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feed range query fix #46506
Changes from all commits
e4c4b39
fd62436
34ba12a
f99623b
f4668ab
e35c0b1
aa4b1e2
3474442
2a4538c
9a1e9df
b98cd8a
4a1b709
8b22307
7d22830
ff9eb03
ea50e6b
eb50d76
aebfe70
b19dbac
1007da5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -8,6 +8,8 @@ | |
|
|
||
| #### Bugs Fixed | ||
| * Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243) | ||
| * Fixed a bug in `query_items(feed_range=...)` where pagination could return incorrect results after a partition split caused the supplied feed range to overlap multiple physical partitions. See [PR 46506](https://github.com/Azure/azure-sdk-for-python/pull/46506) | ||
| * Fixed `SELECT VALUE` aggregation classification across partitions: booleans are no longer treated as numeric aggregates, non-aggregate numeric projections are no longer merged, and `MIN`/`MAX` detection is now correct. See [PR 46506](https://github.com/Azure/azure-sdk-for-python/pull/46506) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Recommendation · Docs: Missing behavioral change in CHANGELOG
The CHANGELOG entry correctly calls out the boolean exclusion and non-aggregate numeric fix, but it omits a user-facing behavioral change: This is the right design choice (loud failure > silent data corruption), but users who previously depended on the (incorrect) sum-of-averages behavior will encounter an unexpected exception on upgrade. The CHANGELOG should call this out explicitly so users can prepare — e.g.:
|
||
|
|
||
| #### Other Changes | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,6 +39,11 @@ | |
| from . import documents | ||
| from . import http_constants | ||
| from . import _runtime_constants | ||
| from ._query_aggregate_utils import ( | ||
| _AggregatePartialClassification, | ||
| _classify_aggregate_partial, | ||
| _get_select_value_aggregate_function, | ||
| ) | ||
| from ._constants import _Constants as Constants | ||
| from .auth import _get_authorization_header | ||
| from .offer import ThroughputProperties | ||
|
|
@@ -129,6 +134,7 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]: | |
| options['accessCondition'] = {'type': 'IfNoneMatch', 'condition': if_none_match} | ||
| return options | ||
|
|
||
|
|
||
| def _merge_query_results( | ||
| results: dict[str, Any], | ||
| partial_result: dict[str, Any], | ||
|
|
@@ -168,22 +174,13 @@ def _merge_query_results( | |
|
|
||
| results_docs = results.get("Documents") | ||
|
|
||
| # Check if both results are aggregate queries | ||
| is_partial_agg = ( | ||
| isinstance(partial_docs, list) | ||
| and len(partial_docs) == 1 | ||
| and isinstance(partial_docs[0], dict) | ||
| and partial_docs[0].get("_aggregate") is not None | ||
| ) | ||
| is_results_agg = ( | ||
| results_docs | ||
| and isinstance(results_docs, list) | ||
| and len(results_docs) == 1 | ||
| and isinstance(results_docs[0], dict) | ||
| and results_docs[0].get("_aggregate") is not None | ||
| ) | ||
| partial_aggregate_class = _classify_aggregate_partial(partial_docs, query) | ||
| results_aggregate_class = _classify_aggregate_partial(results_docs, query) | ||
|
|
||
| if is_partial_agg and is_results_agg: | ||
| if ( | ||
| partial_aggregate_class == _AggregatePartialClassification.OBJECT | ||
| and results_aggregate_class == _AggregatePartialClassification.OBJECT | ||
| ): | ||
| agg_results = results_docs[0]["_aggregate"] # type: ignore[index] | ||
| agg_partial = partial_docs[0]["_aggregate"] | ||
| for key in agg_partial: | ||
|
|
@@ -201,33 +198,26 @@ def _merge_query_results( | |
| agg_results[key] += agg_partial[key] | ||
| return results | ||
|
|
||
| # Check if both are VALUE aggregate queries | ||
| is_partial_value_agg = ( | ||
| isinstance(partial_docs, list) | ||
| and len(partial_docs) == 1 | ||
| and isinstance(partial_docs[0], (int, float)) | ||
| ) | ||
| is_results_value_agg = ( | ||
| results_docs | ||
| and isinstance(results_docs, list) | ||
| and len(results_docs) == 1 | ||
| and isinstance(results_docs[0], (int, float)) | ||
| ) | ||
|
|
||
| if is_partial_value_agg and is_results_value_agg: | ||
| query_text = query.get("query") if isinstance(query, dict) else query | ||
| if query_text: | ||
| query_upper = query_text.upper() | ||
| # For MIN/MAX, we find the min/max of the partial results. | ||
| # For COUNT/SUM, we sum the partial results. | ||
| # Without robust query parsing, we can't distinguish them reliably. | ||
| # Defaulting to sum for COUNT/SUM. MIN/MAX VALUE queries are not fully supported client-side. | ||
| if " SELECT VALUE MIN" in query_upper: | ||
| results_docs[0] = min(results_docs[0], partial_docs[0]) # type: ignore[index] | ||
| elif " SELECT VALUE MAX" in query_upper: | ||
| results_docs[0] = max(results_docs[0], partial_docs[0]) # type: ignore[index] | ||
| else: # For COUNT/SUM, we sum the partial results | ||
| results_docs[0] += partial_docs[0] # type: ignore[index] | ||
| if ( | ||
| partial_aggregate_class == _AggregatePartialClassification.VALUE | ||
| and results_aggregate_class == _AggregatePartialClassification.VALUE | ||
| ): | ||
| aggregate_fn = _get_select_value_aggregate_function(query) | ||
|
dibahlfi marked this conversation as resolved.
|
||
| if aggregate_fn is None: | ||
| raise ValueError( | ||
| "Invariant violation: VALUE aggregate classification requires a recognized aggregate function." | ||
| ) | ||
| if aggregate_fn == "MIN": | ||
|
tvaron3 marked this conversation as resolved.
|
||
| results_docs[0] = min(results_docs[0], partial_docs[0]) # type: ignore[index] | ||
| elif aggregate_fn == "MAX": | ||
| results_docs[0] = max(results_docs[0], partial_docs[0]) # type: ignore[index] | ||
| elif aggregate_fn == "AVG": | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Recommendation · Design: SQL text scanning duplicates aggregate detection the backend already provides The SDK already has comprehensive, backend-driven aggregate detection that doesn't require SQL parsing:
This new SQL scanner creates a parallel, less reliable detection mechanism:
Preferred alternative: Fetch or cache the query plan at the feed-range entry point and pass If there's a latency constraint preventing a query-plan round-trip on the feed-range path, this trade-off should be documented explicitly, and the AVG limitation called out in the CHANGELOG or docstring. |
||
| raise ValueError( | ||
| "VALUE AVG aggregate merge across partitions is not supported client-side." | ||
| ) | ||
| else: | ||
| # COUNT/SUM are additive. | ||
| results_docs[0] += partial_docs[0] # type: ignore[index] | ||
| return results | ||
|
|
||
| # Standard query, append documents | ||
|
|
@@ -239,6 +229,28 @@ def _merge_query_results( | |
| return results | ||
|
|
||
|
|
||
| def _raise_query_merge_value_error(merge_error: ValueError) -> None: | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🟡 Recommendation · Type Safety: Return type should be
from typing import NoReturn
def _raise_query_merge_value_error(merge_error: ValueError) -> NoReturn:Minor but cheap to fix, and it strengthens the type contract for a function on a correctness-critical error path. |
||
| """Raise a clearer user-facing error for unsupported VALUE aggregate merges. | ||
|
|
||
| ``SELECT VALUE AVG(...)`` partials cannot be merged correctly client-side | ||
| across multiple partition/range responses. We fail loudly instead of | ||
| falling back to list concatenation (which would silently produce | ||
| mathematically incorrect results). | ||
|
|
||
| :param merge_error: ValueError raised while merging partial query results. | ||
| :type merge_error: ValueError | ||
| :raises ValueError: Always re-raises, potentially with a clearer message. | ||
| """ | ||
| merge_message = str(merge_error) | ||
| if "VALUE AVG aggregate merge across partitions is not supported client-side." in merge_message: | ||
| raise ValueError( | ||
| "Unsupported query shape for range-scoped pagination: " | ||
| "SELECT VALUE AVG(...) cannot be merged client-side when the query " | ||
| "scope spans multiple physical partitions." | ||
| ) from merge_error | ||
| raise merge_error | ||
|
|
||
|
|
||
| def GetHeaders( # pylint: disable=too-many-statements,too-many-branches | ||
| cosmos_client_connection: Union["CosmosClientConnection", "AsyncClientConnection"], | ||
| default_headers: Mapping[str, Any], | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.