Skip to content

Commit e4c4b39

Browse files
committed
feed range query fix - initial commit
1 parent 13e7deb commit e4c4b39

6 files changed

Lines changed: 2566 additions & 100 deletions

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 194 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
import urllib.parse
3030
import uuid
3131
from concurrent.futures.thread import ThreadPoolExecutor
32-
from typing import Callable, Any, Iterable, Mapping, Optional, Sequence, Tuple, Union, cast
32+
from typing import Callable, Any, Dict, Iterable, Mapping, Optional, Sequence, Tuple, Union, cast
3333
from typing_extensions import TypedDict
3434
from urllib3.util.retry import Retry
3535

@@ -73,7 +73,18 @@
7373
from ._read_items_helper import ReadItemsHelperSync
7474
from ._request_object import RequestObject
7575
from ._retry_utility import ConnectionRetryPolicy
76-
from ._routing import routing_map_provider, routing_range
76+
from ._routing import routing_map_provider
77+
from ._routing.feed_range_continuation import (
78+
_apply_feedrange_request_headers,
79+
_build_scope_from_overlaps,
80+
_decode_token,
81+
_derive_initial_feedranges,
82+
_explode_feedrange_on_multi_overlap,
83+
_extract_resume_state,
84+
_normalize_max_item_count,
85+
_set_outbound_continuation,
86+
_validate_token_identity,
87+
)
7788
from ._query_advisor import get_query_advice_info
7889
from ._inference_service import _InferenceService
7990
from .documents import ConnectionPolicy, DatabaseAccount
@@ -3240,7 +3251,17 @@ def __QueryFeed( # pylint: disable=too-many-locals, too-many-statements, too-ma
32403251
if timeout is not None:
32413252
kwargs.setdefault("timeout", timeout)
32423253

3243-
internal_headers_capture = kwargs.pop("_internal_response_headers_capture", None)
3254+
internal_headers_capture: Optional[Dict[str, Any]] = kwargs.pop(
3255+
"_internal_response_headers_capture", None
3256+
)
3257+
3258+
def _capture_internal_headers(headers: Mapping[str, Any]) -> None:
3259+
# Local helper so flow analysis can narrow Optional[Dict] once
3260+
# and every call site stays a single line.
3261+
if internal_headers_capture is None:
3262+
return
3263+
internal_headers_capture.clear()
3264+
internal_headers_capture.update(headers)
32443265

32453266
if query:
32463267
__GetBodiesFromQueryResult = result_fn
@@ -3293,8 +3314,7 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]:
32933314
result, last_response_headers = self.__Get(path, request_params, headers, **kwargs)
32943315
self.last_response_headers = last_response_headers
32953316
if internal_headers_capture is not None:
3296-
internal_headers_capture.clear()
3297-
internal_headers_capture.update(last_response_headers)
3317+
_capture_internal_headers(last_response_headers)
32983318
if response_headers_list is not None:
32993319
response_headers_list.append(last_response_headers.copy())
33003320
if response_hook:
@@ -3348,64 +3368,187 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]:
33483368

