|
28 | 28 | """ |
29 | 29 |
|
30 | 30 | import logging |
| 31 | +import random |
31 | 32 | from typing import Any, Dict, List, Optional, Tuple |
32 | 33 |
|
33 | 34 | from .. import _base, http_constants |
34 | | -from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges |
| 35 | +from ..exceptions import CosmosHttpResponseError |
| 36 | +from .collection_routing_map import ( |
| 37 | + CollectionRoutingMap, |
| 38 | + _build_routing_map_from_ranges, |
| 39 | + _OverlapDetected, # noqa: F401 # re-exported for sync/async provider modules and tests |
| 40 | +) |
35 | 41 | from . import routing_range |
36 | 42 | from .routing_range import ( |
37 | 43 | PKRange, |
|
44 | 50 |
|
45 | 51 | PAGE_SIZE_CHANGE_FEED = "-1" # Return all available changes |
46 | 52 |
|
| 53 | +# Number of times the full-load path will re-fetch ``/pkranges`` when the |
| 54 | +# builder reports an overlap (``_OverlapDetected``). Overlap on the full-load |
| 55 | +# path is treated as a transient gateway inconsistency, so a small fixed |
| 56 | +# retry budget with backoff is preferred over surfacing immediately. After |
| 57 | +# this many attempts the caller surfaces a transient HTTP 503 so the |
| 58 | +# upstream retry policy can take over. |
| 59 | +# |
| 60 | +# Defined here (rather than in each provider module) so the sync and async |
| 61 | +# providers cannot drift on the retry budget — both import the same constant. |
| 62 | +_OVERLAP_RETRY_MAX_ATTEMPTS = 3 |
| 63 | +# Initial backoff between overlap retries; doubles each attempt. Worst-case |
| 64 | +# total sleep under the budget above is ~3.5s (0.5 + 1.0 + 2.0). |
| 65 | +_OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS = 0.5 |
| 66 | + |
| 67 | + |
| 68 | +def _jittered_backoff(backoff_seconds: float) -> float: |
| 69 | + """Return a uniformly-jittered backoff in the range ``[0, backoff_seconds]``. |
| 70 | +
|
| 71 | + Implements the "full jitter" strategy: the actual sleep is drawn uniformly |
| 72 | + from zero to the full deterministic backoff. This decorrelates concurrent |
| 73 | + retriers (for example, multiple Cosmos clients running inside a single |
| 74 | + PySpark process that all hit the same gateway node on the same bad |
| 75 | + ``/pkranges`` snapshot at the same instant) so they do not retry in |
| 76 | + lockstep and re-collide on the same gateway node. |
| 77 | +
|
| 78 | + The worst-case sleep per attempt is unchanged (still bounded by the |
| 79 | + deterministic backoff), so the documented retry-budget contract still |
| 80 | + holds; the expected per-attempt sleep is half of it. |
| 81 | + """ |
| 82 | + return random.uniform(0, backoff_seconds) |
| 83 | + |
| 84 | + |
| 85 | +def _handle_overlap_retry_decision( |
| 86 | + *, |
| 87 | + overlap_attempt_count: int, |
| 88 | + collection_link: str, |
| 89 | + logger: logging.Logger, # pylint: disable=redefined-outer-name |
| 90 | +) -> float: |
| 91 | + """Decide what to do after the full-load builder reported an overlap. |
| 92 | +
|
| 93 | + Centralises the sync/async-identical retry policy. Returns the number of |
| 94 | + seconds the caller should sleep before the next attempt. Raises |
| 95 | + :class:`CosmosHttpResponseError` (HTTP 503) when the attempt budget has |
| 96 | + been exhausted; the caller's existing retry policy then handles it as |
| 97 | + a transient error. |
| 98 | +
|
| 99 | + The returned sleep duration is jittered (see :func:`_jittered_backoff`) |
| 100 | + so concurrent retriers do not retry in lockstep. The deterministic |
| 101 | + backoff schedule (0.5s -> 1.0s -> 2.0s, doubling) defines the *upper |
| 102 | + bound* of each attempt's sleep; the actual sleep is drawn uniformly |
| 103 | + from ``[0, that upper bound]``. |
| 104 | +
|
| 105 | + The caller is responsible for the actual sleep (sync ``time.sleep`` or |
| 106 | + ``await asyncio.sleep``). Keeping the sleep at the call site is what |
| 107 | + lets this helper stay free of concurrency-runtime assumptions — the |
| 108 | + only line that has to differ between the sync and async providers. |
| 109 | +
|
| 110 | + :param int overlap_attempt_count: Number of overlap attempts made so far, |
| 111 | + including the one that just failed. Pass ``1`` after the first failure, |
| 112 | + ``2`` after the second, etc. |
| 113 | + :param str collection_link: Used in log messages and the 503 error body |
| 114 | + so the caller knows which collection ran out of budget. |
| 115 | + :param logging.Logger logger: Caller's module-level logger, so messages |
| 116 | + appear under the right ``azure.cosmos._routing.*`` namespace. |
| 117 | + :return: Jittered backoff seconds to sleep before retrying. Guaranteed |
| 118 | + to be in ``[0, deterministic_backoff_for_attempt]``. |
| 119 | + :rtype: float |
| 120 | + :raises CosmosHttpResponseError: When ``overlap_attempt_count`` has reached |
| 121 | + ``_OVERLAP_RETRY_MAX_ATTEMPTS``. Status code is 503 so the upstream |
| 122 | + retry policy classifies it as transient. |
| 123 | + """ |
| 124 | + if overlap_attempt_count >= _OVERLAP_RETRY_MAX_ATTEMPTS: |
| 125 | + logger.error( |
| 126 | + "Full-load routing-map fetch for collection '%s' detected " |
| 127 | + "overlapping partition key ranges on every one of %d attempt(s). " |
| 128 | + "Surfacing as transient HTTP 503 so the caller's retry policy " |
| 129 | + "can take over.", |
| 130 | + collection_link, |
| 131 | + overlap_attempt_count, |
| 132 | + ) |
| 133 | + raise CosmosHttpResponseError( |
| 134 | + status_code=http_constants.StatusCodes.SERVICE_UNAVAILABLE, |
| 135 | + message=( |
| 136 | + "Failed to build routing map for collection '{}': " |
| 137 | + "overlapping partition key ranges persisted across {} " |
| 138 | + "full-load attempt(s). Surfaced as a retryable transient " |
| 139 | + "error so the upstream retry policy can take over, rather " |
| 140 | + "than allowing the underlying ValueError to escape as a " |
| 141 | + "fatal crash." |
| 142 | + ).format(collection_link, overlap_attempt_count), |
| 143 | + ) |
| 144 | + |
| 145 | + deterministic_backoff = ( |
| 146 | + _OVERLAP_RETRY_INITIAL_BACKOFF_SECONDS * (2 ** (overlap_attempt_count - 1)) |
| 147 | + ) |
| 148 | + jittered_backoff = _jittered_backoff(deterministic_backoff) |
| 149 | + logger.warning( |
| 150 | + "Full-load routing-map fetch for collection '%s' detected overlapping " |
| 151 | + "partition key ranges (attempt %d/%d). Sleeping %.2fs (jittered from " |
| 152 | + "upper bound %.2fs) and retrying.", |
| 153 | + collection_link, |
| 154 | + overlap_attempt_count, |
| 155 | + _OVERLAP_RETRY_MAX_ATTEMPTS, |
| 156 | + jittered_backoff, |
| 157 | + deterministic_backoff, |
| 158 | + ) |
| 159 | + return jittered_backoff |
| 160 | + |
47 | 161 |
|
48 | 162 | def is_cache_unchanged_since_previous( |
49 | 163 | collection_routing_map_by_item: Dict[str, CollectionRoutingMap], |
@@ -257,7 +371,32 @@ def process_fetched_ranges( |
257 | 371 |
|
258 | 372 | unresolved = next_unresolved |
259 | 373 |
|
260 | | - result = previous_routing_map.try_combine(range_tuples, effective_etag) |
| 374 | + try: |
| 375 | + result = previous_routing_map.try_combine(range_tuples, effective_etag) |
| 376 | + except ValueError as overlap_error: |
| 377 | + # ``try_combine`` validates the merged map via |
| 378 | + # ``CollectionRoutingMap.is_complete_set_of_range`` and raises |
| 379 | + # ``ValueError("Ranges overlap: ...")`` if the merge produces a |
| 380 | + # self-contradictory tiling. This can happen during the incremental |
| 381 | + # path when the delta contains a range whose key span overlaps an |
| 382 | + # existing cached range without either side declaring the other a |
| 383 | + # parent. |
| 384 | + # |
| 385 | + # We must NOT let this ``ValueError`` escape: the cache layer above |
| 386 | + # treats a ``None`` routing map as "no ranges" and would convert |
| 387 | + # the bare exception into a silent empty-result return at |
| 388 | + # ``get_overlapping_ranges``. Convert to ``_IncrementalMergeFailed`` |
| 389 | + # so the caller's existing retry loop retries the incremental fetch |
| 390 | + # once and then falls back to the full-load path, which has its own |
| 391 | + # ``_OverlapDetected`` handler with retry+backoff and surfaces a |
| 392 | + # transient HTTP 503 if the inconsistency persists. |
| 393 | + logger.warning( |
| 394 | + "Incremental merge for collection '%s' produced overlapping ranges: %s. " |
| 395 | + "Converting to _IncrementalMergeFailed so the caller retries / " |
| 396 | + "falls back to a full refresh.", |
| 397 | + collection_link, str(overlap_error), |
| 398 | + ) |
| 399 | + raise _IncrementalMergeFailed() from overlap_error |
261 | 400 | if not result: |
262 | 401 | logger.warning( |
263 | 402 | "Incremental merge resulted in incomplete routing map for " |
|
0 commit comments