Skip to content

Commit 6acd2bd

Browse files
committed
PYTHON-5856 Move cursor execution logic into cursor class
Move the command preparation and response construction that lived in Server.run_operation into _AsyncCursorBase._run_with_conn, so cursors call run_cursor_command directly without routing through Server. - Extract _CURSOR_DOC_FIELDS, _split_message, and _operation_to_command as module-level helpers in cursor_base.py - Add abstract _unpack_response to _AsyncCursorBase to make the interface explicit - Add _run_with_conn (carrying the @_handle_reauth decorator) to _AsyncCursorBase; this is the new home for all of run_operation's logic - client._run_operation now accepts execute_fn(conn, op, rp) -> Response instead of routing through server.run_operation - Remove Server.run_operation, Server.operation_to_command, and Server._split_message entirely
1 parent f65a451 commit 6acd2bd

12 files changed

Lines changed: 255 additions & 321 deletions

File tree

pymongo/asynchronous/command_cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ async def _send_message(self, operation: _GetMore) -> None:
162162
client = self._collection.database.client
163163
try:
164164
response = await client._run_operation(
165-
operation, self._unpack_response, address=self._address
165+
operation, self._run_with_conn, address=self._address
166166
)
167167
except OperationFailure as exc:
168168
if exc.code in _CURSOR_CLOSED_ERRORS:

pymongo/asynchronous/cursor.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -980,7 +980,7 @@ async def _send_message(self, operation: Union[_Query, _GetMore]) -> None:
980980

981981
try:
982982
response = await client._run_operation(
983-
operation, self._unpack_response, address=self._address
983+
operation, self._run_with_conn, address=self._address
984984
)
985985
except OperationFailure as exc:
986986
if exc.code in _CURSOR_CLOSED_ERRORS or self._exhaust:

pymongo/asynchronous/cursor_base.py

Lines changed: 117 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,57 @@
1616

1717
from __future__ import annotations
1818

19+
import datetime
1920
from abc import abstractmethod
20-
from typing import TYPE_CHECKING, Any, Optional
21+
from collections.abc import Mapping, Sequence
22+
from typing import TYPE_CHECKING, Any, Optional, Union
2123

2224
from pymongo import _csot
25+
from pymongo.asynchronous.command_runner import run_cursor_command
26+
from pymongo.asynchronous.helpers import _handle_reauth
2327
from pymongo.cursor_shared import _AgnosticCursorBase
2428
from pymongo.lock import _async_create_lock
25-
from pymongo.typings import _DocumentType
29+
from pymongo.message import _GetMore, _OpMsg, _Query
30+
from pymongo.response import PinnedResponse, Response
31+
from pymongo.typings import _DocumentOut, _DocumentType
2632

2733
if TYPE_CHECKING:
2834
from pymongo.asynchronous.client_session import AsyncClientSession
2935
from pymongo.asynchronous.pool import AsyncConnection
36+
from pymongo.read_preferences import _ServerMode
3037

3138
_IS_SYNC = False
3239

40+
_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}}
41+
42+
43+
def _split_message(
44+
message: Union[tuple[int, Any], tuple[int, Any, int]],
45+
) -> tuple[int, Any, int]:
46+
"""Return request_id, data, max_doc_size.
47+
48+
:param message: (request_id, data, max_doc_size) or (request_id, data)
49+
"""
50+
if len(message) == 3:
51+
return message # type: ignore[return-value]
52+
# get_more and kill_cursors messages don't include BSON documents.
53+
request_id, data = message # type: ignore[misc]
54+
return request_id, data, 0
55+
56+
57+
async def _operation_to_command(
58+
operation: Union[_Query, _GetMore],
59+
conn: AsyncConnection,
60+
use_cmd: bool,
61+
) -> tuple[dict[str, Any], str]:
62+
cmd, db = operation.as_command(conn, use_cmd)
63+
if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption:
64+
cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment]
65+
operation.db, cmd, operation.codec_options
66+
)
67+
operation.update_command(cmd)
68+
return cmd, db
69+
3370