33493369
# If feed_range_epk exist, query with the range
33503370
if feed_range_epk is not None:
3351-
over_lapping_ranges = self._routing_map_provider.get_overlapping_ranges(resource_id, [feed_range_epk],
3352-
options)
3353-
# It is possible to get more than one over lapping range. We need to get the query results for each one
3371+
# (a) Look at the continuation the caller passed in.
3372+
# - Empty or from a pre-fix SDK: start fresh.
3373+
# - One of our v=1 envelopes: check the collection, query, and
3374+
# feed_range still match before resuming from it.
3375+
inbound = _decode_token(options.get("continuation"))
3376+
if inbound is not None:
3377+
_validate_token_identity(inbound, resource_id, query, feed_range_epk)
3378+
current_feedrange, remaining_feedranges, next_backend_cont = _extract_resume_state(inbound)
3379+
else:
3380+
# First call (or legacy passthrough). Ask the routing map which
3381+
# partitions the input feed_range overlaps right now and turn
3382+
# each overlap into a feedrange (intersection of that partition
3383+
# and the input feed_range).
3384+
first_overlaps = self._routing_map_provider.get_overlapping_ranges(
3385+
resource_id, [feed_range_epk], options
3386+
)
3387+
all_feedranges = _derive_initial_feedranges(feed_range_epk, first_overlaps)
3388+
if not all_feedranges:
3389+
# The input feed_range does not overlap any current
3390+
# physical partition. Mirror the async path: stamp
3391+
# an empty headers object onto self.last_response_headers
3392+
# so callers reading it after a no-op call see a
3393+
# consistent (empty) value instead of whatever the
3394+
# previous request left behind.
3395+
empty_headers: CaseInsensitiveDict = CaseInsensitiveDict()
3396+
self.last_response_headers = empty_headers
3397+
return [], empty_headers
3398+
current_feedrange, remaining_feedranges = all_feedranges[0], all_feedranges[1:]
3399+
next_backend_cont = None
3400+
3401+
# (b) max_item_count is the cap for the page we hand back to the
3402+
# caller. We may need several backend POSTs to fill it (one per
3403+
# feedrange we have to query). Once the cap is reached we stop;
3404+
# any feedranges we have not started yet go into the outbound
3405+
# token so the next call picks up from there.
3406+
#
3407+
# Non-positive or non-numeric caps are normalized to "unbounded"
3408+
# (None) by _normalize_max_item_count: a cap of 0 / -1 would
3409+
# otherwise make the loop short-circuit before issuing any POST
3410+
# and emit a continuation token whose current_feedrange is
3411+
# unchanged - the caller would then pull empty pages forever
3412+
# without making progress.
3413+
remaining_budget = _normalize_max_item_count(options.get("maxItemCount"))
3414+
33543415
results: dict[str, Any] = {}
3355-
# For each over lapping range we will take a sub range of the feed range EPK that overlaps with the over
3356-
# lapping physical partition. The EPK sub range will be one of four:
3357-
# 1) Will have a range min equal to the feed range EPK min, and a range max equal to the over lapping
3358-
# partition
3359-
# 2) Will have a range min equal to the over lapping partition range min, and a range max equal to the
3360-
# feed range EPK range max.
3361-
# 3) will match exactly with the current over lapping physical partition, so we just return the over lapping
3362-
# physical partition's partition key id.
3363-
# 4) Will equal the feed range EPK since it is a sub range of a single physical partition
3364-
for over_lapping_range in over_lapping_ranges:
3365-
single_range = routing_range.Range.PartitionKeyRangeToRange(over_lapping_range)
3366-
# Since the range min and max are all Upper Cased string Hex Values,
3367-
# we can compare the values lexicographically
3368-
EPK_sub_range = routing_range.Range(range_min=max(single_range.min, feed_range_epk.min),
3369-
range_max=min(single_range.max, feed_range_epk.max),
3370-
isMinInclusive=True, isMaxInclusive=False)
3371-
3372-
# set the session token for this specific partition to avoid sending compound token for all partitions
3373-
base.set_session_token_header(self, req_headers, path, request_params, options,
3374-
over_lapping_range["id"])
3375-
if single_range.min == EPK_sub_range.min and EPK_sub_range.max == single_range.max:
3376-
# The Epk Sub Range spans exactly one physical partition
3377-
# In this case we can route to the physical pk range id
3378-
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
3416+
last_response_headers: CaseInsensitiveDict = CaseInsensitiveDict()
3417+
3418+
while True:
3419+
if remaining_budget is not None and remaining_budget <= 0:
3420+
break # cap reached; carry the unfinished feedranges forward
3421+
3422+
# Look up the live routing map for the current feedrange.
3423+
# Doing this every iteration is what makes the token
3424+
# split-safe.
3425+
overlapping = self._routing_map_provider.get_overlapping_ranges(
3426+
resource_id, [current_feedrange], options
3427+
)
3428+
overlapping, partition_scope = _build_scope_from_overlaps(
3429+
overlapping, current_feedrange
3430+
)
3431+
3432+
# Handle the case where Cosmos split a partition between the
3433+
# previous run and this one. Example: the saved
3434+
# current_feedrange used to live inside one partition X, but X
3435+
# has since been split into children X1 and X2. The routing
3436+
# map now returns two partitions for the same feedrange. If
3437+
# we sent one POST to X1 with X's full range as the EPK
3438+
# filter, the backend would filter in-partition only and
3439+
# silently drop every row living on X2 (that is how a resume
3440+
# after a split came back 19 ids short of ground truth in
3441+
# test_post_split_resume_async).
3442+
#
3443+
# So when the lookup returns more than one partition, slice
3444+
# the saved feedrange into one sub-feedrange per child
3445+
# (intersection with the saved feedrange, ordered by EPK
3446+
# min), make the first sub-feedrange the new current one,
3447+
# put the rest in front of the remaining list, and clear the
3448+
# saved backend continuation - it was issued by the old
3449+
# parent partition and the children won't accept it. The next
3450+
# loop iteration sees a single overlap and falls through to
3451+
# the normal single-partition POST below.
3452+
#
3453+
# Note: if the caller had already pulled some rows from X
3454+
# before the split, those rows show up again on this resume.
3455+
# The customer dedupes by document id.
3456+
current_feedrange, remaining_feedranges, did_explode = _explode_feedrange_on_multi_overlap(
3457+
current_feedrange,
3458+
overlapping,
3459+
remaining_feedranges,
3460+
)
3461+
if did_explode:
3462+
next_backend_cont = None
3463+
overlapping = self._routing_map_provider.get_overlapping_ranges(
3464+
resource_id, [current_feedrange], options
3465+
)
3466+
overlapping, partition_scope = _build_scope_from_overlaps(
3467+
overlapping, current_feedrange
3468+
)
3469+
3470+
sub_options = dict(options)
3471+
if remaining_budget is not None:
3472+
sub_options["maxItemCount"] = remaining_budget
3473+
if next_backend_cont is not None:
3474+
sub_options["continuation"] = next_backend_cont
33793475
else:
3380-
# The Epk Sub Range spans less than a single physical partition
3381-
# In this case we route to the physical partition and
3382-
# pass the epk sub range to the headers to filter within partition
3383-
req_headers[http_constants.HttpHeaders.PartitionKeyRangeID] = over_lapping_range["id"]
3384-
req_headers[http_constants.HttpHeaders.StartEpkString] = EPK_sub_range.min
3385-
req_headers[http_constants.HttpHeaders.EndEpkString] = EPK_sub_range.max
3386-
req_headers[http_constants.HttpHeaders.ReadFeedKeyType] = "EffectivePartitionKeyRange"
3387-
partial_result, last_response_headers = self.__Post(
3476+
sub_options.pop("continuation", None)
3477+
3478+
# Populate request headers for this single backend POST.
3479+
# The shared helper handles partition routing (PKR id +
3480+
# optional EPK filter), page-size cap, and continuation
3481+
# set/clear so the same rules apply to sync and async.
3482+
_apply_feedrange_request_headers(
3483+
req_headers,
3484+
overlapping,
3485+
partition_scope,
3486+
current_feedrange,
3487+
remaining_budget,
3488+
sub_options.get("continuation"),
3489+
)
3490+
# Use the session token for this specific partition so we don't
3491+
# send a compound token covering all partitions.
3492+
base.set_session_token_header(
3493+
self, req_headers, path, request_params, sub_options, overlapping[0]["id"]
3494+
)
3495+
3496+
partial_result, sub_response_headers = self.__Post(
33883497
path, request_params, query, req_headers, **kwargs
33893498
)
3499+
last_response_headers = sub_response_headers
33903500
self.last_response_headers = last_response_headers
33913501
if internal_headers_capture is not None:
3392-
internal_headers_capture.clear()
3393-
internal_headers_capture.update(last_response_headers)
3394-
self._UpdateSessionIfRequired(req_headers, partial_result, last_response_headers)
3395-
# Introducing a temporary complex function into a critical path to handle aggregated queries
3396-
# during splits, as a precaution falling back to the original logic if anything goes wrong
3502+
_capture_internal_headers(sub_response_headers)
3503+
self._UpdateSessionIfRequired(req_headers, partial_result, sub_response_headers)
3504+
3505+
# Merge results, falling back to a plain extend if the
3506+
# aggregating merge raises (it can on aggregated queries
3507+
# during splits).
33973508
try:
33983509
results = base._merge_query_results(results, partial_result, query)
3399-
except Exception: # pylint: disable=broad-exception-caught
3400-
# If the new merge logic fails, fall back to the original logic.
3510+
except Exception: # pylint: disable=broad-exception-caught
34013511
if results:
34023512
results["Documents"].extend(partial_result["Documents"])
34033513
else:
34043514
results = partial_result
3515+
3516+
items_returned = len(partial_result.get("Documents", []))
3517+
if remaining_budget is not None:
3518+
remaining_budget -= items_returned
34053519
if response_headers_list is not None:
3406-
response_headers_list.append(last_response_headers.copy())
3520+
response_headers_list.append(sub_response_headers.copy())
34073521
if response_hook:
3408-
response_hook(last_response_headers, partial_result)
3522+
response_hook(sub_response_headers, partial_result)
3523+
3524+
next_backend_cont = sub_response_headers.get(http_constants.HttpHeaders.Continuation)
3525+
if next_backend_cont:
3526+
# Current feedrange has more to give. Stay on it; the
3527+
# budget check at the top of the next iteration decides
3528+
# whether to issue another POST.
3529+
continue
3530+
3531+
# Current feedrange is drained. Move to the next one if there
3532+
# is one; otherwise we are done.
3533+
if not remaining_feedranges:
3534+
current_feedrange = None
3535+
break
3536+
current_feedrange = remaining_feedranges.pop(0)
3537+
next_backend_cont = None
3538+
3539+
# (c) Build the outbound token. Clear the continuation header if
3540+
# there is no work left at all.
3541+
_set_outbound_continuation(
3542+
last_response_headers,
3543+
resource_id,
3544+
query,
3545+
feed_range_epk,
3546+
current_feedrange,
3547+
remaining_feedranges,
3548+
next_backend_cont,
3549+
)
3550+
self.last_response_headers = last_response_headers
3551+
34093552
# if the prefix partition query has results lets return it
34103553
if results:
34113554
if last_response_headers.get(http_constants.HttpHeaders.IndexUtilization) is not None:
@@ -3417,12 +3560,12 @@ def __GetBodiesFromQueryResult(result: dict[str, Any]) -> list[dict[str, Any]]:
34173560
last_response_headers[http_constants.HttpHeaders.QueryAdvice] = (
34183561
get_query_advice_info(query_advice_raw))
34193562
return __GetBodiesFromQueryResult(results), last_response_headers
3563+
return [], last_response_headers
34203564

34213565
result, last_response_headers = self.__Post(path, request_params, query, req_headers, **kwargs)
34223566
self.last_response_headers = last_response_headers
34233567
if internal_headers_capture is not None:
3424-
internal_headers_capture.clear()
3425-
internal_headers_capture.update(last_response_headers)
3568+
_capture_internal_headers(last_response_headers)
34263569
self._UpdateSessionIfRequired(req_headers, result, last_response_headers)
34273570
if last_response_headers.get(http_constants.HttpHeaders.IndexUtilization) is not None:
34283571
INDEX_METRICS_HEADER = http_constants.HttpHeaders.IndexUtilization

0 commit comments

Comments
 (0)