@@ -16,6 +16,12 @@ const MAX_SPANS_PER_ENVELOPE = 1000;
1616
1717const MAX_TRACE_WEIGHT_IN_BYTES = 5_000_000 ;
1818
19+ interface TraceBucket {
20+ spans : Set < SerializedStreamedSpanWithSegmentSpan > ;
21+ size : number ;
22+ timeout : ReturnType < typeof setTimeout > ;
23+ }
24+
1925export interface SpanBufferOptions {
2026 /**
2127 * Max spans per trace before auto-flush
@@ -26,7 +32,8 @@ export interface SpanBufferOptions {
2632 maxSpanLimit ?: number ;
2733
2834 /**
29- * Flush interval in ms
35+ * Per-trace flush timeout in ms. A timeout is started when a trace bucket is first created
36+ * and fires flush() for that specific trace when it expires.
3037 * Must be greater than 0.
3138 *
3239 * @default 5_000
@@ -44,7 +51,7 @@ export interface SpanBufferOptions {
4451
4552/**
4653 * A buffer for serialized streamed span JSON objects that flushes them to Sentry in Span v2 envelopes.
47- * Handles interval -based flushing, size thresholds, and graceful shutdown.
54+ * Handles per-trace timeout -based flushing, size thresholds, and graceful shutdown.
4855 * Also handles computation of the Dynamic Sampling Context (DSC) for the trace, if it wasn't yet
4956 * frozen onto the segment span.
5057 *
@@ -54,19 +61,16 @@ export interface SpanBufferOptions {
5461 * still active and modifyable when child spans are added to the buffer.
5562 */
5663export class SpanBuffer {
57- /* Bucket spans by their trace id */
58- private _traceMap : Map < string , Set < SerializedStreamedSpanWithSegmentSpan > > ;
59- private _traceWeightMap : Map < string , number > ;
64+ /* Bucket spans by their trace id, along with accumulated size and a per-trace flush timeout */
65+ private _traceBuckets : Map < string , TraceBucket > ;
6066
61- private _flushIntervalId : ReturnType < typeof setInterval > | null ;
6267 private _client : Client ;
6368 private _maxSpanLimit : number ;
6469 private _flushInterval : number ;
6570 private _maxTraceWeight : number ;
6671
6772 public constructor ( client : Client , options ?: SpanBufferOptions ) {
68- this . _traceMap = new Map ( ) ;
69- this . _traceWeightMap = new Map ( ) ;
73+ this . _traceBuckets = new Map ( ) ;
7074 this . _client = client ;
7175
7276 const { maxSpanLimit, flushInterval, maxTraceWeightInBytes } = options ?? { } ;
@@ -79,21 +83,17 @@ export class SpanBuffer {
7983 this . _maxTraceWeight =
8084 maxTraceWeightInBytes && maxTraceWeightInBytes > 0 ? maxTraceWeightInBytes : MAX_TRACE_WEIGHT_IN_BYTES ;
8185
82- this . _flushIntervalId = null ;
83- this . _debounceFlushInterval ( ) ;
84-
8586 this . _client . on ( 'flush' , ( ) => {
8687 this . drain ( ) ;
8788 } ) ;
8889
8990 this . _client . on ( 'close' , ( ) => {
9091 // No need to drain the buffer here as `Client.close()` internally already calls `Client.flush()`
9192 // which already invokes the `flush` hook and thus drains the buffer.
92- if ( this . _flushIntervalId ) {
93- clearInterval ( this . _flushIntervalId ) ;
94- }
95- this . _traceMap . clear ( ) ;
96- this . _traceWeightMap . clear ( ) ;
93+ this . _traceBuckets . forEach ( bucket => {
94+ clearTimeout ( bucket . timeout ) ;
95+ } ) ;
96+ this . _traceBuckets . clear ( ) ;
9797 } ) ;
9898 }
9999
@@ -102,57 +102,63 @@ export class SpanBuffer {
102102 */
103103 public add ( spanJSON : SerializedStreamedSpanWithSegmentSpan ) : void {
104104 const traceId = spanJSON . trace_id ;
105- let traceBucket = this . _traceMap . get ( traceId ) ;
106- if ( traceBucket ) {
107- traceBucket . add ( spanJSON ) ;
108- } else {
109- traceBucket = new Set ( [ spanJSON ] ) ;
110- this . _traceMap . set ( traceId , traceBucket ) ;
111- }
105+ const existingBucket = this . _traceBuckets . get ( traceId ) ;
112106
113- const newWeight = ( this . _traceWeightMap . get ( traceId ) ?? 0 ) + estimateSerializedSpanSizeInBytes ( spanJSON ) ;
114- this . _traceWeightMap . set ( traceId , newWeight ) ;
107+ if ( existingBucket ) {
108+ existingBucket . spans . add ( spanJSON ) ;
109+ existingBucket . size += estimateSerializedSpanSizeInBytes ( spanJSON ) ;
115110
116- if ( traceBucket . size >= this . _maxSpanLimit || newWeight >= this . _maxTraceWeight ) {
117- this . flush ( traceId ) ;
118- this . _debounceFlushInterval ( ) ;
111+ if ( existingBucket . spans . size >= this . _maxSpanLimit || existingBucket . size >= this . _maxTraceWeight ) {
112+ this . flush ( traceId ) ;
113+ }
114+ } else {
115+ const size = estimateSerializedSpanSizeInBytes ( spanJSON ) ;
116+ const timeout = safeUnref (
117+ setTimeout ( ( ) => {
118+ this . flush ( traceId ) ;
119+ } , this . _flushInterval ) ,
120+ ) ;
121+ this . _traceBuckets . set ( traceId , { spans : new Set ( [ spanJSON ] ) , size, timeout } ) ;
122+
123+ if ( size >= this . _maxTraceWeight ) {
124+ this . flush ( traceId ) ;
125+ }
119126 }
120127 }
121128
122129 /**
123130 * Drain and flush all buffered traces.
124131 */
125132 public drain ( ) : void {
126- if ( ! this . _traceMap . size ) {
133+ if ( ! this . _traceBuckets . size ) {
127134 return ;
128135 }
129136
130- DEBUG_BUILD && debug . log ( `Flushing span tree map with ${ this . _traceMap . size } traces` ) ;
137+ DEBUG_BUILD && debug . log ( `Flushing span tree map with ${ this . _traceBuckets . size } traces` ) ;
131138
132- this . _traceMap . forEach ( ( _ , traceId ) => {
139+ this . _traceBuckets . forEach ( ( _ , traceId ) => {
133140 this . flush ( traceId ) ;
134141 } ) ;
135- this . _debounceFlushInterval ( ) ;
136142 }
137143
138144 /**
139145 * Flush spans of a specific trace.
140- * In contrast to {@link SpanBuffer.flush }, this method does not flush all traces, but only the one with the given traceId.
146+ * In contrast to {@link SpanBuffer.drain }, this method does not flush all traces, but only the one with the given traceId.
141147 */
142148 public flush ( traceId : string ) : void {
143- const traceBucket = this . _traceMap . get ( traceId ) ;
144- if ( ! traceBucket ) {
149+ const bucket = this . _traceBuckets . get ( traceId ) ;
150+ if ( ! bucket ) {
145151 return ;
146152 }
147153
148- if ( ! traceBucket . size ) {
149- // we should never get here, given we always add a span when we create a new bucket
154+ if ( ! bucket . spans . size ) {
155+ // we should never get here, given we always add a span when we create a new bucket
150156 // and delete the bucket once we flush out the trace
151157 this . _removeTrace ( traceId ) ;
152158 return ;
153159 }
154160
155- const spans = Array . from ( traceBucket ) ;
161+ const spans = Array . from ( bucket . spans ) ;
156162
157163 const segmentSpan = spans [ 0 ] ?. _segmentSpan ;
158164 if ( ! segmentSpan ) {
@@ -181,18 +187,10 @@ export class SpanBuffer {
181187 }
182188
183189 private _removeTrace ( traceId : string ) : void {
184- this . _traceMap . delete ( traceId ) ;
185- this . _traceWeightMap . delete ( traceId ) ;
186- }
187-
188- private _debounceFlushInterval ( ) : void {
189- if ( this . _flushIntervalId ) {
190- clearInterval ( this . _flushIntervalId ) ;
190+ const bucket = this . _traceBuckets . get ( traceId ) ;
191+ if ( bucket ) {
192+ clearTimeout ( bucket . timeout ) ;
191193 }
192- this . _flushIntervalId = safeUnref (
193- setInterval ( ( ) => {
194- this . drain ( ) ;
195- } , this . _flushInterval ) ,
196- ) ;
194+ this . _traceBuckets . delete ( traceId ) ;
197195 }
198196}
0 commit comments