1515
1616
1717class SpanBatcher (Batcher ["StreamedSpan" ]):
18- # TODO[span-first]: size-based flushes
19- # TODO[span-first]: adjust flush/drop defaults
18+ # MAX_BEFORE_FLUSH should be lower than MAX_BEFORE_DROP, so that there is
19+ # a bit of a buffer for spans that appear between setting the flush event
20+ # and actually flushing the buffer.
21+ #
22+ # The max limits are all per trace.
23+ MAX_ENVELOPE_SIZE = 1000 # spans
2024 MAX_BEFORE_FLUSH = 1000
21- MAX_BEFORE_DROP = 5000
25+ MAX_BEFORE_DROP = 2000
26+ MAX_BYTES_BEFORE_FLUSH = 5 * 1024 * 1024 # 5 MB
2227 FLUSH_WAIT_TIME = 5.0
2328
2429 TYPE = "span"
@@ -35,6 +40,7 @@ def __init__(
3540 # envelope.
3641 # trace_id -> span buffer
3742 self ._span_buffer : dict [str , list ["StreamedSpan" ]] = defaultdict (list )
43+ self ._running_size : dict [str , int ] = defaultdict (lambda : 0 )
3844 self ._capture_func = capture_func
3945 self ._record_lost_func = record_lost_func
4046 self ._running = True
@@ -45,16 +51,12 @@ def __init__(
4551 self ._flusher : "Optional[threading.Thread]" = None
4652 self ._flusher_pid : "Optional[int]" = None
4753
48- def get_size (self ) -> int :
49- # caller is responsible for locking before checking this
50- return sum (len (buffer ) for buffer in self ._span_buffer .values ())
51-
5254 def add (self , span : "StreamedSpan" ) -> None :
5355 if not self ._ensure_thread () or self ._flusher is None :
5456 return None
5557
5658 with self ._lock :
57- size = self .get_size ( )
59+ size = len ( self ._span_buffer [ span . trace_id ] )
5860 if size >= self .MAX_BEFORE_DROP :
5961 self ._record_lost_func (
6062 reason = "queue_overflow" ,
@@ -64,18 +66,36 @@ def add(self, span: "StreamedSpan") -> None:
6466 return None
6567
6668 self ._span_buffer [span .trace_id ].append (span )
69+ self ._running_size [span .trace_id ] += self ._estimate_size (span )
70+
6771 if size + 1 >= self .MAX_BEFORE_FLUSH :
6872 self ._flush_event .set ()
73+ return
74+
75+ if self ._running_size [span .trace_id ] >= self .MAX_BYTES_BEFORE_FLUSH :
76+ self ._flush_event .set ()
77+ return
78+
79+ @staticmethod
80+ def _estimate_size (item : "StreamedSpan" ) -> int :
81+ # Rough estimate of serialized span size that's quick to compute.
82+ # 210 is the rough size of the payload without attributes, and we
83+ # estimate additional 70 bytes on top of that per attribute.
84+ return 210 + 70 * len (item ._attributes )
6985
7086 @staticmethod
7187 def _to_transport_format (item : "StreamedSpan" ) -> "Any" :
7288 # TODO[span-first]
7389 res : "dict[str, Any]" = {
90+ "trace_id" : item .trace_id ,
7491 "span_id" : item .span_id ,
7592 "name" : item ._name ,
7693 "status" : item ._status ,
7794 }
7895
96+ if item ._parent_span_id :
97+ res ["parent_span_id" ] = item ._parent_span_id
98+
7999 if item ._attributes :
80100 res ["attributes" ] = {
81101 k : serialize_attribute (v ) for (k , v ) in item ._attributes .items ()
@@ -86,7 +106,7 @@ def _to_transport_format(item: "StreamedSpan") -> "Any":
86106 def _flush (self ) -> None :
87107 with self ._lock :
88108 if len (self ._span_buffer ) == 0 :
89- return None
109+ return
90110
91111 envelopes = []
92112 for trace_id , spans in self ._span_buffer .items ():
@@ -95,34 +115,40 @@ def _flush(self) -> None:
95115 # dsc = spans[0].dynamic_sampling_context()
96116 dsc = None
97117
98- envelope = Envelope (
99- headers = {
100- "sent_at" : format_timestamp (datetime .now (timezone .utc )),
101- "trace" : dsc ,
102- }
103- )
104-
105- envelope .add_item (
106- Item (
107- type = "span" ,
108- content_type = "application/vnd.sentry.items.span.v2+json" ,
118+ # Max per envelope is 1000, so if we happen to have more than
119+ # 1000 spans in one bucket, we'll need to separate them.
120+ for start in range (0 , len (spans ), self .MAX_ENVELOPE_SIZE ):
121+ end = min (start + self .MAX_ENVELOPE_SIZE , len (spans ))
122+
123+ envelope = Envelope (
109124 headers = {
110- "item_count" : len (spans ),
111- },
112- payload = PayloadRef (
113- json = {
114- "items" : [
115- self ._to_transport_format (span )
116- for span in spans
117- ]
118- }
119- ),
125+ "sent_at" : format_timestamp (datetime .now (timezone .utc )),
126+ "trace" : dsc ,
127+ }
128+ )
129+
130+ envelope .add_item (
131+ Item (
132+ type = self .TYPE ,
133+ content_type = self .CONTENT_TYPE ,
134+ headers = {
135+ "item_count" : end - start ,
136+ },
137+ payload = PayloadRef (
138+ json = {
139+ "items" : [
140+ self ._to_transport_format (spans [j ])
141+ for j in range (start , end )
142+ ]
143+ }
144+ ),
145+ )
120146 )
121- )
122147
123- envelopes .append (envelope )
148+ envelopes .append (envelope )
124149
125150 self ._span_buffer .clear ()
151+ self ._running_size .clear ()
126152
127153 for envelope in envelopes :
128154 self ._capture_func (envelope )
0 commit comments