3471
class _ConnectionManager:
3572
"""Used with exhaust cursors to ensure the connection is returned."""
@@ -66,6 +103,84 @@ def session(self) -> Optional[AsyncClientSession]:
66103
async def _next_batch(self, result: list, total: Optional[int] = None) -> bool: # type: ignore[type-arg]
67104
...
68105

106+
@abstractmethod
107+
def _unpack_response(
108+
self,
109+
response: _OpMsg,
110+
cursor_id: Optional[int],
111+
codec_options: Any,
112+
user_fields: Optional[Mapping[str, Any]] = None,
113+
legacy_response: bool = False,
114+
) -> Sequence[_DocumentOut]: ...
115+
116+
@_handle_reauth
117+
async def _run_with_conn(
118+
self,
119+
conn: AsyncConnection,
120+
operation: Union[_Query, _GetMore],
121+
read_preference: _ServerMode,
122+
) -> Response:
123+
"""Execute a cursor operation on the given connection and return a Response."""
124+
client = self._collection.database.client
125+
use_cmd = operation.use_command(conn)
126+
more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come)
127+
cmd, dbn = await _operation_to_command(operation, conn, use_cmd)
128+
if more_to_come:
129+
request_id, data, max_doc_size = 0, b"", 0
130+
else:
131+
message = operation.get_message(read_preference, conn, use_cmd)
132+
request_id, data, max_doc_size = _split_message(message)
133+
user_fields = _CURSOR_DOC_FIELDS if use_cmd else None
134+
docs, reply, duration = await run_cursor_command(
135+
conn,
136+
cmd,
137+
dbn,
138+
request_id,
139+
data,
140+
client=client,
141+
session=operation.session, # type: ignore[arg-type]
142+
listeners=client._event_listeners,
143+
address=conn.address,
144+
start=datetime.datetime.now(),
145+
codec_options=operation.codec_options,
146+
user_fields=user_fields,
147+
command_name=operation.name,
148+
pool_opts=conn.opts,
149+
max_doc_size=max_doc_size,
150+
more_to_come=more_to_come,
151+
unpack_res=self._unpack_response,
152+
cursor_id=operation.cursor_id,
153+
)
154+
assert reply is not None
155+
if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type]
156+
conn.pin_cursor()
157+
if isinstance(reply, _OpMsg):
158+
# In OP_MSG, the server keeps sending only if the more_to_come flag is set.
159+
more_to_come = reply.more_to_come
160+
else:
161+
# In OP_REPLY, the server keeps sending until cursor_id is 0.
162+
more_to_come = bool(operation.exhaust and reply.cursor_id)
163+
if operation.conn_mgr:
164+
operation.conn_mgr.update_exhaust(more_to_come)
165+
return PinnedResponse(
166+
data=reply,
167+
address=conn.address,
168+
conn=conn,
169+
duration=duration,
170+
request_id=request_id,
171+
from_command=use_cmd,
172+
docs=docs, # type: ignore[arg-type]
173+
more_to_come=more_to_come,
174+
)
175+
return Response(
176+
data=reply,
177+
address=conn.address,
178+
duration=duration,
179+
request_id=request_id,
180+
from_command=use_cmd,
181+
docs=docs, # type: ignore[arg-type]
182+
)
183+
69184
async def _die_lock(self) -> None:
70185
"""Closes this cursor."""
71186
try:

pymongo/asynchronous/mongo_client.py

