@@ -43,6 +43,12 @@ use crate::{AdapterError, ExecuteContext, ExecuteResponse};
4343/// unbounded in-memory growth in a single giant batch.
4444const COPY_FROM_STDIN_MAX_BATCH_BYTES : usize = 32 * 1024 * 1024 ;
4545
46+ /// Cap on the number of parallel decode workers spawned per COPY FROM STDIN.
47+ /// A single network-bound stream sees marginal gains past a handful of
48+ /// decoders, and capping bounds how much of the blocking pool any one COPY can
49+ /// occupy while actively decoding.
50+ const COPY_FROM_STDIN_MAX_WORKERS : usize = 8 ;
51+
4652impl Coordinator {
4753 pub ( crate ) async fn sequence_copy_from (
4854 & mut self ,
@@ -415,10 +421,14 @@ impl Coordinator {
415421 . collect :: < Vec < _ > > ( )
416422 . into ( ) ;
417423
418- // Determine number of parallel workers.
419- let num_workers = std:: thread:: available_parallelism ( )
420- . map ( |n| n. get ( ) )
421- . unwrap_or ( 1 ) ;
424+ // Determine number of parallel workers, capped so that a single COPY
425+ // cannot reserve an unbounded share of the shared blocking pool.
426+ let num_workers = std:: cmp:: min (
427+ std:: thread:: available_parallelism ( )
428+ . map ( |n| n. get ( ) )
429+ . unwrap_or ( 1 ) ,
430+ COPY_FROM_STDIN_MAX_WORKERS ,
431+ ) ;
422432 tracing:: info!(
423433 %target_id, num_workers,
424434 "starting parallel COPY FROM STDIN batch builders"
@@ -430,11 +440,12 @@ impl Coordinator {
430440 let collection_desc = Arc :: new ( collection_desc) ;
431441 let persist_client = self . persist_client . clone ( ) ;
432442
433- // Create per-worker channels and spawn workers on blocking threads.
434- // Each worker does CPU-intensive TSV decoding + columnar encoding,
435- // so they need dedicated OS threads (not tokio async tasks) for
436- // true parallelism.
437- let rt_handle = tokio:: runtime:: Handle :: current ( ) ;
443+ // Create per-worker channels and spawn one async task per worker. Each
444+ // worker offloads the CPU-intensive processing of a chunk (decode plus
445+ // the per-row transform/constraint-check/columnar encode) to the
446+ // blocking pool for the duration of that chunk (see
447+ // `copy_from_stdin_batch_builder`), so workers run in parallel while
448+ // doing CPU work but hold no thread while idle between chunks.
438449 let mut batch_txs = Vec :: with_capacity ( num_workers) ;
439450 let mut worker_handles = Vec :: with_capacity ( num_workers) ;
440451
@@ -464,24 +475,21 @@ impl Coordinator {
464475 // Only worker 0 receives the first chunk (round-robin), so only
465476 // it needs to skip the CSV header on its first chunk.
466477 let skip_header_on_first_chunk = worker_id == 0 && first_chunk_has_header;
467- let rt = rt_handle. clone ( ) ;
468478
469- let handle = mz_ore:: task:: spawn_blocking (
479+ let handle = mz_ore:: task:: spawn (
470480 || format ! ( "copy_from_stdin_worker:{target_id}:{worker_id}" ) ,
471- move || {
472- rt. block_on ( Self :: copy_from_stdin_batch_builder (
473- persist_client,
474- shard_id,
475- collection_id,
476- collection_desc,
477- target_desc,
478- column_transform,
479- column_types,
480- params,
481- skip_header_on_first_chunk,
482- batch_rx,
483- ) )
484- } ,
481+ Self :: copy_from_stdin_batch_builder (
482+ persist_client,
483+ shard_id,
484+ collection_id,
485+ collection_desc,
486+ target_desc,
487+ column_transform,
488+ column_types,
489+ params,
490+ skip_header_on_first_chunk,
491+ batch_rx,
492+ ) ,
485493 ) ;
486494 worker_handles. push ( handle) ;
487495 }
@@ -555,10 +563,11 @@ impl Coordinator {
555563 let mut batch_bytes: usize = 0 ;
556564 let mut proto_batches = Vec :: new ( ) ;
557565
566+ let rt = tokio:: runtime:: Handle :: current ( ) ;
558567 let mut is_first_chunk = true ;
559568 while let Some ( raw_bytes) = batch_rx. recv ( ) . await {
560- // Decode raw bytes into rows. For the first chunk of worker 0,
561- // re-enable header skipping so the real CSV header line is skipped.
569+ // For the first chunk of worker 0, re-enable header skipping so the
570+ // real CSV header line is skipped.
562571 let chunk_params = if is_first_chunk && skip_header_on_first_chunk {
563572 let mut p = params. clone ( ) ;
564573 if let CopyFormatParams :: Csv ( ref mut csv) = p {
@@ -569,34 +578,73 @@ impl Coordinator {
569578 params. clone ( )
570579 } ;
571580 is_first_chunk = false ;
572- let rows = mz_pgcopy:: decode_copy_format ( & raw_bytes, & column_types, chunk_params)
573- . map_err ( |e| AdapterError :: CopyFormatError ( e. to_string ( ) ) ) ?;
574-
575- for row in rows {
576- // Apply column transform if needed (add defaults, reorder).
577- let full_row = if let Some ( ref transform) = * column_transform {
578- transform. apply ( & row)
579- } else {
580- row
581- } ;
582-
583- // Check constraints.
584- for ( i, datum) in full_row. iter ( ) . enumerate ( ) {
585- target_desc. constraints_met ( i, & datum) . map_err ( |e| {
586- AdapterError :: Unstructured ( anyhow:: anyhow!( "constraint violation: {e}" ) )
587- } ) ?;
588- }
589-
590- let data = SourceData ( Ok ( full_row) ) ;
591- batch_builder
592- . add ( & data, & ( ) , & lower, & 1 )
593- . await
594- . map_err ( |e| AdapterError :: Unstructured ( anyhow:: anyhow!( "persist add: {e}" ) ) ) ?;
595- row_count += 1 ;
596- row_count_in_batch += 1 ;
597- }
581+ let raw_len = raw_bytes. len ( ) ;
582+
583+ // Offload the entire CPU-bound per-chunk pipeline -- decode, column
584+ // transform, constraint checks, and the columnar persist encode
585+ // (`BatchBuilder::add` -> `PartBuilder::push`) -- to the blocking
586+ // pool. There is no yield point in the row loop until a batch fills
587+ // (`add` only awaits `flush_part`, and only once an *encoded* part
588+ // reaches `blob_target_size`, far beyond the 32 MiB *raw* batch
589+ // boundary), so left on the async runtime each chunk's rows would
590+ // run as one uninterrupted burst on a shared runtime worker thread,
591+ // starving other connections. The blocking thread is held only
592+ // while a chunk is in flight and released back to the pool between
593+ // chunks (during `recv().await`), so idle workers still hold no
594+ // thread. `block_on` is invoked once per chunk -- not per row -- to
595+ // drive the row loop and the rare `flush_part` it may await.
596+ let chunk_column_types = Arc :: clone ( & column_types) ;
597+ let chunk_transform = Arc :: clone ( & column_transform) ;
598+ let chunk_target_desc = Arc :: clone ( & target_desc) ;
599+ let chunk_rt = rt. clone ( ) ;
600+ let ( returned_builder, added_rows) = mz_ore:: task:: spawn_blocking (
601+ || "copy_from_stdin_process_chunk" ,
602+ move || {
603+ let rows = mz_pgcopy:: decode_copy_format (
604+ & raw_bytes,
605+ & chunk_column_types,
606+ chunk_params,
607+ )
608+ . map_err ( |e| AdapterError :: CopyFormatError ( e. to_string ( ) ) ) ?;
609+
610+ chunk_rt. block_on ( async move {
611+ let mut added: u64 = 0 ;
612+ for row in rows {
613+ // Apply column transform if needed (add defaults, reorder).
614+ let full_row = if let Some ( ref transform) = * chunk_transform {
615+ transform. apply ( & row)
616+ } else {
617+ row
618+ } ;
619+
620+ // Check constraints.
621+ for ( i, datum) in full_row. iter ( ) . enumerate ( ) {
622+ chunk_target_desc. constraints_met ( i, & datum) . map_err ( |e| {
623+ AdapterError :: Unstructured ( anyhow:: anyhow!(
624+ "constraint violation: {e}"
625+ ) )
626+ } ) ?;
627+ }
628+
629+ let data = SourceData ( Ok ( full_row) ) ;
630+ batch_builder
631+ . add ( & data, & ( ) , & lower, & 1 )
632+ . await
633+ . map_err ( |e| {
634+ AdapterError :: Unstructured ( anyhow:: anyhow!( "persist add: {e}" ) )
635+ } ) ?;
636+ added += 1 ;
637+ }
638+ Ok :: < _ , AdapterError > ( ( batch_builder, added) )
639+ } )
640+ } ,
641+ )
642+ . await ?;
643+ batch_builder = returned_builder;
644+ row_count += added_rows;
645+ row_count_in_batch += added_rows;
598646
599- batch_bytes = batch_bytes. saturating_add ( raw_bytes . len ( ) ) ;
647+ batch_bytes = batch_bytes. saturating_add ( raw_len ) ;
600648 if batch_bytes >= COPY_FROM_STDIN_MAX_BATCH_BYTES {
601649 let batch = batch_builder. finish ( upper. clone ( ) ) . await . map_err ( |e| {
602650 AdapterError :: Unstructured ( anyhow:: anyhow!( "persist finish: {e}" ) )
0 commit comments