Skip to content

Commit cf8f234

Browse files
committed
Use command_telemetry
1 parent 2b20d3e commit cf8f234

10 files changed

Lines changed: 450 additions & 1061 deletions

File tree

pymongo/asynchronous/bulk.py

Lines changed: 53 additions & 151 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
from __future__ import annotations
2020

2121
import copy
22-
import datetime
23-
import logging
2422
from collections.abc import MutableMapping
2523
from itertools import islice
2624
from typing import (
@@ -57,18 +55,17 @@
5755
OperationFailure,
5856
)
5957
from pymongo.helpers_shared import _RETRYABLE_ERROR_CODES
60-
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
6158
from pymongo.message import (
6259
_DELETE,
6360
_INSERT,
6461
_UPDATE,
6562
_BulkWriteContext,
66-
_convert_exception,
6763
_convert_write_result,
6864
_EncryptedBulkWriteContext,
6965
_randint,
7066
)
7167
from pymongo.read_preferences import ReadPreference
68+
from pymongo.telemetry import command_telemetry
7269
from pymongo.write_concern import WriteConcern
7370

7471
if TYPE_CHECKING:
@@ -252,78 +249,31 @@ async def write_command(
252249
) -> dict[str, Any]:
253250
"""A proxy for SocketInfo.write_command that handles event publishing."""
254251
cmd[bwc.field] = docs
255-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
256-
_debug_log(
257-
_COMMAND_LOGGER,
258-
message=_CommandStatusMessage.STARTED,
259-
clientId=client._topology_settings._topology_id,
260-
command=cmd,
261-
commandName=next(iter(cmd)),
262-
databaseName=bwc.db_name,
263-
requestId=request_id,
264-
operationId=request_id,
265-
driverConnectionId=bwc.conn.id,
266-
serverConnectionId=bwc.conn.server_connection_id,
267-
serverHost=bwc.conn.address[0],
268-
serverPort=bwc.conn.address[1],
269-
serviceId=bwc.conn.service_id,
270-
)
271-
if bwc.publish:
272-
bwc._start(cmd, request_id, docs)
273-
try:
274-
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
275-
duration = datetime.datetime.now() - bwc.start_time
276-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
277-
_debug_log(
278-
_COMMAND_LOGGER,
279-
message=_CommandStatusMessage.SUCCEEDED,
280-
clientId=client._topology_settings._topology_id,
281-
durationMS=duration,
282-
reply=reply,
283-
commandName=next(iter(cmd)),
284-
databaseName=bwc.db_name,
285-
requestId=request_id,
286-
operationId=request_id,
287-
driverConnectionId=bwc.conn.id,
288-
serverConnectionId=bwc.conn.server_connection_id,
289-
serverHost=bwc.conn.address[0],
290-
serverPort=bwc.conn.address[1],
291-
serviceId=bwc.conn.service_id,
292-
)
293-
if bwc.publish:
294-
bwc._succeed(request_id, reply, duration) # type: ignore[arg-type]
295-
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
296-
except Exception as exc:
297-
duration = datetime.datetime.now() - bwc.start_time
298-
if isinstance(exc, (NotPrimaryError, OperationFailure)):
299-
failure: _DocumentOut = exc.details # type: ignore[assignment]
300-
else:
301-
failure = _convert_exception(exc)
302-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
303-
_debug_log(
304-
_COMMAND_LOGGER,
305-
message=_CommandStatusMessage.FAILED,
306-
clientId=client._topology_settings._topology_id,
307-
durationMS=duration,
308-
failure=failure,
309-
commandName=next(iter(cmd)),
310-
databaseName=bwc.db_name,
311-
requestId=request_id,
312-
operationId=request_id,
313-
driverConnectionId=bwc.conn.id,
314-
serverConnectionId=bwc.conn.server_connection_id,
315-
serverHost=bwc.conn.address[0],
316-
serverPort=bwc.conn.address[1],
317-
serviceId=bwc.conn.service_id,
318-
isServerSideError=isinstance(exc, OperationFailure),
319-
)
320-
321-
if bwc.publish:
322-
bwc._fail(request_id, failure, duration)
323-
# Process the response from the server.
324-
if isinstance(exc, (NotPrimaryError, OperationFailure)):
325-
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
326-
raise
252+
with command_telemetry(
253+
command_name=bwc.name,
254+
database_name=bwc.db_name,
255+
spec=cmd,
256+
driver_connection_id=bwc.conn.id,
257+
server_connection_id=bwc.conn.server_connection_id,
258+
publish_event=bwc.publish,
259+
start_time=bwc.start_time,
260+
address=bwc.conn.address,
261+
listeners=bwc.listeners,
262+
client=client,
263+
request_id=request_id,
264+
service_id=bwc.conn.service_id,
265+
operation_id=bwc.op_id,
266+
) as telemetry:
267+
try:
268+
reply = await bwc.conn.write_command(request_id, msg, bwc.codec) # type: ignore[misc]
269+
telemetry.publish_succeeded(reply)
270+
await client._process_response(reply, bwc.session) # type: ignore[arg-type]
271+
except Exception as exc:
272+
telemetry.publish_failed(exc)
273+
# Process the response from the server.
274+
if isinstance(exc, (NotPrimaryError, OperationFailure)):
275+
await client._process_response(exc.details, bwc.session) # type: ignore[arg-type]
276+
raise
327277
return reply # type: ignore[return-value]
328278

