Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8b03fa2
perf(cosmos): share pk range cache + __slots__ + skip .upper()
tvaron3 Apr 14, 2026
3ec8f5e
perf(cosmos): add PKRange namedtuple for compact partition key range …
tvaron3 Apr 14, 2026
2cd31c6
fix: resolve pylint, mypy, cspell errors in PKRange change
tvaron3 Apr 14, 2026
5448e75
perf(cosmos): add __slots__ to _PartitionHealthInfo + comments on Ran…
tvaron3 Apr 14, 2026
a63db88
fix: mypy type annotation + move cspell to cosmos package level
tvaron3 Apr 14, 2026
5407306
merge: resolve cspell.json conflict with upstream/main
tvaron3 Apr 14, 2026
5a0992f
test(cosmos): add integration + fault injection tests for shared cache
tvaron3 Apr 15, 2026
e1d4152
fix(cosmos): address review - clear_cache identity, PKRange indexing,…
tvaron3 Apr 15, 2026
7f04560
chore: remove harness artifacts from tracked files
tvaron3 Apr 15, 2026
44e87b0
fix(cosmos): resolve test failures — PKRange dict equality, test updates
tvaron3 Apr 16, 2026
a64fe14
chore: remove stale .temp artifact
tvaron3 Apr 16, 2026
e429f92
fix(cosmos): session token parents.copy(), shared cache test isolatio…
tvaron3 Apr 16, 2026
fafee80
chore: remove .temp artifact
tvaron3 Apr 16, 2026
dd32caf
fix(cosmos): test fixes — PKRange field assertions, remove looping fa…
tvaron3 Apr 16, 2026
770c5b1
test(cosmos): add async versions of all shared cache tests
tvaron3 Apr 17, 2026
bd830a0
fix(cosmos): async tests — drop enable_cross_partition_query, use que…
tvaron3 Apr 20, 2026
b0780c6
fix(cosmos): async tests — populate PK range cache via direct provide…
tvaron3 Apr 20, 2026
506d3fe
fix(cosmos): address iter-2 review — shared locks, cache release, PKR…
tvaron3 Apr 20, 2026
2320aae
fix(cosmos): pylint docstrings on _resolve_endpoint + async CRUD test…
tvaron3 Apr 20, 2026
dffcc7b
chore: untrack .coding-harness/ harness artifacts
tvaron3 Apr 20, 2026
4aaa556
ci: retrigger pipelines (flaky test_health_check_failure_startup_asyn…
tvaron3 Apr 20, 2026
ce63b96
doc(cosmos): document PKRange.__contains__ truthy-presence semantics
tvaron3 Apr 20, 2026
19e046e
chore: untrack .coding-harness/ harness artifacts (proper gitignore)
tvaron3 Apr 20, 2026
c474821
test(cosmos): bump test_timeout_for_read_items delay 2s→3s
tvaron3 Apr 20, 2026
96e1323
Merge remote-tracking branch 'upstream/main' into fix/shared-pk-range…
tvaron3 Apr 21, 2026
014dc89
chore(cosmos): address PR review comments
tvaron3 Apr 21, 2026
7a0730c
docs(cosmos): explain shared routing-map cache module-level globals
tvaron3 Apr 22, 2026
220fcf0
fix(cosmos): scope async pk-range locks per event loop, reset cache b…
Copilot Apr 23, 2026
335f6f5
Fix shared-cache test fixture to preserve dict identity
tvaron3 Apr 23, 2026
e2c698c
F2: preserve per-collection locks across clear_cache to keep single-f…
tvaron3 Apr 23, 2026
db156df
Address xinlian12 review + fix test_multi_client_shared_cache_queries
tvaron3 Apr 23, 2026
28fe446
Merge branch 'main' into fix/strip-pk-range-fields
tvaron3 Apr 23, 2026
2f70bbf
Fix Build Analyze: pylint C4732/C4739 + cspell TOCTOU
Apr 23, 2026
2c7a318
Address xinlian Apr 24 review: sync clear_cache + retain status/throu…
tvaron3 Apr 27, 2026
1abb340
Revert .gitignore changes — keep PR diff scoped to PKR cache work
tvaron3 Apr 27, 2026
eab73eb
Address xinlian review: dedupe _resolve_endpoint + PKRange construction
tvaron3 Apr 29, 2026
ea36af0
fix: pylint C4740 — restore type annotation on _resolve_endpoint
tvaron3 Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)

