Skip to content

Commit bf574bc

Browse files
feat(clickhouse_driver): Support span streaming (#6508)
Drop the `db.params`, `db.result`, `query`, and `db.query_id` attributes in streaming lifecycle mode. Replace deprecated `db.system` and `db.name` attributes with `db.system.name` and `db.namespace` attributes in the streaming lifecycle mode.
1 parent 0358263 commit bf574bc

2 files changed

Lines changed: 1022 additions & 467 deletions

File tree

sentry_sdk/integrations/clickhouse_driver.py

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,21 @@
1+
import functools
12
from typing import TYPE_CHECKING, TypeVar
23

34
import sentry_sdk
45
from sentry_sdk.consts import OP, SPANDATA
56
from sentry_sdk.integrations import DidNotEnable, Integration, _check_minimum_version
67
from sentry_sdk.scope import should_send_default_pii
8+
from sentry_sdk.traces import StreamedSpan
79
from sentry_sdk.tracing import Span
8-
from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
10+
from sentry_sdk.tracing_utils import has_span_streaming_enabled
11+
from sentry_sdk.utils import capture_internal_exceptions
912

1013
# Hack to get new Python features working in older versions
1114
# without introducing a hard dependency on `typing_extensions`
1215
# from: https://stackoverflow.com/a/71944042/300572
1316
if TYPE_CHECKING:
1417
from collections.abc import Iterator
15-
from typing import Any, Callable, ParamSpec
18+
from typing import Any, Callable, ParamSpec, Union
1619
else:
1720
# Fake ParamSpec
1821
class ParamSpec:
@@ -70,30 +73,43 @@ def setup_once() -> None:
7073

7174

7275
def _wrap_start(f: "Callable[P, T]") -> "Callable[P, T]":
73-
@ensure_integration_enabled(ClickhouseDriverIntegration, f)
76+
@functools.wraps(f)
7477
def _inner(*args: "P.args", **kwargs: "P.kwargs") -> "T":
78+
client = sentry_sdk.get_client()
79+
if client.get_integration(ClickhouseDriverIntegration) is None:
80+
return f(*args, **kwargs)
81+
7582
connection = args[0]
7683
query = args[1]
7784
query_id = args[2] if len(args) > 2 else kwargs.get("query_id")
7885
params = args[3] if len(args) > 3 else kwargs.get("params")
7986

80-
span = sentry_sdk.start_span(
81-
op=OP.DB,
82-
name=query,
83-
origin=ClickhouseDriverIntegration.origin,
84-
)
87+
if has_span_streaming_enabled(client.options):
88+
span = sentry_sdk.traces.start_span(
89+
name=query, # type: ignore
90+
attributes={
91+
"sentry.op": OP.DB,
92+
"sentry.origin": ClickhouseDriverIntegration.origin,
93+
},
94+
)
95+
else:
96+
span = sentry_sdk.start_span(
97+
op=OP.DB,
98+
name=query,
99+
origin=ClickhouseDriverIntegration.origin,
100+
)
85101

86-
connection._sentry_span = span # type: ignore[attr-defined]
102+
span.set_data("query", query)
87103

88-
_set_db_data(span, connection)
104+
if query_id:
105+
span.set_data("db.query_id", query_id)
89106

90-
span.set_data("query", query)
107+
if params and should_send_default_pii():
108+
span.set_data("db.params", params)
91109

92-
if query_id:
93-
span.set_data("db.query_id", query_id)
110+
connection._sentry_span = span # type: ignore[attr-defined]
94111

95-
if params and should_send_default_pii():
96-
span.set_data("db.params", params)
112+
_set_db_data(span, connection)
97113

98114
# run the original code
99115
ret = f(*args, **kwargs)
@@ -109,7 +125,12 @@ def _inner_end(*args: "P.args", **kwargs: "P.kwargs") -> "T":
109125
instance = args[0]
110126
span = getattr(instance.connection, "_sentry_span", None) # type: ignore[attr-defined]
111127

112-
if span is not None:
128+
if span is None:
129+
return res
130+
131+
if isinstance(span, StreamedSpan):
132+
span.end()
133+
else:
113134
if res is not None and should_send_default_pii():
114135
span.set_data("db.result", res)
115136

@@ -133,6 +154,12 @@ def _inner_send_data( # type: ignore[no-untyped-def] # clickhouse-driver does n
133154
):
134155
span = getattr(self.connection, "_sentry_span", None)
135156

157+
if isinstance(span, StreamedSpan):
158+
_set_db_data(span, self.connection)
159+
return original_send_data(
160+
self, sample_block, data, types_check, columnar, *args, **kwargs
161+
)
162+
136163
if span is not None:
137164
_set_db_data(span, self.connection)
138165

@@ -165,10 +192,19 @@ def wrapped_generator() -> "Iterator[Any]":
165192
Client.send_data = _inner_send_data
166193

167194

168-
def _set_db_data(span: "Span", connection: "Connection") -> None:
169-
span.set_data(SPANDATA.DB_SYSTEM, "clickhouse")
170-
span.set_data(SPANDATA.DB_DRIVER_NAME, "clickhouse-driver")
171-
span.set_data(SPANDATA.SERVER_ADDRESS, connection.host)
172-
span.set_data(SPANDATA.SERVER_PORT, connection.port)
173-
span.set_data(SPANDATA.DB_NAME, connection.database)
174-
span.set_data(SPANDATA.DB_USER, connection.user)
195+
def _set_db_data(span: "Union[Span, StreamedSpan]", connection: "Connection") -> None:
196+
if isinstance(span, StreamedSpan):
197+
span.set_attribute(SPANDATA.DB_SYSTEM_NAME, "clickhouse")
198+
span.set_attribute(SPANDATA.DB_NAMESPACE, connection.database)
199+
200+
set_on_span = span.set_attribute
201+
else:
202+
span.set_data(SPANDATA.DB_SYSTEM, "clickhouse")
203+
span.set_data(SPANDATA.DB_NAME, connection.database)
204+
205+
set_on_span = span.set_data
206+
207+
set_on_span(SPANDATA.DB_DRIVER_NAME, "clickhouse-driver")
208+
set_on_span(SPANDATA.SERVER_ADDRESS, connection.host)
209+
set_on_span(SPANDATA.SERVER_PORT, connection.port)
210+
set_on_span(SPANDATA.DB_USER, connection.user)

0 commit comments

Comments
 (0)