Skip to content

Commit e9058e3

Browse files
authored
PYTHON-5713 Allow using OP_MSG exclusively (#2799)
1 parent a24fc91 commit e9058e3

24 files changed

Lines changed: 288 additions & 453 deletions

pymongo/asynchronous/command_cursor.py

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,14 +24,13 @@
2424
NoReturn,
2525
Optional,
2626
Sequence,
27-
Union,
2827
)
2928

3029
from bson import CodecOptions, _convert_raw_document_lists_to_streams
3130
from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager
3231
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
3332
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
34-
from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore
33+
from pymongo.message import _GetMore, _OpMsg, _RawBatchGetMore
3534
from pymongo.response import PinnedResponse
3635
from pymongo.typings import _Address, _DocumentOut, _DocumentType
3736

@@ -145,7 +144,7 @@ async def _maybe_pin_connection(self, conn: AsyncConnection) -> None:
145144

146145
def _unpack_response(
147146
self,
148-
response: Union[_OpReply, _OpMsg],
147+
response: _OpMsg,
149148
cursor_id: Optional[int],
150149
codec_options: CodecOptions[Mapping[str, Any]],
151150
user_fields: Optional[Mapping[str, Any]] = None,
@@ -189,15 +188,10 @@ async def _send_message(self, operation: _GetMore) -> None:
189188
if isinstance(response, PinnedResponse):
190189
if not self._sock_mgr:
191190
self._sock_mgr = _ConnectionManager(response.conn, response.more_to_come) # type: ignore[arg-type]
192-
if response.from_command:
193-
cursor = response.docs[0]["cursor"]
194-
documents = cursor["nextBatch"]
195-
self._postbatchresumetoken = cursor.get("postBatchResumeToken")
196-
self._id = cursor["id"]
197-
else:
198-
documents = response.docs
199-
assert isinstance(response.data, _OpReply)
200-
self._id = response.data.cursor_id
191+
cursor = response.docs[0]["cursor"]
192+
documents = cursor["nextBatch"]
193+
self._postbatchresumetoken = cursor.get("postBatchResumeToken")
194+
self._id = cursor["id"]
201195

202196
if self._id == 0:
203197
await self.close()
@@ -333,7 +327,7 @@ def __init__(
333327

334328
def _unpack_response( # type: ignore[override]
335329
self,
336-
response: Union[_OpReply, _OpMsg],
330+
response: _OpMsg,
337331
cursor_id: Optional[int],
338332
codec_options: CodecOptions[dict[str, Any]],
339333
user_fields: Optional[Mapping[str, Any]] = None,

pymongo/asynchronous/cursor.py

Lines changed: 16 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@
4848
from pymongo.message import (
4949
_GetMore,
5050
_OpMsg,
51-
_OpReply,
5251
_Query,
5352
_RawBatchGetMore,
5453
_RawBatchQuery,
@@ -864,7 +863,7 @@ def collation(self, collation: Optional[_CollationIn]) -> AsyncCursor[_DocumentT
864863

865864
def _unpack_response(
866865
self,
867-
response: Union[_OpReply, _OpMsg],
866+
response: _OpMsg,
868867
cursor_id: Optional[int],
869868
codec_options: CodecOptions, # type: ignore[type-arg]
870869
user_fields: Optional[Mapping[str, Any]] = None,
@@ -1020,29 +1019,23 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None:
10201019

10211020
cmd_name = operation.name
10221021
docs = response.docs
1023-
if response.from_command:
1024-
if cmd_name != "explain":
1025-
cursor = docs[0]["cursor"]
1026-
self._id = cursor["id"]
1027-
if cmd_name == "find":
1028-
documents = cursor["firstBatch"]
1029-
# Update the namespace used for future getMore commands.
1030-
ns = cursor.get("ns")
1031-
if ns:
1032-
self._dbname, self._collname = ns.split(".", 1)
1033-
else:
1034-
documents = cursor["nextBatch"]
1035-
self._data = deque(documents)
1036-
self._retrieved += len(documents)
1022+
if cmd_name != "explain":
1023+
cursor = docs[0]["cursor"]
1024+
self._id = cursor["id"]
1025+
if cmd_name == "find":
1026+
documents = cursor["firstBatch"]
1027+
# Update the namespace used for future getMore commands.
1028+
ns = cursor.get("ns")
1029+
if ns:
1030+
self._dbname, self._collname = ns.split(".", 1)
10371031
else:
1038-
self._id = 0
1039-
self._data = deque(docs)
1040-
self._retrieved += len(docs)
1032+
documents = cursor["nextBatch"]
1033+
self._data = deque(documents)
1034+
self._retrieved += len(documents)
10411035
else:
1042-
assert isinstance(response.data, _OpReply)
1043-
self._id = response.data.cursor_id
1036+
self._id = 0
10441037
self._data = deque(docs)
1045-
self._retrieved += response.data.number_returned
1038+
self._retrieved += len(docs)
10461039

10471040
if self._id == 0:
10481041
# Don't wait for garbage collection to call __del__, return the
@@ -1195,7 +1188,7 @@ def __init__(
11951188

11961189
def _unpack_response(
11971190
self,
1198-
response: Union[_OpReply, _OpMsg],
1191+
response: _OpMsg,
11991192
cursor_id: Optional[int],
12001193
codec_options: CodecOptions[Mapping[str, Any]],
12011194
user_fields: Optional[Mapping[str, Any]] = None,

pymongo/asynchronous/network.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ async def command(
6262
conn: AsyncConnection,
6363
dbname: str,
6464
spec: MutableMapping[str, Any],
65-
is_mongos: bool,
65+
is_mongos: bool, # noqa: ARG001
6666
read_preference: Optional[_ServerMode],
6767
codec_options: CodecOptions[_DocumentType],
6868
session: Optional[AsyncClientSession],
@@ -76,7 +76,6 @@ async def command(
7676
parse_write_concern_error: bool = False,
7777
collation: Optional[_CollationIn] = None,
7878
compression_ctx: Union[SnappyContext, ZlibContext, ZstdContext, None] = None,
79-
use_op_msg: bool = False,
8079
unacknowledged: bool = False,
8180
user_fields: Optional[Mapping[str, Any]] = None,
8281
exhaust_allowed: bool = False,
@@ -102,22 +101,17 @@ async def command(
102101
field in the command response.
103102
:param collation: The collation for this command.
104103
:param compression_ctx: optional compression Context.
105-
:param use_op_msg: True if we should use OP_MSG.
106104
:param unacknowledged: True if this is an unacknowledged command.
107105
:param user_fields: Response fields that should be decoded
108106
using the TypeDecoders from codec_options, passed to
109107
bson._decode_all_selective.
110108
:param exhaust_allowed: True if we should enable OP_MSG exhaustAllowed.
111109
"""
112110
name = next(iter(spec))
113-
ns = dbname + ".$cmd"
114111
speculative_hello = False
115112

116113
# Publish the original command document, perhaps with lsid and $clusterTime.
117114
orig = spec
118-
if is_mongos and not use_op_msg:
119-
assert read_preference is not None
120-
spec = message._maybe_add_read_preference(spec, read_preference)
121115
if read_concern and not (session and session.in_transaction):
122116
if read_concern.level:
123117
spec["readConcern"] = read_concern.document
@@ -142,20 +136,15 @@ async def command(
142136
conn.apply_timeout(client, spec)
143137
_csot.apply_write_concern(spec, write_concern)
144138

145-
if use_op_msg:
146-
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
147-
flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
148-
request_id, msg, size, max_doc_size = message._op_msg(
149-
flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx
150-
)
151-
# If this is an unacknowledged write then make sure the encoded doc(s)
152-
# are small enough, otherwise rely on the server to return an error.
153-
if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size:
154-
message._raise_document_too_large(name, size, max_bson_size)
155-
else:
156-
request_id, msg, size = message._query(
157-
0, ns, 0, -1, spec, None, codec_options, compression_ctx
158-
)
139+
flags = _OpMsg.MORE_TO_COME if unacknowledged else 0
140+
flags |= _OpMsg.EXHAUST_ALLOWED if exhaust_allowed else 0
141+
request_id, msg, size, max_doc_size = message._op_msg(
142+
flags, spec, dbname, read_preference, codec_options, ctx=compression_ctx
143+
)
144+
# If this is an unacknowledged write then make sure the encoded doc(s)
145+
# are small enough, otherwise rely on the server to return an error.
146+
if unacknowledged and max_bson_size is not None and max_doc_size > max_bson_size:
147+
message._raise_document_too_large(name, size, max_bson_size)
159148

160149
if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD:
161150
message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD)
@@ -190,7 +179,7 @@ async def command(
190179

191180
try:
192181
await async_sendall(conn.conn.get_conn, msg)
193-
if use_op_msg and unacknowledged:
182+
if unacknowledged:
194183
# Unacknowledged, fake a successful command response.
195184
reply = None
196185
response_doc: _DocumentOut = {"ok": 1}

pymongo/asynchronous/pool.py

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@
102102
ZlibContext,
103103
ZstdContext,
104104
)
105-
from pymongo.message import _OpMsg, _OpReply
105+
from pymongo.message import _OpMsg
106106
from pymongo.read_concern import ReadConcern
107107
from pymongo.read_preferences import _ServerMode
108108
from pymongo.typings import _Address, _CollationIn
@@ -146,7 +146,6 @@ def __init__(
146146
self.supports_sessions = False
147147
self.hello_ok: bool = False
148148
self.is_mongos = False
149-
self.op_msg_enabled = False
150149
self.listeners = pool.opts._event_listeners
151150
self.enabled_for_cmap = pool.enabled_for_cmap
152151
self.enabled_for_logging = pool.enabled_for_logging
@@ -235,13 +234,11 @@ async def unpin(self) -> None:
235234
await self.close_conn(ConnectionClosedReason.STALE)
236235

237236
def hello_cmd(self) -> dict[str, Any]:
238-
# Handshake spec requires us to use OP_MSG+hello command for the
239-
# initial handshake in load balanced or stable API mode.
237+
# As of PYTHON-5713, always use OP_MSG for the handshake since all
238+
# supported servers (MongoDB 4.2+, wire version >= 8) support it.
240239
if self.opts.server_api or self.hello_ok or self.opts.load_balanced:
241-
self.op_msg_enabled = True
242240
return {HelloCompat.CMD: 1}
243-
else:
244-
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}
241+
return {HelloCompat.LEGACY_CMD: 1, "helloOk": True}
245242

246243
async def hello(self) -> Hello[dict[str, Any]]:
247244
return await self._hello(None, None)
@@ -314,7 +311,6 @@ async def _hello(
314311
ctx = self.compression_settings.get_compression_context(hello.compressors)
315312
self.compression_context = ctx
316313

317-
self.op_msg_enabled = True
318314
self.server_connection_id = hello.connection_id
319315
if creds:
320316
self.negotiated_mechs = hello.sasl_supported_mechs
@@ -397,8 +393,7 @@ async def command(
397393
self.send_cluster_time(spec, session, client)
398394
listeners = self.listeners if publish_events else None
399395
unacknowledged = bool(write_concern and not write_concern.acknowledged)
400-
if self.op_msg_enabled:
401-
self._raise_if_not_writable(unacknowledged)
396+
self._raise_if_not_writable(unacknowledged)
402397
try:
403398
return await command(
404399
self,
@@ -418,7 +413,6 @@ async def command(
418413
parse_write_concern_error=parse_write_concern_error,
419414
collation=collation,
420415
compression_ctx=self.compression_context,
421-
use_op_msg=self.op_msg_enabled,
422416
unacknowledged=unacknowledged,
423417
user_fields=user_fields,
424418
exhaust_allowed=exhaust_allowed,
@@ -447,7 +441,7 @@ async def send_message(self, message: bytes, max_doc_size: int) -> None:
447441
except BaseException as error:
448442
await self._raise_connection_failure(error)
449443

450-
async def receive_message(self, request_id: Optional[int]) -> Union[_OpReply, _OpMsg]:
444+
async def receive_message(self, request_id: Optional[int]) -> _OpMsg:
451445
"""Receive a raw BSON message or raise ConnectionFailure.
452446
453447
If any exception is raised, the socket is closed.

pymongo/asynchronous/server.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -265,16 +265,7 @@ async def run_operation(
265265
duration = datetime.now() - start
266266
# Must publish in find / getMore / explain command response
267267
# format.
268-
if use_cmd:
269-
res = docs[0]
270-
elif operation.name == "explain":
271-
res = docs[0] if docs else {}
272-
else:
273-
res = {"cursor": {"id": reply.cursor_id, "ns": operation.namespace()}, "ok": 1} # type: ignore[union-attr]
274-
if operation.name == "find":
275-
res["cursor"]["firstBatch"] = docs
276-
else:
277-
res["cursor"]["nextBatch"] = docs
268+
res = docs[0]
278269
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
279270
_debug_log(
280271
_COMMAND_LOGGER,

0 commit comments

Comments
 (0)