Skip to content

Commit b01afa8

Browse files
tvaron3CopilotTomas Varon
authored
perf(cosmos): improve pkrange cache memory usage (#46297)
* perf(cosmos): share pk range cache + __slots__ + skip .upper() 1. Share CollectionRoutingMap cache across clients per endpoint. Eliminates N-1 redundant copies when N clients target the same account. 2. Add __slots__ to Range class (64 bytes vs ~250 bytes per instance). 3. Skip .upper() when string is already uppercase. PPCB overhead (150 clients, tracemalloc): Original: 27.4 MB -> Patched: ~0 MB (-100%) At customer scale (200K partitions x 152 clients): ~2.1 GB -> ~14 MB Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * perf(cosmos): add PKRange namedtuple for compact partition key range storage Convert raw service response dicts to PKRange namedtuples in both full refresh (_build_routing_map_from_ranges) and incremental update (process_fetched_ranges) paths. PKRange retains only 4 fields (id, minInclusive, maxExclusive, parents) and supports dict-style access for backward compatibility. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: resolve pylint, mypy, cspell errors in PKRange change - Import PKRange in _routing_map_provider_common.py (fixes all emulator tests) - Fix namedtuple name mismatch (_PKRangeBase, not PKRange) for mypy - Use raise-from pattern in PKRange.__getitem__ (pylint W0707) - Move _locks_lock and _collection_locks init into __init__ (pylint W0201) - Add 'pkrange' to cspell dictionary Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * perf(cosmos): add __slots__ to _PartitionHealthInfo + comments on Range __slots__ Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: mypy type annotation + move cspell to cosmos package level - Widen range_tuples type to List[Tuple[Any, Any]] for PKRange compatibility - Move pkrange word to sdk/cosmos/azure-cosmos/cspell.json (not .vscode) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test(cosmos): add integration + fault injection tests for shared cache Integration tests (7): - Multi-client shared cache for reads and queries - clear_cache() transparent repopulation and cross-client propagation - Different endpoints isolated - PKRange full CRUD lifecycle and change feed compatibility Fault injection tests (6 sync + 6 async): - 410 Gone triggers cache refresh - Partition split (410/1002) refreshes routing map - Concurrent cache refresh with ThreadPoolExecutor/asyncio.gather - PKRange immutability (namedtuple guarantee) - Transient 503 during PKRange fetch with retry recovery - clear_cache during concurrent reads (no crash/corruption) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): address review - clear_cache identity, PKRange indexing, parents tuple Fixes from coding agent harness review iteration 1: F1: Fix async else branch in refresh_routing_map_provider to use clear_cache() instead of re-creating SmartRoutingMapProvider F2: Use dict.clear() in clear_cache() to preserve all client references (was creating new dict, orphaning other clients' references) F3: Clear _collection_locks under _locks_lock instead of replacing F4: Align async clear_cache() with sync (both use .clear()) F5: PKRange.__getitem__ supports integer indexing (int/slice → super()) F6: Convert parents to tuple at construction for true immutability F8: Fix tests to verify dict identity preserved after clear_cache F9: Cache .upper() result to avoid double call in slow path F11: Add changelog entry Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * chore: remove harness artifacts from tracked files Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): resolve test failures — PKRange dict equality, test updates - Add PKRange.__eq__ for dict comparison (existing tests compare against dicts) - Update partition split retry tests: assert clear_cache() instead of SmartRoutingMapProvider constructor (sync + async) - Fix sync test .close() calls (sync CosmosClient uses context manager, not .close()) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * chore: remove stale .temp artifact Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): session token parents.copy(), shared cache test isolation, container limits - Fix _session.py: parents.copy() -> list(parents) for tuple compatibility - Add url_connection + tearDown to routing_map_provider tests (cache isolation) - Use existing test containers instead of creating new ones (25-container limit) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * chore: remove .temp artifact Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): test fixes — PKRange field assertions, remove looping fault tests - test_routing_map.py: only check id/minInclusive/maxExclusive (PKRange's 4 fields) - Remove fault injection tests that loop infinitely (FaultInjectionTransport resets counter after max_inner_count, causing retry → re-fault → retry loop) - Keep: concurrent cache refresh, PKRange immutability, concurrent reads tests - Remove tearDown cache clearing (conflicts with setUpClass client refs) - Fix clear_cache repopulation test (don't assert empty between clear and read) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * test(cosmos): add async versions of all shared cache tests - test_shared_cache_integration_async.py: 7 async integration tests (multi-client reads/queries, clear_cache, endpoint isolation, CRUD, change feed) - test_shared_pk_range_cache_async.py: 5 async unit tests (cache sharing, isolation, clear_cache identity, cross-endpoint isolation) Total async test coverage: 7 integration + 5 unit + 3 fault injection = 15 async tests Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): async tests — drop enable_cross_partition_query, use query for cache population - async query_items() doesn't accept enable_cross_partition_query (TypeError in aiohttp) - async point reads don't populate the PK range cache; use a cross-partition query to deterministically populate it before/after clear_cache(). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): async tests — populate PK range cache via direct provider call Async query_items doesn't reliably populate _collection_routing_map_by_item the way sync cross-partition queries do. Add _populate_cache() helper that calls provider.get_routing_map() directly to deterministically populate the cache for assertions. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): address iter-2 review — shared locks, cache release, PKRange semantics - Per-endpoint shared collection_locks dict and locks_lock, eliminating fragile per-instance lock state when multiple PartitionKeyRangeCache instances target the same endpoint (sync + async). - Add reference counting (release/__del__) on the shared cache entries and wire release() into CosmosClient.__exit__ and __aexit__ so the shared cache is evicted when the last client for an endpoint closes. - Make async PartitionKeyRangeCache.clear_cache an async coroutine that acquires the per-endpoint asyncio.Lock under the threading meta-lock; update the two await sites in _cosmos_client_connection_async and the affected tests (await + AsyncMock). - _resolve_endpoint falls back to id(client) when url_connection is unavailable (e.g. MagicMock test clients) so isolation is preserved. - PKRange.__contains__: return False for missing fields or empty tuples to avoid spurious membership matches against unset parents. - PKRange.__eq__ dict branch: include parents in equality, normalizing both sides to tuple to handle service raw dicts with list/missing parents. - Restore parents assertion in test_routing_map_provider with tuple normalization on both sides. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): pylint docstrings on _resolve_endpoint + async CRUD test populate - Add :param/:returns/:rtype docstrings on _resolve_endpoint helper (sync + async) to satisfy azure-pylint-guidelines-checker (C4739/41/42). - test_pkrange_survives_full_crud_lifecycle_async: drive routing-aware query via _populate_cache before asserting cache populated. Async point reads/writes don't reliably populate _collection_routing_map_by_item the way sync does. * chore: untrack .coding-harness/ harness artifacts * ci: retrigger pipelines (flaky test_health_check_failure_startup_async on py39 dep-checks) * doc(cosmos): document PKRange.__contains__ truthy-presence semantics Per iter-3 reviewer minor finding F3 — clarify that 'key in pkr' returns False for absent or empty fields so callers can use it as a single truthy presence check (matching the legacy raw-dict behaviour where the field was simply missing when empty). * chore: untrack .coding-harness/ harness artifacts (proper gitignore) * test(cosmos): bump test_timeout_for_read_items delay 2s→3s The test reads items across multiple physical partitions through a transport that delays each request by N seconds, expecting cumulative delay to exceed the 5s timeout. With shared routing-map cache, the new delayed client inherits the routing map populated when the test container was created, eliminating one HTTP request from the timed path. With 2 physical partitions × 2s = 4s, the test no longer reaches the 5s timeout and the assertion fails on the circuit_breaker_MultiMaster job (which provisions exactly 2 partitions for offer_throughput=11000). Bumping per-request delay to 3s (2 partitions × 3s = 6s) makes the test robust regardless of cache-warming state. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * chore(cosmos): address PR review comments - Remove unused imports across test files (patch, uuid, PartitionKey, PKRange, Range, sys, pytest_asyncio, FaultInjectionTransport, CosmosHttpResponseError, duplicate PartitionKeyRangeCache import). - Use CosmosClient as a context manager in tests so shared-cache refcounting is released deterministically instead of relying on GC (sync integration, sync fault-injection worker/reader helpers). - Clear shared routing-map cache in tearDownClass / asyncTearDown so module-level state does not leak across test classes in the same process. - Use parents=() (immutable tuple) instead of parents=[] to match the PKRange namedtuple contract and preserve deep immutability. - Update stale docstring/inline comments in refresh_routing_map_provider and test docstring to reflect the in-place clear() of the shared cache instead of the old 'create a new provider instance' wording. - Drop the brittle sys.getsizeof(pkr) < 100 assertion from test_range_has_slots; the __slots__ contract is already verified via hasattr(__dict__). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * docs(cosmos): explain shared routing-map cache module-level globals Add per-line comments above each of the five module-level globals in both sync and async routing_map_provider.py describing: - _shared_routing_map_cache: the actual cached routing maps shared across every client for an endpoint - _shared_collection_locks: per-collection single-flight refresh lock - _shared_locks_locks: guards the creation of new collection-locks to preserve the single-flight invariant under races - _shared_cache_refcounts: ref-count of live clients per endpoint, used to GC the entry when the last client closes - _shared_cache_lock: process-wide threading.Lock guarding all four dicts; intentionally threading (not asyncio) so it can be shared between sync and async paths and across event loops Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix(cosmos): scope async pk-range locks per event loop, reset cache between tests Two related fixes flagged in deep review of the shared partition-key-range cache: F1 — async locks at module scope broke across event loops asyncio.Lock binds to the event loop on first acquire (CPython 3.10+) and raises 'RuntimeError: ... bound to a different event loop' if reused from another running loop. Both _shared_locks_locks (per-endpoint meta-lock) and _shared_collection_locks (per-collection refresh lock) held module- level asyncio.Lock instances, which fails for: * pytest-asyncio's default function-scoped event loop (second async test against the same emulator endpoint hits the bug) * re-entrant asyncio.run() (uvicorn worker reload, jupyter kernel restart, multiprocessing fork) Fix: * _shared_locks_locks: asyncio.Lock -> threading.Lock. Its critical sections are pure dict reads/writes with no awaits, so a threading lock is identical in semantics and loop-agnostic. * _shared_collection_locks: keyed by (loop_id, collection_id) instead of just collection_id. _get_lock_for_collection now uses id(asyncio.get_running_loop()) so each loop owns its own asyncio.Lock and single-flighting is correctly scoped per loop. F3 — no autouse fixture clearing shared globals between tests Existing test base classes construct CosmosClient without 'with', leaving refcount entries pinned for the test process lifetime. The new shared- cache test files added their own cache-clear teardowns but only for _shared_routing_map_cache, missing _shared_collection_locks, _shared_locks_locks, and _shared_cache_refcounts; existing tests cleared nothing. Result: order-dependent failures and flakiness in any test that asserts on routing-map cache state or _ReadPartitionKeyRanges call counts. Fix: autouse pytest fixture in tests/conftest.py that clears all four globals on both sync and async modules after every test. (F2 — clear_cache stale-write race during in-flight refresh — deferred to a follow-up PR; needs a generation counter for a complete fix.) Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix shared-cache test fixture to preserve dict identity The autouse fixture cleared the _shared_routing_map_cache registry between tests, which orphaned the inner-dict references held by long-lived class-level CosmosClient fixtures (e.g. test_shared_cache_integration's self.client1). The next test that constructed a second client for the same endpoint got a brand-new inner dict, breaking the cache-sharing invariant the tests assert via assertIs. Now we only clear the *contents* of each per-endpoint cache dict (and per-endpoint locks dict). The registry mappings stay intact so existing clients continue to share the same inner objects, while the staleness between tests that motivated the fixture is still resolved. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * F2: preserve per-collection locks across clear_cache to keep single-flight clear_cache previously did self._collection_locks.clear() alongside the routing-map wipe. That opened a stale-write race: * An in-flight _fetch_routing_map holds a per-collection lock that was just removed from the dict. * It finishes its network call and writes into the (just-cleared) shared cache. * A concurrent arrival creates a brand-new lock for the same collection and races the in-flight refresher — both can write, last wins. Worst case: the in-flight result pre-dates the cause of clear_cache (e.g. a 410 split notification), so a stale routing map lives in the cache as fresh until the next force-refresh. Fix: do not touch self._collection_locks in clear_cache. The in-flight holder still owns its lock; the next arrival acquires the same lock and serialises behind the in-flight write, preserving the single-flight invariant. The locks dict is still cleaned up in release() when the endpoint refcount hits zero. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address xinlian12 review + fix test_multi_client_shared_cache_queries Fixes from the @sdkReviewAgent inline comments on PR #46297 plus the CI test failure introduced by the conftest reset fixture. C1 — TOCTOU on _released (sync + async release()): Move the check-and-set of self._released INSIDE the _shared_cache_lock block. Previously two concurrent callers (e.g. __exit__ racing __del__) could both pass the early-return guard before either set the flag, then both decrement the refcount. Added a threaded barrier-based regression test that demonstrates the fix. C2 — Sync CosmosClient.close(): Added close() to sync CosmosClient mirroring the async client's close(). Now that release() manages process-global refcounts, users that don't use 'with' need a deterministic teardown path. Delegates to __exit__. C3 — Comment correctness: Fixed misleading comment on _shared_cache_lock claiming sync and async modules share state — they don't, each module has its own globals. Also fixed the refcount comment that said clear_cache decrements (it does not — only release() does). C4 — _session.py:386 regression coverage: Added focused unit tests in test_session_token_unit.py for the list(pk_range[0].get('parents') or ()) migration: PKRange-tuple input, None parents, empty parents, tuple parents, and the parents-then-self walk semantics. C5 — release() lifecycle coverage: Added 8 sync + 4 async lifecycle tests in tests/routing/: - construct increments refcount - release decrements / multi-client decrement - release evicts all four globals at zero - release does not evict with other clients alive - release is idempotent (sequential double-call) - concurrent release does not double-decrement (TOCTOU regression) - __del__ fallback releases when client teardown was skipped - clear_cache does not change refcount Test failure fix — test_multi_client_shared_cache_queries: Added _populate_cache helper to the sync integration test that calls PartitionKeyRangeCache.get_routing_map directly (mirroring the async sibling test). The previous version asserted that query_items(... cross_partition=True) populated _collection_routing_map_by_item, which is an implementation detail. The autouse conftest fixture exposed this fragility — the test had been passing only by accident due to cache state left by earlier tests. Teardown completeness: Updated tearDown / tearDownClass in both routing/test_shared_pk_range_cache(_async).py and test_shared_cache_integration(_async).py to clear ALL FOUR shared- cache globals (_shared_routing_map_cache, _shared_collection_locks, _shared_locks_locks, _shared_cache_refcounts) rather than only the routing-map dict. Avoids order-dependent leaks and refcount drift. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Fix Build Analyze: pylint C4732/C4739 + cspell TOCTOU - cosmos_client.py: disable specify-parameter-names-in-call on __exit__(None, None, None) — sentinels are positional by Python convention. - routing_range.py: add :param/:returns/:rtype to PKRange.__contains__ docstring. - cspell.json: add 'toctou' to ignoreWords (used in race-condition comments). Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address xinlian Apr 24 review: sync clear_cache + retain status/throughputFraction Address xinlian12's two latest review comments on PR #46297, plus retain two non-routing PKR fields based on bluebird-grounded review. clear_cache: async -> sync - aio/routing_map_provider.py: clear_cache no longer async (no awaits inside, uses threading.Lock + dict.clear()). Mirrors the sync release() signature. - aio/_cosmos_client_connection_async.py: drop await from the 2 callers. - tests/test_partition_split_retry_unit_async.py: AsyncMock -> MagicMock. - tests/routing/test_shared_pk_range_cache_async.py, tests/test_shared_cache_fault_injection_async.py, tests/test_shared_cache_integration_async.py: drop await from clear_cache call sites in async tests. PKRange: retain status and throughputFraction - routing_range.py: add status and throughputFraction to _PKRangeBase namedtuple with defaults=(None, None) for back-compat. Add Status and ThroughputFraction constants. - collection_routing_map.py + _routing_map_provider_common.py: propagate both fields when constructing PKRange from raw service dicts (full-load and incremental merge paths). Tests - test_shared_pk_range_cache.py: add test_pkrange_contains_truthy_presence_for_parents covering parents=() (most common production case, partition has never split). - test_shared_pk_range_cache.py: add test_pkrange_status_and_throughput_fraction_fields_roundtrip covering default-None back-compat plus explicit values via dict-style access and __contains__ truthy-presence semantic. All 143 routing/cache/split-retry tests pass locally against a live account. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Revert .gitignore changes — keep PR diff scoped to PKR cache work Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * Address xinlian review: dedupe _resolve_endpoint + PKRange construction Per xinlian's review (PR #46297): two duplications were called out: 1. _resolve_endpoint() was identical in sync and async modules. Moved to _routing_map_provider_common.py; both modules import the shared implementation. Prevents silent fallback-shape divergence that would fragment the per-endpoint shared cache. 2. PKRange construction was duplicated in both code paths: - collection_routing_map._build_routing_map_from_ranges (full build) - _routing_map_provider_common.process_fetched_ranges (incremental merge) Added PKRange.from_dict(raw) classmethod factory in routing_range.py; both call sites now use it. Field-mapping policy lives in exactly one place — adding/removing a field touches one line, not two. Net diff: 32 lines deduplicated across 3 files. No behavior change. All 143 existing tests in tests/routing/, tests/test_partition_split_retry_unit*, tests/test_shared_cache_integration*, tests/test_shared_cache_fault_injection_async still pass against tomasvaron-cdb. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> * fix: pylint C4740 — restore type annotation on _resolve_endpoint Build Analyze pylint job (build 6228688) flagged C4740(docstring-missing-type) on _resolve_endpoint after it was moved to _routing_map_provider_common.py in eab73eb. The original sync/async versions had `client: Any` and `-> str` annotations plus the matching `:type client: Any` docstring line — those were dropped during the move. Restored the function signature to `def _resolve_endpoint(client: Any) -> str:` (matching the originals) and added the missing `:type client: Any` docstring entry. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --------- Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> Co-authored-by: tvaron3 <tvaron3@users.noreply.github.com> Co-authored-by: Tomas Varon <tvaron@microsoft.com>
1 parent 44cfa46 commit b01afa8

28 files changed

Lines changed: 1773 additions & 79 deletions

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)
1111

1212
#### Other Changes
13+
* Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297)
1314

