Skip to content

Commit 2b20d3e

Browse files
committed
WIP
1 parent 469a32a commit 2b20d3e

6 files changed

Lines changed: 618 additions & 275 deletions

File tree

pymongo/asynchronous/network.py

Lines changed: 64 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
from __future__ import annotations
1717

1818
import datetime
19-
import logging
2019
from typing import (
2120
TYPE_CHECKING,
2221
Any,
@@ -31,17 +30,14 @@
3130
from bson import _decode_all_selective
3231
from pymongo import _csot, helpers_shared, message
3332
from pymongo.compression_support import _NO_COMPRESSION
34-
from pymongo.errors import (
35-
NotPrimaryError,
36-
OperationFailure,
37-
)
38-
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
3933
from pymongo.message import _OpMsg
4034
from pymongo.monitoring import _is_speculative_authenticate
4135
from pymongo.network_layer import (
4236
async_receive_message,
4337
async_sendall,
4438
)
39+
from pymongo.telemetry import command_telemetry
40+
from pymongo.tracing import add_cursor_id
4541

4642
if TYPE_CHECKING:
4743
from bson import CodecOptions
@@ -159,140 +155,71 @@ async def command(
159155

160156
if max_bson_size is not None and size > max_bson_size + message._COMMAND_OVERHEAD:
161157
message._raise_document_too_large(name, size, max_bson_size + message._COMMAND_OVERHEAD)
162-
if client is not None:
163-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
164-
_debug_log(
165-
_COMMAND_LOGGER,
166-
message=_CommandStatusMessage.STARTED,
167-
clientId=client._topology_settings._topology_id,
168-
command=spec,
169-
commandName=next(iter(spec)),
170-
databaseName=dbname,
171-
requestId=request_id,
172-
operationId=request_id,
173-
driverConnectionId=conn.id,
174-
serverConnectionId=conn.server_connection_id,
175-
serverHost=conn.address[0],
176-
serverPort=conn.address[1],
177-
serviceId=conn.service_id,
178-
)
179-
if publish:
180-
assert listeners is not None
181-
assert address is not None
182-
listeners.publish_command_start(
183-
orig,
184-
dbname,
185-
request_id,
186-
address,
187-
conn.server_connection_id,
188-
service_id=conn.service_id,
189-
)
190158

191-
try:
192-
await async_sendall(conn.conn.get_conn, msg)
193-
if use_op_msg and unacknowledged:
194-
# Unacknowledged, fake a successful command response.
195-
reply = None
196-
response_doc: _DocumentOut = {"ok": 1}
197-
else:
198-
reply = await async_receive_message(conn, request_id)
199-
conn.more_to_come = reply.more_to_come
200-
unpacked_docs = reply.unpack_response(
201-
codec_options=codec_options, user_fields=user_fields
202-
)
203-
204-
response_doc = unpacked_docs[0]
205-
if not conn.ready:
206-
cluster_time = response_doc.get("$clusterTime")
207-
if cluster_time:
208-
conn._cluster_time = cluster_time
209-
if client:
210-
await client._process_response(response_doc, session)
211-
if check:
212-
helpers_shared._check_command_response(
213-
response_doc,
214-
conn.max_wire_version,
215-
allowable_errors,
216-
parse_write_concern_error=parse_write_concern_error,
217-
)
218-
except Exception as exc:
219-
duration = datetime.datetime.now() - start
220-
if isinstance(exc, (NotPrimaryError, OperationFailure)):
221-
failure: _DocumentOut = exc.details # type: ignore[assignment]
222-
else:
223-
failure = message._convert_exception(exc)
224-
if client is not None:
225-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
226-
_debug_log(
227-
_COMMAND_LOGGER,
228-
message=_CommandStatusMessage.FAILED,
229-
clientId=client._topology_settings._topology_id,
230-
durationMS=duration,
231-
failure=failure,
232-
commandName=next(iter(spec)),
233-
databaseName=dbname,
234-
requestId=request_id,
235-
operationId=request_id,
236-
driverConnectionId=conn.id,
237-
serverConnectionId=conn.server_connection_id,
238-
serverHost=conn.address[0],
239-
serverPort=conn.address[1],
240-
serviceId=conn.service_id,
241-
isServerSideError=isinstance(exc, OperationFailure),
159+
with command_telemetry(
160+
command_name=name,
161+
database_name=dbname,
162+
spec=spec,
163+
address=address if address else conn.address,
164+
driver_connection_id=conn.id,
165+
server_connection_id=conn.server_connection_id,
166+
publish_event=publish,
167+
start_time=start,
168+
client=client,
169+
listeners=listeners,
170+
request_id=request_id,
171+
service_id=conn.service_id,
172+
) as telemetry:
173+
try:
174+
await async_sendall(conn.conn.get_conn, msg)
175+
if use_op_msg and unacknowledged:
176+
# Unacknowledged, fake a successful command response.
177+
reply = None
178+
response_doc: _DocumentOut = {"ok": 1}
179+
else:
180+
reply = await async_receive_message(conn, request_id)
181+
conn.more_to_come = reply.more_to_come
182+
unpacked_docs = reply.unpack_response(
183+
codec_options=codec_options, user_fields=user_fields
242184
)
243-
if publish:
244-
assert listeners is not None
245-
assert address is not None
246-
listeners.publish_command_failure(
247-
duration,
248-
failure,
249-
name,
250-
request_id,
251-
address,
252-
conn.server_connection_id,
253-
service_id=conn.service_id,
254-
database_name=dbname,
255-
)
256-
raise
257-
duration = datetime.datetime.now() - start
258-
if client is not None:
259-
if _COMMAND_LOGGER.isEnabledFor(logging.DEBUG):
260-
_debug_log(
261-
_COMMAND_LOGGER,
262-
message=_CommandStatusMessage.SUCCEEDED,
263-
clientId=client._topology_settings._topology_id,
264-
durationMS=duration,
265-
reply=response_doc,
266-
commandName=next(iter(spec)),
267-
databaseName=dbname,
268-
requestId=request_id,
269-
operationId=request_id,
270-
driverConnectionId=conn.id,
271-
serverConnectionId=conn.server_connection_id,
272-
serverHost=conn.address[0],
273-
serverPort=conn.address[1],
274-
serviceId=conn.service_id,
275-
speculative_authenticate="speculativeAuthenticate" in orig,
276-
)
277-
if publish:
278-
assert listeners is not None
279-
assert address is not None
280-
listeners.publish_command_success(
281-
duration,
282-
response_doc,
283-
name,
284-
request_id,
285-
address,
286-
conn.server_connection_id,
287-
service_id=conn.service_id,
185+
186+
response_doc = unpacked_docs[0]
187+
if not conn.ready:
188+
cluster_time = response_doc.get("$clusterTime")
189+
if cluster_time:
190+
conn._cluster_time = cluster_time
191+
if client:
192+
await client._process_response(response_doc, session)
193+
if check:
194+
helpers_shared._check_command_response(
195+
response_doc,
196+
conn.max_wire_version,
197+
allowable_errors,
198+
parse_write_concern_error=parse_write_concern_error,
199+
)
200+
except Exception as exc:
201+
telemetry.publish_failed(exc)
202+
raise
203+
204+
# Add cursor_id to span if present in response
205+
if telemetry.span is not None and isinstance(response_doc, dict):
206+
cursor_info = response_doc.get("cursor")
207+
if cursor_info and isinstance(cursor_info, dict):
208+
cursor_id = cursor_info.get("id", 0)
209+
if cursor_id:
210+
add_cursor_id(telemetry.span, cursor_id)
211+
212+
# Publish command succeeded event
213+
telemetry.publish_succeeded(
214+
reply=response_doc,
288215
speculative_hello=speculative_hello,
289-
database_name=dbname,
216+
speculative_authenticate="speculativeAuthenticate" in orig,
290217
)
291218

292-
if client and client._encrypter and reply:
293-
decrypted = await client._encrypter.decrypt(reply.raw_command_response())
294-
response_doc = cast(
295-
"_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0]
296-
)
219+
if client and client._encrypter and reply:
220+
decrypted = await client._encrypter.decrypt(reply.raw_command_response())
221+
response_doc = cast(
222+
"_DocumentOut", _decode_all_selective(decrypted, codec_options, user_fields)[0]
223+
)
297224

298-
return response_doc # type: ignore[return-value]
225+
return response_doc # type: ignore[return-value]

0 commit comments

Comments
 (0)