@@ -24,14 +24,20 @@ internal sealed class DataStreamsWriter : IDataStreamsWriter
2424{
2525 private const TaskCreationOptions TaskOptions = TaskCreationOptions . RunContinuationsAsynchronously ;
2626
27+ // Drain the queues when this many items accumulate OR after DrainTimeoutMs — whichever comes first.
28+ // At the queue limit of 10_000, this leaves 50% headroom even at 2× current peak throughput.
29+ private const int DrainThreshold = 5_000 ;
30+ private const int DrainTimeoutMs = 1_000 ;
31+
2732 private static readonly IDatadogLogger Log = DatadogLogging . GetLoggerFor < DataStreamsWriter > ( ) ;
2833
2934 private readonly object _initLock = new ( ) ;
3035 private readonly long _bucketDurationMs ;
36+
3137 private readonly BoundedConcurrentQueue < StatsPoint > _buffer = new ( queueLimit : 10_000 ) ;
3238 private readonly BoundedConcurrentQueue < BacklogPoint > _backlogBuffer = new ( queueLimit : 10_000 ) ;
3339 private readonly BoundedConcurrentQueue < DataStreamsTransactionInfo > _transactionBuffer = new ( queueLimit : 10_000 ) ;
34- private readonly TimeSpan _waitTimeSpan = TimeSpan . FromMilliseconds ( 10 ) ;
40+ private readonly ManualResetEventSlim _drainSignal = new ( false ) ;
3541 private readonly TimeSpan _flushSemaphoreWaitTime = TimeSpan . FromSeconds ( 1 ) ;
3642 private readonly DataStreamsAggregator _aggregator ;
3743 private readonly IDiscoveryService _discoveryService ;
@@ -125,6 +131,11 @@ public void Add(in StatsPoint point)
125131
126132 if ( _buffer . TryEnqueue ( point ) )
127133 {
134+ if ( ! _drainSignal . IsSet && _buffer . Count >= DrainThreshold )
135+ {
136+ _drainSignal . Set ( ) ;
137+ }
138+
128139 return ;
129140 }
130141 }
@@ -143,6 +154,11 @@ public void AddTransaction(in DataStreamsTransactionInfo transaction)
143154 {
144155 if ( _transactionBuffer . TryEnqueue ( transaction ) )
145156 {
157+ if ( ! _drainSignal . IsSet && _transactionBuffer . Count >= DrainThreshold )
158+ {
159+ _drainSignal . Set ( ) ;
160+ }
161+
146162 return ;
147163 }
148164 }
@@ -181,6 +197,7 @@ public async Task DisposeAsync()
181197#endif
182198 await FlushAndCloseAsync ( ) . ConfigureAwait ( false ) ;
183199 _flushSemaphore . Dispose ( ) ;
200+ _drainSignal . Dispose ( ) ;
184201 }
185202
186203 private async Task FlushAndCloseAsync ( )
@@ -190,6 +207,9 @@ private async Task FlushAndCloseAsync()
190207 return ;
191208 }
192209
210+ // Wake ProcessQueueLoop immediately so it can observe _processExit and return.
211+ _drainSignal . Set ( ) ;
212+
193213 // nothing else to do, since the writer was not fully initialized
194214 if ( ! Volatile . Read ( ref _isInitialized ) || _processTask == null || _flushTask == null )
195215 {
@@ -347,7 +367,8 @@ private void ProcessQueueLoop()
347367 {
348368 while ( true )
349369 {
350- Thread . Sleep ( _waitTimeSpan ) ;
370+ _drainSignal . Wait ( DrainTimeoutMs ) ;
371+ _drainSignal . Reset ( ) ;
351372
352373 if ( ! _flushSemaphore . Wait ( _flushSemaphoreWaitTime ) )
353374 {
0 commit comments