Skip to content

Commit 0a47a19

Browse files
authored
PYTHON-5752 - Only retry overload errors if retries are enabled (#2726)
1 parent cc5b9c4 commit 0a47a19

File tree

11 files changed

+3210
-766
lines changed

11 files changed

+3210
-766
lines changed

pymongo/asynchronous/collection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2946,6 +2946,7 @@ async def _aggregate(
29462946
session,
29472947
retryable=not cmd._performs_write,
29482948
operation=_Op.AGGREGATE,
2949+
is_aggregate_write=cmd._performs_write,
29492950
)
29502951

29512952
async def aggregate(

pymongo/asynchronous/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ async def inner(
952952
)
953953

954954
return await self._client._retryable_read(
955-
inner, read_preference, session, command_name, None, False
955+
inner, read_preference, session, command_name, None, False, is_run_command=True
956956
)
957957

958958
@_csot.apply

pymongo/asynchronous/mongo_client.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2010,6 +2010,8 @@ async def _retry_internal(
20102010
read_pref: Optional[_ServerMode] = None,
20112011
retryable: bool = False,
20122012
operation_id: Optional[int] = None,
2013+
is_run_command: bool = False,
2014+
is_aggregate_write: bool = False,
20132015
) -> T:
20142016
"""Internal retryable helper for all client transactions.
20152017
@@ -2021,6 +2023,8 @@ async def _retry_internal(
20212023
:param address: Server Address, defaults to None
20222024
:param read_pref: Topology of read operation, defaults to None
20232025
:param retryable: If the operation should be retried once, defaults to None
2026+
:param is_run_command: If this is a runCommand operation, defaults to False
2027+
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
20242028
20252029
:return: Output of the calling func()
20262030
"""
@@ -2035,6 +2039,8 @@ async def _retry_internal(
20352039
address=address,
20362040
retryable=retryable,
20372041
operation_id=operation_id,
2042+
is_run_command=is_run_command,
2043+
is_aggregate_write=is_aggregate_write,
20382044
).run()
20392045

20402046
async def _retryable_read(
@@ -2046,6 +2052,8 @@ async def _retryable_read(
20462052
address: Optional[_Address] = None,
20472053
retryable: bool = True,
20482054
operation_id: Optional[int] = None,
2055+
is_run_command: bool = False,
2056+
is_aggregate_write: bool = False,
20492057
) -> T:
20502058
"""Execute an operation with consecutive retries if possible
20512059
@@ -2061,6 +2069,8 @@ async def _retryable_read(
20612069
:param address: Optional address when sending a message, defaults to None
20622070
:param retryable: if we should attempt retries
20632071
(may not always be supported even if supplied), defaults to False
2072+
:param is_run_command: If this is a runCommand operation, defaults to False.
2073+
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
20642074
"""
20652075

20662076
# Ensure that the client supports retrying on reads and there is no session in
@@ -2079,6 +2089,8 @@ async def _retryable_read(
20792089
read_pref=read_pref,
20802090
retryable=retryable,
20812091
operation_id=operation_id,
2092+
is_run_command=is_run_command,
2093+
is_aggregate_write=is_aggregate_write,
20822094
)
20832095

20842096
async def _retryable_write(
@@ -2748,6 +2760,8 @@ def __init__(
27482760
address: Optional[_Address] = None,
27492761
retryable: bool = False,
27502762
operation_id: Optional[int] = None,
2763+
is_run_command: bool = False,
2764+
is_aggregate_write: bool = False,
27512765
):
27522766
self._last_error: Optional[Exception] = None
27532767
self._retrying = False
@@ -2770,6 +2784,8 @@ def __init__(
27702784
self._operation = operation
27712785
self._operation_id = operation_id
27722786
self._attempt_number = 0
2787+
self._is_run_command = is_run_command
2788+
self._is_aggregate_write = is_aggregate_write
27732789

27742790
async def run(self) -> T:
27752791
"""Runs the supplied func() and attempts a retry
@@ -2810,18 +2826,30 @@ async def run(self) -> T:
28102826
always_retryable = False
28112827
overloaded = False
28122828
exc_to_check = exc
2829+
2830+
if self._is_run_command and not (
2831+
self._client.options.retry_reads and self._client.options.retry_writes
2832+
):
2833+
raise
2834+
if self._is_aggregate_write and not self._client.options.retry_writes:
2835+
raise
2836+
28132837
# Execute specialized catch on read
28142838
if self._is_read:
28152839
if isinstance(exc, (ConnectionFailure, OperationFailure)):
28162840
# ConnectionFailures do not supply a code property
28172841
exc_code = getattr(exc, "code", None)
28182842
overloaded = exc.has_error_label("SystemOverloadedError")
28192843
always_retryable = exc.has_error_label("RetryableError") and overloaded
2820-
if not always_retryable and (
2821-
self._is_not_eligible_for_retry()
2822-
or (
2823-
isinstance(exc, OperationFailure)
2824-
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2844+
if (
2845+
not self._client.options.retry_reads
2846+
or not always_retryable
2847+
and (
2848+
self._is_not_eligible_for_retry()
2849+
or (
2850+
isinstance(exc, OperationFailure)
2851+
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2852+
)
28252853
)
28262854
):
28272855
raise
@@ -2852,7 +2880,12 @@ async def run(self) -> T:
28522880
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
28532881
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
28542882
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
2855-
if not self._retryable and not always_retryable:
2883+
2884+
# Always retry abortTransaction and commitTransaction up to once
2885+
if self._operation not in ["abortTransaction", "commitTransaction"] and (
2886+
not self._client.options.retry_writes
2887+
or not (self._retryable or always_retryable)
2888+
):
28562889
raise
28572890
if retryable_write_label or always_retryable:
28582891
assert self._session

pymongo/synchronous/collection.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2939,6 +2939,7 @@ def _aggregate(
29392939
session,
29402940
retryable=not cmd._performs_write,
29412941
operation=_Op.AGGREGATE,
2942+
is_aggregate_write=cmd._performs_write,
29422943
)
29432944

29442945
def aggregate(

pymongo/synchronous/database.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -952,7 +952,7 @@ def inner(
952952
)
953953

954954
return self._client._retryable_read(
955-
inner, read_preference, session, command_name, None, False
955+
inner, read_preference, session, command_name, None, False, is_run_command=True
956956
)
957957

958958
@_csot.apply

pymongo/synchronous/mongo_client.py

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2006,6 +2006,8 @@ def _retry_internal(
20062006
read_pref: Optional[_ServerMode] = None,
20072007
retryable: bool = False,
20082008
operation_id: Optional[int] = None,
2009+
is_run_command: bool = False,
2010+
is_aggregate_write: bool = False,
20092011
) -> T:
20102012
"""Internal retryable helper for all client transactions.
20112013
@@ -2017,6 +2019,8 @@ def _retry_internal(
20172019
:param address: Server Address, defaults to None
20182020
:param read_pref: Topology of read operation, defaults to None
20192021
:param retryable: If the operation should be retried once, defaults to None
2022+
:param is_run_command: If this is a runCommand operation, defaults to False
2023+
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
20202024
20212025
:return: Output of the calling func()
20222026
"""
@@ -2031,6 +2035,8 @@ def _retry_internal(
20312035
address=address,
20322036
retryable=retryable,
20332037
operation_id=operation_id,
2038+
is_run_command=is_run_command,
2039+
is_aggregate_write=is_aggregate_write,
20342040
).run()
20352041

20362042
def _retryable_read(
@@ -2042,6 +2048,8 @@ def _retryable_read(
20422048
address: Optional[_Address] = None,
20432049
retryable: bool = True,
20442050
operation_id: Optional[int] = None,
2051+
is_run_command: bool = False,
2052+
is_aggregate_write: bool = False,
20452053
) -> T:
20462054
"""Execute an operation with consecutive retries if possible
20472055
@@ -2057,6 +2065,8 @@ def _retryable_read(
20572065
:param address: Optional address when sending a message, defaults to None
20582066
:param retryable: if we should attempt retries
20592067
(may not always be supported even if supplied), defaults to False
2068+
:param is_run_command: If this is a runCommand operation, defaults to False.
2069+
:param is_aggregate_write: If this is a aggregate operation with a write, defaults to False.
20602070
"""
20612071

20622072
# Ensure that the client supports retrying on reads and there is no session in
@@ -2075,6 +2085,8 @@ def _retryable_read(
20752085
read_pref=read_pref,
20762086
retryable=retryable,
20772087
operation_id=operation_id,
2088+
is_run_command=is_run_command,
2089+
is_aggregate_write=is_aggregate_write,
20782090
)
20792091

20802092
def _retryable_write(
@@ -2738,6 +2750,8 @@ def __init__(
27382750
address: Optional[_Address] = None,
27392751
retryable: bool = False,
27402752
operation_id: Optional[int] = None,
2753+
is_run_command: bool = False,
2754+
is_aggregate_write: bool = False,
27412755
):
27422756
self._last_error: Optional[Exception] = None
27432757
self._retrying = False
@@ -2760,6 +2774,8 @@ def __init__(
27602774
self._operation = operation
27612775
self._operation_id = operation_id
27622776
self._attempt_number = 0
2777+
self._is_run_command = is_run_command
2778+
self._is_aggregate_write = is_aggregate_write
27632779

27642780
def run(self) -> T:
27652781
"""Runs the supplied func() and attempts a retry
@@ -2800,18 +2816,30 @@ def run(self) -> T:
28002816
always_retryable = False
28012817
overloaded = False
28022818
exc_to_check = exc
2819+
2820+
if self._is_run_command and not (
2821+
self._client.options.retry_reads and self._client.options.retry_writes
2822+
):
2823+
raise
2824+
if self._is_aggregate_write and not self._client.options.retry_writes:
2825+
raise
2826+
28032827
# Execute specialized catch on read
28042828
if self._is_read:
28052829
if isinstance(exc, (ConnectionFailure, OperationFailure)):
28062830
# ConnectionFailures do not supply a code property
28072831
exc_code = getattr(exc, "code", None)
28082832
overloaded = exc.has_error_label("SystemOverloadedError")
28092833
always_retryable = exc.has_error_label("RetryableError") and overloaded
2810-
if not always_retryable and (
2811-
self._is_not_eligible_for_retry()
2812-
or (
2813-
isinstance(exc, OperationFailure)
2814-
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2834+
if (
2835+
not self._client.options.retry_reads
2836+
or not always_retryable
2837+
and (
2838+
self._is_not_eligible_for_retry()
2839+
or (
2840+
isinstance(exc, OperationFailure)
2841+
and exc_code not in helpers_shared._RETRYABLE_ERROR_CODES
2842+
)
28152843
)
28162844
):
28172845
raise
@@ -2842,7 +2870,12 @@ def run(self) -> T:
28422870
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
28432871
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
28442872
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded
2845-
if not self._retryable and not always_retryable:
2873+
2874+
# Always retry abortTransaction and commitTransaction up to once
2875+
if self._operation not in ["abortTransaction", "commitTransaction"] and (
2876+
not self._client.options.retry_writes
2877+
or not (self._retryable or always_retryable)
2878+
):
28462879
raise
28472880
if retryable_write_label or always_retryable:
28482881
assert self._session

test/asynchronous/test_client_backpressure.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -227,7 +227,7 @@ async def asyncSetUp(self) -> None:
227227
self.listener.reset()
228228
self.app_name = self.__class__.__name__.lower()
229229
self.client = await self.async_rs_or_single_client(
230-
event_listeners=[self.listener], retryWrites=False, appName=self.app_name
230+
event_listeners=[self.listener], appName=self.app_name
231231
)
232232

233233
@patch("random.random")

0 commit comments

Comments
 (0)