Skip to content

Commit a71dd7a

Browse files
authored
Merge branch 'master' into PYTHON-5814
2 parents 9663485 + 4282767 commit a71dd7a

15 files changed

Lines changed: 1286 additions & 1723 deletions

File tree

justfile

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,6 @@ default:
1616
resync:
1717
@uv sync --quiet
1818

19-
[private]
20-
run-synchro:
21-
uv run --group unasync ./tools/synchro.py
22-
2319
# Set up the development environment
2420
install:
2521
bash .evergreen/scripts/setup-dev-env.sh
@@ -70,7 +66,7 @@ lint-manual *args="": && resync
7066

7167
# Run pytest (e.g. just test test/test_uri_parser.py)
7268
[group('test')]
73-
test *args="-v --durations=5 --maxfail=10": run-synchro && resync
69+
test *args="-v --durations=5 --maxfail=10": && resync
7470
#!/usr/bin/env bash
7571
set -euo pipefail
7672
uv run ${USE_ACTIVE_VENV:+--active} --extra test python -m pytest {{args}}
@@ -83,7 +79,7 @@ test-numpy *args="": && resync
8379

8480
# Run tests via the Evergreen test runner script
8581
[group('test')]
86-
run-tests *args: run-synchro && resync
82+
run-tests *args: && resync
8783
bash ./.evergreen/run-tests.sh {{args}}
8884

8985
# Set up the test environment (auth, TLS, etc.)

pymongo/asynchronous/bulk.py

Lines changed: 30 additions & 163 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@
2020
from __future__ import annotations
2121

