-
Notifications
You must be signed in to change notification settings - Fork 3.3k
Add Cosmos DB dedicated gateway bypass cache and shard key support #44846
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
4c3e9d1
65f4491
87148ce
470329a
1a304ef
d404df8
7d04430
59941d3
cf89cc9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -39,7 +39,7 @@ | |
| from .. import _utils as utils | ||
| from .._availability_strategy_config import _validate_hedging_config | ||
| from .._base import (_build_properties_cache, _deserialize_throughput, _replace_throughput, | ||
| build_options as _build_options, GenerateGuidId, validate_cache_staleness_value) | ||
| build_options as _build_options, GenerateGuidId, validate_cache_staleness_value, validate_shard_key_value) | ||
| from .._change_feed.feed_range_internal import FeedRangeInternalEpk | ||
|
|
||
| from .._cosmos_responses import CosmosDict, CosmosList | ||
|
|
@@ -321,6 +321,8 @@ async def read_item( | |
| session_token: Optional[str] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| priority: Optional[Literal["High", "Low"]] = None, | ||
| throughput_bucket: Optional[int] = None, | ||
| availability_strategy_config: Optional[dict[str, Any]] = _Unset, | ||
|
|
@@ -342,6 +344,13 @@ async def read_item( | |
| :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in | ||
| milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, | ||
| responses are guaranteed to be no staler than this value. | ||
| :keyword bool bypass_integrated_cache: If set to True, the read will be served by the backend and won't be | ||
| cached in the dedicated gateway. If set to False or not provided, the integrated cache will be used if | ||
| configured. This option only applies to accounts configured with dedicated gateway. | ||
| :keyword str dedicated_gateway_shard_key: The shard key to use for the dedicated gateway request. Specifying | ||
| the shard key will help route the request to an instance that has cached data for this shard or bypass | ||
| the SQLx cache if the correlated instance is down. If not specified, the request will fall back to | ||
| randomly selecting an instance. This option only applies to accounts configured with dedicated gateway. | ||
| :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each | ||
| request. Once the user has reached their provisioned throughput, low priority requests are throttled | ||
| before high priority requests start getting throttled. Feature must first be enabled at the account level. | ||
|
|
@@ -386,6 +395,11 @@ async def read_item( | |
| if max_integrated_cache_staleness_in_ms is not None: | ||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||
| request_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||
| if bypass_integrated_cache is not None: | ||
| request_options["bypassIntegratedCache"] = bypass_integrated_cache | ||
| if dedicated_gateway_shard_key is not None: | ||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||
| request_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||
| await self._get_properties_with_options(request_options) | ||
| request_options["containerRID"] = self.__get_client_container_caches()[self.container_link]["_rid"] | ||
|
|
||
|
|
@@ -399,6 +413,8 @@ def read_all_items( | |
| session_token: Optional[str] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
|
Comment on lines
413
to
+417
|
||
| priority: Optional[Literal["High", "Low"]] = None, | ||
| throughput_bucket: Optional[int] = None, | ||
| availability_strategy_config: Optional[dict[str, Any]] = _Unset, | ||
|
|
@@ -414,6 +430,13 @@ def read_all_items( | |
| :keyword int max_integrated_cache_staleness_in_ms: The max cache staleness for the integrated cache in | ||
| milliseconds. For accounts configured to use the integrated cache, using Session or Eventual consistency, | ||
| responses are guaranteed to be no staler than this value. | ||
| :keyword bool bypass_integrated_cache: If set to True, the read will be served by the backend and won't be | ||
| cached in the dedicated gateway. If set to False or not provided, the integrated cache will be used if | ||
| configured. This option only applies to accounts configured with dedicated gateway. | ||
| :keyword str dedicated_gateway_shard_key: The shard key to use for the dedicated gateway request. Specifying | ||
| the shard key will help route the request to an instance that has cached data for this shard or bypass | ||
| the SQLx cache if the correlated instance is down. If not specified, the request will fall back to | ||
| randomly selecting an instance. This option only applies to accounts configured with dedicated gateway. | ||
| :keyword Literal["High", "Low"] priority: Priority based execution allows users to set a priority for each | ||
| request. Once the user has reached their provisioned throughput, low priority requests are throttled | ||
| before high priority requests start getting throttled. Feature must first be enabled at the account level. | ||
|
|
@@ -442,9 +465,14 @@ def read_all_items( | |
| feed_options = _build_options(kwargs) | ||
| if max_item_count is not None: | ||
| feed_options["maxItemCount"] = max_item_count | ||
| if max_integrated_cache_staleness_in_ms: | ||
| if max_integrated_cache_staleness_in_ms is not None: | ||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||
| feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||
| if bypass_integrated_cache is not None: | ||
| feed_options["bypassIntegratedCache"] = bypass_integrated_cache | ||
| if dedicated_gateway_shard_key is not None: | ||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||
| feed_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||
| response_hook = kwargs.pop("response_hook", None) | ||
| if response_hook and hasattr(response_hook, "clear"): | ||
| response_hook.clear() | ||
|
|
@@ -537,6 +565,8 @@ def query_items( | |
| enable_scan_in_query: Optional[bool] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| max_item_count: Optional[int] = None, | ||
| parameters: Optional[list[dict[str, object]]] = None, | ||
| partition_key: PartitionKeyType, | ||
|
|
@@ -624,6 +654,8 @@ def query_items( | |
| feed_range: dict[str, Any], | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| max_item_count: Optional[int] = None, | ||
| parameters: Optional[list[dict[str, object]]] = None, | ||
| populate_index_metrics: Optional[bool] = None, | ||
|
|
@@ -706,6 +738,8 @@ def query_items( | |
| enable_scan_in_query: Optional[bool] = None, | ||
| initial_headers: Optional[dict[str, str]] = None, | ||
| max_integrated_cache_staleness_in_ms: Optional[int] = None, | ||
| bypass_integrated_cache: Optional[bool] = None, | ||
| dedicated_gateway_shard_key: Optional[str] = None, | ||
| max_item_count: Optional[int] = None, | ||
| parameters: Optional[list[dict[str, object]]] = None, | ||
| populate_index_metrics: Optional[bool] = None, | ||
|
|
@@ -868,6 +902,13 @@ def query_items( | |
| max_integrated_cache_staleness_in_ms = kwargs.pop("max_integrated_cache_staleness_in_ms") | ||
| validate_cache_staleness_value(max_integrated_cache_staleness_in_ms) | ||
| feed_options["maxIntegratedCacheStaleness"] = max_integrated_cache_staleness_in_ms | ||
| if utils.valid_key_value_exist(kwargs, "bypass_integrated_cache"): | ||
| bypass_integrated_cache = kwargs.pop("bypass_integrated_cache") | ||
| feed_options["bypassIntegratedCache"] = bypass_integrated_cache | ||
| if utils.valid_key_value_exist(kwargs, "dedicated_gateway_shard_key"): | ||
| dedicated_gateway_shard_key = kwargs.pop("dedicated_gateway_shard_key") | ||
| validate_shard_key_value(dedicated_gateway_shard_key) | ||
| feed_options["dedicatedGatewayShardKey"] = dedicated_gateway_shard_key | ||
|
Comment on lines
+905
to
+911
|
||
| if utils.valid_key_value_exist(kwargs, "continuation_token_limit"): | ||
| feed_options["responseContinuationTokenLimitInKb"] = kwargs.pop("continuation_token_limit") | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.