Skip to content

Commit b50187b

Browse files
dibahlfiCopilot
andcommitted
Fix metadata timeout propagation
- propagate per-call read_timeout, connection_timeout, and timeout (operation deadline) options across query setup metadata calls (container read, query plan, /pkranges) in sync and async paths - extend test coverage for timeout propagation Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 30b7dfe commit b50187b

14 files changed

Lines changed: 846 additions & 60 deletions

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
#### Breaking Changes
88

99
#### Bugs Fixed
10+
* Fixed per-call `read_timeout`, `connection_timeout`, and `timeout` (operation deadline) being dropped on the metadata calls a query makes before its first page.
1011

1112
#### Other Changes
1213

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,11 @@ def build_options(kwargs: dict[str, Any]) -> dict[str, Any]:
124124
options[Constants.Kwargs.READ_TIMEOUT] = kwargs[Constants.Kwargs.READ_TIMEOUT]
125125
if Constants.Kwargs.TIMEOUT in kwargs:
126126
options[Constants.Kwargs.TIMEOUT] = kwargs[Constants.Kwargs.TIMEOUT]
127+
# Copy (not pop) so connection_timeout stays in kwargs for the page fetch
128+
# and is also placed in options, where the container read, partition-key
129+
# ranges, and query plan calls read it.
130+
if Constants.Kwargs.CONNECTION_TIMEOUT in kwargs:
131+
options[Constants.Kwargs.CONNECTION_TIMEOUT] = kwargs[Constants.Kwargs.CONNECTION_TIMEOUT]
127132

128133

129134
options[Constants.OperationStartTime] = time.time()
@@ -1082,6 +1087,70 @@ def _build_properties_cache(properties: dict[str, Any], container_link: str) ->
10821087
"partitionKey": properties.get("partitionKey", None), "container_link": container_link
10831088
}
10841089