Lines changed: 7 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1906,13 +1906,14 @@ async def _conn_for_reads(
19061906
async def _run_operation(
19071907
self,
19081908
operation: Union[_Query, _GetMore],
1909-
unpack_res: Callable, # type: ignore[type-arg]
1909+
execute_fn: Callable, # type: ignore[type-arg]
19101910
address: Optional[_Address] = None,
19111911
) -> Response:
19121912
"""Run a _Query/_GetMore operation and return a Response.
19131913
19141914
:param operation: a _Query or _GetMore object.
1915-
:param unpack_res: A callable that decodes the wire protocol response.
1915+
:param execute_fn: A callable ``(conn, operation, read_preference) -> Response``
1916+
that executes the operation on a given connection.
19161917
:param address: Optional address when sending a message
19171918
to a specific server, used for getMore.
19181919
"""
@@ -1927,30 +1928,18 @@ async def _run_operation(
19271928
async with operation.conn_mgr._lock:
19281929
async with _MongoClientErrorHandler(self, server, operation.session) as err_handler: # type: ignore[arg-type]
19291930
err_handler.contribute_socket(operation.conn_mgr.conn)
1930-
return await server.run_operation(
1931-
operation.conn_mgr.conn,
1932-
operation,
1933-
operation.read_preference,
1934-
self._event_listeners,
1935-
unpack_res,
1936-
self,
1931+
return await execute_fn(
1932+
operation.conn_mgr.conn, operation, operation.read_preference
19371933
)
19381934

19391935
async def _cmd(
19401936
_session: Optional[AsyncClientSession],
1941-
server: Server,
1937+
_server: Server,
19421938
conn: AsyncConnection,
19431939
read_preference: _ServerMode,
19441940
) -> Response:
19451941
operation.reset() # Reset op in case of retry.
1946-
return await server.run_operation(
1947-
conn,
1948-
operation,
1949-
read_preference,
1950-
self._event_listeners,
1951-
unpack_res,
1952-
self,
1953-
)
1942+
return await execute_fn(conn, operation, read_preference)
19541943

19551944
return await self._retryable_read(
19561945
_cmd,

pymongo/asynchronous/server.py

Lines changed: 1 addition & 135 deletions
Original file line numberDiff line numberDiff line change
@@ -18,42 +18,31 @@
1818

1919
import logging
2020
from contextlib import AbstractAsyncContextManager
21-
from datetime import datetime
2221
from typing import (
2322
TYPE_CHECKING,
2423
Any,
25-
Callable,
2624
Optional,
27-
Union,
2825
)
2926

30-
from pymongo.asynchronous.command_runner import run_cursor_command
31-
from pymongo.asynchronous.helpers import _handle_reauth
3227
from pymongo.logger import (
3328
_SDAM_LOGGER,
3429
_debug_log,
3530
_SDAMStatusMessage,
3631
)
37-
from pymongo.message import _GetMore, _OpMsg, _Query
38-
from pymongo.response import PinnedResponse, Response
3932

4033
if TYPE_CHECKING:
4134
from queue import Queue
4235
from weakref import ReferenceType
4336

4437
from bson.objectid import ObjectId
45-
from pymongo.asynchronous.mongo_client import AsyncMongoClient, _MongoClientErrorHandler
38+
from pymongo.asynchronous.mongo_client import _MongoClientErrorHandler
4639
from pymongo.asynchronous.monitor import Monitor
4740
from pymongo.asynchronous.pool import AsyncConnection, Pool
4841
from pymongo.monitoring import _EventListeners
49-
from pymongo.read_preferences import _ServerMode
5042
from pymongo.server_description import ServerDescription
51-
from pymongo.typings import _DocumentOut
5243

5344
_IS_SYNC = False
5445

55-
_CURSOR_DOC_FIELDS = {"cursor": {"firstBatch": 1, "nextBatch": 1}}
56-
5746

5847
class Server:
5948
def __init__(
@@ -118,115 +107,6 @@ def request_check(self) -> None:
118107
"""Check the server's state soon."""
119108
self._monitor.request_check()
120109

121-
async def operation_to_command(
122-
self, operation: Union[_Query, _GetMore], conn: AsyncConnection, apply_timeout: bool = False
123-
) -> tuple[dict[str, Any], str]:
124-
cmd, db = operation.as_command(conn, apply_timeout)
125-
# Support auto encryption
126-
if operation.client._encrypter and not operation.client._encrypter._bypass_auto_encryption:
127-
cmd = await operation.client._encrypter.encrypt( # type: ignore[misc, assignment]
128-
operation.db, cmd, operation.codec_options
129-
)
130-
operation.update_command(cmd)
131-
132-
return cmd, db
133-
134-
@_handle_reauth
135-
async def run_operation(
136-
self,
137-
conn: AsyncConnection,
138-
operation: Union[_Query, _GetMore],
139-
read_preference: _ServerMode,
140-
listeners: Optional[_EventListeners],
141-
unpack_res: Callable[..., list[_DocumentOut]],
142-
client: AsyncMongoClient[Any],
143-
) -> Response:
144-
"""Run a _Query or _GetMore operation and return a Response object.
145-
146-
This method is used only to run _Query/_GetMore operations from
147-
cursors.
148-
Can raise ConnectionFailure, OperationFailure, etc.
149-
150-
:param conn: An AsyncConnection instance.
151-
:param operation: A _Query or _GetMore object.
152-
:param read_preference: The read preference to use.
153-
:param listeners: Instance of _EventListeners or None.
154-
:param unpack_res: A callable that decodes the wire protocol response.
155-
:param client: An AsyncMongoClient instance.
156-
"""
157-
assert listeners is not None
158-
start = datetime.now()
159-
160-
use_cmd = operation.use_command(conn)
161-
more_to_come = bool(operation.conn_mgr and operation.conn_mgr.more_to_come)
162-
cmd, dbn = await self.operation_to_command(operation, conn, use_cmd)
163-
if more_to_come:
164-
request_id = 0
165-
data = b""
166-
max_doc_size = 0
167-
else:
168-
message = operation.get_message(read_preference, conn, use_cmd)
169-
request_id, data, max_doc_size = self._split_message(message)
170-
171-
user_fields = _CURSOR_DOC_FIELDS if use_cmd else None
172-
173-
docs, reply, duration = await run_cursor_command(
174-
conn,
175-
cmd,
176-
dbn,
177-
request_id,
178-
data,
179-
client=client,
180-
session=operation.session, # type: ignore[arg-type]
181-
listeners=listeners,
182-
address=conn.address,
183-
start=start,
184-
codec_options=operation.codec_options,
185-
user_fields=user_fields,
186-
command_name=operation.name,
187-
pool_opts=conn.opts,
188-
max_doc_size=max_doc_size,
189-
more_to_come=more_to_come,
190-
unpack_res=unpack_res,
191-
cursor_id=operation.cursor_id,
192-
)
193-
assert reply is not None
194-
195-
response: Response
196-
197-
if client._should_pin_cursor(operation.session) or operation.exhaust: # type: ignore[arg-type]
198-
conn.pin_cursor()
199-
if isinstance(reply, _OpMsg):
200-
# In OP_MSG, the server keeps sending only if the
201-
# more_to_come flag is set.
202-
more_to_come = reply.more_to_come
203-
else:
204-
# In OP_REPLY, the server keeps sending until cursor_id is 0.
205-
more_to_come = bool(operation.exhaust and reply.cursor_id)
206-
if operation.conn_mgr:
207-
operation.conn_mgr.update_exhaust(more_to_come)
208-
response = PinnedResponse(
209-
data=reply,
210-
address=self._description.address,
211-
conn=conn,
212-
duration=duration,
213-
request_id=request_id,
214-
from_command=use_cmd,
215-
docs=docs, # type: ignore[arg-type]
216-
more_to_come=more_to_come,
217-
)
218-
else:
219-
response = Response(
220-
data=reply,
221-
address=self._description.address,
222-
duration=duration,
223-
request_id=request_id,
224-
from_command=use_cmd,
225-
docs=docs, # type: ignore[arg-type]
226-
)
227-
228-
return response
229-
230110
async def checkout(
231111
self, handler: Optional[_MongoClientErrorHandler] = None
232112
) -> AbstractAsyncContextManager[AsyncConnection]:
@@ -245,19 +125,5 @@ def description(self, server_description: ServerDescription) -> None:
245125
def pool(self) -> Pool:
246126
return self._pool
247127

248-
def _split_message(
249-
self, message: Union[tuple[int, Any], tuple[int, Any, int]]
250-
) -> tuple[int, Any, int]:
251-
"""Return request_id, data, max_doc_size.
252-
253-
:param message: (request_id, data, max_doc_size) or (request_id, data)
254-
"""
255-
if len(message) == 3:
256-
return message # type: ignore[return-value]
257-
else:
258-
# get_more and kill_cursors messages don't include BSON documents.
259-
request_id, data = message # type: ignore[misc]
260-
return request_id, data, 0
261-
262128
def __repr__(self) -> str:
263129
return f"<{self.__class__.__name__} {self._description!r}>"

0 commit comments

Comments
 (0)