Skip to content

Commit a39b361

Browse files
authored
PYTHON-5809 - Send afterClusterTime on writes in causally-consistent sessions (#2851)
1 parent 1a584e6 commit a39b361

18 files changed

Lines changed: 1683 additions & 193 deletions

.evergreen/resync-specs.sh

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,9 @@ do
117117
apm|APM|command-monitoring|command_monitoring)
118118
cpjson command-logging-and-monitoring/tests/monitoring command_monitoring
119119
;;
120+
causal|causal-consistency|causal_consistency)
121+
cpjson causal-consistency/tests/ causal_consistency
122+
;;
120123
command-logging|command_logging)
121124
cpjson command-logging-and-monitoring/tests/logging command_logging
122125
;;

.evergreen/spec-patch/PYTHON-5809.patch

Lines changed: 0 additions & 97 deletions
This file was deleted.

pymongo/asynchronous/client_session.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@
172172
WTimeoutError,
173173
)
174174
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
175+
from pymongo.operations import _WRITES_WITH_CLUSTER_TIME
175176
from pymongo.read_concern import ReadConcern
176177
from pymongo.read_preferences import ReadPreference, _ServerMode
177178
from pymongo.server_type import SERVER_TYPE
@@ -1111,7 +1112,12 @@ def _apply_to(
11111112
return
11121113
self._check_ended()
11131114
self._materialize(conn.logical_session_timeout_minutes)
1114-
if self.options.snapshot:
1115+
# Add afterClusterTime on snapshot reads or writes in causally-consistent sessions
1116+
if self.options.snapshot or (
1117+
self.options.causal_consistency
1118+
and not self.in_transaction
1119+
and operation in _WRITES_WITH_CLUSTER_TIME
1120+
):
11151121
self._update_read_concern(command, conn)
11161122

11171123
self._server_session.last_use = time.monotonic()

pymongo/operations.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,22 @@ class _Op(str, enum.Enum):
8282
TEST = "testOperation"
8383

8484

85+
_WRITES_WITH_CLUSTER_TIME = frozenset(
86+
{
87+
_Op.INSERT.value,
88+
_Op.UPDATE.value,
89+
_Op.FIND_AND_MODIFY.value,
90+
_Op.DELETE.value,
91+
_Op.BULK_WRITE.value,
92+
_Op.CREATE.value,
93+
_Op.CREATE_INDEXES.value,
94+
_Op.DROP.value,
95+
_Op.DROP_DATABASE.value,
96+
_Op.DROP_INDEXES.value,
97+
}
98+
)
99+
100+
85101
class InsertOne(Generic[_DocumentType]):
86102
"""Represents an insert_one operation."""
87103

pymongo/synchronous/client_session.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@
169169
WTimeoutError,
170170
)
171171
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
172+
from pymongo.operations import _WRITES_WITH_CLUSTER_TIME
172173
from pymongo.read_concern import ReadConcern
173174
from pymongo.read_preferences import ReadPreference, _ServerMode
174175
from pymongo.server_type import SERVER_TYPE
@@ -1107,7 +1108,12 @@ def _apply_to(
11071108
return
11081109
self._check_ended()
11091110
self._materialize(conn.logical_session_timeout_minutes)
1110-
if self.options.snapshot:
1111+
# Add afterClusterTime on snapshot reads or writes in causally-consistent sessions
1112+
if self.options.snapshot or (
1113+
self.options.causal_consistency
1114+
and not self.in_transaction
1115+
and operation in _WRITES_WITH_CLUSTER_TIME
1116+
):
11111117
self._update_read_concern(command, conn)
11121118

11131119
self._server_session.last_use = time.monotonic()
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Copyright 2021-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Test the causal consistency unified spec tests."""
16+
from __future__ import annotations
17+
18+
import os
19+
import pathlib
20+
import sys
21+
22+
sys.path[0:0] = [""]
23+
24+
from test import unittest
25+
from test.asynchronous.unified_format import generate_test_classes, get_test_path
26+
27+
_IS_SYNC = False
28+
29+
# Generate unified tests.
30+
globals().update(generate_test_classes(get_test_path("causal_consistency"), module=__name__))
31+
32+
if __name__ == "__main__":
33+
unittest.main()

test/asynchronous/test_session.py

Lines changed: 1 addition & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1138,52 +1138,7 @@ async def _test_no_read_concern(self, op):
11381138
self.assertIsNone(rc)
11391139

11401140
@async_client_context.require_no_standalone
1141-
async def test_writes_do_not_include_read_concern(self):
1142-
await self._test_no_read_concern(
1143-
lambda coll, session: coll.bulk_write([InsertOne[dict]({})], session=session)
1144-
)
1145-
await self._test_no_read_concern(lambda coll, session: coll.insert_one({}, session=session))
1146-
await self._test_no_read_concern(
1147-
lambda coll, session: coll.insert_many([{}], session=session)
1148-
)
1149-
await self._test_no_read_concern(
1150-
lambda coll, session: coll.replace_one({"_id": 1}, {"x": 1}, session=session)
1151-
)
1152-
await self._test_no_read_concern(
1153-
lambda coll, session: coll.update_one({}, {"$set": {"X": 1}}, session=session)
1154-
)
1155-
await self._test_no_read_concern(
1156-
lambda coll, session: coll.update_many({}, {"$set": {"x": 1}}, session=session)
1157-
)
1158-
await self._test_no_read_concern(lambda coll, session: coll.delete_one({}, session=session))
1159-
await self._test_no_read_concern(
1160-
lambda coll, session: coll.delete_many({}, session=session)
1161-
)
1162-
await self._test_no_read_concern(
1163-
lambda coll, session: coll.find_one_and_replace({"x": 1}, {"y": 1}, session=session)
1164-
)
1165-
await self._test_no_read_concern(
1166-
lambda coll, session: coll.find_one_and_update(
1167-
{"y": 1}, {"$set": {"x": 1}}, session=session
1168-
)
1169-
)
1170-
await self._test_no_read_concern(
1171-
lambda coll, session: coll.find_one_and_delete({"x": 1}, session=session)
1172-
)
1173-
await self._test_no_read_concern(
1174-
lambda coll, session: coll.create_index("foo", session=session)
1175-
)
1176-
await self._test_no_read_concern(
1177-
lambda coll, session: coll.create_indexes(
1178-
[IndexModel([("bar", ASCENDING)])], session=session
1179-
)
1180-
)
1181-
await self._test_no_read_concern(
1182-
lambda coll, session: coll.drop_index("foo_1", session=session)
1183-
)
1184-
await self._test_no_read_concern(lambda coll, session: coll.drop_indexes(session=session))
1185-
1186-
# Not a write, but explain also doesn't support readConcern.
1141+
async def test_explain_does_not_include_read_concern(self):
11871142
await self._test_no_read_concern(
11881143
lambda coll, session: coll.find({}, session=session).explain()
11891144
)

test/asynchronous/unified_format.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,12 @@ async def _databaseOperation_createChangeStream(self, target, *args, **kwargs):
761761
async def _collectionOperation_createChangeStream(self, target, *args, **kwargs):
762762
return await self.__entityOperation_createChangeStream(target, *args, **kwargs)
763763

764+
async def _clientOperation_dropDatabase(self, target, **kwargs):
765+
self.__raise_if_unsupported("dropDatabase", target, AsyncMongoClient)
766+
return await target.drop_database(
767+
name_or_database=kwargs.pop("database"), session=kwargs.pop("session", None)
768+
)
769+
764770
async def _databaseOperation_runCommand(self, target, **kwargs):
765771
self.__raise_if_unsupported("runCommand", target, AsyncDatabase)
766772
# Ensure the first key is the command name.

0 commit comments

Comments
 (0)