Skip to content

Commit 63a9768

Browse files
committed
PYTHON-5745 Consolidate command telemetry into _CommandTelemetry
1 parent 4282767 commit 63a9768

9 files changed

Lines changed: 259 additions & 265 deletions

File tree

pymongo/_telemetry.py

Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# Copyright 2015-present MongoDB, Inc.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# https://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
"""Internal helpers combining structured logging with APM event publishing."""
16+
17+
from __future__ import annotations
18+
19+
import datetime
20+
import logging
21+
from collections.abc import MutableMapping
22+
from typing import TYPE_CHECKING, Any, Optional
23+
24+
from pymongo.logger import _COMMAND_LOGGER, _CommandStatusMessage, _debug_log
25+
from pymongo.pool_shared import _ConnectionTelemetryInfo
26+
27+
if TYPE_CHECKING:
28+
from bson.objectid import ObjectId
29+
from pymongo.monitoring import _EventListeners
30+
from pymongo.typings import _DocumentOut
31+
32+
33+
class _CommandTelemetry:
34+
"""Combines structured logging and APM event publishing for a single command.
35+
36+
Construct once per command, call :meth:`started` before the network send,
37+
then call :meth:`succeeded` or :meth:`failed` when the outcome is known.
38+
Duration is measured from the :meth:`started` call.
39+
"""
40+
41+
__slots__ = (
42+
"_cmd",
43+
"_conn",
44+
"_dbname",
45+
"_duration",
46+
"_listeners",
47+
"_name",
48+
"_op_id",
49+
"_publish",
50+
"_request_id",
51+
"_should_log",
52+
"_start",
53+
"_topology_id",
54+
)
55+
56+
def __init__(
57+
self,
58+
topology_id: Optional[ObjectId],
59+
conn: _ConnectionTelemetryInfo,
60+
listeners: Optional[_EventListeners],
61+
cmd: MutableMapping[str, Any],
62+
dbname: str,
63+
request_id: int,
64+
op_id: Optional[int],
65+
) -> None:
66+
self._topology_id = topology_id
67+
self._should_log = topology_id is not None and _COMMAND_LOGGER.isEnabledFor(logging.DEBUG)
68+
self._publish = listeners is not None and listeners.enabled_for_commands
69+
self._listeners = listeners
70+
self._conn = conn
71+
self._cmd = cmd
72+
self._name = next(iter(cmd))
73+
self._dbname = dbname
74+
self._request_id = request_id
75+
self._op_id = op_id
76+
self._start: datetime.datetime
77+
self._duration: datetime.timedelta
78+
79+
def started(self, orig: MutableMapping[str, Any], ensure_db: bool) -> None:
80+
"""Emit the STARTED log entry and APM event, and start the duration clock."""
81+
self._start = datetime.datetime.now()
82+
if self._should_log:
83+
_debug_log(
84+
_COMMAND_LOGGER,
85+
message=_CommandStatusMessage.STARTED,
86+
clientId=self._topology_id,
87+
command=self._cmd,
88+
commandName=self._name,
89+
databaseName=self._dbname,
90+
requestId=self._request_id,
91+
operationId=self._request_id,
92+
driverConnectionId=self._conn.id,
93+
serverConnectionId=self._conn.server_connection_id,
94+
serverHost=self._conn.address[0],
95+
serverPort=self._conn.address[1],
96+
serviceId=self._conn.service_id,
97+
)
98+
if self._publish:
99+
assert self._listeners is not None
100+
if ensure_db and "$db" not in orig:
101+
orig["$db"] = self._dbname
102+
self._listeners.publish_command_start(
103+
orig,
104+
self._dbname,
105+
self._request_id,
106+
self._conn.address,
107+
self._conn.server_connection_id,
108+
self._op_id,
109+
service_id=self._conn.service_id,
110+
)
111+
112+
@property
113+
def duration(self) -> datetime.timedelta:
114+
"""Duration from :meth:`started` to :meth:`succeeded` or :meth:`failed`."""
115+
return self._duration
116+
117+
def succeeded(
118+
self,
119+
reply: _DocumentOut,
120+
command_name: str,
121+
speculative_hello: bool,
122+
) -> None:
123+
"""Emit the SUCCEEDED log entry and APM event."""
124+
self._duration = datetime.datetime.now() - self._start
125+
if not self._should_log and not self._publish:
126+
return
127+
duration = self._duration
128+
if self._should_log:
129+
_debug_log(
130+
_COMMAND_LOGGER,
131+
message=_CommandStatusMessage.SUCCEEDED,
132+
clientId=self._topology_id,
133+
durationMS=duration,
134+
reply=reply,
135+
commandName=self._name,
136+
databaseName=self._dbname,
137+
requestId=self._request_id,
138+
operationId=self._request_id,
139+
driverConnectionId=self._conn.id,
140+
serverConnectionId=self._conn.server_connection_id,
141+
serverHost=self._conn.address[0],
142+
serverPort=self._conn.address[1],
143+
serviceId=self._conn.service_id,
144+
speculative_authenticate=speculative_hello,
145+
)
146+
if self._publish:
147+
assert self._listeners is not None
148+
self._listeners.publish_command_success(
149+
duration,
150+
reply,
151+
command_name,
152+
self._request_id,
153+
self._conn.address,
154+
self._conn.server_connection_id,
155+
self._op_id,
156+
service_id=self._conn.service_id,
157+
speculative_hello=speculative_hello,
158+
database_name=self._dbname,
159+
)
160+
161+
def failed(
162+
self,
163+
failure: _DocumentOut,
164+
command_name: str,
165+
is_server_side_error: bool,
166+
) -> None:
167+
"""Emit the FAILED log entry and APM event."""
168+
self._duration = datetime.datetime.now() - self._start
169+
if not self._should_log and not self._publish:
170+
return
171+
duration = self._duration
172+
if self._should_log:
173+
_debug_log(
174+
_COMMAND_LOGGER,
175+
message=_CommandStatusMessage.FAILED,
176+
clientId=self._topology_id,
177+
durationMS=duration,
178+
failure=failure,
179+
commandName=self._name,
180+
databaseName=self._dbname,
181+
requestId=self._request_id,
182+
operationId=self._request_id,
183+
driverConnectionId=self._conn.id,
184+
serverConnectionId=self._conn.server_connection_id,
185+
serverHost=self._conn.address[0],
186+
serverPort=self._conn.address[1],
187+
serviceId=self._conn.service_id,
188+
isServerSideError=is_server_side_error,
189+
)
190+
if self._publish:
191+
assert self._listeners is not None
192+
self._listeners.publish_command_failure(
193+
duration,
194+
failure,
195+
command_name,
196+
self._request_id,
197+
self._conn.address,
198+
self._conn.server_connection_id,
199+
self._op_id,
200+
service_id=self._conn.service_id,
201+
database_name=self._dbname,
202+
)

0 commit comments

Comments
 (0)