#### Other Changes
* Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297)

### 4.16.0b2 (2026-04-04)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3591,7 +3591,8 @@ def refresh_routing_map_provider(

If collection_link is provided, refreshes only that collection.
When previous_routing_map is provided this is incremental; otherwise this is a collection-scoped repopulation.
Without collection_link, it creates a new provider instance for a full refresh.
Without collection_link, it clears the shared routing-map cache in place
so the next request for any collection re-fetches from the service.

:param str collection_link: The collection link.
:param object previous_routing_map: The routing map that is considered stale.
Expand Down Expand Up @@ -3634,12 +3635,14 @@ def refresh_routing_map_provider(
status_code,
)
else:
# Full refresh - create a new provider instance. This clears all cached routing maps.
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
# Full refresh - clear the shared routing-map cache in place so all
# clients sharing this endpoint re-fetch on next use. The provider
# instance itself is preserved (shared cache design).
self._routing_map_provider.clear_cache()
return

# Fallback to full refresh when targeted refresh fails transiently.
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
self._routing_map_provider.clear_cache()

def _refresh_container_properties_cache(self, container_link: str):
# If container properties cache is stale, refresh it by reading the container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ class _PartitionHealthInfo(object):
"""
This internal class keeps the health and statistics for a partition.
"""
# __slots__ reduces per-instance memory by using a fixed-size C array
# instead of a per-instance __dict__. Significant when tracking many partitions.
__slots__ = (
'write_failure_count',
'read_failure_count',
'write_success_count',
'read_success_count',
'read_consecutive_failure_count',
'write_consecutive_failure_count',
'unavailability_info',
)

