@@ -18,6 +18,19 @@ export type DynamicFlushSchedulerConfig<T> = {
1818 isDroppableEvent ?: ( item : T ) => boolean ; // Function to determine if an event can be dropped
1919} ;
2020
21+ // Bound on the recursive batch-split safety net. 4 → up to 16-way split,
22+ // which isolates a single bad row inside a 16k-element batch.
23+ const MAX_SPLIT_DEPTH = 4 ;
24+
25+ function isClickHouseJsonParseError ( error : unknown ) : boolean {
26+ if ( ! error ) return false ;
27+ const message =
28+ typeof error === "object" && error !== null && "message" in error
29+ ? String ( ( error as { message ?: unknown } ) . message ?? "" )
30+ : String ( error ) ;
31+ return message . includes ( "Cannot parse JSON object" ) ;
32+ }
33+
2134export class DynamicFlushScheduler < T > {
2235 private batchQueue : T [ ] [ ] ;
2336 private currentBatch : T [ ] ;
@@ -43,6 +56,10 @@ export class DynamicFlushScheduler<T> {
4356 totalItemsFlushed : 0 ,
4457 droppedEvents : 0 ,
4558 droppedEventsByKind : new Map < string , number > ( ) ,
59+ // Rows dropped at flush time because ClickHouse rejected them and the
60+ // batch-split safety net couldn't isolate them further. Distinct from
61+ // `droppedEvents`, which counts pre-batch load-shedding drops.
62+ droppedRows : 0 ,
4663 } ;
4764 private isShuttingDown : boolean = false ;
4865
@@ -196,40 +213,112 @@ export class DynamicFlushScheduler<T> {
196213 // Schedule all batches for concurrent processing
197214 const flushPromises = batchesToFlush . map ( ( batch ) =>
198215 this . limiter ( async ( ) => {
199- const itemCount = batch . length ;
200-
201216 const self = this ;
202217
203- async function tryFlush ( flushId : string , batchToFlush : T [ ] , attempt : number = 1 ) {
218+ async function tryFlush (
219+ flushId : string ,
220+ batchToFlush : T [ ] ,
221+ attempt : number = 1 ,
222+ splitDepth : number = 0
223+ ) {
224+ const subBatchSize = batchToFlush . length ;
225+
204226 try {
205227 const startTime = Date . now ( ) ;
206228 await self . callback ( flushId , batchToFlush ) ;
207229
208230 const duration = Date . now ( ) - startTime ;
209- self . totalQueuedItems -= itemCount ;
231+ self . totalQueuedItems -= subBatchSize ;
210232 self . consecutiveFlushFailures = 0 ;
211233 self . lastFlushTime = Date . now ( ) ;
212234 self . metrics . flushedBatches ++ ;
213- self . metrics . totalItemsFlushed += itemCount ;
235+ self . metrics . totalItemsFlushed += subBatchSize ;
214236
215237 self . logger . debug ( "Batch flushed successfully" , {
216238 flushId,
217- itemCount,
239+ itemCount : subBatchSize ,
218240 duration,
219241 remainingQueueDepth : self . totalQueuedItems ,
220242 activeConcurrency : self . limiter . activeCount ,
221243 pendingConcurrency : self . limiter . pendingCount ,
222244 } ) ;
223245 } catch ( error ) {
246+ // ClickHouse rejects an entire batch when a single row's
247+ // attributes JSON is unparseable. Retrying the same batch will
248+ // just fail again, so split-and-retry isolates the offender
249+ // instead of poisoning the whole 5–10k-row batch.
250+ const isParseError = isClickHouseJsonParseError ( error ) ;
251+
252+ if ( isParseError && subBatchSize > 1 && splitDepth < MAX_SPLIT_DEPTH ) {
253+ const mid = Math . floor ( subBatchSize / 2 ) ;
254+ const left = batchToFlush . slice ( 0 , mid ) ;
255+ const right = batchToFlush . slice ( mid ) ;
256+
257+ self . logger . warn (
258+ "Splitting OTel batch after ClickHouse JSON parse failure" ,
259+ {
260+ flushId,
261+ itemCount : subBatchSize ,
262+ splitDepth,
263+ leftSize : left . length ,
264+ rightSize : right . length ,
265+ }
266+ ) ;
267+
268+ // Run halves concurrently and tolerate independent failures —
269+ // a rejection from one half must not prevent the other half
270+ // from completing. Each leaf's tryFlush updates totalQueuedItems
271+ // and metrics on its own success/drop paths.
272+ const results = await Promise . allSettled ( [
273+ tryFlush ( flushId + "-L" , left , 1 , splitDepth + 1 ) ,
274+ tryFlush ( flushId + "-R" , right , 1 , splitDepth + 1 ) ,
275+ ] ) ;
276+
277+ for ( const [ index , result ] of results . entries ( ) ) {
278+ if ( result . status === "rejected" ) {
279+ self . metrics . failedBatches ++ ;
280+ self . logger . error (
281+ "Split half failed after exhausting retries" ,
282+ {
283+ flushId : flushId + ( index === 0 ? "-L" : "-R" ) ,
284+ error : result . reason ,
285+ splitDepth : splitDepth + 1 ,
286+ }
287+ ) ;
288+ }
289+ }
290+ return ;
291+ }
292+
293+ if ( isParseError && subBatchSize === 1 ) {
294+ // Singleton row that ClickHouse still rejects. Drop it so
295+ // the rest of the queue keeps flowing, and log enough of
296+ // the offending event for someone to investigate later
297+ // without dumping multi-KB of attributes into the log.
298+ self . metrics . droppedRows += 1 ;
299+ self . metrics . failedBatches ++ ;
300+ self . totalQueuedItems -= 1 ;
301+ self . logger . error (
302+ "Dropping single OTel row rejected by ClickHouse JSON parser" ,
303+ {
304+ flushId,
305+ sample : JSON . stringify ( batchToFlush [ 0 ] ) . slice ( 0 , 1024 ) ,
306+ splitDepth,
307+ }
308+ ) ;
309+ return ;
310+ }
311+
224312 self . consecutiveFlushFailures ++ ;
225313 self . metrics . failedBatches ++ ;
226314
227315 self . logger . error ( "Error attempting to flush batch" , {
228316 flushId,
229- itemCount,
317+ itemCount : subBatchSize ,
230318 error,
231319 consecutiveFailures : self . consecutiveFlushFailures ,
232320 attempt,
321+ splitDepth,
233322 } ) ;
234323
235324 // Back off on failures
@@ -239,7 +328,7 @@ export class DynamicFlushScheduler<T> {
239328
240329 if ( attempt <= 3 ) {
241330 await new Promise ( ( resolve ) => setTimeout ( resolve , 500 ) ) ;
242- return await tryFlush ( flushId , batchToFlush , attempt + 1 ) ;
331+ return await tryFlush ( flushId , batchToFlush , attempt + 1 , splitDepth ) ;
243332 } else {
244333 throw error ;
245334 }
0 commit comments