-
Notifications
You must be signed in to change notification settings - Fork 629
Expand file tree
/
Copy pathfastapi.py
More file actions
181 lines (139 loc) · 5.9 KB
/
Copy pathfastapi.py
File metadata and controls
181 lines (139 loc) · 5.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import sys
from copy import deepcopy
from functools import wraps
from typing import TYPE_CHECKING
import sentry_sdk
from sentry_sdk.consts import SPANDATA
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.traces import StreamedSpan, _get_current_streamed_span
from sentry_sdk.tracing import SOURCE_FOR_STYLE, TransactionSource
from sentry_sdk.tracing_utils import has_span_streaming_enabled
from sentry_sdk.utils import transaction_from_function
if TYPE_CHECKING:
from typing import Any, Awaitable, Callable, Dict
from sentry_sdk._types import Event
try:
from sentry_sdk.integrations.starlette import (
StarletteIntegration,
StarletteRequestExtractor,
_get_cached_request_body_attribute,
)
except DidNotEnable:
raise DidNotEnable("Starlette is not installed")
try:
import fastapi # type: ignore
except ImportError:
raise DidNotEnable("FastAPI is not installed")
_DEFAULT_TRANSACTION_NAME = "generic FastAPI request"
# Vendored: https://github.com/Kludex/starlette/blob/0a29b5ccdcbd1285c75c4fdb5d62ae1d244a21b0/starlette/_utils.py#L11-L17
if sys.version_info >= (3, 13): # pragma: no cover
from inspect import iscoroutinefunction
else:
from asyncio import iscoroutinefunction
class FastApiIntegration(StarletteIntegration):
identifier = "fastapi"
@staticmethod
def setup_once() -> None:
patch_get_request_handler()
def _set_transaction_name_and_source(
scope: "sentry_sdk.Scope", transaction_style: str, request: "Any"
) -> None:
name = ""
if transaction_style == "endpoint":
endpoint = request.scope.get("endpoint")
if endpoint:
name = transaction_from_function(endpoint) or ""
elif transaction_style == "url":
route = request.scope.get("route")
if route:
path = getattr(route, "path", None)
if path is not None:
name = path
if not name:
name = _DEFAULT_TRANSACTION_NAME
source = TransactionSource.ROUTE
else:
source = SOURCE_FOR_STYLE[transaction_style]
scope.set_transaction_name(name, source=source)
async def _wrap_async_handler(
handler: "Callable[..., Awaitable[Any]]", *args: "Any", **kwargs: "Any"
) -> "Any":
"""
Wraps an asynchronous handler function to attach request info to errors and the server segment span.
The request body cached on the Starlette Request object is attached to streamed spans, but consuming the request body in the event
processor can still cause application hangs.
"""
client = sentry_sdk.get_client()
integration = client.get_integration(FastApiIntegration)
if integration is None:
return await handler(*args, **kwargs)
request = args[0]
_set_transaction_name_and_source(
sentry_sdk.get_current_scope(), integration.transaction_style, request
)
sentry_scope = sentry_sdk.get_isolation_scope()
extractor = StarletteRequestExtractor(request)
info = await extractor.extract_request_info()
def _make_request_event_processor(
req: "Any", integration: "Any"
) -> "Callable[[Event, Dict[str, Any]], Event]":
def event_processor(event: "Event", hint: "Dict[str, Any]") -> "Event":
# Extract information from request
request_info = event.get("request", {})
if info:
if "cookies" in info and should_send_default_pii():
request_info["cookies"] = info["cookies"]
if "data" in info:
request_info["data"] = info["data"]
event["request"] = deepcopy(request_info)
return event
return event_processor
sentry_scope._name = FastApiIntegration.identifier
sentry_scope.add_event_processor(
_make_request_event_processor(request, integration)
)
try:
return await handler(*args, **kwargs)
finally:
current_span = _get_current_streamed_span()
if type(current_span) is StreamedSpan:
request_body = _get_cached_request_body_attribute(
client=client, request=request
)
if request_body:
current_span._segment.set_attribute(
SPANDATA.HTTP_REQUEST_BODY_DATA,
request_body,
)
def patch_get_request_handler() -> None:
old_get_request_handler = fastapi.routing.get_request_handler
def _sentry_get_request_handler(*args: "Any", **kwargs: "Any") -> "Any":
dependant = kwargs.get("dependant")
if (
dependant
and dependant.call is not None
and not iscoroutinefunction(dependant.call)
):
old_call = dependant.call
@wraps(old_call)
def _sentry_call(*args: "Any", **kwargs: "Any") -> "Any":
current_scope = sentry_sdk.get_current_scope()
client = sentry_sdk.get_client()
if has_span_streaming_enabled(client.options):
current_span = current_scope.streamed_span
if type(current_span) is StreamedSpan:
segment = current_span._segment
segment._update_active_thread()
elif current_scope.transaction is not None:
current_scope.transaction.update_active_thread()
sentry_scope = sentry_sdk.get_isolation_scope()
if sentry_scope.profile is not None:
sentry_scope.profile.update_active_thread_id()
return old_call(*args, **kwargs)
dependant.call = _sentry_call
old_app = old_get_request_handler(*args, **kwargs)
async def _sentry_app(*args: "Any", **kwargs: "Any") -> "Any":
return await _wrap_async_handler(old_app, *args, **kwargs)
return _sentry_app
fastapi.routing.get_request_handler = _sentry_get_request_handler