Skip to content

Commit 19fd1b1

Browse files
authored
PYTHON-5745 Consolidate command telemetry (#2891)
1 parent 7cea267 commit 19fd1b1

11 files changed

Lines changed: 252 additions & 271 deletions

File tree

pymongo/_telemetry.py

Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
# Copyright 2026-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Internal helpers combining structured logging with APM event publishing."""
16+
17+
from __future__ import annotations
18+
19+
import datetime
20+
import logging
21+
from collections.abc import MutableMapping
22+
from typing import TYPE_CHECKING, Any, Optional
23+
24+
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
25+
from pymongo.pool_shared import _ConnectionTelemetryInfo
26+
27+
if TYPE_CHECKING:
28+
from bson.objectid import ObjectId
29+
from pymongo.monitoring import _EventListeners
30+
from pymongo.typings import _DocumentOut
31+
32+
33+
class _CommandTelemetry:
34+
"""Combines structured logging and APM event publishing for a single command.
35+
36+
Construct once per command, call :meth:`started` before the network send,
37+
then call :meth:`succeeded` or :meth:`failed` when the outcome is known.
38+
Duration is measured from the :meth:`started` call.
39+
"""
40+
41+
__slots__ = (
42+
"_active",
43+
"_cmd",
44+
"_conn",
45+
"_dbname",
46+
"_duration",
47+
"_listeners",
48+
"_name",
49+
"_op_id",
50+
"_publish",
51+
"_request_id",
52+
"_should_log",
53+
"_start",
54+
"_topology_id",
55+
)
56+
57+
def __init__(
58+
self,
59+
topology_id: Optional[ObjectId],
60+
conn: _ConnectionTelemetryInfo,
61+
listeners: Optional[_EventListeners],
62+
cmd: MutableMapping[str, Any],
63+
dbname: str,
64+
request_id: int,
65+
op_id: Optional[int],
66+
) -> None:
67+
self._topology_id = topology_id
68+
self._should_log = topology_id is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG)
69+
self._publish = listeners is not None and listeners.enabled_for_commands
70+
self._active = self._should_log or self._publish
71+
self._listeners = listeners
72+
self._conn = conn
73+
self._cmd = cmd
74+
self._name = next(iter(cmd))
75+
self._dbname = dbname
76+
self._request_id = request_id
77+
self._op_id = op_id
78+
self._start: datetime.datetime
79+
self._duration: datetime.timedelta
80+
81+
def _emit_log(self, message: _CommandStatusMessage, **extra: Any) -> None:
82+
_debug_log(
83+
_COMMAND_LOGGER,
84+
message=message,
85+
clientId=self._topology_id,
86+
commandName=self._name,
87+
databaseName=self._dbname,
88+
requestId=self._request_id,
89+
operationId=self._request_id,
90+
driverConnectionId=self._conn.id,
91+
serverConnectionId=self._conn.server_connection_id,
92+
serverHost=self._conn.address[0],
93+
serverPort=self._conn.address[1],
94+
serviceId=self._conn.service_id,
95+
**extra,
96+
)
97+
98+
def started(self, orig: MutableMapping[str, Any], ensure_db: bool) -> None:
99+
"""Emit the STARTED log entry and APM event, and start the duration clock."""
100+
self._start = datetime.datetime.now()
101+
if not self._active:
102+
return
103+
if self._should_log:
104+
self._emit_log(_CommandStatusMessage.STARTED, command=self._cmd)
105+
if self._publish:
106+
assert self._listeners is not None
107+
if ensure_db and "$db" not in orig:
108+
orig["$db"] = self._dbname
109+
self._listeners.publish_command_start(
110+
orig,
111+
self._dbname,
112+
self._request_id,
113+
self._conn.address,
114+
self._conn.server_connection_id,
115+
self._op_id,
116+
service_id=self._conn.service_id,
117+
)
118+
119+
@property
120+
def duration(self) -> datetime.timedelta:
121+
"""Duration from :meth:`started` to :meth:`succeeded` or :meth:`failed`."""
122+
return self._duration
123+
124+
def succeeded(
125+
self,
126+
reply: _DocumentOut,
127+
command_name: str,
128+
speculative_hello: bool,
129+
) -> None:
130+
"""Emit the SUCCEEDED log entry and APM event."""
131+
self._duration = datetime.datetime.now() - self._start
132+
if not self._active:
133+
return
134+
if self._should_log:
135+
self._emit_log(
136+
_CommandStatusMessage.SUCCEEDED,
137+
durationMS=self._duration,
138+
reply=reply,
139+
speculative_authenticate=speculative_hello,
140+
)
141+
if self._publish:
142+
assert self._listeners is not None
143+
self._listeners.publish_command_success(
144+
self._duration,
145+
reply,
146+
command_name,
147+
self._request_id,
148+
self._conn.address,
149+
self._conn.server_connection_id,
150+
self._op_id,
151+
service_id=self._conn.service_id,
152+
speculative_hello=speculative_hello,
153+
database_name=self._dbname,
154+
)
155+
156+
def failed(
157+
self,
158+
failure: _DocumentOut,
159+
command_name: str,
160+
is_server_side_error: bool,
161+
) -> None:
162+
"""Emit the FAILED log entry and APM event."""
163+
self._duration = datetime.datetime.now() - self._start
164+
if not self._active:
165+
return
166+
if self._should_log:
167+
self._emit_log(
168+
_CommandStatusMessage.FAILED,
169+
durationMS=self._duration,
170+
failure=failure,
171+
isServerSideError=is_server_side_error,
172+
)
173+
if self._publish:
174+
assert self._listeners is not None
175+
self._listeners.publish_command_failure(
176+
self._duration,
177+
failure,
178+
command_name,
179+
self._request_id,
180+
self._conn.address,
181+
self._conn.server_connection_id,
182+
self._op_id,
183+
service_id=self._conn.service_id,
184+
database_name=self._dbname,
185+
)

0 commit comments

Comments
 (0)