-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Add Cosmos DB dedicated gateway bypass cache and shard key support #44846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 8 commits
4c3e9d1
65f4491
87148ce
470329a
1a304ef
d404df8
7d04430
59941d3
cf89cc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,7 +39,7 @@ | |
| from .. import _utils as utils | ||
| from .._availability_strategy_config import _validate_hedging_config | ||
| from .._base import (_build_properties_cache, _deserialize_throughput, _replace_throughput, | ||
| build_options as _build_options, GenerateGuidId, validate_cache_staleness_value) | ||
| build_options as _build_options, GenerateGuidId, validate_cache_staleness_value, validate_shard_key_value) | ||
| from .._change_feed.feed_range_internal import FeedRangeInternalEpk | ||
|
|
||
| from .._cosmos_responses import CosmosDict, CosmosList | ||
|
|
@@ -321,6 +321,8 @@ async def read_item( | |
| session_token: Optional[str] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| priority: Optional[Literal["High", "Low"]] = None, | ||
| throughput_bucket: Optional[int] = None, | ||
| availability_strategy_config: Optional[dict[str, Any]] = _Unset, | ||
|
|
@@ -342,6 +344,13 @@ async def read_item( | |
| :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in | ||
| milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, | ||
| responses are guaranteed to be no staler than this value. | ||
| :keyword bool bypass_integrated_cache: If set to True, the read will be served by the backend and won't be | ||
| cached in the dedicated gateway. If set to False or not provided, the integrated cache will be used if | ||
| configured. This option only applies to accounts configured with dedicated gateway. | ||
| :keyword str dedicated_gateway_shard_key: The shard key to use for the dedicated gateway request. Specifying | ||
| the shard key will help route the request to an instance that has cached data for this shard or bypass | ||
| the SQLx cache if the correlated instance is down. If not specified, the request will fall back to | ||
| randomly selecting an instance. This option only applies to accounts configured with dedicated gateway. | ||
| :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each | ||
| request. Once the user has reached their provisioned throughput, low priority requests are throttled | ||
| before high priority requests start getting throttled. Feature must first be enabled at the account level. | ||
|
|
@@ -386,6 +395,11 @@ async def read_item( | |
| if max_integrated_cache_staleness_in_ms is not None: | ||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||
| request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||
| if bypass_integrated_cache is not None: | ||
| request_options["bypassIntegratedCache"] = bypass_integrated_cache | ||
| if dedicated_gateway_shard_key is not None: | ||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||
| request_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||
| await self._get_properties_with_options(request_options) | ||
| request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] | ||
|
|
||
|
|
@@ -399,6 +413,8 @@ def read_all_items( | |
| session_token: Optional[str] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
|
Comment on lines
413
to
+417
|
||
| priority: Optional[Literal["High", "Low"]] = None, | ||
| throughput_bucket: Optional[int] = None, | ||
| availability_strategy_config: Optional[dict[str, Any]] = _Unset, | ||
|
|
@@ -442,9 +458,14 @@ def read_all_items( | |
| feed_options = _build_options(kwargs) | ||
| if max_item_count is not None: | ||
| feed_options["maxItemCount"] = max_item_count | ||
| if max_integrated_cache_staleness_in_ms: | ||
| if max_integrated_cache_staleness_in_ms is not None: | ||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||
| feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||
| if bypass_integrated_cache is not None: | ||
| feed_options["bypassIntegratedCache"] = bypass_integrated_cache | ||
| if dedicated_gateway_shard_key is not None: | ||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||
| feed_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||
| response_hook = kwargs.pop("response_hook", None) | ||
| if response_hook and hasattr(response_hook, "clear"): | ||
| response_hook.clear() | ||
|
|
@@ -537,6 +558,8 @@ def query_items( | |
| enable_scan_in_query: Optional[bool] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| max_item_count: Optional[int] = None, | ||
| parameters: Optional[list[dict[str, object]]] = None, | ||
| partition_key: PartitionKeyType, | ||
|
|
@@ -624,6 +647,8 @@ def query_items( | |
| feed_range: dict[str, Any], | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| max_item_count: Optional[int] = None, | ||
| parameters: Optional[list[dict[str, object]]] = None, | ||
| populate_index_metrics: Optional[bool] = None, | ||
|
|
@@ -706,6 +731,8 @@ def query_items( | |
| enable_scan_in_query: Optional[bool] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| max_item_count: Optional[int] = None, | ||
| parameters: Optional[list[dict[str, object]]] = None, | ||
| populate_index_metrics: Optional[bool] = None, | ||
|
|
@@ -868,6 +895,13 @@ def query_items( | |
| max_integrated_cache_staleness_in_ms = kwargs.pop("max_integrated_cache_staleness_in_ms") | ||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||
| feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||
| if utils.valid_key_value_exist(kwargs, "bypass_integrated_cache"): | ||
| bypass_integrated_cache = kwargs.pop("bypass_integrated_cache") | ||
| feed_options["bypassIntegratedCache"] = bypass_integrated_cache | ||
| if utils.valid_key_value_exist(kwargs, "dedicated_gateway_shard_key"): | ||
| dedicated_gateway_shard_key = kwargs.pop("dedicated_gateway_shard_key") | ||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||
| feed_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||
|
Comment on lines
+905
to
+911
|
||
| if utils.valid_key_value_exist(kwargs, "continuation_token_limit"): | ||
| feed_options["responseContinuationTokenLimitInKb"] = kwargs.pop("continuation_token_limit") | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -35,7 +35,7 @@ | |||||||||||||||||||||
| from . import _utils as utils | ||||||||||||||||||||||
| from ._availability_strategy_config import _validate_hedging_config | ||||||||||||||||||||||
| from ._base import (_build_properties_cache, _deserialize_throughput, _replace_throughput, build_options, | ||||||||||||||||||||||
| GenerateGuidId, validate_cache_staleness_value) | ||||||||||||||||||||||
| GenerateGuidId, validate_cache_staleness_value, validate_shard_key_value) | ||||||||||||||||||||||
| from ._change_feed.feed_range_internal import FeedRangeInternalEpk | ||||||||||||||||||||||
| from ._constants import _Constants as Constants, TimeoutScope | ||||||||||||||||||||||
| from ._cosmos_client_connection import CosmosClientConnection | ||||||||||||||||||||||
|
|
@@ -223,6 +223,8 @@ def read_item( # pylint:disable=docstring-missing-param | |||||||||||||||||||||
| session_token: Optional[str] = None, | ||||||||||||||||||||||
| initial_headers: Optional[dict[str, str]] = None, | ||||||||||||||||||||||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||||||||||||||||||||||
| bypass_integrated_cache: Optional[bool] = None, | ||||||||||||||||||||||
| dedicated_gateway_shard_key: Optional[str] = None, | ||||||||||||||||||||||
| priority: Optional[Literal["High", "Low"]] = None, | ||||||||||||||||||||||
| throughput_bucket: Optional[int] = None, | ||||||||||||||||||||||
| availability_strategy_config: Optional[dict[str, Any]] = _Unset, | ||||||||||||||||||||||
|
|
@@ -245,6 +247,13 @@ def read_item( # pylint:disable=docstring-missing-param | |||||||||||||||||||||
| :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in | ||||||||||||||||||||||
| milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, | ||||||||||||||||||||||
| responses are guaranteed to be no staler than this value. | ||||||||||||||||||||||
| :keyword bool bypass_integrated_cache: If set to True, the read will be served by the backend and won't be | ||||||||||||||||||||||
| cached in the dedicated gateway. If set to False or not provided, the integrated cache will be used if | ||||||||||||||||||||||
| configured. This option only applies to accounts configured with dedicated gateway. | ||||||||||||||||||||||
| :keyword str dedicated_gateway_shard_key: The shard key to use for the dedicated gateway request. Specifying | ||||||||||||||||||||||
| the shard key will help route the request to an instance that has cached data for this shard or bypass | ||||||||||||||||||||||
| the SQLx cache if the correlated instance is down. If not specified, the request will fall back to | ||||||||||||||||||||||
| randomly selecting an instance. This option only applies to accounts configured with dedicated gateway. | ||||||||||||||||||||||
| :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each | ||||||||||||||||||||||
| request. Once the user has reached their provisioned throughput, low priority requests are throttled | ||||||||||||||||||||||
| before high priority requests start getting throttled. Feature must first be enabled at the account level. | ||||||||||||||||||||||
|
|
@@ -295,6 +304,11 @@ def read_item( # pylint:disable=docstring-missing-param | |||||||||||||||||||||
| if max_integrated_cache_staleness_in_ms is not None: | ||||||||||||||||||||||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||||||||||||||||||||||
| request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||||||||||||||||||||||
| if bypass_integrated_cache is not None: | ||||||||||||||||||||||
| request_options["bypassIntegratedCache"] = bypass_integrated_cache | ||||||||||||||||||||||
| if dedicated_gateway_shard_key is not None: | ||||||||||||||||||||||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||||||||||||||||||||||
| request_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||||||||||||||||||||||
| self._get_properties_with_options(request_options) | ||||||||||||||||||||||
| request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] | ||||||||||||||||||||||
| return self.client_connection.ReadItem(document_link=doc_link, options=request_options, **kwargs) | ||||||||||||||||||||||
|
|
@@ -385,6 +399,8 @@ def read_all_items( # pylint:disable=docstring-missing-param | |||||||||||||||||||||
| session_token: Optional[str] = None, | ||||||||||||||||||||||
| initial_headers: Optional[dict[str, str]] = None, | ||||||||||||||||||||||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||||||||||||||||||||||
| bypass_integrated_cache: Optional[bool] = None, | ||||||||||||||||||||||
| dedicated_gateway_shard_key: Optional[str] = None, | ||||||||||||||||||||||
| priority: Optional[Literal["High", "Low"]] = None, | ||||||||||||||||||||||
| throughput_bucket: Optional[int] = None, | ||||||||||||||||||||||
| availability_strategy_config: Optional[dict[str, Any]] = _Unset, | ||||||||||||||||||||||
|
|
@@ -401,6 +417,13 @@ def read_all_items( # pylint:disable=docstring-missing-param | |||||||||||||||||||||
| :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in | ||||||||||||||||||||||
| milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, | ||||||||||||||||||||||
| responses are guaranteed to be no staler than this value. | ||||||||||||||||||||||
| :keyword bool bypass_integrated_cache: If set to True, the read will be served by the backend and won't be | ||||||||||||||||||||||
| cached in the dedicated gateway. If set to False or not provided, the integrated cache will be used if | ||||||||||||||||||||||
| configured. This option only applies to accounts configured with dedicated gateway. | ||||||||||||||||||||||
| :keyword str dedicated_gateway_shard_key: The shard key to use for the dedicated gateway request. Specifying | ||||||||||||||||||||||
| the shard key will help route the request to an instance that has cached data for this shard or bypass | ||||||||||||||||||||||
| the SQLx cache if the correlated instance is down. If not specified, the request will fall back to | ||||||||||||||||||||||
| randomly selecting an instance. This option only applies to accounts configured with dedicated gateway. | ||||||||||||||||||||||
| :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each | ||||||||||||||||||||||
| request. Once the user has reached their provisioned throughput, low priority requests are throttled | ||||||||||||||||||||||
| before high priority requests start getting throttled. Feature must first be enabled at the account level. | ||||||||||||||||||||||
|
|
@@ -436,9 +459,14 @@ def read_all_items( # pylint:disable=docstring-missing-param | |||||||||||||||||||||
| DeprecationWarning, | ||||||||||||||||||||||
| ) | ||||||||||||||||||||||
| feed_options["populateQueryMetrics"] = populate_query_metrics | ||||||||||||||||||||||
| if max_integrated_cache_staleness_in_ms: | ||||||||||||||||||||||
| if max_integrated_cache_staleness_in_ms is not None: | ||||||||||||||||||||||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||||||||||||||||||||||
| feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||||||||||||||||||||||
| if bypass_integrated_cache is not None: | ||||||||||||||||||||||
| feed_options["bypassIntegratedCache"] = bypass_integrated_cache | ||||||||||||||||||||||
| if dedicated_gateway_shard_key is not None: | ||||||||||||||||||||||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||||||||||||||||||||||
| feed_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||||||||||||||||||||||
|
Comment on lines
+465
to
+469
|
||||||||||||||||||||||
| if response_hook and hasattr(response_hook, "clear"): | ||||||||||||||||||||||
| response_hook.clear() | ||||||||||||||||||||||
|
|
||||||||||||||||||||||
|
|
@@ -729,6 +757,8 @@ def query_items( | |||||||||||||||||||||
| continuation_token_limit: Optional[int] = None, | ||||||||||||||||||||||
| initial_headers: Optional[dict[str, str]] = None, | ||||||||||||||||||||||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||||||||||||||||||||||
| bypass_integrated_cache: Optional[bool] = None, | ||||||||||||||||||||||
| dedicated_gateway_shard_key: Optional[str] = None, | ||||||||||||||||||||||
| populate_index_metrics: Optional[bool] = None, | ||||||||||||||||||||||
| priority: Optional[Literal["High", "Low"]] = None, | ||||||||||||||||||||||
| response_hook: Optional[Callable[[Mapping[str, str], dict[str, Any]], None]] = None, | ||||||||||||||||||||||
|
|
@@ -774,6 +804,13 @@ def query_items( | |||||||||||||||||||||
| :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in | ||||||||||||||||||||||
| milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, | ||||||||||||||||||||||
| responses are guaranteed to be no staler than this value. | ||||||||||||||||||||||
| :keyword bool bypass_integrated_cache: If set to True, the read will be served by the backend and won't be | ||||||||||||||||||||||
| cached in the dedicated gateway. If set to False or not provided, the integrated cache will be used if | ||||||||||||||||||||||
| configured. This option only applies to accounts configured with dedicated gateway. | ||||||||||||||||||||||
| :keyword str dedicated_gateway_shard_key: The shard key to use for the dedicated gateway request. Specifying | ||||||||||||||||||||||
| the shard key will help route the request to an instance that has cached data for this shard or bypass | ||||||||||||||||||||||
| the SQLx cache if the correlated instance is down. If not specified, the request will fall back to | ||||||||||||||||||||||
| randomly selecting an instance. This option only applies to accounts configured with dedicated gateway. | ||||||||||||||||||||||
|
Comment on lines
804
to
+813
|
||||||||||||||||||||||
| :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in | |
| milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, | |
| responses are guaranteed to be no staler than this value. | |
| :keyword bool bypass_integrated_cache: If set to True, the read will be served by the backend and won't be | |
| cached in the dedicated gateway. If set to False or not provided, the integrated cache will be used if | |
| configured. This option only applies to accounts configured with dedicated gateway. | |
| :keyword str dedicated_gateway_shard_key: The shard key to use for the dedicated gateway request. Specifying | |
| the shard key will help route the request to an instance that has cached data for this shard or bypass | |
| the SQLx cache if the correlated instance is down. If not specified, the request will fall back to | |
| randomly selecting an instance. This option only applies to accounts configured with dedicated gateway. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
GetHeadersonly setsx-ms-dedicatedgateway-max-agewhenmaxIntegratedCacheStalenessis truthy, so a valid value of0(allowed byvalidate_cache_staleness_value) will silently omit the header. Consider checking for key presence /is not Noneinstead so0is transmitted correctly.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@copilot apply changes based on this feedback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed to use key presence check (
if "maxIntegratedCacheStaleness" in options) instead of truthy check. This ensures value 0 is correctly transmitted as a header. Commit: cf89cc9