Skip to content

Commit fff9bbf

Browse files
authored
PYTHON-5676 Consolidate Command Execution Logic (#2789)
1 parent f7084ac commit fff9bbf

13 files changed

Lines changed: 820 additions & 1280 deletions

File tree

pymongo/_telemetry.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
# Copyright 2025-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Unified per-command telemetry: logging, monitoring, and OpenTelemetry.
16+
17+
Currently wires the command logging channel; APM event publishing and
18+
OpenTelemetry spans are layered on top of :class:`_CommandTelemetry`.
19+
"""
20+
from __future__ import annotations
21+
22+
import datetime
23+
import logging
24+
from typing import Any, Mapping, Optional
25+
26+
from pymongo.errors import NotPrimaryError, OperationFailure
27+
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
28+
from pymongo.message import _convert_exception
29+
30+
31+
class _CommandTelemetry:
32+
"""Context manager for per-command telemetry.
33+
34+
Currently wires the command *logging* channel only; the name reflects the
35+
intended scope as the APM and OpenTelemetry channels are added on top.
36+
37+
Logs the ``STARTED`` event on entry, then ``SUCCEEDED`` or ``FAILED`` once
38+
the outcome is known. Call :meth:`handle_succeeded` with the server reply
39+
on success or :meth:`handle_failed` with the raised exception on error; if
40+
an exception propagates out of the ``with`` block without either being
41+
called, the ``FAILED`` event is logged automatically from ``__exit__``.
42+
43+
This consolidates command *logging* only -- APM event publishing remains
44+
at the call site. The context manager owns the duration clock (from the
45+
``start`` time passed in) and exposes it via :attr:`duration`, and stores
46+
the computed failure document on :attr:`failure`, so callers can reuse both
47+
for APM events. A future change can extend this class to publish monitoring
48+
(and OpenTelemetry) events alongside logging.
49+
50+
Usage::
51+
52+
with _CommandTelemetry(client, conn, cmd, dbname, request_id, start) as cmd_telemetry:
53+
reply = do_network_call()
54+
duration = cmd_telemetry.handle_succeeded(reply)
55+
# Failures are logged automatically in __exit__.
56+
"""
57+
58+
__slots__ = (
59+
"_client",
60+
"_conn",
61+
"_spec",
62+
"_dbname",
63+
"_request_id",
64+
"_operation_id",
65+
"_start",
66+
"duration",
67+
"failure",
68+
"_handled",
69+
)
70+
71+
def __init__(
72+
self,
73+
client: Any,
74+
conn: Any,
75+
spec: Mapping[str, Any],
76+
dbname: str,
77+
request_id: int,
78+
start: datetime.datetime,
79+
operation_id: Optional[int] = None,
80+
) -> None:
81+
self._client = client
82+
self._conn = conn
83+
self._spec = spec
84+
self._dbname = dbname
85+
self._request_id = request_id
86+
self._operation_id = request_id if operation_id is None else operation_id
87+
self._start = start
88+
self.duration: Optional[datetime.timedelta] = None
89+
self.failure: Any = None
90+
self._handled = False
91+
92+
def _enabled(self) -> bool:
93+
return self._client is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG)
94+
95+
def __enter__(self) -> _CommandTelemetry:
96+
if self._enabled():
97+
_debug_log(
98+
_COMMAND_LOGGER,
99+
message=_CommandStatusMessage.STARTED,
100+
clientId=self._client._topology_settings._topology_id,
101+
command=self._spec,
102+
commandName=next(iter(self._spec)),
103+
databaseName=self._dbname,
104+
requestId=self._request_id,
105+
operationId=self._operation_id,
106+
driverConnectionId=self._conn.id,
107+
serverConnectionId=self._conn.server_connection_id,
108+
serverHost=self._conn.address[0],
109+
serverPort=self._conn.address[1],
110+
serviceId=self._conn.service_id,
111+
)
112+
return self
113+
114+
def handle_succeeded(
115+
self,
116+
reply: Any,
117+
speculative_hello: bool = False,
118+
) -> datetime.timedelta:
119+
"""Log the ``SUCCEEDED`` event and return the elapsed duration."""
120+
self.duration = datetime.datetime.now() - self._start
121+
if self._enabled():
122+
_debug_log(
123+
_COMMAND_LOGGER,
124+
message=_CommandStatusMessage.SUCCEEDED,
125+
clientId=self._client._topology_settings._topology_id,
126+
durationMS=self.duration,
127+
reply=reply,
128+
commandName=next(iter(self._spec)),
129+
databaseName=self._dbname,
130+
requestId=self._request_id,
131+
operationId=self._operation_id,
132+
driverConnectionId=self._conn.id,
133+
serverConnectionId=self._conn.server_connection_id,
134+
serverHost=self._conn.address[0],
135+
serverPort=self._conn.address[1],
136+
serviceId=self._conn.service_id,
137+
speculative_authenticate=speculative_hello,
138+
)
139+
self._handled = True
140+
return self.duration
141+
142+
def handle_failed(
143+
self,
144+
exc: BaseException,
145+
failure: Optional[Any] = None,
146+
is_server_side_error: Optional[bool] = None,
147+
) -> datetime.timedelta:
148+
"""Log the ``FAILED`` event and return the elapsed duration.
149+
150+
The failure document and server-side-error flag are derived from *exc*
151+
for the common case. Callers that must transform the failure document
152+
(e.g. unacknowledged bulk writes) pass *failure* explicitly. The
153+
computed failure is stored on :attr:`failure` for reuse by APM events.
154+
"""
155+
self.duration = datetime.datetime.now() - self._start
156+
if failure is None:
157+
if isinstance(exc, (NotPrimaryError, OperationFailure)):
158+
failure = exc.details
159+
else:
160+
failure = _convert_exception(exc) # type: ignore[arg-type]
161+
if is_server_side_error is None:
162+
is_server_side_error = isinstance(exc, OperationFailure)
163+
self.failure = failure
164+
if self._enabled():
165+
_debug_log(
166+
_COMMAND_LOGGER,
167+
message=_CommandStatusMessage.FAILED,
168+
clientId=self._client._topology_settings._topology_id,
169+
durationMS=self.duration,
170+
failure=failure,
171+
commandName=next(iter(self._spec)),
172+
databaseName=self._dbname,
173+
requestId=self._request_id,
174+
operationId=self._operation_id,
175+
driverConnectionId=self._conn.id,
176+
serverConnectionId=self._conn.server_connection_id,
177+
serverHost=self._conn.address[0],
178+
serverPort=self._conn.address[1],
179+
serviceId=self._conn.service_id,
180+
isServerSideError=is_server_side_error,
181+
)
182+
self._handled = True
183+
return self.duration
184+
185+
def __exit__(
186+
self,
187+
exc_type: Optional[type],
188+
exc_val: Optional[BaseException],
189+
exc_tb: Any,
190+
) -> None:
191+
# Safety net: log a failure if an exception propagates without the
192+
# outcome having been recorded explicitly by the caller.
193+
if exc_val is not None and not self._handled:
194+
self.handle_failed(exc_val)

0 commit comments

Comments
 (0)