Skip to content

Commit 5da9183

Browse files
NoahStappCopilotJibola
authored
PYTHON-5794 - Add prose tests to verify correct retry behavior when a… (mongodb#2755)
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Jib <Jibzade@gmail.com>
1 parent 35e51a5 commit 5da9183

6 files changed

Lines changed: 490 additions & 6 deletions

File tree

pymongo/asynchronous/mongo_client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2779,7 +2779,7 @@ def __init__(
27792779
self._last_error: Optional[Exception] = None
27802780
self._retrying = False
27812781
self._always_retryable = False
2782-
self._multiple_retries = _csot.get_timeout() is not None
2782+
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
27832783
self._client = mongo_client
27842784
self._retry_policy = mongo_client._retry_policy
27852785
self._func = func
@@ -2852,6 +2852,8 @@ async def run(self) -> T:
28522852
# ConnectionFailures do not supply a code property
28532853
exc_code = getattr(exc, "code", None)
28542854
overloaded = exc.has_error_label("SystemOverloadedError")
2855+
if overloaded:
2856+
self._max_retries = self._client.options.max_adaptive_retries
28552857
always_retryable = exc.has_error_label("RetryableError") and overloaded
28562858
if not self._client.options.retry_reads or (
28572859
not always_retryable
@@ -2890,6 +2892,8 @@ async def run(self) -> T:
28902892
exc_to_check = exc.error
28912893
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
28922894
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
2895+
if overloaded:
2896+
self._max_retries = self._client.options.max_adaptive_retries
28932897
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
28942898

28952899
# Always retry abortTransaction and commitTransaction up to once
@@ -2943,7 +2947,9 @@ async def run(self) -> T:
29432947

29442948
def _is_not_eligible_for_retry(self) -> bool:
29452949
"""Checks if the exchange is not eligible for retry"""
2946-
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
2950+
return not self._retryable or (
2951+
self._is_retrying() and self._attempt_number >= self._max_retries
2952+
)
29472953

29482954
def _is_retrying(self) -> bool:
29492955
"""Checks if the exchange is currently undergoing a retry"""

pymongo/synchronous/mongo_client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2769,7 +2769,7 @@ def __init__(
27692769
self._last_error: Optional[Exception] = None
27702770
self._retrying = False
27712771
self._always_retryable = False
2772-
self._multiple_retries = _csot.get_timeout() is not None
2772+
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
27732773
self._client = mongo_client
27742774
self._retry_policy = mongo_client._retry_policy
27752775
self._func = func
@@ -2842,6 +2842,8 @@ def run(self) -> T:
28422842
# ConnectionFailures do not supply a code property
28432843
exc_code = getattr(exc, "code", None)
28442844
overloaded = exc.has_error_label("SystemOverloadedError")
2845+
if overloaded:
2846+
self._max_retries = self._client.options.max_adaptive_retries
28452847
always_retryable = exc.has_error_label("RetryableError") and overloaded
28462848
if not self._client.options.retry_reads or (
28472849
not always_retryable
@@ -2880,6 +2882,8 @@ def run(self) -> T:
28802882
exc_to_check = exc.error
28812883
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
28822884
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
2885+
if overloaded:
2886+
self._max_retries = self._client.options.max_adaptive_retries
28832887
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
28842888

28852889
# Always retry abortTransaction and commitTransaction up to once
@@ -2933,7 +2937,9 @@ def run(self) -> T:
29332937

29342938
def _is_not_eligible_for_retry(self) -> bool:
29352939
"""Checks if the exchange is not eligible for retry"""
2936-
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
2940+
return not self._retryable or (
2941+
self._is_retrying() and self._attempt_number >= self._max_retries
2942+
)
29372943

29382944
def _is_retrying(self) -> bool:
29392945
"""Checks if the exchange is currently undergoing a retry"""

test/asynchronous/test_retryable_reads.py

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import sys
2121
import threading
2222
from test.asynchronous.utils import async_set_fail_point
23+
from unittest import mock
2324

24-
from pymongo.errors import OperationFailure
25+
from pymongo import MongoClient
26+
from pymongo.common import MAX_ADAPTIVE_RETRIES
27+
from pymongo.errors import OperationFailure, PyMongoError
2528

2629
sys.path[0:0] = [""]
2730

@@ -38,6 +41,7 @@
3841
)
3942

4043
from pymongo.monitoring import (
44+
CommandFailedEvent,
4145
ConnectionCheckedOutEvent,
4246
ConnectionCheckOutFailedEvent,
4347
ConnectionCheckOutFailedReason,
@@ -145,6 +149,19 @@ async def test_pool_paused_error_is_retryable(self):
145149

146150

147151
class TestRetryableReads(AsyncIntegrationTest):
152+
async def asyncSetUp(self) -> None:
153+
await super().asyncSetUp()
154+
self.setup_client = MongoClient(**async_client_context.client_options)
155+
self.addCleanup(self.setup_client.close)
156+
157+
# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
158+
def configure_fail_point_sync(self, command_args, off=False) -> None:
159+
cmd = {"configureFailPoint": "failCommand", **command_args}
160+
if off:
161+
cmd["mode"] = "off"
162+
cmd.pop("data", None)
163+
self.setup_client.admin.command(cmd)
164+
148165
@async_client_context.require_multiple_mongoses
149166
@async_client_context.require_failCommand_fail_point
150167
async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
@@ -383,6 +400,117 @@ async def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_th
383400
# 6. Assert that both events occurred on the same server.
384401
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id
385402

403+
@async_client_context.require_failCommand_fail_point
404+
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
405+
async def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
406+
# Create a client.
407+
listener = OvertCommandListener()
408+
409+
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
410+
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
411+
overload_fail_point = {
412+
"configureFailPoint": "failCommand",
413+
"mode": {"times": 1},
414+
"data": {
415+
"failCommands": ["find"],
416+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
417+
"errorCode": 91,
418+
},
419+
}
420+
421+
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
422+
non_overload_fail_point = {
423+
"configureFailPoint": "failCommand",
424+
"mode": "alwaysOn",
425+
"data": {
426+
"failCommands": ["find"],
427+
"errorCode": 91,
428+
"errorLabels": ["RetryableError"],
429+
},
430+
}
431+
432+
def failed(event: CommandFailedEvent) -> None:
433+
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
434+
if listener.failed_events:
435+
return
436+
assert event.failure["code"] == 91
437+
self.configure_fail_point_sync(non_overload_fail_point)
438+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
439+
listener.failed_events.append(event)
440+
441+
listener.failed = failed
442+
443+
client = await self.async_rs_client(event_listeners=[listener])
444+
await client.test.test.insert_one({})
445+
446+
self.configure_fail_point_sync(overload_fail_point)
447+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
448+
449+
with self.assertRaises(PyMongoError):
450+
await client.test.test.find_one()
451+
452+
started_finds = [e for e in listener.started_events if e.command_name == "find"]
453+
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)
454+
455+
@async_client_context.require_failCommand_fail_point
456+
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
457+
async def test_backoff_is_not_applied_for_non_overload_errors(self):
458+
if _IS_SYNC:
459+
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
460+
else:
461+
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"
462+
463+
# Create a client.
464+
listener = OvertCommandListener()
465+
466+
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
467+
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
468+
overload_fail_point = {
469+
"configureFailPoint": "failCommand",
470+
"mode": {"times": 1},
471+
"data": {
472+
"failCommands": ["find"],
473+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
474+
"errorCode": 91,
475+
},
476+
}
477+
478+
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
479+
non_overload_fail_point = {
480+
"configureFailPoint": "failCommand",
481+
"mode": "alwaysOn",
482+
"data": {
483+
"failCommands": ["find"],
484+
"errorCode": 91,
485+
"errorLabels": ["RetryableError"],
486+
},
487+
}
488+
489+
def failed(event: CommandFailedEvent) -> None:
490+
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
491+
if listener.failed_events:
492+
return
493+
assert event.failure["code"] == 91
494+
self.configure_fail_point_sync(non_overload_fail_point)
495+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
496+
listener.failed_events.append(event)
497+
498+
listener.failed = failed
499+
500+
client = await self.async_rs_client(event_listeners=[listener])
501+
await client.test.test.insert_one({})
502+
503+
self.configure_fail_point_sync(overload_fail_point)
504+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
505+
506+
# Perform a findOne operation with coll. Expect the operation to fail.
507+
with mock.patch(mock_target, return_value=0) as mock_backoff:
508+
with self.assertRaises(PyMongoError):
509+
await client.test.test.find_one()
510+
511+
# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
512+
self.assertEqual(mock_backoff.call_count, 1)
513+
386514

387515
if __name__ == "__main__":
388516
unittest.main()

test/asynchronous/test_retryable_writes.py

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@
2121
import sys
2222
import threading
2323
from test.asynchronous.utils import async_set_fail_point, flaky
24+
from unittest import mock
25+
26+
from pymongo.common import MAX_ADAPTIVE_RETRIES
2427

2528
sys.path[0:0] = [""]
2629

@@ -784,6 +787,111 @@ def failed(event: CommandFailedEvent) -> None:
784787
# Assert that the error does not contain the error label `NoWritesPerformed`.
785788
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]
786789

790+
async def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
791+
# Create a client with retryWrites=true.
792+
listener = OvertCommandListener()
793+
794+
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
795+
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
796+
overload_fail_point = {
797+
"configureFailPoint": "failCommand",
798+
"mode": {"times": 1},
799+
"data": {
800+
"failCommands": ["insert"],
801+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
802+
"errorCode": 91,
803+
},
804+
}
805+
806+
# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
807+
non_overload_fail_point = {
808+
"configureFailPoint": "failCommand",
809+
"mode": "alwaysOn",
810+
"data": {
811+
"failCommands": ["insert"],
812+
"errorCode": 91,
813+
"errorLabels": ["RetryableError", "RetryableWriteError"],
814+
},
815+
}
816+
817+
def failed(event: CommandFailedEvent) -> None:
818+
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
819+
if listener.failed_events:
820+
return
821+
assert event.failure["code"] == 91
822+
self.configure_fail_point_sync(non_overload_fail_point)
823+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
824+
listener.failed_events.append(event)
825+
826+
listener.failed = failed
827+
828+
client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])
829+
830+
self.configure_fail_point_sync(overload_fail_point)
831+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
832+
833+
with self.assertRaises(PyMongoError):
834+
await client.test.test.insert_one({"x": 1})
835+
836+
started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
837+
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)
838+
839+
async def test_backoff_is_not_applied_for_non_overload_errors(self):
840+
if _IS_SYNC:
841+
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
842+
else:
843+
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"
844+
845+
# Create a client.
846+
listener = OvertCommandListener()
847+
848+
# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
849+
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
850+
overload_fail_point = {
851+
"configureFailPoint": "failCommand",
852+
"mode": {"times": 1},
853+
"data": {
854+
"failCommands": ["insert"],
855+
"errorLabels": ["RetryableError", "SystemOverloadedError"],
856+
"errorCode": 91,
857+
},
858+
}
859+
860+
# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
861+
non_overload_fail_point = {
862+
"configureFailPoint": "failCommand",
863+
"mode": "alwaysOn",
864+
"data": {
865+
"failCommands": ["insert"],
866+
"errorCode": 91,
867+
"errorLabels": ["RetryableError", "RetryableWriteError"],
868+
},
869+
}
870+
871+
def failed(event: CommandFailedEvent) -> None:
872+
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
873+
if listener.failed_events:
874+
return
875+
assert event.failure["code"] == 91
876+
self.configure_fail_point_sync(non_overload_fail_point)
877+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
878+
listener.failed_events.append(event)
879+
880+
listener.failed = failed
881+
882+
client = await self.async_rs_client(event_listeners=[listener])
883+
884+
self.configure_fail_point_sync(overload_fail_point)
885+
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
886+
887+
# Perform a findOne operation with coll. Expect the operation to fail.
888+
with mock.patch(mock_target, return_value=0) as mock_backoff:
889+
with self.assertRaises(PyMongoError):
890+
await client.test.test.insert_one({})
891+
892+
# Assert that backoff was applied only once for the initial overload error and not for the subsequent non-overload retryable errors.
893+
self.assertEqual(mock_backoff.call_count, 1)
894+
787895

788896
if __name__ == "__main__":
789897
unittest.main()

0 commit comments

Comments
 (0)