Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pymongo/asynchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2779,7 +2779,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2852,6 +2852,8 @@ async def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2890,6 +2892,8 @@ async def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2943,7 +2947,9 @@ async def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
10 changes: 8 additions & 2 deletions pymongo/synchronous/mongo_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2769,7 +2769,7 @@ def __init__(
self._last_error: Optional[Exception] = None
self._retrying = False
self._always_retryable = False
self._multiple_retries = _csot.get_timeout() is not None
self._max_retries = float("inf") if _csot.get_timeout() is not None else 1
self._client = mongo_client
self._retry_policy = mongo_client._retry_policy
self._func = func
Expand Down Expand Up @@ -2842,6 +2842,8 @@ def run(self) -> T:
# ConnectionFailures do not supply a code property
exc_code = getattr(exc, "code", None)
overloaded = exc.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc.has_error_label("RetryableError") and overloaded
if not self._client.options.retry_reads or (
not always_retryable
Expand Down Expand Up @@ -2880,6 +2882,8 @@ def run(self) -> T:
exc_to_check = exc.error
retryable_write_label = exc_to_check.has_error_label("RetryableWriteError")
overloaded = exc_to_check.has_error_label("SystemOverloadedError")
if overloaded:
self._max_retries = self._client.options.max_adaptive_retries
always_retryable = exc_to_check.has_error_label("RetryableError") and overloaded

# Always retry abortTransaction and commitTransaction up to once
Expand Down Expand Up @@ -2933,7 +2937,9 @@ def run(self) -> T:

def _is_not_eligible_for_retry(self) -> bool:
"""Checks if the exchange is not eligible for retry"""
return not self._retryable or (self._is_retrying() and not self._multiple_retries)
return not self._retryable or (
self._is_retrying() and self._attempt_number >= self._max_retries
)

def _is_retrying(self) -> bool:
"""Checks if the exchange is currently undergoing a retry"""
Expand Down
39 changes: 37 additions & 2 deletions test/asynchronous/test_client_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pathlib
import sys
from time import perf_counter
from unittest import mock
from unittest.mock import patch

from pymongo.common import MAX_ADAPTIVE_RETRIES
Expand Down Expand Up @@ -228,7 +229,7 @@ async def test_01_operation_retry_uses_exponential_backoff(self, random_func):
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 0.3)) < 0.3)

@async_client_context.require_failCommand_appName
async def test_03_overload_retries_limited(self):
async def test_02_overload_retries_limited(self):
# Drivers should test that overload errors are retried a maximum of two times.

# 1. Let `client` be a `MongoClient`.
Expand Down Expand Up @@ -260,7 +261,7 @@ async def test_03_overload_retries_limited(self):
self.assertEqual(len(self.listener.started_events), MAX_ADAPTIVE_RETRIES + 1)

@async_client_context.require_failCommand_appName
async def test_04_overload_retries_limited_configured(self):
async def test_03_overload_retries_limited_configured(self):
# Drivers should test that overload errors are retried a maximum of maxAdaptiveRetries times.
max_retries = 1

Expand Down Expand Up @@ -294,6 +295,40 @@ async def test_04_overload_retries_limited_configured(self):
# 6. Assert that the total number of started commands is max_retries + 1.
self.assertEqual(len(self.listener.started_events), max_retries + 1)

@async_client_context.require_failCommand_fail_point
async def test_04_backoff_is_not_applied_for_non_overload_errors(self):
# Drivers should test that backoff is not applied for non-overload retryable errors.
if _IS_SYNC:
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
else:
mock_target = "pymongo.asynchronous.helpers._RetryPolicy.backoff"

# 1. Let `client` be a `MongoClient`.
client = self.client

# 2. Let `coll` be a collection.
coll = client.test.test
await coll.insert_one({})

# 3. Configure a failpoint with a retryable error that is NOT an overload error.
failpoint = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorCode": 91, # ShutdownInProgress
"errorLabels": ["RetryableError"],
},
}

