@@ -103,7 +103,18 @@ BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context
103103{
104104 // Build InputStream according to the remote segment read tasks
105105 DAGPipeline pipeline;
106+ <<<<<<< HEAD
106107 buildRemoteSegmentInputStreams (db_context, buildReadTaskWithBackoff (db_context), num_streams, pipeline);
108+ =======
109+ buildRemoteSegmentInputStreams (
110+ db_context,
111+ buildReadTaskWithBackoff (db_context, scan_context),
112+ num_streams,
113+ pipeline,
114+ scan_context);
115+ // handle generated column if necessary.
116+ executeGeneratedColumnPlaceholder (generated_column_infos, log, pipeline);
117+ >>>>>>> a5e14033f8 (Fix three schema mismatch bugs under disaggregated arch (#10530 ))
107118
108119 NamesAndTypes source_columns;
109120 source_columns.reserve (table_scan.getColumnSize ());
@@ -131,8 +142,16 @@ void StorageDisaggregated::readThroughS3(
131142 exec_context,
132143 group_builder,
133144 db_context,
145+ <<<<<<< HEAD
134146 buildReadTaskWithBackoff (db_context),
135147 num_streams);
148+ =======
149+ buildReadTaskWithBackoff (db_context, scan_context),
150+ num_streams,
151+ scan_context);
152+ // handle generated column if necessary.
153+ executeGeneratedColumnPlaceholder (exec_context, group_builder, generated_column_infos, log);
154+ >>>>>>> a5e14033f8 (Fix three schema mismatch bugs under disaggregated arch (#10530 ))
136155
137156 NamesAndTypes source_columns;
138157 auto header = group_builder.getCurrentHeader ();
@@ -496,12 +515,23 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator(
496515 return DM::RSOperator::build (dag_query, table_scan.getColumns (), *columns_to_read, enable_rs_filter, log);
497516}
498517
518+ <<<<<<< HEAD
499519std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisaggregated::packSegmentReadTasks (
500520 const Context & db_context,
501521 DM::SegmentReadTasks && read_tasks,
502522 const DM::ColumnDefinesPtr & column_defines,
503523 size_t num_streams,
504524 int extra_table_id_index)
525+ =======
526+ std::tuple<std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr>, DM::ColumnDefinesPtr> StorageDisaggregated::
527+ packSegmentReadTasks(
528+ const Context & db_context,
529+ DM::SegmentReadTasks && read_tasks,
530+ const DM::ColumnDefinesPtr & column_defines,
531+ const DM::ScanContextPtr & scan_context,
532+ size_t num_streams,
533+ int extra_table_id_index)
534+ >>>>>>> a5e14033f8 (Fix three schema mismatch bugs under disaggregated arch (#10530 ))
505535{
506536 const auto & executor_id = table_scan.getTableScanExecutorID ();
507537
@@ -520,20 +550,29 @@ std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisagg
520550 push_down_filter);
521551 const UInt64 start_ts = sender_target_mpp_task_id.gather_id .query_id .start_ts ;
522552 const auto enable_read_thread = db_context.getSettingsRef ().dt_enable_read_thread ;
553+ <<<<<<< HEAD
554+ =======
555+ const auto & final_columns_defines = push_down_executor && push_down_executor->extra_cast
556+ ? push_down_executor->columns_after_cast
557+ : column_defines;
558+ RUNTIME_CHECK (num_streams > 0 , num_streams);
559+ >>>>>>> a5e14033f8 (Fix three schema mismatch bugs under disaggregated arch (#10530 ))
523560 LOG_INFO (
524561 log,
525562 " packSegmentReadTasks: enable_read_thread={} read_mode={} is_fast_scan={} keep_order={} task_count={} "
526- " num_streams={} column_defines={}" ,
563+ " num_streams={} column_defines={} final_columns_defines={} " ,
527564 enable_read_thread,
528565 magic_enum::enum_name (read_mode),
529566 table_scan.isFastScan (),
530567 table_scan.keepOrder (),
531568 read_tasks.size (),
532569 num_streams,
533- *column_defines);
570+ *column_defines,
571+ *final_columns_defines);
534572
535573 if (enable_read_thread)
536574 {
575+ <<<<<<< HEAD
537576 return std::make_shared<DM::SegmentReadTaskPool>(
538577 extra_table_id_index,
539578 *column_defines,
@@ -561,6 +600,44 @@ std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisagg
561600 .read_mode = read_mode,
562601 },
563602 num_streams);
603+ =======
604+ // Under disagg arch, now we use blocking IO to read data from cloud storage. So it require more active
605+ // segments to fully utilize the read threads.
606+ const size_t read_thread_num_active_seg = 10 * num_streams;
607+ return {
608+ std::make_shared<DM::SegmentReadTaskPool>(
609+ extra_table_id_index,
610+ *final_columns_defines,
611+ push_down_executor,
612+ start_ts,
613+ db_context.getSettingsRef ().max_block_size ,
614+ read_mode,
615+ std::move (read_tasks),
616+ /* after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {},
617+ log->identifier (),
618+ /* enable_read_thread*/ true ,
619+ num_streams,
620+ read_thread_num_active_seg,
621+ context.getDAGContext ()->getKeyspaceID (),
622+ context.getDAGContext ()->getResourceGroupName ()),
623+ final_columns_defines};
624+ }
625+ else
626+ {
627+ return {
628+ DM::Remote::RNWorkers::create (
629+ db_context,
630+ std::move (read_tasks),
631+ {
632+ .log = log->getChild (executor_id),
633+ .columns_to_read = final_columns_defines,
634+ .start_ts = start_ts,
635+ .push_down_executor = push_down_executor,
636+ .read_mode = read_mode,
637+ },
638+ num_streams),
639+ final_columns_defines};
640+ >>>>>>> a5e14033f8 (Fix three schema mismatch bugs under disaggregated arch (#10530 ))
564641 }
565642}
566643
@@ -598,15 +675,29 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams(
598675 DAGPipeline & pipeline)
599676{
600677 // Build the input streams to read blocks from remote segments
678+ <<<<<<< HEAD
601679 auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead (table_scan);
602680 auto packed_read_tasks
603681 = packSegmentReadTasks (db_context, std::move (read_tasks), column_defines, num_streams, extra_table_id_index);
604682 RUNTIME_CHECK (num_streams > 0 , num_streams);
683+ =======
684+ DM::ColumnDefinesPtr column_defines;
685+ int extra_table_id_index;
686+ std::tie (column_defines, extra_table_id_index, generated_column_infos)
687+ = genColumnDefinesForDisaggregatedRead (table_scan);
688+ auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks (
689+ db_context,
690+ std::move (read_tasks),
691+ column_defines,
692+ scan_context,
693+ num_streams,
694+ extra_table_id_index);
695+ >>>>>>> a5e14033f8 (Fix three schema mismatch bugs under disaggregated arch (#10530 ))
605696 pipeline.streams .reserve (num_streams);
606697
607698 InputStreamBuilder builder{
608699 .tracing_id = log->identifier (),
609- .columns_to_read = column_defines ,
700+ .columns_to_read = final_column_defines ,
610701 .extra_table_id_index = extra_table_id_index,
611702 };
612703 for (size_t stream_idx = 0 ; stream_idx < num_streams; ++stream_idx)
@@ -661,14 +752,28 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps(
661752 size_t num_streams)
662753{
663754 // Build the input streams to read blocks from remote segments
755+ <<<<<<< HEAD
664756 auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead (table_scan);
665757 auto packed_read_tasks
666758 = packSegmentReadTasks (db_context, std::move (read_tasks), column_defines, num_streams, extra_table_id_index);
759+ =======
760+ DM::ColumnDefinesPtr column_defines;
761+ int extra_table_id_index;
762+ std::tie (column_defines, extra_table_id_index, generated_column_infos)
763+ = genColumnDefinesForDisaggregatedRead (table_scan);
764+ auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks (
765+ db_context,
766+ std::move (read_tasks),
767+ column_defines,
768+ scan_context,
769+ num_streams,
770+ extra_table_id_index);
771+ >>>>>>> a5e14033f8 (Fix three schema mismatch bugs under disaggregated arch (#10530 ))
667772
668773 RUNTIME_CHECK (num_streams > 0 , num_streams);
669774 SrouceOpBuilder builder{
670775 .tracing_id = log->identifier (),
671- .column_defines = column_defines ,
776+ .column_defines = final_column_defines ,
672777 .extra_table_id_index = extra_table_id_index,
673778 .exec_context = exec_context,
674779 };
0 commit comments