2222
import copy
23-
import datetime
24-
import logging
2523
from collections.abc import Iterator, Mapping, MutableMapping
2624
from itertools import islice
2725
from typing import (
@@ -34,9 +32,9 @@
3432
from bson.objectid import ObjectId
3533
from bson.raw_bson import RawBSONDocument
3634
from pymongo import _csot, common
37-
from pymongo.asynchronous.client_session import (
38-
AsyncClientSession,
39-
_validate_session_write_concern,
35+
from pymongo.asynchronous.client_session import AsyncClientSession, _validate_session_write_concern
36+
from pymongo.asynchronous.command_runner import (
37+
run_bulk_write_command,
4038
)
4139
from pymongo.asynchronous.helpers import _handle_reauth
4240
from pymongo.bulk_shared import (
@@ -54,18 +52,14 @@
5452
from pymongo.errors import (
5553
ConfigurationError,
5654
InvalidOperation,
57-
NotPrimaryError,
5855
OperationFailure,
5956
)
6057
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
61-
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
6258
from pymongo.message import (
6359
_DELETE,
6460
_INSERT,
6561
_UPDATE,
6662
_BulkWriteContext,
67-
_convert_exception,
68-
_convert_write_result,
6963
_EncryptedBulkWriteContext,
7064
_randint,
7165
)
@@ -251,83 +245,16 @@ async def write_command(
251245
docs: list[Mapping[str, Any]],
252246
client: AsyncMongoClient[Any],
253247
) -> dict[str, Any]:
254-
"""A proxy for SocketInfo.write_command that handles event publishing."""
248+
"""Run a batch write command, returning the response as a dict."""
255249
cmd[bwc.field] = docs
256-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
257-
_debug_log(
258-
_COMMAND_LOGGER,
259-
message=_CommandStatusMessage.STARTED,
260-
clientId=client._topology_settings._topology_id,
261-
command=cmd,
262-
commandName=next(iter(cmd)),
263-
databaseName=bwc.db_name,
264-
requestId=request_id,
265-
operationId=request_id,
266-
driverConnectionId=bwc.conn.id,
267-
serverConnectionId=bwc.conn.server_connection_id,
268-
serverHost=bwc.conn.address[0],
269-
serverPort=bwc.conn.address[1],
270-
serviceId=bwc.conn.service_id,
271-
)
272-
if bwc.publish:
273-
bwc._start(cmd, request_id, docs)
274-
try:
275-
if bwc.session is not None and bwc.session._starting_transaction:
276-
bwc.session._transaction.set_in_progress()
277-
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
278-
duration = datetime.datetime.now() - bwc.start_time
279-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
280-
_debug_log(
281-
_COMMAND_LOGGER,
282-
message=_CommandStatusMessage.SUCCEEDED,
283-
clientId=client._topology_settings._topology_id,
284-
durationMS=duration,
285-
reply=reply,
286-
commandName=next(iter(cmd)),
287-
databaseName=bwc.db_name,
288-
requestId=request_id,
289-
operationId=request_id,
290-
driverConnectionId=bwc.conn.id,
291-
serverConnectionId=bwc.conn.server_connection_id,
292-
serverHost=bwc.conn.address[0],
293-
serverPort=bwc.conn.address[1],
294-
serviceId=bwc.conn.service_id,
295-
)
296-
if bwc.publish:
297-
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
298-
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
299-
except Exception as exc:
300-
duration = datetime.datetime.now() - bwc.start_time
301-
if isinstance(exc, (NotPrimaryError, OperationFailure)):
302-
failure: _DocumentOut = exc.details # type: ignore[assignment]
303-
else:
304-
failure = _convert_exception(exc)
305-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
306-
_debug_log(
307-
_COMMAND_LOGGER,
308-
message=_CommandStatusMessage.FAILED,
309-
clientId=client._topology_settings._topology_id,
310-
durationMS=duration,
311-
failure=failure,
312-
commandName=next(iter(cmd)),
313-
databaseName=bwc.db_name,
314-
requestId=request_id,
315-
operationId=request_id,
316-
driverConnectionId=bwc.conn.id,
317-
serverConnectionId=bwc.conn.server_connection_id,
318-
serverHost=bwc.conn.address[0],
319-
serverPort=bwc.conn.address[1],
320-
serviceId=bwc.conn.service_id,
321-
isServerSideError=isinstance(exc, OperationFailure),
322-
)
323-
324-
if bwc.publish:
325-
bwc._fail(request_id, failure, duration)
326-
# Process the response from the server.
327-
if isinstance(exc, (NotPrimaryError, OperationFailure)):
328-
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
329-
raise
330-
return reply # type: ignore[return-value]
250+
result_docs, _, _ = await run_bulk_write_command(
251+
bwc,
252+
cmd,
253+
request_id,
254+
msg,
255+
client=client,
256+
)
257+
return result_docs[0]
331258

332259
async def unack_write(
333260
self,
@@ -339,83 +266,23 @@ async def unack_write(
339266
docs: list[Mapping[str, Any]],
340267
client: AsyncMongoClient[Any],
341268
) -> Optional[Mapping[str, Any]]:
342-
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
343-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
344-
_debug_log(
345-
_COMMAND_LOGGER,
346-
message=_CommandStatusMessage.STARTED,
347-
clientId=client._topology_settings._topology_id,
348-
command=cmd,
349-
commandName=next(iter(cmd)),
350-
databaseName=bwc.db_name,
351-
requestId=request_id,
352-
operationId=request_id,
353-
driverConnectionId=bwc.conn.id,
354-
serverConnectionId=bwc.conn.server_connection_id,
355-
serverHost=bwc.conn.address[0],
356-
serverPort=bwc.conn.address[1],
357-
serviceId=bwc.conn.service_id,
358-
)
359-
if bwc.publish:
360-
cmd = bwc._start(cmd, request_id, docs)
361-
try:
362-
result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override]
363-
duration = datetime.datetime.now() - bwc.start_time
364-
if result is not None:
365-
reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type]
366-
else:
367-
# Comply with APM spec.
368-
reply = {"ok": 1}
369-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
370-
_debug_log(
371-
_COMMAND_LOGGER,
372-
message=_CommandStatusMessage.SUCCEEDED,
373-
clientId=client._topology_settings._topology_id,
374-
durationMS=duration,
375-
reply=reply,
376-
commandName=next(iter(cmd)),
377-
databaseName=bwc.db_name,
378-
requestId=request_id,
379-
operationId=request_id,
380-
driverConnectionId=bwc.conn.id,
381-
serverConnectionId=bwc.conn.server_connection_id,
382-
serverHost=bwc.conn.address[0],
383-
serverPort=bwc.conn.address[1],
384-
serviceId=bwc.conn.service_id,
385-
)
386-
if bwc.publish:
387-
bwc._succeed(request_id, reply, duration)
388-
except Exception as exc:
389-
duration = datetime.datetime.now() - bwc.start_time
390-
if isinstance(exc, OperationFailure):
391-
failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type]
392-
elif isinstance(exc, NotPrimaryError):
393-
failure = exc.details # type: ignore[assignment]
394-
else:
395-
failure = _convert_exception(exc)
396-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
397-
_debug_log(
398-
_COMMAND_LOGGER,
399-
message=_CommandStatusMessage.FAILED,
400-
clientId=client._topology_settings._topology_id,
401-
durationMS=duration,
402-
failure=failure,
403-
commandName=next(iter(cmd)),
404-
databaseName=bwc.db_name,
405-
requestId=request_id,
406-
operationId=request_id,
407-
driverConnectionId=bwc.conn.id,
408-
serverConnectionId=bwc.conn.server_connection_id,
409-
serverHost=bwc.conn.address[0],
410-
serverPort=bwc.conn.address[1],
411-
serviceId=bwc.conn.service_id,
412-
isServerSideError=isinstance(exc, OperationFailure),
413-
)
414-
if bwc.publish:
415-
assert bwc.start_time is not None
416-
bwc._fail(request_id, failure, duration)
417-
raise
418-
return result # type: ignore[return-value]
269+
"""Send an unacknowledged batch write command."""
270+
# Historically the STARTED log omits the documents while the published
271+
# CommandStartedEvent includes them, so log ``cmd`` but publish a copy
272+
# carrying the ``docs`` field.
273+
published = dict(cmd)
274+
published[bwc.field] = docs
275+
await run_bulk_write_command(
276+
bwc,
277+
cmd,
278+
request_id,
279+
msg,
280+
client=client,
281+
orig=published,
282+
max_doc_size=max_doc_size,
283+
unacknowledged=True,
284+
)
285+
return None
419286

420287
async def _execute_batch_unack(
421288
self,
@@ -487,7 +354,7 @@ async def _execute_command(
487354
run = self.current_run
488355

489356
# AsyncConnection.command validates the session, but we use
490-
# AsyncConnection.write_command
357+
# run_bulk_write_command.
491358
conn.validate_session(client, session)
492359
last_run = False
493360

0 commit comments

Comments
 (0)