329279
async def unack_write(
@@ -337,81 +287,33 @@ async def unack_write(
337287
client: AsyncMongoClient[Any],
338288
) -> Optional[Mapping[str, Any]]:
339289
"""A proxy for AsyncConnection.unack_write that handles event publishing."""
340-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
341-
_debug_log(
342-
_COMMAND_LOGGER,
343-
message=_CommandStatusMessage.STARTED,
344-
clientId=client._topology_settings._topology_id,
345-
command=cmd,
346-
commandName=next(iter(cmd)),
347-
databaseName=bwc.db_name,
348-
requestId=request_id,
349-
operationId=request_id,
350-
driverConnectionId=bwc.conn.id,
351-
serverConnectionId=bwc.conn.server_connection_id,
352-
serverHost=bwc.conn.address[0],
353-
serverPort=bwc.conn.address[1],
354-
serviceId=bwc.conn.service_id,
355-
)
356-
if bwc.publish:
357-
cmd = bwc._start(cmd, request_id, docs)
358-
try:
359-
result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override]
360-
duration = datetime.datetime.now() - bwc.start_time
361-
if result is not None:
362-
reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type]
363-
else:
364-
# Comply with APM spec.
365-
reply = {"ok": 1}
366-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
367-
_debug_log(
368-
_COMMAND_LOGGER,
369-
message=_CommandStatusMessage.SUCCEEDED,
370-
clientId=client._topology_settings._topology_id,
371-
durationMS=duration,
372-
reply=reply,
373-
commandName=next(iter(cmd)),
374-
databaseName=bwc.db_name,
375-
requestId=request_id,
376-
operationId=request_id,
377-
driverConnectionId=bwc.conn.id,
378-
serverConnectionId=bwc.conn.server_connection_id,
379-
serverHost=bwc.conn.address[0],
380-
serverPort=bwc.conn.address[1],
381-
serviceId=bwc.conn.service_id,
382-
)
383-
if bwc.publish:
384-
bwc._succeed(request_id, reply, duration)
385-
except Exception as exc:
386-
duration = datetime.datetime.now() - bwc.start_time
387-
if isinstance(exc, OperationFailure):
388-
failure: _DocumentOut = _convert_write_result(bwc.name, cmd, exc.details) # type: ignore[arg-type]
389-
elif isinstance(exc, NotPrimaryError):
390-
failure = exc.details # type: ignore[assignment]
391-
else:
392-
failure = _convert_exception(exc)
393-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
394-
_debug_log(
395-
_COMMAND_LOGGER,
396-
message=_CommandStatusMessage.FAILED,
397-
clientId=client._topology_settings._topology_id,
398-
durationMS=duration,
399-
failure=failure,
400-
commandName=next(iter(cmd)),
401-
databaseName=bwc.db_name,
402-
requestId=request_id,
403-
operationId=request_id,
404-
driverConnectionId=bwc.conn.id,
405-
serverConnectionId=bwc.conn.server_connection_id,
406-
serverHost=bwc.conn.address[0],
407-
serverPort=bwc.conn.address[1],
408-
serviceId=bwc.conn.service_id,
409-
isServerSideError=isinstance(exc, OperationFailure),
410-
)
411-
if bwc.publish:
412-
assert bwc.start_time is not None
413-
bwc._fail(request_id, failure, duration)
414-
raise
290+
cmd[bwc.field] = docs
291+
with command_telemetry(
292+
command_name=bwc.name,
293+
database_name=bwc.db_name,
294+
spec=cmd,
295+
driver_connection_id=bwc.conn.id,
296+
server_connection_id=bwc.conn.server_connection_id,
297+
publish_event=bwc.publish,
298+
start_time=bwc.start_time,
299+
address=bwc.conn.address,
300+
listeners=bwc.listeners,
301+
client=client,
302+
request_id=request_id,
303+
service_id=bwc.conn.service_id,
304+
operation_id=bwc.op_id,
305+
) as telemetry:
306+
try:
307+
result = await bwc.conn.unack_write(msg, max_doc_size) # type: ignore[func-returns-value, misc, override]
308+
if result is not None:
309+
reply = _convert_write_result(bwc.name, cmd, result) # type: ignore[arg-type]
310+
else:
311+
# Comply with APM spec.
312+
reply = {"ok": 1}
313+
telemetry.publish_succeeded(reply)
314+
except Exception as exc:
315+
telemetry.publish_failed(exc)
316+
raise
415317
return result # type: ignore[return-value]
416318

417319
async def _execute_batch_unack(

0 commit comments

Comments
 (0)