Skip to content
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
8b03fa2
perf(cosmos): share pk range cache + __slots__ + skip .upper()
tvaron3 Apr 14, 2026
3ec8f5e
perf(cosmos): add PKRange namedtuple for compact partition key range …
tvaron3 Apr 14, 2026
2cd31c6
fix: resolve pylint, mypy, cspell errors in PKRange change
tvaron3 Apr 14, 2026
5448e75
perf(cosmos): add __slots__ to _PartitionHealthInfo + comments on Ran…
tvaron3 Apr 14, 2026
a63db88
fix: mypy type annotation + move cspell to cosmos package level
tvaron3 Apr 14, 2026
5407306
merge: resolve cspell.json conflict with upstream/main
tvaron3 Apr 14, 2026
5a0992f
test(cosmos): add integration + fault injection tests for shared cache
tvaron3 Apr 15, 2026
e1d4152
fix(cosmos): address review - clear_cache identity, PKRange indexing,…
tvaron3 Apr 15, 2026
7f04560
chore: remove harness artifacts from tracked files
tvaron3 Apr 15, 2026
44e87b0
fix(cosmos): resolve test failures — PKRange dict equality, test updates
tvaron3 Apr 16, 2026
a64fe14
chore: remove stale .temp artifact
tvaron3 Apr 16, 2026
e429f92
fix(cosmos): session token parents.copy(), shared cache test isolatio…
tvaron3 Apr 16, 2026
fafee80
chore: remove .temp artifact
tvaron3 Apr 16, 2026
dd32caf
fix(cosmos): test fixes — PKRange field assertions, remove looping fa…
tvaron3 Apr 16, 2026
770c5b1
test(cosmos): add async versions of all shared cache tests
tvaron3 Apr 17, 2026
bd830a0
fix(cosmos): async tests — drop enable_cross_partition_query, use que…
tvaron3 Apr 20, 2026
b0780c6
fix(cosmos): async tests — populate PK range cache via direct provide…
tvaron3 Apr 20, 2026
506d3fe
fix(cosmos): address iter-2 review — shared locks, cache release, PKR…
tvaron3 Apr 20, 2026
2320aae
fix(cosmos): pylint docstrings on _resolve_endpoint + async CRUD test…
tvaron3 Apr 20, 2026
dffcc7b
chore: untrack .coding-harness/ harness artifacts
tvaron3 Apr 20, 2026
4aaa556
ci: retrigger pipelines (flaky test_health_check_failure_startup_asyn…
tvaron3 Apr 20, 2026
ce63b96
doc(cosmos): document PKRange.__contains__ truthy-presence semantics
tvaron3 Apr 20, 2026
19e046e
chore: untrack .coding-harness/ harness artifacts (proper gitignore)
tvaron3 Apr 20, 2026
c474821
test(cosmos): bump test_timeout_for_read_items delay 2s→3s
tvaron3 Apr 20, 2026
96e1323
Merge remote-tracking branch 'upstream/main' into fix/shared-pk-range…
tvaron3 Apr 21, 2026
014dc89
chore(cosmos): address PR review comments
tvaron3 Apr 21, 2026
7a0730c
docs(cosmos): explain shared routing-map cache module-level globals
tvaron3 Apr 22, 2026
220fcf0
fix(cosmos): scope async pk-range locks per event loop, reset cache b…
Copilot Apr 23, 2026
335f6f5
Fix shared-cache test fixture to preserve dict identity
tvaron3 Apr 23, 2026
e2c698c
F2: preserve per-collection locks across clear_cache to keep single-f…
tvaron3 Apr 23, 2026
db156df
Address xinlian12 review + fix test_multi_client_shared_cache_queries
tvaron3 Apr 23, 2026
28fe446
Merge branch 'main' into fix/strip-pk-range-fields
tvaron3 Apr 23, 2026
2f70bbf
Fix Build Analyze: pylint C4732/C4739 + cspell TOCTOU
Apr 23, 2026
2c7a318
Address xinlian Apr 24 review: sync clear_cache + retain status/throu…
tvaron3 Apr 27, 2026
1abb340
Revert .gitignore changes — keep PR diff scoped to PKR cache work
tvaron3 Apr 27, 2026
eab73eb
Address xinlian review: dedupe _resolve_endpoint + PKRange construction
tvaron3 Apr 29, 2026
ea36af0
fix: pylint C4740 — restore type annotation on _resolve_endpoint
tvaron3 Apr 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -179,4 +179,5 @@ component-detection-pip-report.json
uv.lock

# Sphinx generated documentation
website/
website/
.coding-harness/
1 change: 1 addition & 0 deletions sdk/cosmos/azure-cosmos/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* Fixed bug where `CosmosClient` construction with AAD credentials would crash at startup if the semantic reranking inference endpoint environment variable was not set, even when semantic reranking was not being used. The inference service is now lazily initialized on first use. See [PR 46243](https://github.com/Azure/azure-sdk-for-python/pull/46243)

#### Other Changes
* Reduced per-client memory overhead when partition-level circuit breaker (PPCB) is enabled by sharing the partition key range routing map cache across CosmosClient instances connected to the same endpoint, and stripping unused fields from cached partition key ranges using compact PKRange namedtuples. See [PR 46297](https://github.com/Azure/azure-sdk-for-python/pull/46297)

### 4.16.0b2 (2026-04-04)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3635,11 +3635,11 @@ def refresh_routing_map_provider(
)
else:
# Full refresh - create a new provider instance. This clears all cached routing maps.
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
self._routing_map_provider.clear_cache()
Comment thread
tvaron3 marked this conversation as resolved.
Outdated
return

# Fallback to full refresh when targeted refresh fails transiently.
self._routing_map_provider = routing_map_provider.SmartRoutingMapProvider(self)
self._routing_map_provider.clear_cache()

def _refresh_container_properties_cache(self, container_link: str):
# If container properties cache is stale, refresh it by reading the container.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,17 @@ class _PartitionHealthInfo(object):
"""
This internal class keeps the health and statistics for a partition.
"""
# __slots__ reduces per-instance memory by using a fixed-size C array
# instead of a per-instance __dict__. Significant when tracking many partitions.
__slots__ = (
'write_failure_count',
'read_failure_count',
'write_success_count',
'read_success_count',
'read_consecutive_failure_count',
'write_consecutive_failure_count',
'unavailability_info',
)

def __init__(self) -> None:
self.write_failure_count: int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
from .collection_routing_map import CollectionRoutingMap, _build_routing_map_from_ranges
from . import routing_range
from .routing_range import (
PKRange,
PartitionKeyRange,
_is_sorted_and_non_overlapping,
_subtract_range,
Expand Down Expand Up @@ -186,7 +187,7 @@ def process_fetched_ranges(
# Incremental update -- merge deltas into the existing map.
# Resolve parent chains transitively within this single delta so cascading
# splits (A->B+C and B->D+E in one payload) can be merged incrementally.
range_tuples: List[Tuple[Dict[str, Any], Any]] = []
range_tuples: List[Tuple[Any, Any]] = []
known_range_info_by_id = {
pkr_id: pkr_tuple[1]
for pkr_id, pkr_tuple in previous_routing_map._rangeById.items() # pylint: disable=protected-access
Expand All @@ -209,7 +210,11 @@ def process_fetched_ranges(
next_unresolved.append(r)
continue

range_tuples.append((r, range_info))
range_tuples.append((PKRange(
id=r[PartitionKeyRange.Id],
minInclusive=r[PartitionKeyRange.MinInclusive],
maxExclusive=r[PartitionKeyRange.MaxExclusive],
parents=tuple(r.get(PartitionKeyRange.Parents) or ())), range_info))
known_range_info_by_id[r[PartitionKeyRange.Id]] = range_info
progress_made = True

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
"""
import asyncio # pylint: disable=do-not-import-asyncio
import logging
import threading
from typing import Dict, Any, Optional, List, TYPE_CHECKING
from azure.core.utils import CaseInsensitiveDict
from ... import _base, http_constants
Expand All @@ -41,6 +42,39 @@

if TYPE_CHECKING:
from ...aio._cosmos_client_connection_async import CosmosClientConnection

# Shared routing map cache across all clients targeting the same endpoint.
# All four module-level dicts are keyed by endpoint and protected by
# ``_shared_cache_lock`` for mutation. Per-collection refresh serialization is
# handled by the per-endpoint asyncio.Locks in ``_shared_collection_locks`` so
# that all clients sharing an endpoint single-flight refreshes through the
# same lock.
_shared_routing_map_cache: dict = {}
_shared_collection_locks: Dict[str, Dict[str, asyncio.Lock]] = {}
_shared_locks_locks: Dict[str, asyncio.Lock] = {}
_shared_cache_refcounts: Dict[str, int] = {}
_shared_cache_lock = threading.Lock()


def _resolve_endpoint(client: Any) -> str:
"""Return a cache key for ``client``'s endpoint.

Falls back to ``__unknown_<id>__`` when ``client`` has no ``url_connection``
so unknown/mocked clients are isolated rather than collapsed into a single
shared cache entry.

:param client: The CosmosClient (or compatible) instance whose endpoint
will be used as the shared-cache key.
:type client: Any
:returns: The endpoint URL string, or a per-instance fallback key when the
client does not expose ``url_connection``.
:rtype: str
"""
try:
return client.url_connection
except AttributeError:
return f"__unknown_{id(client)}__"

# pylint: disable=protected-access

logger = logging.getLogger(__name__)
Expand All @@ -64,13 +98,68 @@ def __init__(self, client: Any):
"""

self._document_client = client
self._endpoint = _resolve_endpoint(client)
self._released = False

# Share routing map cache, per-collection asyncio locks, and the
# per-endpoint meta-lock that guards the per-collection-lock dict
# across all clients with the same endpoint. Refcount lets us evict
# the entry when the last sharing client releases it (see ``release``).
with _shared_cache_lock:
if self._endpoint not in _shared_routing_map_cache:
_shared_routing_map_cache[self._endpoint] = {}
_shared_collection_locks[self._endpoint] = {}
_shared_locks_locks[self._endpoint] = asyncio.Lock()
_shared_cache_refcounts[self._endpoint] = 0
_shared_cache_refcounts[self._endpoint] += 1
self._collection_routing_map_by_item = _shared_routing_map_cache[self._endpoint]
self._collection_locks: Dict[str, asyncio.Lock] = _shared_collection_locks[self._endpoint]
self._locks_lock: asyncio.Lock = _shared_locks_locks[self._endpoint]

async def clear_cache(self):
Comment thread
tvaron3 marked this conversation as resolved.
Outdated
"""Clear the shared routing map cache for this endpoint.

Uses in-place ``.clear()`` to preserve all client references to the
same dict and the same per-collection lock dict, so concurrent clients
sharing the endpoint continue to single-flight through the same locks.
"""
async with self._locks_lock:
with _shared_cache_lock:
if self._endpoint in _shared_routing_map_cache:
_shared_routing_map_cache[self._endpoint].clear()
self._collection_locks.clear()

# keeps the cached collection routing map by collection id
self._collection_routing_map_by_item: Dict[str, CollectionRoutingMap] = {}
# A lock to control access to the locks dictionary itself
self._locks_lock = asyncio.Lock()
# A dictionary to hold a lock for each collection ID
self._collection_locks: Dict[str, asyncio.Lock] = {}
def release(self) -> None:
"""Decrement the per-endpoint refcount and evict shared state at zero.

Safe to call multiple times. Best-effort: never raises.
"""
if self._released:
return
self._released = True
endpoint = self._endpoint
try:
with _shared_cache_lock:
count = _shared_cache_refcounts.get(endpoint, 0) - 1
if count <= 0:
_shared_cache_refcounts.pop(endpoint, None)
_shared_routing_map_cache.pop(endpoint, None)
_shared_collection_locks.pop(endpoint, None)
_shared_locks_locks.pop(endpoint, None)
else:
_shared_cache_refcounts[endpoint] = count
except Exception: # pylint: disable=broad-except
# release() may be called from __del__ during interpreter shutdown
# where module globals may already be torn down.
pass

def __del__(self):
# Defensive fallback in case the owning client teardown path didn't
# call release(). Must never raise.
try:
self.release()
except Exception: # pylint: disable=broad-except
pass

async def _get_lock_for_collection(self, collection_id: str) -> asyncio.Lock:
"""Safely gets or creates a lock for a given collection ID.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from typing import Optional, Union

from azure.cosmos._routing import routing_range
from azure.cosmos._routing.routing_range import PartitionKeyRange
from azure.cosmos._routing.routing_range import PartitionKeyRange, PKRange

# pylint: disable=line-too-long
class CollectionRoutingMap(object):
Expand Down Expand Up @@ -288,7 +288,13 @@ def _build_routing_map_from_ranges(
if PartitionKeyRange.Parents in r and r[PartitionKeyRange.Parents]:
gone_range_ids.update(r[PartitionKeyRange.Parents])

filtered_ranges = [r for r in ranges if r[PartitionKeyRange.Id] not in gone_range_ids]
filtered_ranges = [
PKRange(id=r[PartitionKeyRange.Id],
minInclusive=r[PartitionKeyRange.MinInclusive],
maxExclusive=r[PartitionKeyRange.MaxExclusive],
parents=tuple(r.get(PartitionKeyRange.Parents) or ()))
for r in ranges if r[PartitionKeyRange.Id] not in gone_range_ids
]
range_tuples = [(r, True) for r in filtered_ranges]

routing_map = CollectionRoutingMap.CompleteRoutingMap(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,38 @@

if TYPE_CHECKING:
from .._cosmos_client_connection import CosmosClientConnection

# Shared routing map cache across all clients targeting the same endpoint.
# All four module-level dicts are keyed by endpoint and protected by
# ``_shared_cache_lock`` for mutation. Per-collection refresh serialization is
# handled by the per-endpoint locks in ``_shared_collection_locks`` so that all
# clients sharing an endpoint single-flight refreshes through the same lock.
_shared_routing_map_cache: dict = {}
_shared_collection_locks: Dict[str, Dict[str, threading.Lock]] = {}
_shared_locks_locks: Dict[str, threading.Lock] = {}
_shared_cache_refcounts: Dict[str, int] = {}
_shared_cache_lock = threading.Lock()


def _resolve_endpoint(client: Any) -> str:
"""Return a cache key for ``client``'s endpoint.

Falls back to ``__unknown_<id>__`` when ``client`` has no ``url_connection``
so unknown/mocked clients are isolated rather than collapsed into a single
shared cache entry.

:param client: The CosmosClient (or compatible) instance whose endpoint
will be used as the shared-cache key.
:type client: Any
:returns: The endpoint URL string, or a per-instance fallback key when the
client does not expose ``url_connection``.
:rtype: str
"""
try:
return client.url_connection
except AttributeError:
return f"__unknown_{id(client)}__"

# pylint: disable=protected-access, line-too-long


Expand All @@ -63,13 +95,68 @@ def __init__(self, client: Any):
"""

self._document_client = client
self._endpoint = _resolve_endpoint(client)
self._released = False

# Share routing map cache, per-collection locks, and the meta-lock that
# guards the per-collection-lock dict across all clients with the same
# endpoint. Refcount lets us evict the entry when the last sharing
# client releases it (see ``release``).
with _shared_cache_lock:
Comment thread
tvaron3 marked this conversation as resolved.
if self._endpoint not in _shared_routing_map_cache:
_shared_routing_map_cache[self._endpoint] = {}
_shared_collection_locks[self._endpoint] = {}
_shared_locks_locks[self._endpoint] = threading.Lock()
_shared_cache_refcounts[self._endpoint] = 0
_shared_cache_refcounts[self._endpoint] += 1
self._collection_routing_map_by_item = _shared_routing_map_cache[self._endpoint]
self._collection_locks: Dict[str, threading.Lock] = _shared_collection_locks[self._endpoint]
self._locks_lock: threading.Lock = _shared_locks_locks[self._endpoint]

def clear_cache(self):
"""Clear the shared routing map cache for this endpoint.

Uses in-place ``.clear()`` to preserve all client references to the
same dict and the same per-collection lock dict, so concurrent clients
sharing the endpoint continue to single-flight through the same locks.
"""
with self._locks_lock:
with _shared_cache_lock:
if self._endpoint in _shared_routing_map_cache:
_shared_routing_map_cache[self._endpoint].clear()
self._collection_locks.clear()

# keeps the cached collection routing map by collection id
self._collection_routing_map_by_item: Dict[str, CollectionRoutingMap] = {}
# A lock to control access to the locks dictionary itself
self._locks_lock = threading.Lock()
# A dictionary to hold a lock for each collection ID
self._collection_locks: Dict[str, threading.Lock] = {}
def release(self) -> None:
"""Decrement the per-endpoint refcount and evict shared state at zero.

Safe to call multiple times. Best-effort: never raises.
"""
if self._released:
Comment thread
tvaron3 marked this conversation as resolved.
Outdated
return
self._released = True
endpoint = self._endpoint
try:
with _shared_cache_lock:
count = _shared_cache_refcounts.get(endpoint, 0) - 1
if count <= 0:
_shared_cache_refcounts.pop(endpoint, None)
_shared_routing_map_cache.pop(endpoint, None)
_shared_collection_locks.pop(endpoint, None)
_shared_locks_locks.pop(endpoint, None)
else:
_shared_cache_refcounts[endpoint] = count
except Exception: # pylint: disable=broad-except
# release() may be called from __del__ during interpreter shutdown
# where module globals may already be torn down.
pass

def __del__(self):
# Defensive fallback in case the owning client teardown path didn't
# call release(). Must never raise.
try:
self.release()
except Exception: # pylint: disable=broad-except
pass

def _get_lock_for_collection(self, collection_id: str) -> threading.Lock:

Expand Down
Loading
Loading