1090+
# The per-call timeout keys a caller can set on a single request. Listed once
1091+
# here so format_pk_range_options and the hybrid-search fetch forward the same
1092+
# set.
1093+
_PER_CALL_TIMEOUT_OPTION_KEYS: Tuple[str, ...] = (
1094+
Constants.Kwargs.READ_TIMEOUT,
1095+
Constants.Kwargs.CONNECTION_TIMEOUT,
1096+
Constants.Kwargs.TIMEOUT,
1097+
)
1098+
1099+
# The operation deadline is checked as elapsed = now - OperationStartTime, and
1100+
# OperationStartTime defaults to the current time when it is missing. So timeout
1101+
# and OperationStartTime must be carried together onto the metadata setup calls;
1102+
# otherwise a setup call measures the deadline from its own start instead of the
1103+
# operation's start. This adds OperationStartTime to the three timeout keys above.
1104+
_PER_CALL_DEADLINE_OPTION_KEYS: Tuple[str, ...] = _PER_CALL_TIMEOUT_OPTION_KEYS + (
1105+
Constants.OperationStartTime,
1106+
)
1107+
1108+
1109+
def _carry_per_call_timeout_options(source: Mapping[str, Any], destination: dict[str, Any]) -> None:
1110+
"""Copy the per-call timeouts and the operation start time from source into destination.
1111+
1112+
Copies read_timeout, connection_timeout, timeout, and OperationStartTime. Only
1113+
keys present in source are copied, so a timeout the caller did not set stays
1114+
absent and the request uses the client default instead of None.
1115+
1116+
:param source: The request options to read the timeouts from.
1117+
:type source: ~collections.abc.Mapping[str, typing.Any]
1118+
:param destination: The options dict to copy the timeouts into.
1119+
:type destination: dict[str, typing.Any]
1120+
:return: None
1121+
:rtype: None
1122+
"""
1123+
for key in _PER_CALL_DEADLINE_OPTION_KEYS:
1124+
if key in source:
1125+
destination[key] = source[key]
1126+
1127+
1128+
def _copy_per_call_timeouts_to_kwargs(
1129+
options: Optional[Mapping[str, Any]],
1130+
kwargs: dict[str, Any]
1131+
) -> None:
1132+
"""Copy the per-call timeouts and the operation start time from options into kwargs.
1133+
1134+
Moves read_timeout, connection_timeout, timeout, and OperationStartTime from
1135+
the request options into the kwargs the request layer reads. A value is copied
1136+
only when it is set (not None), so an unset timeout falls back to the client
1137+
default instead of None; setdefault keeps any value already in kwargs.
1138+
1139+
:param options: The request options to read the timeouts from (may be None or empty).
1140+
:type options: ~collections.abc.Mapping[str, typing.Any] or None
1141+
:param kwargs: The kwargs dict to copy the timeouts into; mutated in place.
1142+
:type kwargs: dict[str, typing.Any]
1143+
:return: None
1144+
:rtype: None
1145+
"""
1146+
if not options:
1147+
return
1148+
for key in _PER_CALL_DEADLINE_OPTION_KEYS:
1149+
value = options.get(key)
1150+
if value is not None:
1151+
kwargs.setdefault(key, value)
1152+
1153+
10851154
def format_pk_range_options(query_options: Mapping[str, Any]) -> dict[str, Any]:
10861155
"""Formats the partition key range options to be used internally from the query ones.
10871156
:param dict query_options: The query options being used.
@@ -1094,4 +1163,7 @@ def format_pk_range_options(query_options: Mapping[str, Any]) -> dict[str, Any]:
10941163
pk_range_options[Constants.ContainerRID] = query_options[Constants.ContainerRID]
10951164
if "excludedLocations" in query_options:
10961165
pk_range_options["excludedLocations"] = query_options["excludedLocations"]
1166+
# Keep the per-call timeouts so the partition-key ranges fetch uses them
1167+
# instead of the client default.
1168+
_carry_per_call_timeout_options(query_options, pk_range_options)
10971169
return pk_range_options

sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -116,15 +116,17 @@ class _Constants:
116116
class Kwargs:
117117
"""Keyword arguments used in the azure-cosmos package"""
118118

119+
# Whether to retry write operations if they fail. Used either at client level or request level.
119120
RETRY_WRITE: Literal["retry_write"] = "retry_write"
120-
"""Whether to retry write operations if they fail. Used either at client level or request level."""
121121
EXCLUDED_LOCATIONS: Literal["excludedLocations"] = "excludedLocations"
122+
# Availability strategy config. Used either at client level or request level.
122123
AVAILABILITY_STRATEGY: Literal["availabilityStrategy"] = "availabilityStrategy"
123-
"""Availability strategy config. Used either at client level or request level"""
124+
# Socket read timeout in seconds. Used either at client level or request level.
124125
READ_TIMEOUT: Literal["read_timeout"] = "read_timeout"
125-
"""Socket read timeout in seconds. Used either at client level or request level."""
126+
# Absolute timeout in seconds for the combined HTTP request and response processing.
126127
TIMEOUT: Literal["timeout"] = "timeout"
127-
"""Absolute timeout in seconds for the combined HTTP request and response processing."""
128+
# Socket connect (handshake) timeout in seconds. Used either at client level or request level.
129+
CONNECTION_TIMEOUT: Literal["connection_timeout"] = "connection_timeout"
128130

129131
class UserAgentFeatureFlags(IntEnum):
130132
"""

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 5 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -3244,18 +3244,11 @@ def __QueryFeed( # pylint: disable=too-many-locals, too-many-statements, too-ma
32443244
"""
32453245
if options is None:
32463246
options = {}
3247-
read_timeout = options.get("read_timeout")
3248-
if read_timeout is not None:
3249-
# we currently have a gap where kwargs are not getting passed correctly down the pipeline. In order to make
3250-
# absolute time out work, we are passing read_timeout via kwargs as a temporary fix
3251-
kwargs.setdefault("read_timeout", read_timeout)
3252-
3253-
operation_start_time = options.get(Constants.OperationStartTime)
3254-
if operation_start_time is not None:
3255-
kwargs.setdefault(Constants.OperationStartTime, operation_start_time)
3256-
timeout = options.get("timeout")
3257-
if timeout is not None:
3258-
kwargs.setdefault("timeout", timeout)
3247+
# Copy the per-call timeouts and the operation start time out of options into
3248+
# kwargs, where _Request reads them. A value is copied only when set, so
3249+
# an unset timeout falls back to the client/policy default instead of
3250+
# None; setdefault keeps any explicit kwarg the caller already placed.
3251+
base._copy_per_call_timeouts_to_kwargs(options, kwargs)
32593252

32603253
# Execution context injects this via request options; keep kwargs fallback
32613254
# for compatibility with call paths that still thread internal values there.

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/execution_dispatcher.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
from azure.cosmos.exceptions import CosmosHttpResponseError
3636
from azure.cosmos.http_constants import StatusCodes
3737
from ..._constants import _Constants as Constants
38+
from ... import _base
3839

3940
# pylint: disable=protected-access
4041

@@ -67,11 +68,17 @@ def __init__(self, client, resource_link, query, options, fetch_function,
6768
async def _create_execution_context_with_query_plan(self):
6869
self._fetched_query_plan = True
6970
query_to_use = self._query if self._query is not None else "Select * from root r"
71+
# read_timeout is forwarded as-is (None when the caller did not set it) to
72+
# keep its existing behavior. It is set before the helper, so the helper's
73+
# setdefault leaves it unchanged and only adds connection_timeout, timeout,
74+
# and OperationStartTime when the caller set them.
75+
query_plan_kwargs = {"read_timeout": self._options.get('read_timeout')}
76+
_base._copy_per_call_timeouts_to_kwargs(self._options, query_plan_kwargs)
7077
query_plan = await self._client._GetQueryPlanThroughGateway(
7178
query_to_use,
7279
self._resource_link,
7380
self._options.get('excludedLocations'),
74-
read_timeout=self._options.get('read_timeout')
81+
**query_plan_kwargs
7582
)
7683
query_execution_info = _PartitionedQueryExecutionInfo(query_plan)
7784
qe_info = getattr(query_execution_info, "_query_execution_info", None)

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/aio/hybrid_search_aggregator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
_FULL_TEXT_SCORE_SCOPE_KEY, _FULL_TEXT_SCORE_SCOPE_LOCAL, _FULL_TEXT_SCORE_SCOPE_DEFAULT
1111
from azure.cosmos._routing import routing_range
1212
from azure.cosmos import exceptions
13+
from azure.cosmos import _base
1314
from ..._constants import _Constants as Constants
1415

1516
# pylint: disable=protected-access
@@ -297,6 +298,9 @@ async def _get_target_partition_key_range(self, target_all_ranges):
297298
feed_options = {}
298299
if Constants.ContainerRID in self._options:
299300
feed_options[Constants.ContainerRID] = self._options[Constants.ContainerRID]
301+
# This path calls _ReadPartitionKeyRanges directly and skips
302+
# format_pk_range_options, so copy the per-call timeouts here too.
303+
_base._carry_per_call_timeout_options(self._options, feed_options)
300304
return [item async for item in self._client._ReadPartitionKeyRanges(
301305
collection_link=self._resource_link, feed_options=feed_options)]
302306
query_ranges = self._partitioned_query_ex_info.get_query_ranges()

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/execution_dispatcher.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
from azure.cosmos.documents import _DistinctType
3535
from azure.cosmos.http_constants import StatusCodes, SubStatusCodes
3636
from .._constants import _Constants as Constants
37+
from .. import _base
3738

3839
# pylint: disable=protected-access
3940

@@ -97,11 +98,17 @@ def __init__(self, client, resource_link, query, options, fetch_function, respon
9798
def _create_execution_context_with_query_plan(self):
9899
self._fetched_query_plan = True
99100
query_to_use = self._query if self._query is not None else "Select * from root r"
101+
# read_timeout is forwarded as-is (None when the caller did not set it) to
102+
# keep its existing behavior. It is set before the helper, so the helper's
103+
# setdefault leaves it unchanged and only adds connection_timeout, timeout,
104+
# and OperationStartTime when the caller set them.
105+
query_plan_kwargs = {"read_timeout": self._options.get('read_timeout')}
106+
_base._copy_per_call_timeouts_to_kwargs(self._options, query_plan_kwargs)
100107
query_plan = self._client._GetQueryPlanThroughGateway(
101108
query_to_use,
102109
self._resource_link,
103110
self._options.get('excludedLocations'),
104-
read_timeout=self._options.get('read_timeout')
111+
**query_plan_kwargs
105112
)
106113
query_execution_info = _PartitionedQueryExecutionInfo(query_plan)
107114
qe_info = getattr(query_execution_info, "_query_execution_info", None)

sdk/cosmos/azure-cosmos/azure/cosmos/_execution_context/hybrid_search_aggregator.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from azure.cosmos._execution_context import document_producer
99
from azure.cosmos._routing import routing_range
1010
from azure.cosmos import exceptions
11+
from azure.cosmos import _base
1112
from .._constants import _Constants as Constants
1213

1314
# pylint: disable=protected-access
@@ -454,6 +455,9 @@ def _get_target_partition_key_range(self, target_all_ranges):
454455
feed_options = {}
455456
if Constants.ContainerRID in self._options:
456457
feed_options[Constants.ContainerRID] = self._options[Constants.ContainerRID]
458+
# This path calls _ReadPartitionKeyRanges directly and skips
459+
# format_pk_range_options, so copy the per-call timeouts here too.
460+
_base._carry_per_call_timeout_options(self._options, feed_options)
457461
return list(self._client._ReadPartitionKeyRanges(
458462
collection_link=self._resource_link, feed_options=feed_options))
459463
query_ranges = self._partitioned_query_ex_info.get_query_ranges()

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_container.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,8 @@
3939
from .. import _utils as utils
4040
from .._availability_strategy_config import _validate_request_hedging_strategy
4141
from .._base import (_build_properties_cache, _deserialize_throughput, _replace_throughput,
42-
build_options as _build_options, GenerateGuidId, validate_cache_staleness_value)
42+
build_options as _build_options, _copy_per_call_timeouts_to_kwargs,
43+
GenerateGuidId, validate_cache_staleness_value)
4344
from .._change_feed.feed_range_internal import FeedRangeInternalEpk
4445

4546
from .._cosmos_responses import CosmosDict, CosmosList, CosmosAsyncItemPaged
@@ -102,13 +103,9 @@ async def _get_properties_with_options(self, options: Optional[dict[str, Any]] =
102103
if options:
103104
if "excludedLocations" in options:
104105
kwargs['excluded_locations'] = options['excludedLocations']
105-
if Constants.OperationStartTime in options:
106-
kwargs[Constants.OperationStartTime] = options[Constants.OperationStartTime]
107-
if Constants.Kwargs.TIMEOUT in options:
108-
kwargs[Constants.Kwargs.TIMEOUT] = options[Constants.Kwargs.TIMEOUT]
109-
if Constants.Kwargs.READ_TIMEOUT in options:
110-
kwargs[Constants.Kwargs.READ_TIMEOUT] = options[Constants.Kwargs.READ_TIMEOUT]
111-
106+
# Forward the per-call timeouts and the operation start time so the
107+
# container read honors them instead of the client/policy default.
108+
_copy_per_call_timeouts_to_kwargs(options, kwargs)
112109
return await self._get_properties(**kwargs)
113110

114111
async def _get_properties(self, **kwargs: Any) -> dict[str, Any]:

sdk/cosmos/azure-cosmos/azure/cosmos/aio/_cosmos_client_connection_async.py

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3038,20 +3038,11 @@ async def __QueryFeed( # pylint: disable=too-many-branches,too-many-statements,
30383038
if options is None:
30393039
options = {}
30403040

3041-
read_timeout = options.get("read_timeout")
3042-
if read_timeout is not None:
3043-
# we currently have a gap where kwargs are not getting passed correctly down the pipeline. In order to make
3044-
# absolute time out work, we are passing read_timeout via kwargs as a temporary fix
3045-
kwargs.setdefault("read_timeout", read_timeout)
3046-
3047-
operation_start_time = options.get(Constants.OperationStartTime)
3048-
if operation_start_time is not None:
3049-
# we need to set operation_state in kwargs as thats where it is looked at while sending the request
3050-
kwargs.setdefault(Constants.OperationStartTime, operation_start_time)
3051-
timeout = options.get("timeout")
3052-
if timeout is not None:
3053-
# we need to set operation_state in kwargs as that's where it is looked at while sending the request
3054-
kwargs.setdefault("timeout", timeout)
3041+
# Copy the per-call timeouts and the operation start time out of options into
3042+
# kwargs, where _Request reads them. A value is copied only when set, so
3043+
# an unset timeout falls back to the client/policy default instead of
3044+
# None; setdefault keeps any explicit kwarg the caller already placed.
3045+
base._copy_per_call_timeouts_to_kwargs(options, kwargs)
30553046

30563047
# The capture dict can arrive via two upstream paths:
30573048
# 1. The query execution context puts it into ``options`` (the

0 commit comments

Comments
 (0)