Skip to content

Commit a8a4992

Browse files
feat(sqlalchemy): Support span streaming (#6132)
Adapt `add_query_source()` to mirror changes to `add_http_request_source()` in #6084. Add `record_sql_queries_supporting_streaming()` based on `record_sql_queries()` so that the streaming path is not triggered for ORM integrations that have not yet been ported to the streaming trace lifecycle. Drop the `db.params`, `db.paramstyle`, `db.executemany`, and `db.cursor` attributes in streaming lifecycle mode. Replace deprecated `db.system` and `db.name` attributes with `db.system.name` and `db.namespace` attributes in the streaming lifecycle mode. Set `<unknown SQL query>` as the name when the description was previously `None`.
1 parent 2b30add commit a8a4992

7 files changed

Lines changed: 1055 additions & 374 deletions

File tree

sentry_sdk/integrations/sqlalchemy.py

Lines changed: 43 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,16 @@
11
from sentry_sdk.consts import SPANSTATUS, SPANDATA
22
from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
3-
from sentry_sdk.tracing_utils import add_query_source, record_sql_queries
3+
from sentry_sdk.tracing_utils import (
4+
add_query_source,
5+
record_sql_queries_supporting_streaming,
6+
)
47
from sentry_sdk.utils import (
58
capture_internal_exceptions,
69
ensure_integration_enabled,
710
parse_version,
811
)
12+
from sentry_sdk.traces import StreamedSpan, SpanStatus
13+
from sentry_sdk.tracing import Span
914

1015
try:
1116
from sqlalchemy.engine import Engine # type: ignore
@@ -20,8 +25,7 @@
2025
from typing import Any
2126
from typing import ContextManager
2227
from typing import Optional
23-
24-
from sentry_sdk.tracing import Span
28+
from typing import Union
2529

2630

2731
class SqlalchemyIntegration(Integration):
@@ -48,7 +52,7 @@ def _before_cursor_execute(
4852
executemany: bool,
4953
*args: "Any",
5054
) -> None:
51-
ctx_mgr = record_sql_queries(
55+
ctx_mgr = record_sql_queries_supporting_streaming(
5256
cursor,
5357
statement,
5458
parameters,
@@ -78,12 +82,19 @@ def _after_cursor_execute(
7882
context, "_sentry_sql_span_manager", None
7983
)
8084

85+
# Record query source immediately before span is finished: accurate end timestamp and before the span is flushed.
86+
span: "Optional[Union[Span, StreamedSpan]]" = getattr(
87+
context, "_sentry_sql_span", None
88+
)
89+
if isinstance(span, StreamedSpan):
90+
with capture_internal_exceptions():
91+
add_query_source(span)
92+
8193
if ctx_mgr is not None:
8294
context._sentry_sql_span_manager = None
8395
ctx_mgr.__exit__(None, None, None)
8496

85-
span: "Optional[Span]" = getattr(context, "_sentry_sql_span", None)
86-
if span is not None:
97+
if isinstance(span, Span):
8798
with capture_internal_exceptions():
8899
add_query_source(span)
89100

@@ -96,7 +107,10 @@ def _handle_error(context: "Any", *args: "Any") -> None:
96107
span: "Optional[Span]" = getattr(execution_context, "_sentry_sql_span", None)
97108

98109
if span is not None:
99-
span.set_status(SPANSTATUS.INTERNAL_ERROR)
110+
if isinstance(span, StreamedSpan):
111+
span.status = SpanStatus.ERROR
112+
else:
113+
span.set_status(SPANSTATUS.INTERNAL_ERROR)
100114

101115
# _after_cursor_execute does not get called for crashing SQL stmts. Judging
102116
# from SQLAlchemy codebase it does seem like any error coming into this
@@ -132,29 +146,43 @@ def _get_db_system(name: str) -> "Optional[str]":
132146
return None
133147

134148

135-
def _set_db_data(span: "Span", conn: "Any") -> None:
149+
def _set_db_data(span: "Union[Span, StreamedSpan]", conn: "Any") -> None:
136150
db_system = _get_db_system(conn.engine.name)
137-
if db_system is not None:
138-
span.set_data(SPANDATA.DB_SYSTEM, db_system)
151+
152+
if isinstance(span, StreamedSpan):
153+
if db_system is not None:
154+
span.set_attribute(SPANDATA.DB_SYSTEM_NAME, db_system)
155+
else:
156+
if db_system is not None:
157+
span.set_data(SPANDATA.DB_SYSTEM, db_system)
158+
159+
if isinstance(span, StreamedSpan):
160+
set_on_span = span.set_attribute
161+
else:
162+
set_on_span = span.set_data
139163

140164
try:
141165
driver = conn.dialect.driver
142166
if driver:
143-
span.set_data(SPANDATA.DB_DRIVER_NAME, driver)
167+
set_on_span(SPANDATA.DB_DRIVER_NAME, driver)
144168
except Exception:
145169
pass
146170

147171
if conn.engine.url is None:
148172
return
149173

150174
db_name = conn.engine.url.database
151-
if db_name is not None:
152-
span.set_data(SPANDATA.DB_NAME, db_name)
175+
if isinstance(span, StreamedSpan):
176+
if db_name is not None:
177+
span.set_attribute(SPANDATA.DB_NAMESPACE, db_name)
178+
else:
179+
if db_name is not None:
180+
span.set_data(SPANDATA.DB_NAME, db_name)
153181

154182
server_address = conn.engine.url.host
155183
if server_address is not None:
156-
span.set_data(SPANDATA.SERVER_ADDRESS, server_address)
184+
set_on_span(SPANDATA.SERVER_ADDRESS, server_address)
157185

158186
server_port = conn.engine.url.port
159187
if server_port is not None:
160-
span.set_data(SPANDATA.SERVER_PORT, server_port)
188+
set_on_span(SPANDATA.SERVER_PORT, server_port)

sentry_sdk/tracing_utils.py

Lines changed: 75 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,65 @@ def record_sql_queries(
170170
yield span
171171

172172

173+
# Mirrors record_sql_queries() temporarily so the Django and asyncpg integrations don't crash with span streaming enabled.
174+
# Once both are ported, remove record_sql_queries() and rename record_sql_queries_supporting_streaming() to record_sql_queries().
175+
@contextlib.contextmanager
176+
def record_sql_queries_supporting_streaming(
177+
cursor: "Any",
178+
query: "Any",
179+
params_list: "Any",
180+
paramstyle: "Optional[str]",
181+
executemany: bool,
182+
record_cursor_repr: bool = False,
183+
span_origin: str = "manual",
184+
) -> "Generator[Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan], None, None]":
185+
# TODO: Bring back capturing of params by default
186+
client = sentry_sdk.get_client()
187+
if client.options["_experiments"].get("record_sql_params", False):
188+
if not params_list or params_list == [None]:
189+
params_list = None
190+
191+
if paramstyle == "pyformat":
192+
paramstyle = "format"
193+
else:
194+
params_list = None
195+
paramstyle = None
196+
197+
query = _format_sql(cursor, query)
198+
199+
data = {}
200+
if params_list is not None:
201+
data["db.params"] = params_list
202+
if paramstyle is not None:
203+
data["db.paramstyle"] = paramstyle
204+
if executemany:
205+
data["db.executemany"] = True
206+
if record_cursor_repr and cursor is not None:
207+
data["db.cursor"] = cursor
208+
209+
with capture_internal_exceptions():
210+
sentry_sdk.add_breadcrumb(message=query, category="query", data=data)
211+
212+
if has_span_streaming_enabled(client.options):
213+
with sentry_sdk.traces.start_span(
214+
name="<unknown SQL query>" if query is None else query,
215+
attributes={
216+
"sentry.origin": span_origin,
217+
"sentry.op": OP.DB,
218+
},
219+
) as span:
220+
yield span
221+
else:
222+
with sentry_sdk.start_span(
223+
op=OP.DB,
224+
name=query,
225+
origin=span_origin,
226+
) as span:
227+
for k, v in data.items():
228+
span.set_data(k, v)
229+
yield span
230+
231+
173232
def maybe_create_breadcrumbs_from_span(
174233
scope: "sentry_sdk.Scope", span: "sentry_sdk.tracing.Span"
175234
) -> None:
@@ -316,22 +375,35 @@ def add_source(
316375
span.set_attribute(SPANDATA.CODE_FUNCTION, frame.f_code.co_name)
317376

318377

319-
def add_query_source(span: "sentry_sdk.tracing.Span") -> None:
378+
def add_query_source(
379+
span: "Union[sentry_sdk.tracing.Span, sentry_sdk.traces.StreamedSpan]",
380+
) -> None:
320381
"""
321382
Adds OTel compatible source code information to a database query span
322383
"""
323384
client = sentry_sdk.get_client()
324385
if not client.is_active():
325386
return
326387

327-
if span.timestamp is None or span.start_timestamp is None:
388+
if isinstance(span, LegacySpan):
389+
# In the StreamedSpan case, we need to add the extra span information before
390+
# the span finishes, so it's expected that this will be None. In the LegacySpan case,
391+
# it should already be finished.
392+
if span.timestamp is None:
393+
return
394+
395+
if span.start_timestamp is None:
328396
return
329397

330398
should_add_query_source = client.options.get("enable_db_query_source", True)
331399
if not should_add_query_source:
332400
return
333401

334-
duration = span.timestamp - span.start_timestamp
402+
end_timestamp = (
403+
datetime.now(timezone.utc) if span.timestamp is None else span.timestamp
404+
)
405+
406+
duration = end_timestamp - span.start_timestamp
335407
threshold = client.options.get("db_query_source_threshold_ms", 0)
336408
slow_query = duration / timedelta(milliseconds=1) > threshold
337409

tests/conftest.py

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -476,23 +476,34 @@ def maybe_monkeypatched_threading(request):
476476

477477
@pytest.fixture
478478
def render_span_tree():
479-
def inner(event):
480-
assert event["type"] == "transaction"
479+
def inner(spans, root_span=None):
480+
streamed_spans = False
481+
if root_span is None:
482+
streamed_spans = True
481483

482484
by_parent = {}
483-
for span in event["spans"]:
485+
for span in spans:
486+
if "parent_span_id" not in span:
487+
root_span = span
488+
continue
489+
484490
by_parent.setdefault(span["parent_span_id"], []).append(span)
485491

486492
def render_span(span):
487-
yield "- op={}: description={}".format(
488-
json.dumps(span.get("op")), json.dumps(span.get("description"))
489-
)
493+
if streamed_spans:
494+
yield "- sentry.op={}: name={}".format(
495+
json.dumps(span["attributes"].get("sentry.op")),
496+
json.dumps(span["name"]),
497+
)
498+
else:
499+
yield "- op={}: description={}".format(
500+
json.dumps(span.get("op")), json.dumps(span.get("description"))
501+
)
502+
490503
for subspan in by_parent.get(span["span_id"]) or ():
491504
for line in render_span(subspan):
492505
yield " {}".format(line)
493506

494-
root_span = event["contexts"]["trace"]
495-
496507
return "\n".join(render_span(root_span))
497508

498509
return inner

tests/integrations/django/asgi/test_asgi.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,8 +292,9 @@ async def test_async_middleware_spans(
292292

293293
(transaction,) = events
294294

295+
assert transaction["type"] == "transaction"
295296
assert (
296-
render_span_tree(transaction)
297+
render_span_tree(transaction["spans"], transaction["contexts"]["trace"])
297298
== """\
298299
- op="http.server": description=null
299300
- op="event.django": description="django.db.reset_queries"

tests/integrations/django/test_basic.py

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -457,7 +457,7 @@ def test_response_trace(sentry_init, client, capture_events, render_span_tree):
457457

458458
assert (
459459
'- op="view.response.render": description="serialize response"'
460-
in render_span_tree(events[0])
460+
in render_span_tree(events[0]["spans"], events[0]["contexts"]["trace"])
461461
)
462462

463463

@@ -596,7 +596,9 @@ def test_django_connect_trace(sentry_init, client, capture_events, render_span_t
596596
data = span.get("data")
597597
assert data.get(SPANDATA.DB_SYSTEM) == "postgresql"
598598

599-
assert '- op="db": description="connect"' in render_span_tree(event)
599+
assert '- op="db": description="connect"' in render_span_tree(
600+
event["spans"], event["contexts"]["trace"]
601+
)
600602

601603

602604
@pytest.mark.forked
@@ -954,7 +956,9 @@ def test_render_spans(sentry_init, client, capture_events, render_span_tree):
954956
events = capture_events()
955957
client.get(url)
956958
transaction = events[0]
957-
assert expected_line in render_span_tree(transaction)
959+
assert expected_line in render_span_tree(
960+
transaction["spans"], transaction["contexts"]["trace"]
961+
)
958962

959963

960964
@pytest.mark.skipif(DJANGO_VERSION < (1, 9), reason="Requires Django >= 1.9")
@@ -1034,7 +1038,10 @@ def test_middleware_spans(sentry_init, client, capture_events, render_span_tree)
10341038
message, transaction = events
10351039

10361040
assert message["message"] == "hi"
1037-
assert render_span_tree(transaction) == EXPECTED_MIDDLEWARE_SPANS
1041+
assert (
1042+
render_span_tree(transaction["spans"], transaction["contexts"]["trace"])
1043+
== EXPECTED_MIDDLEWARE_SPANS
1044+
)
10381045

10391046

10401047
def test_middleware_spans_disabled(sentry_init, client, capture_events):
@@ -1075,7 +1082,10 @@ def test_signals_spans(sentry_init, client, capture_events, render_span_tree):
10751082
message, transaction = events
10761083

10771084
assert message["message"] == "hi"
1078-
assert render_span_tree(transaction) == EXPECTED_SIGNALS_SPANS
1085+
assert (
1086+
render_span_tree(transaction["spans"], transaction["contexts"]["trace"])
1087+
== EXPECTED_SIGNALS_SPANS
1088+
)
10791089

10801090
assert transaction["spans"][0]["op"] == "event.django"
10811091
assert transaction["spans"][0]["description"] == "django.db.reset_queries"
@@ -1127,7 +1137,10 @@ def test_signals_spans_filtering(sentry_init, client, capture_events, render_spa
11271137

11281138
(transaction,) = events
11291139

1130-
assert render_span_tree(transaction) == EXPECTED_SIGNALS_SPANS_FILTERED
1140+
assert (
1141+
render_span_tree(transaction["spans"], transaction["contexts"]["trace"])
1142+
== EXPECTED_SIGNALS_SPANS_FILTERED
1143+
)
11311144

11321145
assert transaction["spans"][0]["op"] == "event.django"
11331146
assert transaction["spans"][0]["description"] == "django.db.reset_queries"
@@ -1206,7 +1219,9 @@ def test_custom_urlconf_middleware(
12061219
event = events.pop(0)
12071220
assert event["transaction"] == "/custom/ok"
12081221
if middleware_spans:
1209-
assert "custom_urlconf_middleware" in render_span_tree(event)
1222+
assert "custom_urlconf_middleware" in render_span_tree(
1223+
event["spans"], event["contexts"]["trace"]
1224+
)
12101225

12111226
_content, status, _headers = unpack_werkzeug_response(client.get("/custom/exc"))
12121227
assert status.lower() == "500 internal server error"
@@ -1216,7 +1231,9 @@ def test_custom_urlconf_middleware(
12161231
assert error_event["exception"]["values"][-1]["mechanism"]["type"] == "django"
12171232
assert transaction_event["transaction"] == "/custom/exc"
12181233
if middleware_spans:
1219-
assert "custom_urlconf_middleware" in render_span_tree(transaction_event)
1234+
assert "custom_urlconf_middleware" in render_span_tree(
1235+
transaction_event["spans"], transaction_event["contexts"]["trace"]
1236+
)
12201237

12211238
settings.MIDDLEWARE.pop(0)
12221239

0 commit comments

Comments
 (0)