Skip to content

Commit e8120f8

Browse files
sentrivanaericapisani
authored andcommitted
ref: Add streaming trace decorator (7) (#5594)
### Description Add the streaming equivalent of the `@trace` decorator. #### Issues <!-- * resolves: #1234 * resolves: LIN-1234 --> #### Reminders - Please add tests to validate your changes, and lint your code using `tox -e linters`. - Add GH Issue ID _&_ Linear ID (if applicable) - PR title should use [conventional commit](https://develop.sentry.dev/engineering-practices/commit-messages/#type) style (`feat:`, `fix:`, `ref:`, `meta:`) - For external contributors: [CONTRIBUTING.md](https://github.com/getsentry/sentry-python/blob/master/CONTRIBUTING.md), [Sentry SDK development docs](https://develop.sentry.dev/sdk/), [Discord community](https://discord.gg/Ww9hbqr)
1 parent 8ffcab9 commit e8120f8

File tree

3 files changed

+203
-33
lines changed

3 files changed

+203
-33
lines changed

sentry_sdk/_span_batcher.py

Lines changed: 58 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,15 @@
1515

1616

1717
class SpanBatcher(Batcher["StreamedSpan"]):
18-
# TODO[span-first]: size-based flushes
19-
# TODO[span-first]: adjust flush/drop defaults
18+
# MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
19+
# a bit of a buffer for spans that appear between setting the flush event
20+
# and actually flushing the buffer.
21+
#
22+
# The max limits are all per trace.
23+
MAX_ENVELOPE_SIZE = 1000 # spans
2024
MAX_BEFORE_FLUSH = 1000
21-
MAX_BEFORE_DROP = 5000
25+
MAX_BEFORE_DROP = 2000
26+
MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
2227
FLUSH_WAIT_TIME = 5.0
2328

2429
TYPE = "span"
@@ -35,6 +40,7 @@ def __init__(
3540
# envelope.
3641
# trace_id -> span buffer
3742
self._span_buffer: dict[str, list["StreamedSpan"]] = defaultdict(list)
43+
self._running_size: dict[str, int] = defaultdict(lambda: 0)
3844
self._capture_func = capture_func
3945
self._record_lost_func = record_lost_func
4046
self._running = True
@@ -45,16 +51,12 @@ def __init__(
4551
self._flusher: "Optional[threading.Thread]" = None
4652
self._flusher_pid: "Optional[int]" = None
4753

48-
def get_size(self) -> int:
49-
# caller is responsible for locking before checking this
50-
return sum(len(buffer) for buffer in self._span_buffer.values())
51-
5254
def add(self, span: "StreamedSpan") -> None:
5355
if not self._ensure_thread() or self._flusher is None:
5456
return None
5557

5658
with self._lock:
57-
size = self.get_size()
59+
size = len(self._span_buffer[span.trace_id])
5860
if size >= self.MAX_BEFORE_DROP:
5961
self._record_lost_func(
6062
reason="queue_overflow",
@@ -64,18 +66,36 @@ def add(self, span: "StreamedSpan") -> None:
6466
return None
6567

6668
self._span_buffer[span.trace_id].append(span)
69+
self._running_size[span.trace_id] += self._estimate_size(span)
70+
6771
if size + 1 >= self.MAX_BEFORE_FLUSH:
6872
self._flush_event.set()
73+
return
74+
75+
if self._running_size[span.trace_id] >= self.MAX_BYTES_BEFORE_FLUSH:
76+
self._flush_event.set()
77+
return
78+
79+
@staticmethod
80+
def _estimate_size(item: "StreamedSpan") -> int:
81+
# Rough estimate of serialized span size that's quick to compute.
82+
# 210 is the rough size of the payload without attributes, and we
83+
# estimate additional 70 bytes on top of that per attribute.
84+
return 210 + 70 * len(item._attributes)
6985

7086
@staticmethod
7187
def _to_transport_format(item: "StreamedSpan") -> "Any":
7288
# TODO[span-first]
7389
res: "dict[str, Any]" = {
90+
"trace_id": item.trace_id,
7491
"span_id": item.span_id,
7592
"name": item._name,
7693
"status": item._status,
7794
}
7895

96+
if item._parent_span_id:
97+
res["parent_span_id"] = item._parent_span_id
98+
7999
if item._attributes:
80100
res["attributes"] = {
81101
k: serialize_attribute(v) for (k, v) in item._attributes.items()
@@ -86,7 +106,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
86106
def _flush(self) -> None:
87107
with self._lock:
88108
if len(self._span_buffer) == 0:
89-
return None
109+
return
90110

91111
envelopes = []
92112
for trace_id, spans in self._span_buffer.items():
@@ -95,34 +115,40 @@ def _flush(self) -> None:
95115
# dsc = spans[0].dynamic_sampling_context()
96116
dsc = None
97117

98-
envelope = Envelope(
99-
headers={
100-
"sent_at": format_timestamp(datetime.now(timezone.utc)),
101-
"trace": dsc,
102-
}
103-
)
104-
105-
envelope.add_item(
106-
Item(
107-
type="span",
108-
content_type="application/vnd.sentry.items.span.v2+json",
118+
# Max per envelope is 1000, so if we happen to have more than
119+
# 1000 spans in one bucket, we'll need to separate them.
120+
for start in range(0, len(spans), self.MAX_ENVELOPE_SIZE):
121+
end = min(start + self.MAX_ENVELOPE_SIZE, len(spans))
122+
123+
envelope = Envelope(
109124
headers={
110-
"item_count": len(spans),
111-
},
112-
payload=PayloadRef(
113-
json={
114-
"items": [
115-
self._to_transport_format(span)
116-
for span in spans
117-
]
118-
}
119-
),
125+
"sent_at": format_timestamp(datetime.now(timezone.utc)),
126+
"trace": dsc,
127+
}
128+
)
129+
130+
envelope.add_item(
131+
Item(
132+
type=self.TYPE,
133+
content_type=self.CONTENT_TYPE,
134+
headers={
135+
"item_count": end - start,
136+
},
137+
payload=PayloadRef(
138+
json={
139+
"items": [
140+
self._to_transport_format(spans[j])
141+
for j in range(start, end)
142+
]
143+
}
144+
),
145+
)
120146
)
121-
)
122147

123-
envelopes.append(envelope)
148+
envelopes.append(envelope)
124149

125150
self._span_buffer.clear()
151+
self._running_size.clear()
126152

127153
for envelope in envelopes:
128154
self._capture_func(envelope)

sentry_sdk/traces.py

Lines changed: 91 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,12 @@
1414
from sentry_sdk.utils import format_attribute, logger
1515

1616
if TYPE_CHECKING:
17-
from typing import Any, Optional, Union
17+
from typing import Any, Callable, Optional, ParamSpec, TypeVar, Union
1818
from sentry_sdk._types import Attributes, AttributeValue
1919

20+
P = ParamSpec("P")
21+
R = TypeVar("R")
22+
2023

2124
class SpanStatus(str, Enum):
2225
OK = "ok"
@@ -235,6 +238,14 @@ def __repr__(self) -> str:
235238
f"active={self._active})>"
236239
)
237240

241+
def __enter__(self) -> "StreamedSpan":
242+
return self
243+
244+
def __exit__(
245+
self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]"
246+
) -> None:
247+
pass
248+
238249
def get_attributes(self) -> "Attributes":
239250
return self._attributes
240251

@@ -306,6 +317,14 @@ def __init__(self) -> None:
306317
def __repr__(self) -> str:
307318
return f"<{self.__class__.__name__}(sampled={self.sampled})>"
308319

320+
def __enter__(self) -> "NoOpStreamedSpan":
321+
return self
322+
323+
def __exit__(
324+
self, ty: "Optional[Any]", value: "Optional[Any]", tb: "Optional[Any]"
325+
) -> None:
326+
pass
327+
309328
def get_attributes(self) -> "Attributes":
310329
return {}
311330

@@ -349,3 +368,74 @@ def trace_id(self) -> str:
349368
@property
350369
def sampled(self) -> "Optional[bool]":
351370
return False
371+
372+
373+
def trace(
374+
func: "Optional[Callable[P, R]]" = None,
375+
*,
376+
name: "Optional[str]" = None,
377+
attributes: "Optional[dict[str, Any]]" = None,
378+
active: bool = True,
379+
) -> "Union[Callable[P, R], Callable[[Callable[P, R]], Callable[P, R]]]":
380+
"""
381+
Decorator to start a span around a function call.
382+
383+
This decorator automatically creates a new span when the decorated function
384+
is called, and finishes the span when the function returns or raises an exception.
385+
386+
:param func: The function to trace. When used as a decorator without parentheses,
387+
this is the function being decorated. When used with parameters (e.g.,
388+
``@trace(op="custom")``, this should be None.
389+
:type func: Callable or None
390+
391+
:param name: The human-readable name/description for the span. If not provided,
392+
defaults to the function name. This provides more specific details about
393+
what the span represents (e.g., "GET /api/users", "process_user_data").
394+
:type name: str or None
395+
396+
:param attributes: A dictionary of key-value pairs to add as attributes to the span.
397+
Attribute values must be strings, integers, floats, or booleans. These
398+
attributes provide additional context about the span's execution.
399+
:type attributes: dict[str, Any] or None
400+
401+
:param active: Controls whether spans started while this span is running
402+
will automatically become its children. That's the default behavior. If
403+
you want to create a span that shouldn't have any children (unless
404+
provided explicitly via the `parent_span` argument), set this to False.
405+
:type active: bool
406+
407+
:returns: When used as ``@trace``, returns the decorated function. When used as
408+
``@trace(...)`` with parameters, returns a decorator function.
409+
:rtype: Callable or decorator function
410+
411+
Example::
412+
413+
import sentry_sdk
414+
415+
# Simple usage with default values
416+
@sentry_sdk.trace
417+
def process_data():
418+
# Function implementation
419+
pass
420+
421+
# With custom parameters
422+
@sentry_sdk.trace(
423+
name="Get user data",
424+
attributes={"postgres": True}
425+
)
426+
def make_db_query(sql):
427+
# Function implementation
428+
pass
429+
"""
430+
from sentry_sdk.tracing_utils import create_streaming_span_decorator
431+
432+
decorator = create_streaming_span_decorator(
433+
name=name,
434+
attributes=attributes,
435+
active=active,
436+
)
437+
438+
if func:
439+
return decorator(func)
440+
else:
441+
return decorator

sentry_sdk/tracing_utils.py

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -942,6 +942,57 @@ def sync_wrapper(*args: "Any", **kwargs: "Any") -> "Any":
942942
return span_decorator
943943

944944

945+
def create_streaming_span_decorator(
946+
name: "Optional[str]" = None,
947+
attributes: "Optional[dict[str, Any]]" = None,
948+
active: bool = True,
949+
) -> "Any":
950+
"""
951+
Create a span creating decorator that can wrap both sync and async functions.
952+
"""
953+
954+
def span_decorator(f: "Any") -> "Any":
955+
"""
956+
Decorator to create a span for the given function.
957+
"""
958+
959+
@functools.wraps(f)
960+
async def async_wrapper(*args: "Any", **kwargs: "Any") -> "Any":
961+
span_name = name or qualname_from_function(f) or ""
962+
963+
with start_streaming_span(
964+
name=span_name, attributes=attributes, active=active
965+
):
966+
result = await f(*args, **kwargs)
967+
return result
968+
969+
try:
970+
async_wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
971+
except Exception:
972+
pass
973+
974+
@functools.wraps(f)
975+
def sync_wrapper(*args: "Any", **kwargs: "Any") -> "Any":
976+
span_name = name or qualname_from_function(f) or ""
977+
978+
with start_streaming_span(
979+
name=span_name, attributes=attributes, active=active
980+
):
981+
return f(*args, **kwargs)
982+
983+
try:
984+
sync_wrapper.__signature__ = inspect.signature(f) # type: ignore[attr-defined]
985+
except Exception:
986+
pass
987+
988+
if inspect.iscoroutinefunction(f):
989+
return async_wrapper
990+
else:
991+
return sync_wrapper
992+
993+
return span_decorator
994+
995+
945996
def get_current_span(scope: "Optional[sentry_sdk.Scope]" = None) -> "Optional[Span]":
946997
"""
947998
Returns the currently active span if there is one running, otherwise `None`
@@ -1317,6 +1368,9 @@ def add_sentry_baggage_to_headers(
13171368
LOW_QUALITY_TRANSACTION_SOURCES,
13181369
SENTRY_TRACE_HEADER_NAME,
13191370
)
1371+
from sentry_sdk.traces import (
1372+
start_span as start_streaming_span,
1373+
)
13201374

13211375
if TYPE_CHECKING:
13221376
from sentry_sdk.tracing import Span

0 commit comments

Comments
 (0)