Skip to content
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#### Bugs Fixed
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)
* Fixed bug where region names in `preferred_locations` and `excluded_locations` (client-level and per-request) were not matched tolerantly for differences in case, whitespace, hyphens, and underscores. See [PR 46937](https://github.com/Azure/azure-sdk-for-python/pull/46937)
* Fixed bug where a `ValueError("Ranges overlap")` or an `AssertionError("code bug: returned overlapping ranges ... is empty")` from the partition key range cache could escape to the caller when the `/pkranges` response contained a transiently inconsistent snapshot (overlap or gap). See [PR 47091](https://github.com/Azure/azure-sdk-for-python/pull/47091)

#### Other Changes
* Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297)
Expand Down
22 changes: 17 additions & 5 deletions sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,14 @@ def _has_read_retryable_headers(request_headers):
return True
return False

def _is_read_retryable_request(request, request_params: Optional[RequestObject] = None):
Comment thread
dibahlfi marked this conversation as resolved.
if request and _has_read_retryable_headers(request.headers):
return True
if request_params and _OperationType.IsReadOnlyOperation(request_params.operation_type):
# Fallback for flows where operation headers are absent but request metadata is available.
return True
return False

def _has_database_account_header(request_headers):
if request_headers.get(HttpHeaders.ThinClientProxyResourceType) == ResourceType.DatabaseAccount:
return True
Expand All @@ -354,13 +362,17 @@ def _handle_service_request_retries(
raise exception

def _handle_service_response_retries(request, client, response_retry_policy, exception, *args):
if request and (_has_read_retryable_headers(request.headers) or (args and (is_write_retryable(args[0], client) or
client._global_endpoint_manager.is_per_partition_automatic_failover_applicable(args[0])))):
request_params = args[0] if args else None
if request and (_is_read_retryable_request(request, request_params) or (request_params is not None and (
is_write_retryable(request_params, client) or
client._global_endpoint_manager.is_per_partition_automatic_failover_applicable(request_params)))):
# we resolve the request endpoint to the next preferred region
# once we are out of preferred regions we stop retrying
retry_policy = response_retry_policy
if not retry_policy.ShouldRetry():
if args and args[0].should_clear_session_token_on_session_read_failure and client.session:
if (request_params is not None
and request_params.should_clear_session_token_on_session_read_failure
and client.session):
client.session.clear_session_token(client.last_response_headers)
raise exception
else:
Expand Down Expand Up @@ -444,7 +456,7 @@ def send(self, request):
except ServiceResponseError as err:
retry_error = err
# Only read operations can be safely retried with ServiceResponseError
if (not _has_read_retryable_headers(request.http_request.headers) or
if (not _is_read_retryable_request(request.http_request, request_params) or
_has_database_account_header(request.http_request.headers) or
request_params.healthy_tentative_location):
raise err
Expand All @@ -466,7 +478,7 @@ def send(self, request):
if (_has_database_account_header(request.http_request.headers) or
request_params.healthy_tentative_location):
raise err
if _has_read_retryable_headers(request.http_request.headers) and retry_settings['read'] > 0:
if _is_read_retryable_request(request.http_request, request_params) and retry_settings['read'] > 0:
_record_failure_if_request_not_cancelled(request_params, global_endpoint_manager, None)
retry_active = self.increment(retry_settings, response=request, error=err)
if retry_active:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,19 @@
"""

import logging
import random
from typing import Any, Dict, List, Optional, Tuple

from .. import _base, http_constants
from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges
from ..exceptions import CosmosHttpResponseError
# Re-exported here so provider modules and tests import these from one place
# rather than reaching into ``collection_routing_map`` directly.
from .collection_routing_map import ( # pylint: disable=unused-import
CollectionRoutingMap,
_build_routing_map_from_ranges,
_OverlapDetected,
_GapDetected,
)
from . import routing_range
from .routing_range import (
PKRange,
Expand All @@ -44,6 +53,76 @@

PAGE_SIZE_CHANGE_FEED = "-1" # Return all available changes

# Retry budget for transient ``/pkranges`` snapshot inconsistencies (overlap
# or gap) before the caller surfaces a 503. Shared by sync and async providers.
_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS = 3
# Initial backoff (seconds) before the next retry; doubles each attempt and
# is jittered uniformly in ``[0, upper_bound]``. With MAX_ATTEMPTS=3 the
Comment thread
dibahlfi marked this conversation as resolved.
Outdated
# worst-case cumulative sleep is 0 + 0.1 + 0.2 = 0.3s per surfaced 503.
_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS = 0.1


def _jittered_backoff(backoff_seconds: float) -> float:
"""Return a uniformly-distributed sleep in ``[0, backoff_seconds]``.

:param float backoff_seconds: Non-negative upper bound for the backoff.
:return: A random sleep value in ``[0, backoff_seconds]``.
:rtype: float
"""
return random.uniform(0, backoff_seconds)


def _handle_transient_snapshot_retry_decision(
*,
retry_attempt_count: int,
collection_link: str,
logger: logging.Logger, # pylint: disable=redefined-outer-name
) -> float:
"""Return the next backoff to sleep, or raise 503 once the budget is exhausted.

Called after the routing-map builder reports a transient overlap or gap.
The caller performs the actual sleep (``time.sleep`` vs ``await
asyncio.sleep``) -- the only line that differs between sync and async.

:keyword int retry_attempt_count: Attempts so far, including the failed
one. Pass ``1`` after the first failure.
:keyword str collection_link: Used in log messages and the 503 body.
:keyword logging.Logger logger: Caller's module-level logger.
:return: Jittered backoff seconds in ``[0, deterministic_upper_bound]``.
:rtype: float
:raises CosmosHttpResponseError: When the retry budget is exhausted.
"""
if retry_attempt_count >= _TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS:
logger.error(
"Routing-map fetch for collection '%s' returned overlapping or "
"gapped ranges on %d attempt(s). Surfacing as HTTP 503.",
collection_link,
retry_attempt_count,
)
raise CosmosHttpResponseError(
status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE,
message=(
"Routing-map fetch for collection '{}' returned overlapping "
"or gapped ranges on {} attempt(s)."
).format(collection_link, retry_attempt_count),
Comment thread
simorenoh marked this conversation as resolved.
)

deterministic_backoff = (
_TRANSIENT_SNAPSHOT_RETRY_INITIAL_BACKOFF_SECONDS * (2 ** (retry_attempt_count - 1))
)
jittered_backoff = _jittered_backoff(deterministic_backoff)
logger.warning(
"Routing-map fetch for collection '%s' returned overlapping or "
"gapped ranges (attempt %d/%d). Sleeping %.2fs and retrying.",
collection_link,
retry_attempt_count,
_TRANSIENT_SNAPSHOT_RETRY_MAX_ATTEMPTS,
jittered_backoff,
)
return jittered_backoff




def is_cache_unchanged_since_previous(
collection_routing_map_by_item: Dict[str, CollectionRoutingMap],
Expand Down Expand Up @@ -149,7 +228,7 @@ def _resolve_endpoint(client: Any) -> str:


class _IncrementalMergeFailed(Exception):
"""Sentinel raised by :func:`process_fetched_ranges` when the
"""Private exception type raised by :func:`process_fetched_ranges` when the
incremental update cannot resolve all partition key ranges.

The caller decides how to recover: retry the incremental fetch
Expand All @@ -162,7 +241,7 @@ def process_fetched_ranges(
collection_id: str,
collection_link: str,
new_etag: Optional[str],
) -> Optional[CollectionRoutingMap]:
) -> CollectionRoutingMap:
"""Turn raw PK-range results into a :class:`CollectionRoutingMap`.

Handles both initial-load (when *previous_routing_map* is ``None``)
Expand All @@ -177,10 +256,8 @@ def process_fetched_ranges(
:param str collection_id: The ID of the collection.
:param str collection_link: The link to the collection.
:param str new_etag: The ETag from the change feed response, or ``None``.
:return: The new/updated routing map, or ``None`` when an
initial load yields no ranges.
:return: The new/updated routing map.
:rtype: ~azure.cosmos._routing.collection_routing_map.CollectionRoutingMap
or None
:raises _IncrementalMergeFailed: When the incremental path cannot
resolve all ranges. The caller catches this and either retries
the incremental fetch or falls back to a full refresh.
Expand Down Expand Up @@ -257,7 +334,21 @@ def process_fetched_ranges(

unresolved = next_unresolved

result = previous_routing_map.try_combine(range_tuples, effective_etag)
try:
result = previous_routing_map.try_combine(range_tuples, effective_etag)
except ValueError as overlap_error:
Comment thread
dibahlfi marked this conversation as resolved.
# Convert the overlap ``ValueError`` to ``_IncrementalMergeFailed`` so
# the caller retries and falls back to a full refresh. Narrow the
# match to the ``"Ranges overlap"`` prefix so any unrelated
# ``ValueError`` still surfaces as a real bug.
if not str(overlap_error).startswith("Ranges overlap"):
raise
logger.warning(
"Incremental merge for collection '%s' produced overlapping ranges: %s. "
"Falling back to a full refresh.",
collection_link, str(overlap_error),
)
raise _IncrementalMergeFailed() from overlap_error
if not result:
logger.warning(
"Incremental merge resulted in incomplete routing map for "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
determine_refresh_action,
get_smart_overlapping_ranges,
_IncrementalMergeFailed,
_OverlapDetected,
_GapDetected,
_handle_transient_snapshot_retry_decision,
)


Expand Down Expand Up @@ -103,6 +106,8 @@
# Number of extra incremental attempts after an incomplete incremental merge
# before falling back to a full routing-map refresh.
_INCOMPLETE_ROUTING_MAP_MAX_RETRIES = 1


class PartitionKeyRangeCache(object):
"""
PartitionKeyRangeCache provides list of effective partition key ranges for a
Expand Down Expand Up @@ -307,9 +312,12 @@ async def get_routing_map(
**kwargs
)

# Update the cache.
if new_routing_map:
self._collection_routing_map_by_item[collection_id] = new_routing_map
# ``_fetch_routing_map`` always returns a populated
# ``CollectionRoutingMap`` on success and raises otherwise --
# No defensive None-check needed; one
# would only mask a future regression by silently leaving
# the cache empty instead of surfacing the failure.
self._collection_routing_map_by_item[collection_id] = new_routing_map

return self._collection_routing_map_by_item.get(collection_id)

Expand All @@ -321,7 +329,7 @@ async def _fetch_routing_map(
previous_routing_map: Optional[CollectionRoutingMap],
feed_options: Optional[Dict[str, Any]],
**kwargs
) -> Optional[CollectionRoutingMap]:
) -> CollectionRoutingMap:
"""Fetches or updates the routing map using an incremental change feed.

This method handles both the initial loading of a collection's routing
Expand All @@ -331,18 +339,30 @@ async def _fetch_routing_map(
of inconsistencies during an incremental update, it automatically falls
back to a full refresh.

Always returns a populated :class:`CollectionRoutingMap` on success.
Failure modes raise an exception rather than returning ``None``:
``CosmosHttpResponseError`` for the underlying network call (including
the transient HTTP 503 raised once the snapshot-inconsistency retry
budget is exhausted), or the internal ``_IncrementalMergeFailed``
signal when the incremental-merge path cannot make progress and there
is no previous map to fall back on.

:param str collection_link: The link to the collection.
:param str collection_id: The ID of the collection.
:param previous_routing_map: The last known routing map for incremental updates.
:type previous_routing_map: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None
:param feed_options: Options for the change feed request.
:type feed_options: dict or None
:return: The updated or newly created CollectionRoutingMap, or None if the update fails.
:rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap or None
:raises CosmosHttpResponseError: If the underlying request to fetch ranges fails.
:return: The updated or newly created CollectionRoutingMap.
:rtype: azure.cosmos.routing.collection_routing_map.CollectionRoutingMap
:raises CosmosHttpResponseError: If the underlying ``/pkranges`` fetch
fails, or if every snapshot-inconsistency retry exhausts the
budget (surfaced as HTTP 503 so the upstream retry policy can
take over).
"""
current_previous_map = previous_routing_map
incomplete_attempt_count = 0
inconsistency_attempt_count = 0
Comment thread
dibahlfi marked this conversation as resolved.

while True:
request_kwargs = dict(kwargs)
Expand Down Expand Up @@ -398,6 +418,18 @@ async def _fetch_routing_map(
continue

raise
except (_OverlapDetected, _GapDetected):
# Reset to ``None`` so the next attempt runs a full refresh
# instead of merging onto the same inconsistent base.
inconsistency_attempt_count += 1
backoff = _handle_transient_snapshot_retry_decision(
retry_attempt_count=inconsistency_attempt_count,
collection_link=collection_link,
logger=logger,
)
await asyncio.sleep(backoff)
current_previous_map = None
continue

async def get_range_by_partition_key_range_id(
self,
Expand Down
Loading
Loading