2020from __future__ import annotations
2121
2222import copy
23- import datetime
24- import logging
2523from collections .abc import Iterator , Mapping , MutableMapping
2624from itertools import islice
2725from typing import (
3432from bson .objectid import ObjectId
3533from bson .raw_bson import RawBSONDocument
3634from pymongo import _csot , common
37- from pymongo .asynchronous .client_session import (
38- AsyncClientSession ,
39- _validate_session_write_concern ,
35+ from pymongo .asynchronous .client_session import AsyncClientSession , _validate_session_write_concern
36+ from pymongo . asynchronous . command_runner import (
37+ run_bulk_write_command ,
4038)
4139from pymongo .asynchronous .helpers import _handle_reauth
4240from pymongo .bulk_shared import (
5452from pymongo .errors import (
5553 ConfigurationError ,
5654 InvalidOperation ,
57- NotPrimaryError ,
5855 OperationFailure ,
5956)
6057from pymongo .helpers_shared import _RETRYABLE_ERROR_CODES
61- from pymongo .logger import _COMMAND_LOGGER , _CommandStatusMessage , _debug_log
6258from pymongo .message import (
6359 _DELETE ,
6460 _INSERT ,
6561 _UPDATE ,
6662 _BulkWriteContext ,
67- _convert_exception ,
68- _convert_write_result ,
6963 _EncryptedBulkWriteContext ,
7064 _randint ,
7165)
@@ -251,83 +245,16 @@ async def write_command(
251245 docs : list [Mapping [str , Any ]],
252246 client : AsyncMongoClient [Any ],
253247 ) -> dict [str , Any ]:
254- """A proxy for SocketInfo.write_command that handles event publishing ."""
248+ """Run a batch write command, returning the response as a dict ."""
255249 cmd [bwc .field ] = docs
256- if _COMMAND_LOGGER .isEnabledFor (logging .DEBUG ):
257- _debug_log (
258- _COMMAND_LOGGER ,
259- message = _CommandStatusMessage .STARTED ,
260- clientId = client ._topology_settings ._topology_id ,
261- command = cmd ,
262- commandName = next (iter (cmd )),
263- databaseName = bwc .db_name ,
264- requestId = request_id ,
265- operationId = request_id ,
266- driverConnectionId = bwc .conn .id ,
267- serverConnectionId = bwc .conn .server_connection_id ,
268- serverHost = bwc .conn .address [0 ],
269- serverPort = bwc .conn .address [1 ],
270- serviceId = bwc .conn .service_id ,
271- )
272- if bwc .publish :
273- bwc ._start (cmd , request_id , docs )
274- try :
275- if bwc .session is not None and bwc .session ._starting_transaction :
276- bwc .session ._transaction .set_in_progress ()
277- reply = await bwc .conn .write_command (request_id , msg , bwc .codec ) # type: ignore[misc]
278- duration = datetime .datetime .now () - bwc .start_time
279- if _COMMAND_LOGGER .isEnabledFor (logging .DEBUG ):
280- _debug_log (
281- _COMMAND_LOGGER ,
282- message = _CommandStatusMessage .SUCCEEDED ,
283- clientId = client ._topology_settings ._topology_id ,
284- durationMS = duration ,
285- reply = reply ,
286- commandName = next (iter (cmd )),
287- databaseName = bwc .db_name ,
288- requestId = request_id ,
289- operationId = request_id ,
290- driverConnectionId = bwc .conn .id ,
291- serverConnectionId = bwc .conn .server_connection_id ,
292- serverHost = bwc .conn .address [0 ],
293- serverPort = bwc .conn .address [1 ],
294- serviceId = bwc .conn .service_id ,
295- )
296- if bwc .publish :
297- bwc ._succeed (request_id , reply , duration ) # type: ignore[arg-type]
298- await client ._process_response (reply , bwc .session ) # type: ignore[arg-type]
299- except Exception as exc :
300- duration = datetime .datetime .now () - bwc .start_time
301- if isinstance (exc , (NotPrimaryError , OperationFailure )):
302- failure : _DocumentOut = exc .details # type: ignore[assignment]
303- else :
304- failure = _convert_exception (exc )
305- if _COMMAND_LOGGER .isEnabledFor (logging .DEBUG ):
306- _debug_log (
307- _COMMAND_LOGGER ,
308- message = _CommandStatusMessage .FAILED ,
309- clientId = client ._topology_settings ._topology_id ,
310- durationMS = duration ,
311- failure = failure ,
312- commandName = next (iter (cmd )),
313- databaseName = bwc .db_name ,
314- requestId = request_id ,
315- operationId = request_id ,
316- driverConnectionId = bwc .conn .id ,
317- serverConnectionId = bwc .conn .server_connection_id ,
318- serverHost = bwc .conn .address [0 ],
319- serverPort = bwc .conn .address [1 ],
320- serviceId = bwc .conn .service_id ,
321- isServerSideError = isinstance (exc , OperationFailure ),
322- )
323-
324- if bwc .publish :
325- bwc ._fail (request_id , failure , duration )
326- # Process the response from the server.
327- if isinstance (exc , (NotPrimaryError , OperationFailure )):
328- await client ._process_response (exc .details , bwc .session ) # type: ignore[arg-type]
329- raise
330- return reply # type: ignore[return-value]
250+ result_docs , _ , _ = await run_bulk_write_command (
251+ bwc ,
252+ cmd ,
253+ request_id ,
254+ msg ,
255+ client = client ,
256+ )
257+ return result_docs [0 ]
331258
332259 async def unack_write (
333260 self ,
@@ -339,83 +266,23 @@ async def unack_write(
339266 docs : list [Mapping [str , Any ]],
340267 client : AsyncMongoClient [Any ],
341268 ) -> Optional [Mapping [str , Any ]]:
342- """A proxy for AsyncConnection.unack_write that handles event publishing."""
343- if _COMMAND_LOGGER .isEnabledFor (logging .DEBUG ):
344- _debug_log (
345- _COMMAND_LOGGER ,
346- message = _CommandStatusMessage .STARTED ,
347- clientId = client ._topology_settings ._topology_id ,
348- command = cmd ,
349- commandName = next (iter (cmd )),
350- databaseName = bwc .db_name ,
351- requestId = request_id ,
352- operationId = request_id ,
353- driverConnectionId = bwc .conn .id ,
354- serverConnectionId = bwc .conn .server_connection_id ,
355- serverHost = bwc .conn .address [0 ],
356- serverPort = bwc .conn .address [1 ],
357- serviceId = bwc .conn .service_id ,
358- )
359- if bwc .publish :
360- cmd = bwc ._start (cmd , request_id , docs )
361- try :
362- result = await bwc .conn .unack_write (msg , max_doc_size ) # type: ignore[func-returns-value, misc, override]
363- duration = datetime .datetime .now () - bwc .start_time
364- if result is not None :
365- reply = _convert_write_result (bwc .name , cmd , result ) # type: ignore[arg-type]
366- else :
367- # Comply with APM spec.
368- reply = {"ok" : 1 }
369- if _COMMAND_LOGGER .isEnabledFor (logging .DEBUG ):
370- _debug_log (
371- _COMMAND_LOGGER ,
372- message = _CommandStatusMessage .SUCCEEDED ,
373- clientId = client ._topology_settings ._topology_id ,
374- durationMS = duration ,
375- reply = reply ,
376- commandName = next (iter (cmd )),
377- databaseName = bwc .db_name ,
378- requestId = request_id ,
379- operationId = request_id ,
380- driverConnectionId = bwc .conn .id ,
381- serverConnectionId = bwc .conn .server_connection_id ,
382- serverHost = bwc .conn .address [0 ],
383- serverPort = bwc .conn .address [1 ],
384- serviceId = bwc .conn .service_id ,
385- )
386- if bwc .publish :
387- bwc ._succeed (request_id , reply , duration )
388- except Exception as exc :
389- duration = datetime .datetime .now () - bwc .start_time
390- if isinstance (exc , OperationFailure ):
391- failure : _DocumentOut = _convert_write_result (bwc .name , cmd , exc .details ) # type: ignore[arg-type]
392- elif isinstance (exc , NotPrimaryError ):
393- failure = exc .details # type: ignore[assignment]
394- else :
395- failure = _convert_exception (exc )
396- if _COMMAND_LOGGER .isEnabledFor (logging .DEBUG ):
397- _debug_log (
398- _COMMAND_LOGGER ,
399- message = _CommandStatusMessage .FAILED ,
400- clientId = client ._topology_settings ._topology_id ,
401- durationMS = duration ,
402- failure = failure ,
403- commandName = next (iter (cmd )),
404- databaseName = bwc .db_name ,
405- requestId = request_id ,
406- operationId = request_id ,
407- driverConnectionId = bwc .conn .id ,
408- serverConnectionId = bwc .conn .server_connection_id ,
409- serverHost = bwc .conn .address [0 ],
410- serverPort = bwc .conn .address [1 ],
411- serviceId = bwc .conn .service_id ,
412- isServerSideError = isinstance (exc , OperationFailure ),
413- )
414- if bwc .publish :
415- assert bwc .start_time is not None
416- bwc ._fail (request_id , failure , duration )
417- raise
418- return result # type: ignore[return-value]
269+ """Send an unacknowledged batch write command."""
270+ # Historically the STARTED log omits the documents while the published
271+ # CommandStartedEvent includes them, so log ``cmd`` but publish a copy
272+ # carrying the ``docs`` field.
273+ published = dict (cmd )
274+ published [bwc .field ] = docs
275+ await run_bulk_write_command (
276+ bwc ,
277+ cmd ,
278+ request_id ,
279+ msg ,
280+ client = client ,
281+ orig = published ,
282+ max_doc_size = max_doc_size ,
283+ unacknowledged = True ,
284+ )
285+ return None
419286
420287 async def _execute_batch_unack (
421288 self ,
@@ -487,7 +354,7 @@ async def _execute_command(
487354 run = self .current_run
488355
489356 # AsyncConnection.command validates the session, but we use
490- # AsyncConnection.write_command
357+ # run_bulk_write_command.
491358 conn .validate_session (client , session )
492359 last_run = False
493360
0 commit comments