# 4. Perform a find operation with `coll` that succeeds on its first retry attempt.
with mock.patch(mock_target, return_value=1) as mock_backoff:
async with self.fail_point(failpoint):
await coll.find_one({})

# 5. Assert that no backoff was used for the retry attempt.
mock_backoff.assert_not_called()


# Location of JSON test specifications.
if _IS_SYNC:
Expand Down
71 changes: 70 additions & 1 deletion test/asynchronous/test_retryable_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import threading
from test.asynchronous.utils import async_set_fail_point

from pymongo.errors import OperationFailure
from pymongo import MongoClient
from pymongo.common import MAX_ADAPTIVE_RETRIES
from pymongo.errors import OperationFailure, PyMongoError

sys.path[0:0] = [""]

Expand All @@ -38,6 +40,7 @@
)

from pymongo.monitoring import (
CommandFailedEvent,
ConnectionCheckedOutEvent,
ConnectionCheckOutFailedEvent,
ConnectionCheckOutFailedReason,
Expand Down Expand Up @@ -145,6 +148,20 @@ async def test_pool_paused_error_is_retryable(self):


class TestRetryableReads(AsyncIntegrationTest):
async def asyncSetUp(self) -> None:
await super().asyncSetUp()
self.setup_client = MongoClient(**async_client_context.client_options)
self.addCleanup(self.setup_client.close)

# TODO: After PYTHON-4595 we can use async event handlers and remove this workaround.
def configure_fail_point_sync(self, command_args, off=False) -> None:
cmd = {"configureFailPoint": "failCommand"}
cmd.update(command_args)
Comment thread
NoahStapp marked this conversation as resolved.
Outdated
if off:
cmd["mode"] = "off"
cmd.pop("data", None)
self.setup_client.admin.command(cmd)

@async_client_context.require_multiple_mongoses
@async_client_context.require_failCommand_fail_point
async def test_retryable_reads_are_retried_on_a_different_mongos_when_one_is_available(self):
Expand Down Expand Up @@ -383,6 +400,58 @@ async def test_03_03_retryable_reads_caused_by_overload_errors_are_retried_on_th
# 6. Assert that both events occurred on the same server.
assert listener.failed_events[0].connection_id == listener.succeeded_events[0].connection_id

@async_client_context.require_failCommand_fail_point
@async_client_context.require_version_min(4, 4, 0) # type:ignore[untyped-decorator]
async def test_overload_then_nonoverload_retries_increased_reads(self) -> None:
# Create a client.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with only the `RetryableError` error label.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["find"],
"errorCode": 91,
"errorLabels": ["RetryableError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(event_listeners=[listener])
await client.test.test.insert_one({})

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)

Comment thread
NoahStapp marked this conversation as resolved.
with self.assertRaises(PyMongoError):
await client.test.test.find_one()

started_finds = [e for e in listener.started_events if e.command_name == "find"]
self.assertEqual(len(started_finds), MAX_ADAPTIVE_RETRIES + 1)


if __name__ == "__main__":
unittest.main()
51 changes: 51 additions & 0 deletions test/asynchronous/test_retryable_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import threading
from test.asynchronous.utils import async_set_fail_point, flaky

from pymongo.common import MAX_ADAPTIVE_RETRIES

sys.path[0:0] = [""]

from test.asynchronous import (
Expand Down Expand Up @@ -784,6 +786,55 @@ def failed(event: CommandFailedEvent) -> None:
# Assert that the error does not contain the error label `NoWritesPerformed`.
assert "NoWritesPerformed" not in exc.exception.errors["errorLabels"]

async def test_overload_then_nonoverload_retries_increased_writes(self) -> None:
# Create a client with retryWrites=true.
listener = OvertCommandListener()

# Configure the client to listen to CommandFailedEvents. In the attached listener, configure a fail point with error
# code `91` (ShutdownInProgress) and `RetryableError` and `SystemOverloadedError` labels.
overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["insert"],
"errorLabels": ["RetryableError", "SystemOverloadedError"],
"errorCode": 91,
},
}