def __init__(self) -> None:
self.write_failure_count: int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges
from . import routing_range
from .routing_range import (
PKRange,
PartitionKeyRange,
_is_sorted_and_non_overlapping,
_subtract_range,
Expand Down Expand Up @@ -122,6 +123,31 @@ def prepare_fetch_options_and_headers(




def _resolve_endpoint(client: Any) -> str:
"""Return a cache key for ``client``'s endpoint.

Falls back to ``__unknown_<id>__`` when ``client`` has no ``url_connection``
so unknown/mocked clients are isolated rather than collapsed into a single
shared cache entry.

Centralized here so the sync (``routing_map_provider``) and async
(``aio.routing_map_provider``) modules use exactly the same fallback shape
— a divergence here would silently fragment the per-endpoint shared cache.

:param client: The CosmosClient (or compatible) instance whose endpoint
will be used as the shared-cache key.
:type client: Any
:returns: The endpoint URL string, or a per-instance fallback key when the
client does not expose ``url_connection``.
:rtype: str
"""
try:
return client.url_connection
except AttributeError:
return f"__unknown_{id(client)}__"


class _NeedFullRefresh(Exception):
"""Sentinel raised by :func:`process_fetched_ranges` when the
incremental update cannot be completed and a full refresh is needed."""
Expand Down Expand Up @@ -186,7 +212,7 @@ def process_fetched_ranges(
# Incremental update -- merge deltas into the existing map.
# Resolve parent chains transitively within this single delta so cascading
# splits (A->B+C and B->D+E in one payload) can be merged incrementally.
range_tuples: List[Tuple[Dict[str, Any], Any]] = []
range_tuples: List[Tuple[Any, Any]] = []
known_range_info_by_id = {
pkr_id: pkr_tuple[1]
for pkr_id, pkr_tuple in previous_routing_map._rangeById.items() # pylint: disable=protected-access
Expand All @@ -209,7 +235,7 @@ def process_fetched_ranges(
next_unresolved.append(r)
continue

range_tuples.append((r, range_info))
range_tuples.append((PKRange.from_dict(r), range_info))
known_range_info_by_id[r[PartitionKeyRange.Id]] = range_info
progress_made = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
"""
import asyncio # pylint: disable=do-not-import-asyncio
import logging
import threading
from typing import Dict, Any, Optional, List, TYPE_CHECKING
from azure.core.utils import CaseInsensitiveDict
from ... import _base, http_constants
from ..collection_routing_map import CollectionRoutingMap
from ...exceptions import CosmosHttpResponseError
from .._routing_map_provider_common import (
_resolve_endpoint,
prepare_fetch_options_and_headers,
process_fetched_ranges,
is_cache_unchanged_since_previous,
Expand All @@ -41,6 +43,60 @@

if TYPE_CHECKING:
from ...aio._cosmos_client_connection_async import CosmosClientConnection

# Module-level shared state, keyed by endpoint URL. All four dicts and the
# refcount are mutated only while holding ``_shared_cache_lock``. Sharing across
# every async CosmosClient that targets the same endpoint is what eliminates
# the per-client duplicate copies of the routing map (the memory win driving
# this change), and what lets concurrent readers single-flight a single
# refresh.

# endpoint -> { collection_id -> CollectionRoutingMap }. The actual cached
# routing maps. The inner dict is shared by every client for that endpoint, so
# a routing-map populated by one client is immediately visible to all others.
_shared_routing_map_cache: dict = {}

# endpoint -> { (loop_id, collection_id) -> asyncio.Lock }. Per-collection
# refresh lock, scoped to the asyncio event loop that owns it. We key by loop
# id (``id(asyncio.get_running_loop())``) because ``asyncio.Lock`` instances
# bind to the loop on first ``acquire()`` (CPython 3.10+) and raise
# ``RuntimeError: ... bound to a different event loop`` if reused from a
# different running loop. Single-flighting only needs to be per-loop in
# practice — coroutines on different loops have different connection pools
# and are effectively independent clients.
_shared_collection_locks: Dict[str, Dict[tuple, asyncio.Lock]] = {}

# endpoint -> threading.Lock. Guards the creation of new entries in the inner
# dict of ``_shared_collection_locks``. Was an ``asyncio.Lock`` previously,
# but its critical sections are pure dict reads/writes (no await), so a
# ``threading.Lock`` works identically and avoids the same loop-binding
# hazard described above. Without this guard, two coroutines racing on a
# brand-new (loop, collection_id) could each create a different Lock object
# and defeat the single-flight invariant.
_shared_locks_locks: Dict[str, threading.Lock] = {}

# endpoint -> int. Number of live async ``PartitionKeyRangeCache`` instances
# using this endpoint. Incremented on construction and decremented in
# ``release`` (called from ``CosmosClient.__aexit__`` / ``close`` / ``__del__``).
# When the count hits zero we drop the entry from all four dicts so an idle
# endpoint does not pin memory forever. ``clear_cache`` does NOT touch this
# count — it only wipes routing-map contents.
_shared_cache_refcounts: Dict[str, int] = {}

# Process-wide lock guarding the four dicts above for *this* (async) module.
# Note: the sync module ``_routing/routing_map_provider.py`` defines its own
# independent set of module-level dicts and its own ``_shared_cache_lock`` —
# state is NOT shared between the sync and async modules. A sync and an async
# ``CosmosClient`` targeting the same endpoint maintain separate routing-map
# caches. Using a ``threading.Lock`` (not an ``asyncio.Lock``) is also
# essential for correctness across multiple event loops in the same process:
# an ``asyncio.Lock`` binds to the loop that first acquires it. The critical
# sections this lock guards are pure dict reads/writes — never await, never
# network I/O — so a brief threading-lock acquisition from a coroutine is
# safe and does not block the event loop in any meaningful way.
_shared_cache_lock = threading.Lock()


# pylint: disable=protected-access

logger = logging.getLogger(__name__)
Expand All @@ -64,25 +120,99 @@ def __init__(self, client: Any):
"""

self._document_client = client
self._endpoint = _resolve_endpoint(client)
self._released = False

# Share routing map cache, per-collection asyncio locks, and the
# per-endpoint meta-lock that guards the per-collection-lock dict
# across all clients with the same endpoint. Refcount lets us evict
# the entry when the last sharing client releases it (see ``release``).
with _shared_cache_lock:
if self._endpoint not in _shared_routing_map_cache:
_shared_routing_map_cache[self._endpoint] = {}
_shared_collection_locks[self._endpoint] = {}
_shared_locks_locks[self._endpoint] = threading.Lock()
_shared_cache_refcounts[self._endpoint] = 0
_shared_cache_refcounts[self._endpoint] += 1
self._collection_routing_map_by_item = _shared_routing_map_cache[self._endpoint]
self._collection_locks: Dict[tuple, asyncio.Lock] = _shared_collection_locks[self._endpoint]
self._locks_lock: threading.Lock = _shared_locks_locks[self._endpoint]

def clear_cache(self):
"""Clear the shared routing map cache for this endpoint.

Uses in-place ``.clear()`` on the routing-map dict to preserve all
client references to the same dict object, so concurrent clients
sharing the endpoint continue to share a single cache instance.

The per-collection locks dict is intentionally **not** cleared here:
an in-flight ``_fetch_routing_map`` caller holds one of those locks
and will write its result into the (now-empty) shared cache when it
completes. Keeping the lock in place ensures that any concurrent
arrival serialises behind the in-flight refresh (single-flight
invariant) instead of racing it with a fresh lock. The locks dict
is evicted in ``release()`` once the endpoint refcount hits zero.
"""
with _shared_cache_lock:
if self._endpoint in _shared_routing_map_cache:
_shared_routing_map_cache[self._endpoint].clear()

def release(self) -> None:
"""Decrement the per-endpoint refcount and evict shared state at zero.

# keeps the cached collection routing map by collection id
self._collection_routing_map_by_item: Dict[str, CollectionRoutingMap] = {}
# A lock to control access to the locks dictionary itself
self._locks_lock = asyncio.Lock()
# A dictionary to hold a lock for each collection ID
self._collection_locks: Dict[str, asyncio.Lock] = {}
Safe to call multiple times concurrently. Best-effort: never raises.

The ``_released`` check-and-set is performed *inside* the shared
cache lock to close the TOCTOU window between two concurrent callers
(e.g. ``CosmosClient.__aexit__`` racing the GC's ``__del__``).
Without the lock, both callers could pass the early-return guard
before either set the flag, then both would decrement the refcount.
"""
endpoint = self._endpoint
try:
with _shared_cache_lock:
if self._released:
return
self._released = True
count = _shared_cache_refcounts.get(endpoint, 0) - 1
if count <= 0:
_shared_cache_refcounts.pop(endpoint, None)
_shared_routing_map_cache.pop(endpoint, None)
_shared_collection_locks.pop(endpoint, None)
_shared_locks_locks.pop(endpoint, None)
else:
_shared_cache_refcounts[endpoint] = count
except Exception: # pylint: disable=broad-except
# release() may be called from __del__ during interpreter shutdown
# where module globals may already be torn down.
pass

def __del__(self):
# Defensive fallback in case the owning client teardown path didn't
# call release(). Must never raise.
try:
self.release()
except Exception: # pylint: disable=broad-except
pass

async def _get_lock_for_collection(self, collection_id: str) -> asyncio.Lock:
"""Safely gets or creates a lock for a given collection ID.
"""Safely gets or creates a lock for a given (loop, collection) pair.

Scoped to the running event loop so the returned ``asyncio.Lock`` is
always bound to the loop that will await it — see the comment on
``_shared_collection_locks`` for the loop-binding rationale.

:param str collection_id: The ID of the collection.
:return: An asyncio.Lock specific to the collection ID.
:return: An asyncio.Lock specific to the (loop, collection) pair.
:rtype: asyncio.Lock
"""
async with self._locks_lock:
if collection_id not in self._collection_locks:
self._collection_locks[collection_id] = asyncio.Lock()
return self._collection_locks[collection_id]
key = (id(asyncio.get_running_loop()), collection_id)
with self._locks_lock:
lock = self._collection_locks.get(key)
if lock is None:
lock = asyncio.Lock()
self._collection_locks[key] = lock
return lock

def _is_cache_stale(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from typing import Optional, Union

from azure.cosmos._routing import routing_range
from azure.cosmos._routing.routing_range import PartitionKeyRange
from azure.cosmos._routing.routing_range import PartitionKeyRange, PKRange

# pylint: disable=line-too-long
class CollectionRoutingMap(object):
Expand Down Expand Up @@ -288,7 +288,10 @@ def _build_routing_map_from_ranges(
if PartitionKeyRange.Parents in r and r[PartitionKeyRange.Parents]:
gone_range_ids.update(r[PartitionKeyRange.Parents])

filtered_ranges = [r for r in ranges if r[PartitionKeyRange.Id] not in gone_range_ids]
filtered_ranges = [
PKRange.from_dict(r)
for r in ranges if r[PartitionKeyRange.Id] not in gone_range_ids
]
range_tuples = [(r, True) for r in filtered_ranges]

routing_map = CollectionRoutingMap.CompleteRoutingMap(
Expand Down
Loading
Loading