Skip to content

Commit e7a5247

Browse files
NoahStapptadjik1
andauthored
PYTHON-5528 - Token buckets disabled by default (#2713)
Co-authored-by: Sergey Zelenov <mail@zelenov.su>
1 parent 1d219a9 commit e7a5247

File tree

11 files changed

+259
-60
lines changed

11 files changed

+259
-60
lines changed

pymongo/asynchronous/helpers.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ async def inner(*args: Any, **kwargs: Any) -> Any:
7979
_MAX_RETRIES = 5
8080
_BACKOFF_INITIAL = 0.1
8181
_BACKOFF_MAX = 10
82-
# DRIVERS-3240 will determine these defaults.
8382
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
8483
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8584

@@ -101,7 +100,6 @@ def __init__(
101100
):
102101
self.lock = _async_create_lock()
103102
self.capacity = capacity
104-
# DRIVERS-3240 will determine how full the bucket should start.
105103
self.tokens = capacity
106104
self.return_rate = return_rate
107105

@@ -123,7 +121,7 @@ async def deposit(self, retry: bool = False) -> None:
123121
class _RetryPolicy:
124122
"""A retry limiter that performs exponential backoff with jitter.
125123
126-
Retry attempts are limited by a token bucket to prevent overwhelming the server during
124+
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
127125
a prolonged outage or high load.
128126
"""
129127

@@ -133,15 +131,18 @@ def __init__(
133131
attempts: int = _MAX_RETRIES,
134132
backoff_initial: float = _BACKOFF_INITIAL,
135133
backoff_max: float = _BACKOFF_MAX,
134+
adaptive_retry: bool = False,
136135
):
137136
self.token_bucket = token_bucket
138137
self.attempts = attempts
139138
self.backoff_initial = backoff_initial
140139
self.backoff_max = backoff_max
140+
self.adaptive_retry = adaptive_retry
141141

142142
async def record_success(self, retry: bool) -> None:
143143
"""Record a successful operation."""
144-
await self.token_bucket.deposit(retry)
144+
if self.adaptive_retry:
145+
await self.token_bucket.deposit(retry)
145146

146147
def backoff(self, attempt: int) -> float:
147148
"""Return the backoff duration for the given ."""
@@ -158,7 +159,7 @@ async def should_retry(self, attempt: int, delay: float) -> bool:
158159
return False
159160

160161
# Check token bucket last since we only want to consume a token if we actually retry.
161-
if not await self.token_bucket.consume():
162+
if self.adaptive_retry and not await self.token_bucket.consume():
162163
# DRIVERS-3246 Improve diagnostics when this case happens.
163164
# We could add info to the exception and log.
164165
return False

pymongo/asynchronous/mongo_client.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,8 +615,18 @@ def __init__(
615615
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
616616
details.
617617
618+
| **Adaptive retry options:**
619+
| (If not enabled explicitly, adaptive retries will not be enabled.)
620+
621+
- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
622+
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
623+
Defaults to ``False``.
624+
618625
.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.
619626
627+
.. versionchanged:: 4.17
628+
Added the ``adaptive_retries`` URI and keyword argument.
629+
620630
.. versionchanged:: 4.5
621631
Added the ``serverMonitoringMode`` keyword argument.
622632
@@ -778,7 +788,6 @@ def __init__(
778788
self._timeout: float | None = None
779789
self._topology_settings: TopologySettings = None # type: ignore[assignment]
780790
self._event_listeners: _EventListeners | None = None
781-
self._retry_policy = _RetryPolicy(_TokenBucket())
782791

783792
# _pool_class, _monitor_class, and _condition_class are for deep
784793
# customization of PyMongo, e.g. Motor.
@@ -890,6 +899,10 @@ def __init__(
890899
self._opened = False
891900
self._closed = False
892901
self._loop: Optional[asyncio.AbstractEventLoop] = None
902+
903+
self._retry_policy = _RetryPolicy(
904+
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
905+
)
893906
if not is_srv:
894907
self._init_background()
895908

pymongo/client_options.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,11 @@ def __init__(
235235
self.__server_monitoring_mode = options.get(
236236
"servermonitoringmode", common.SERVER_MONITORING_MODE
237237
)
238+
self.__adaptive_retries = (
239+
options.get("adaptive_retries", common.ADAPTIVE_RETRIES)
240+
if "adaptive_retries" in options
241+
else options.get("adaptiveretries", common.ADAPTIVE_RETRIES)
242+
)
238243

239244
@property
240245
def _options(self) -> Mapping[str, Any]:
@@ -346,3 +351,11 @@ def server_monitoring_mode(self) -> str:
346351
.. versionadded:: 4.5
347352
"""
348353
return self.__server_monitoring_mode
354+
355+
@property
356+
def adaptive_retries(self) -> bool:
357+
"""The configured adaptiveRetries option.
358+
359+
.. versionadded:: 4.XX
360+
"""
361+
return self.__adaptive_retries

pymongo/common.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,9 @@
140140
# Default value for serverMonitoringMode
141141
SERVER_MONITORING_MODE = "auto" # poll/stream/auto
142142

143+
# Default value for adaptiveRetries
144+
ADAPTIVE_RETRIES = False
145+
143146
# Auth mechanism properties that must raise an error instead of warning if they invalidate.
144147
_MECH_PROP_MUST_RAISE = ["CANONICALIZE_HOST_NAME"]
145148

@@ -738,6 +741,7 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
738741
"srvmaxhosts": validate_non_negative_integer,
739742
"timeoutms": validate_timeoutms,
740743
"servermonitoringmode": validate_server_monitoring_mode,
744+
"adaptiveretries": validate_boolean_or_string,
741745
}
742746

743747
# Dictionary where keys are the names of URI options specific to pymongo,
@@ -771,6 +775,7 @@ def validate_server_monitoring_mode(option: str, value: str) -> str:
771775
"server_selector": validate_is_callable_or_none,
772776
"auto_encryption_opts": validate_auto_encryption_opts_or_none,
773777
"authoidcallowedhosts": validate_list,
778+
"adaptive_retries": validate_boolean_or_string,
774779
}
775780

776781
# Dictionary where keys are any URI option name, and values are the

pymongo/synchronous/helpers.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@ def inner(*args: Any, **kwargs: Any) -> Any:
7979
_MAX_RETRIES = 5
8080
_BACKOFF_INITIAL = 0.1
8181
_BACKOFF_MAX = 10
82-
# DRIVERS-3240 will determine these defaults.
8382
DEFAULT_RETRY_TOKEN_CAPACITY = 1000.0
8483
DEFAULT_RETRY_TOKEN_RETURN = 0.1
8584

@@ -101,7 +100,6 @@ def __init__(
101100
):
102101
self.lock = _create_lock()
103102
self.capacity = capacity
104-
# DRIVERS-3240 will determine how full the bucket should start.
105103
self.tokens = capacity
106104
self.return_rate = return_rate
107105

@@ -123,7 +121,7 @@ def deposit(self, retry: bool = False) -> None:
123121
class _RetryPolicy:
124122
"""A retry limiter that performs exponential backoff with jitter.
125123
126-
Retry attempts are limited by a token bucket to prevent overwhelming the server during
124+
When adaptive retries are enabled, retry attempts are limited by a token bucket to prevent overwhelming the server during
127125
a prolonged outage or high load.
128126
"""
129127

@@ -133,15 +131,18 @@ def __init__(
133131
attempts: int = _MAX_RETRIES,
134132
backoff_initial: float = _BACKOFF_INITIAL,
135133
backoff_max: float = _BACKOFF_MAX,
134+
adaptive_retry: bool = False,
136135
):
137136
self.token_bucket = token_bucket
138137
self.attempts = attempts
139138
self.backoff_initial = backoff_initial
140139
self.backoff_max = backoff_max
140+
self.adaptive_retry = adaptive_retry
141141

142142
def record_success(self, retry: bool) -> None:
143143
"""Record a successful operation."""
144-
self.token_bucket.deposit(retry)
144+
if self.adaptive_retry:
145+
self.token_bucket.deposit(retry)
145146

146147
def backoff(self, attempt: int) -> float:
147148
"""Return the backoff duration for the given ."""
@@ -158,7 +159,7 @@ def should_retry(self, attempt: int, delay: float) -> bool:
158159
return False
159160

160161
# Check token bucket last since we only want to consume a token if we actually retry.
161-
if not self.token_bucket.consume():
162+
if self.adaptive_retry and not self.token_bucket.consume():
162163
# DRIVERS-3246 Improve diagnostics when this case happens.
163164
# We could add info to the exception and log.
164165
return False

pymongo/synchronous/mongo_client.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -615,8 +615,18 @@ def __init__(
615615
client to use Stable API. See `versioned API <https://www.mongodb.com/docs/manual/reference/stable-api/#what-is-the-stable-api--and-should-you-use-it->`_ for
616616
details.
617617
618+
| **Adaptive retry options:**
619+
| (If not enabled explicitly, adaptive retries will not be enabled.)
620+
621+
- `adaptive_retries`: (boolean) Whether the adaptive retry mechanism is enabled for this client.
622+
If enabled, server overload errors will use a token-bucket based system to mitigate further overload.
623+
Defaults to ``False``.
624+
618625
.. seealso:: The MongoDB documentation on `connections <https://dochub.mongodb.org/core/connections>`_.
619626
627+
.. versionchanged:: 4.17
628+
Added the ``adaptive_retries`` URI and keyword argument.
629+
620630
.. versionchanged:: 4.5
621631
Added the ``serverMonitoringMode`` keyword argument.
622632
@@ -778,7 +788,6 @@ def __init__(
778788
self._timeout: float | None = None
779789
self._topology_settings: TopologySettings = None # type: ignore[assignment]
780790
self._event_listeners: _EventListeners | None = None
781-
self._retry_policy = _RetryPolicy(_TokenBucket())
782791

783792
# _pool_class, _monitor_class, and _condition_class are for deep
784793
# customization of PyMongo, e.g. Motor.
@@ -890,6 +899,10 @@ def __init__(
890899
self._opened = False
891900
self._closed = False
892901
self._loop: Optional[asyncio.AbstractEventLoop] = None
902+
903+
self._retry_policy = _RetryPolicy(
904+
_TokenBucket(), adaptive_retry=self._options.adaptive_retries
905+
)
893906
if not is_srv:
894907
self._init_background()
895908

test/asynchronous/test_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,21 @@ async def test_detected_environment_warning(self, mock_get_hosts):
652652
with self.assertWarns(UserWarning):
653653
self.simple_client(multi_host)
654654

655+
async def test_adaptive_retries(self):
656+
# Assert that adaptive retries are disabled by default.
657+
c = self.simple_client(connect=False)
658+
self.assertFalse(c.options.adaptive_retries)
659+
660+
# Assert that adaptive retries can be enabled through connection or client options.
661+
c = self.simple_client(connect=False, adaptive_retries=True)
662+
self.assertTrue(c.options.adaptive_retries)
663+
664+
c = self.simple_client(connect=False, adaptiveRetries=True)
665+
self.assertTrue(c.options.adaptive_retries)
666+
667+
c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False)
668+
self.assertTrue(c.options.adaptive_retries)
669+
655670

656671
class TestClient(AsyncIntegrationTest):
657672
def test_multiple_uris(self):

test/asynchronous/test_client_backpressure.py

Lines changed: 69 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -168,34 +168,11 @@ async def test_retry_overload_error_getMore(self):
168168
self.assertIn("RetryableError", str(error.exception))
169169
self.assertIn("SystemOverloadedError", str(error.exception))
170170

171-
@async_client_context.require_failCommand_appName
172-
async def test_limit_retry_command(self):
173-
client = await self.async_rs_or_single_client()
174-
client._retry_policy.token_bucket.tokens = 1
175-
db = client.pymongo_test
176-
await db.t.insert_one({"x": 1})
177-
178-
# Ensure command is retried once overload error.
179-
fail_many = mock_overload_error.copy()
180-
fail_many["mode"] = {"times": 1}
181-
async with self.fail_point(fail_many):
182-
await db.command("find", "t")
183-
184-
# Ensure command stops retrying when there are no tokens left.
185-
fail_too_many = mock_overload_error.copy()
186-
fail_too_many["mode"] = {"times": 2}
187-
async with self.fail_point(fail_too_many):
188-
with self.assertRaises(PyMongoError) as error:
189-
await db.command("find", "t")
190-
191-
self.assertIn("RetryableError", str(error.exception))
192-
self.assertIn("SystemOverloadedError", str(error.exception))
193-
194171

195172
class TestRetryPolicy(AsyncPyMongoTestCase):
196173
async def test_retry_policy(self):
197174
capacity = 10
198-
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity))
175+
retry_policy = _RetryPolicy(_TokenBucket(capacity=capacity), adaptive_retry=True)
199176
self.assertEqual(retry_policy.attempts, helpers._MAX_RETRIES)
200177
self.assertEqual(retry_policy.backoff_initial, helpers._BACKOFF_INITIAL)
201178
self.assertEqual(retry_policy.backoff_max, helpers._BACKOFF_MAX)
@@ -300,6 +277,74 @@ async def test_01_operation_retry_uses_exponential_backoff(self, random_func):
300277
# runs.
301278
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 3.1)) < 1)
302279

280+
@async_client_context.require_failCommand_appName
281+
async def test_03_overload_retries_limited(self):
282+
# Drivers should test that without adaptive retries enabled, overload errors are retried a maximum of five times.
283+
284+
# 1. Let `client` be a `MongoClient`.
285+
client = self.client
286+
# 2. Let `coll` be a collection.
287+
coll = client.pymongo_test.coll
288+
289+
# 3. Configure the following failpoint:
290+
failpoint = {
291+
"configureFailPoint": "failCommand",
292+
"mode": "alwaysOn",
293+
"data": {
294+
"failCommands": ["find"],
295+
"errorCode": 462, # IngressRequestRateLimitExceeded
296+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
297+
},
298+
}
299+
300+
# 4. Perform a find operation with `coll` that fails.
301+
async with self.fail_point(failpoint):
302+
with self.assertRaises(PyMongoError) as error:
303+
await coll.find_one({})
304+
305+
# 5. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
306+
self.assertIn("RetryableError", str(error.exception))
307+
self.assertIn("SystemOverloadedError", str(error.exception))
308+
309+
# 6. Assert that the total number of started commands is MAX_RETRIES + 1.
310+
self.assertEqual(len(self.listener.started_events), _MAX_RETRIES + 1)
311+
312+
@async_client_context.require_failCommand_appName
313+
async def test_04_adaptive_retries_limited_by_tokens(self):
314+
# Drivers should test that when enabled, adaptive retries are limited by the number of tokens in the bucket.
315+
316+
# 1. Let `client` be a `MongoClient` with adaptiveRetries=True.
317+
client = await self.async_rs_or_single_client(
318+
adaptive_retries=True, event_listeners=[self.listener]
319+
)
320+
# 2. Set `client`'s retry token bucket to have 2 tokens.
321+
client._retry_policy.token_bucket.tokens = 2
322+
# 3. Let `coll` be a collection.
323+
coll = client.pymongo_test.coll
324+
325+
# 4. Configure the following failpoint:
326+
failpoint = {
327+
"configureFailPoint": "failCommand",
328+
"mode": {"times": 3},
329+
"data": {
330+
"failCommands": ["find"],
331+
"errorCode": 462, # IngressRequestRateLimitExceeded
332+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
333+
},
334+
}
335+
336+
# 5. Perform a find operation with `coll` that fails.
337+
async with self.fail_point(failpoint):
338+
with self.assertRaises(PyMongoError) as error:
339+
await coll.find_one({})
340+
341+
# 6. Assert that the raised error contains both the `RetryableError` and `SystemOverLoadedError` error labels.
342+
self.assertIn("RetryableError", str(error.exception))
343+
self.assertIn("SystemOverloadedError", str(error.exception))
344+
345+
# 7. Assert that the total number of started commands is 3: one for the initial attempt and two for the retries.
346+
self.assertEqual(len(self.listener.started_events), 3)
347+
303348

304349
# Location of JSON test specifications.
305350
if _IS_SYNC:

test/test_client.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -645,6 +645,21 @@ def test_detected_environment_warning(self, mock_get_hosts):
645645
with self.assertWarns(UserWarning):
646646
self.simple_client(multi_host)
647647

648+
def test_adaptive_retries(self):
649+
# Assert that adaptive retries are disabled by default.
650+
c = self.simple_client(connect=False)
651+
self.assertFalse(c.options.adaptive_retries)
652+
653+
# Assert that adaptive retries can be enabled through connection or client options.
654+
c = self.simple_client(connect=False, adaptive_retries=True)
655+
self.assertTrue(c.options.adaptive_retries)
656+
657+
c = self.simple_client(connect=False, adaptiveRetries=True)
658+
self.assertTrue(c.options.adaptive_retries)
659+
660+
c = self.simple_client(host="mongodb://localhost/?adaptiveretries=true", connect=False)
661+
self.assertTrue(c.options.adaptive_retries)
662+
648663

649664
class TestClient(IntegrationTest):
650665
def test_multiple_uris(self):

0 commit comments

Comments
 (0)