1415
### 4.16.0b2 (2026-04-04)
1516

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3591,7 +3591,8 @@ def refresh_routing_map_provider(
35913591
35923592
If collection_link is provided, refreshes only that collection.
35933593
When previous_routing_map is provided this is incremental; otherwise this is a collection-scoped repopulation.
3594-
Without collection_link, it creates a new provider instance for a full refresh.
3594+
Without collection_link, it clears the shared routing-map cache in place
3595+
so the next request for any collection re-fetches from the service.
35953596
35963597
:param str collection_link: The collection link.
35973598
:param object previous_routing_map: The routing map that is considered stale.
@@ -3634,12 +3635,14 @@ def refresh_routing_map_provider(
36343635
status_code,
36353636
)
36363637
else:
3637-
# Full refresh - create a new provider instance. This clears all cached routing maps.
3638-
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
3638+
# Full refresh - clear the shared routing-map cache in place so all
3639+
# clients sharing this endpoint re-fetch on next use. The provider
3640+
# instance itself is preserved (shared cache design).
3641+
self._routing_map_provider.clear_cache()
36393642
return
36403643

36413644
# Fallback to full refresh when targeted refresh fails transiently.
3642-
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
3645+
self._routing_map_provider.clear_cache()
36433646

36443647
def _refresh_container_properties_cache(self, container_link: str):
36453648
# If container properties cache is stale, refresh it by reading the container.

sdk/cosmos/azure-cosmos/azure/cosmos/_partition_health_tracker.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,17 @@ class _PartitionHealthInfo(object):
5050
"""
5151
This internal class keeps the health and statistics for a partition.
5252
"""
53+
# __slots__ reduces per-instance memory by using a fixed-size C array
54+
# instead of a per-instance __dict__. Significant when tracking many partitions.
55+
__slots__ = (
56+
'write_failure_count',
57+
'read_failure_count',
58+
'write_success_count',
59+
'read_success_count',
60+
'read_consecutive_failure_count',
61+
'write_consecutive_failure_count',
62+
'unavailability_info',
63+
)
5364

5465
def __init__(self) -> None:
5566
self.write_failure_count: int = 0

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/_routing_map_provider_common.py

Lines changed: 28 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges
3535
from . import routing_range
3636
from .routing_range import (
37+
PKRange,
3738
PartitionKeyRange,
3839
_is_sorted_and_non_overlapping,
3940
_subtract_range,
@@ -122,6 +123,31 @@ def prepare_fetch_options_and_headers(
122123

123124

124125

126+
127+
def _resolve_endpoint(client: Any) -> str:
128+
"""Return a cache key for ``client``'s endpoint.
129+
130+
Falls back to ``__unknown_<id>__`` when ``client`` has no ``url_connection``
131+
so unknown/mocked clients are isolated rather than collapsed into a single
132+
shared cache entry.
133+
134+
Centralized here so the sync (``routing_map_provider``) and async
135+
(``aio.routing_map_provider``) modules use exactly the same fallback shape
136+
— a divergence here would silently fragment the per-endpoint shared cache.
137+
138+
:param client: The CosmosClient (or compatible) instance whose endpoint
139+
will be used as the shared-cache key.
140+
:type client: Any
141+
:returns: The endpoint URL string, or a per-instance fallback key when the
142+
client does not expose ``url_connection``.
143+
:rtype: str
144+
"""
145+
try:
146+
return client.url_connection
147+
except AttributeError:
148+
return f"__unknown_{id(client)}__"
149+
150+
125151
class _NeedFullRefresh(Exception):
126152
"""Sentinel raised by :func:`process_fetched_ranges` when the
127153
incremental update cannot be completed and a full refresh is needed."""
@@ -186,7 +212,7 @@ def process_fetched_ranges(
186212
# Incremental update -- merge deltas into the existing map.
187213
# Resolve parent chains transitively within this single delta so cascading
188214
# splits (A->B+C and B->D+E in one payload) can be merged incrementally.
189-
range_tuples: List[Tuple[Dict[str, Any], Any]] = []
215+
range_tuples: List[Tuple[Any, Any]] = []
190216
known_range_info_by_id = {
191217
pkr_id: pkr_tuple[1]
192218
for pkr_id, pkr_tuple in previous_routing_map._rangeById.items() # pylint: disable=protected-access
@@ -209,7 +235,7 @@ def process_fetched_ranges(
209235
next_unresolved.append(r)
210236
continue
211237

212-
range_tuples.append((r, range_info))
238+
range_tuples.append((PKRange.from_dict(r), range_info))
213239
known_range_info_by_id[r[PartitionKeyRange.Id]] = range_info
214240
progress_made = True
215241

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/aio/routing_map_provider.py

Lines changed: 142 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,14 @@
2424
"""
2525
import asyncio # pylint: disable=do-not-import-asyncio
2626
import logging
27+
import threading
2728
from typing import Dict, Any, Optional, List, TYPE_CHECKING
2829
from azure.core.utils import CaseInsensitiveDict
2930
from ... import _base, http_constants
3031
from ..collection_routing_map import CollectionRoutingMap
3132
from ...exceptions import CosmosHttpResponseError
3233
from .._routing_map_provider_common import (
34+
_resolve_endpoint,
3335
prepare_fetch_options_and_headers,
3436
process_fetched_ranges,
3537
is_cache_unchanged_since_previous,
@@ -41,6 +43,60 @@
4143

4244
if TYPE_CHECKING:
4345
from ...aio._cosmos_client_connection_async import CosmosClientConnection
46+
47+
# Module-level shared state, keyed by endpoint URL. All four dicts and the
48+
# refcount are mutated only while holding ``_shared_cache_lock``. Sharing across
49+
# every async CosmosClient that targets the same endpoint is what eliminates
50+
# the per-client duplicate copies of the routing map (the memory win driving
51+
# this change), and what lets concurrent readers single-flight a single
52+
# refresh.
53+
54+
# endpoint -> { collection_id -> CollectionRoutingMap }. The actual cached
55+
# routing maps. The inner dict is shared by every client for that endpoint, so
56+
# a routing-map populated by one client is immediately visible to all others.
57+
_shared_routing_map_cache: dict = {}
58+
59+
# endpoint -> { (loop_id, collection_id) -> asyncio.Lock }. Per-collection
60+
# refresh lock, scoped to the asyncio event loop that owns it. We key by loop
61+
# id (``id(asyncio.get_running_loop())``) because ``asyncio.Lock`` instances
62+
# bind to the loop on first ``acquire()`` (CPython 3.10+) and raise
63+
# ``RuntimeError: ... bound to a different event loop`` if reused from a
64+
# different running loop. Single-flighting only needs to be per-loop in
65+
# practice — coroutines on different loops have different connection pools
66+
# and are effectively independent clients.
67+
_shared_collection_locks: Dict[str, Dict[tuple, asyncio.Lock]] = {}
68+
69+
# endpoint -> threading.Lock. Guards the creation of new entries in the inner
70+
# dict of ``_shared_collection_locks``. Was an ``asyncio.Lock`` previously,
71+
# but its critical sections are pure dict reads/writes (no await), so a
72+
# ``threading.Lock`` works identically and avoids the same loop-binding
73+
# hazard described above. Without this guard, two coroutines racing on a
74+
# brand-new (loop, collection_id) could each create a different Lock object
75+
# and defeat the single-flight invariant.
76+
_shared_locks_locks: Dict[str, threading.Lock] = {}
77+
78+
# endpoint -> int. Number of live async ``PartitionKeyRangeCache`` instances
79+
# using this endpoint. Incremented on construction and decremented in
80+
# ``release`` (called from ``CosmosClient.__aexit__`` / ``close`` / ``__del__``).
81+
# When the count hits zero we drop the entry from all four dicts so an idle
82+
# endpoint does not pin memory forever. ``clear_cache`` does NOT touch this
83+
# count — it only wipes routing-map contents.
84+
_shared_cache_refcounts: Dict[str, int] = {}
85+
86+
# Process-wide lock guarding the four dicts above for *this* (async) module.
87+
# Note: the sync module ``_routing/routing_map_provider.py`` defines its own
88+
# independent set of module-level dicts and its own ``_shared_cache_lock`` —
89+
# state is NOT shared between the sync and async modules. A sync and an async
90+
# ``CosmosClient`` targeting the same endpoint maintain separate routing-map
91+
# caches. Using a ``threading.Lock`` (not an ``asyncio.Lock``) is also
92+
# essential for correctness across multiple event loops in the same process:
93+
# an ``asyncio.Lock`` binds to the loop that first acquires it. The critical
94+
# sections this lock guards are pure dict reads/writes — never await, never
95+
# network I/O — so a brief threading-lock acquisition from a coroutine is
96+
# safe and does not block the event loop in any meaningful way.
97+
_shared_cache_lock = threading.Lock()
98+
99+
44100
# pylint: disable=protected-access
45101

46102
logger = logging.getLogger(__name__)
@@ -64,25 +120,99 @@ def __init__(self, client: Any):
64120
"""
65121

66122
self._document_client = client
123+
self._endpoint = _resolve_endpoint(client)
124+
self._released = False
125+
126+
# Share routing map cache, per-collection asyncio locks, and the
127+
# per-endpoint meta-lock that guards the per-collection-lock dict
128+
# across all clients with the same endpoint. Refcount lets us evict
129+
# the entry when the last sharing client releases it (see ``release``).
130+
with _shared_cache_lock:
131+
if self._endpoint not in _shared_routing_map_cache:
132+
_shared_routing_map_cache[self._endpoint] = {}
133+
_shared_collection_locks[self._endpoint] = {}
134+
_shared_locks_locks[self._endpoint] = threading.Lock()
135+
_shared_cache_refcounts[self._endpoint] = 0
136+
_shared_cache_refcounts[self._endpoint] += 1
137+
self._collection_routing_map_by_item = _shared_routing_map_cache[self._endpoint]
138+
self._collection_locks: Dict[tuple, asyncio.Lock] = _shared_collection_locks[self._endpoint]
139+
self._locks_lock: threading.Lock = _shared_locks_locks[self._endpoint]
140+
141+
def clear_cache(self):
142+
"""Clear the shared routing map cache for this endpoint.
143+
144+
Uses in-place ``.clear()`` on the routing-map dict to preserve all
145+
client references to the same dict object, so concurrent clients
146+
sharing the endpoint continue to share a single cache instance.
147+
148+
The per-collection locks dict is intentionally **not** cleared here:
149+
an in-flight ``_fetch_routing_map`` caller holds one of those locks
150+
and will write its result into the (now-empty) shared cache when it
151+
completes. Keeping the lock in place ensures that any concurrent
152+
arrival serialises behind the in-flight refresh (single-flight
153+
invariant) instead of racing it with a fresh lock. The locks dict
154+
is evicted in ``release()`` once the endpoint refcount hits zero.
155+
"""
156+
with _shared_cache_lock:
157+
if self._endpoint in _shared_routing_map_cache:
158+
_shared_routing_map_cache[self._endpoint].clear()
159+
160+
def release(self) -> None:
161+
"""Decrement the per-endpoint refcount and evict shared state at zero.
67162
68-
# keeps the cached collection routing map by collection id
69-
self._collection_routing_map_by_item: Dict[str, CollectionRoutingMap] = {}
70-
# A lock to control access to the locks dictionary itself
71-
self._locks_lock = asyncio.Lock()
72-
# A dictionary to hold a lock for each collection ID
73-
self._collection_locks: Dict[str, asyncio.Lock] = {}
163+
Safe to call multiple times concurrently. Best-effort: never raises.
164+
165+
The ``_released`` check-and-set is performed *inside* the shared
166+
cache lock to close the TOCTOU window between two concurrent callers
167+
(e.g. ``CosmosClient.__aexit__`` racing the GC's ``__del__``).
168+
Without the lock, both callers could pass the early-return guard
169+
before either set the flag, then both would decrement the refcount.
170+
"""
171+
endpoint = self._endpoint
172+
try:
173+
with _shared_cache_lock:
174+
if self._released:
175+
return
176+
self._released = True
177+
count = _shared_cache_refcounts.get(endpoint, 0) - 1
178+
if count <= 0:
179+
_shared_cache_refcounts.pop(endpoint, None)
180+
_shared_routing_map_cache.pop(endpoint, None)
181+
_shared_collection_locks.pop(endpoint, None)
182+
_shared_locks_locks.pop(endpoint, None)
183+
else:
184+
_shared_cache_refcounts[endpoint] = count
185+
except Exception: # pylint: disable=broad-except
186+
# release() may be called from __del__ during interpreter shutdown
187+
# where module globals may already be torn down.
188+
pass
189+
190+
def __del__(self):
191+
# Defensive fallback in case the owning client teardown path didn't
192+
# call release(). Must never raise.
193+
try:
194+
self.release()
195+
except Exception: # pylint: disable=broad-except
196+
pass
74197

75198
async def _get_lock_for_collection(self, collection_id: str) -> asyncio.Lock:
76-
"""Safely gets or creates a lock for a given collection ID.
199+
"""Safely gets or creates a lock for a given (loop, collection) pair.
200+
201+
Scoped to the running event loop so the returned ``asyncio.Lock`` is
202+
always bound to the loop that will await it — see the comment on
203+
``_shared_collection_locks`` for the loop-binding rationale.
77204
78205
:param str collection_id: The ID of the collection.
79-
:return: An asyncio.Lock specific to the collection ID.
206+
:return: An asyncio.Lock specific to the (loop, collection) pair.
80207
:rtype: asyncio.Lock
81208
"""
82-
async with self._locks_lock:
83-
if collection_id not in self._collection_locks:
84-
self._collection_locks[collection_id] = asyncio.Lock()
85-
return self._collection_locks[collection_id]
209+
key = (id(asyncio.get_running_loop()), collection_id)
210+
with self._locks_lock:
211+
lock = self._collection_locks.get(key)
212+
if lock is None:
213+
lock = asyncio.Lock()
214+
self._collection_locks[key] = lock
215+
return lock
86216

87217
def _is_cache_stale(
88218
self,

sdk/cosmos/azure-cosmos/azure/cosmos/_routing/collection_routing_map.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from typing import Optional, Union
2828

2929
from azure.cosmos._routing import routing_range
30-
from azure.cosmos._routing.routing_range import PartitionKeyRange
30+
from azure.cosmos._routing.routing_range import PartitionKeyRange, PKRange
3131

3232
# pylint: disable=line-too-long
3333
class CollectionRoutingMap(object):
@@ -288,7 +288,10 @@ def _build_routing_map_from_ranges(
288288
if PartitionKeyRange.Parents in r and r[PartitionKeyRange.Parents]:
289289
gone_range_ids.update(r[PartitionKeyRange.Parents])
290290

291-
filtered_ranges = [r for r in ranges if r[PartitionKeyRange.Id] not in gone_range_ids]
291+
filtered_ranges = [
292+
PKRange.from_dict(r)
293+
for r in ranges if r[PartitionKeyRange.Id] not in gone_range_ids
294+
]
292295
range_tuples = [(r, True) for r in filtered_ranges]
293296

294297
routing_map = CollectionRoutingMap.CompleteRoutingMap(

0 commit comments

Comments
 (0)