# Configure a fail point with error code `91` (ShutdownInProgress) with the `RetryableError` and `RetryableWriteError` error labels.
non_overload_fail_point = {
"configureFailPoint": "failCommand",
"mode": "alwaysOn",
"data": {
"failCommands": ["insert"],
"errorCode": 91,
"errorLabels": ["RetryableError", "RetryableWriteError"],
},
}

def failed(event: CommandFailedEvent) -> None:
# Configure the fail point command only if the failed event is for the 91 error configured in step 2.
if listener.failed_events:
return
assert event.failure["code"] == 91
self.configure_fail_point_sync(non_overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
listener.failed_events.append(event)

listener.failed = failed

client = await self.async_rs_client(retryWrites=True, event_listeners=[listener])

self.configure_fail_point_sync(overload_fail_point)
self.addCleanup(self.configure_fail_point_sync, {}, off=True)
Comment thread
Jibola marked this conversation as resolved.

Comment thread
NoahStapp marked this conversation as resolved.
with self.assertRaises(PyMongoError):
await client.test.test.insert_one({"x": 1})

started_inserts = [e for e in listener.started_events if e.command_name == "insert"]
self.assertEqual(len(started_inserts), MAX_ADAPTIVE_RETRIES + 1)


if __name__ == "__main__":
unittest.main()
39 changes: 37 additions & 2 deletions test/test_client_backpressure.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import pathlib
import sys
from time import perf_counter
from unittest import mock
from unittest.mock import patch

from pymongo.common import MAX_ADAPTIVE_RETRIES
Expand Down Expand Up @@ -228,7 +229,7 @@ def test_01_operation_retry_uses_exponential_backoff(self, random_func):
self.assertTrue(abs((end1 - start1) - (end0 - start0 + 0.3)) < 0.3)

@client_context.require_failCommand_appName
def test_03_overload_retries_limited(self):
def test_02_overload_retries_limited(self):
# Drivers should test that overload errors are retried a maximum of two times.

# 1. Let `client` be a `MongoClient`.
Expand Down Expand Up @@ -260,7 +261,7 @@ def test_03_overload_retries_limited(self):
self.assertEqual(len(self.listener.started_events), MAX_ADAPTIVE_RETRIES + 1)

@client_context.require_failCommand_appName
def test_04_overload_retries_limited_configured(self):
def test_03_overload_retries_limited_configured(self):
# Drivers should test that overload errors are retried a maximum of maxAdaptiveRetries times.
max_retries = 1

Expand Down Expand Up @@ -292,6 +293,40 @@ def test_04_overload_retries_limited_configured(self):
# 6. Assert that the total number of started commands is max_retries + 1.
self.assertEqual(len(self.listener.started_events), max_retries + 1)

@client_context.require_failCommand_fail_point
def test_04_backoff_is_not_applied_for_non_overload_errors(self):
# Drivers should test that backoff is not applied for non-overload retryable errors.
if _IS_SYNC:
mock_target = "pymongo.synchronous.helpers._RetryPolicy.backoff"
else:
mock_target = "pymongo.helpers._RetryPolicy.backoff"

# 1. Let `client` be a `MongoClient`.
client = self.client

# 2. Let `coll` be a collection.
coll = client.test.test
coll.insert_one({})

# 3. Configure a failpoint with a retryable error that is NOT an overload error.
failpoint = {
"configureFailPoint": "failCommand",
"mode": {"times": 1},
"data": {
"failCommands": ["find"],
"errorCode": 91, # ShutdownInProgress
"errorLabels": ["RetryableError"],
},
}

# 4. Perform a find operation with `coll` that succeeds on its first retry attempt.
with mock.patch(mock_target, return_value=1) as mock_backoff:
with self.fail_point(failpoint):
coll.find_one({})

# 5. Assert that no backoff was used for the retry attempt.
mock_backoff.assert_not_called()


# Location of JSON test specifications.
if _IS_SYNC:
Expand Down
Loading
Loading