@@ -115,6 +115,8 @@ BlockInputStreams StorageDisaggregated::readThroughS3(const Context & db_context
115115 num_streams,
116116 pipeline,
117117 scan_context);
118+ // handle generated column if necessary.
119+ executeGeneratedColumnPlaceholder (generated_column_infos, log, pipeline);
118120
119121 NamesAndTypes source_columns;
120122 source_columns.reserve (table_scan.getColumnSize ());
@@ -149,6 +151,8 @@ void StorageDisaggregated::readThroughS3(
149151 buildReadTaskWithBackoff (db_context, scan_context),
150152 num_streams,
151153 scan_context);
154+ // handle generated column if necessary.
155+ executeGeneratedColumnPlaceholder (exec_context, group_builder, generated_column_infos, log);
152156
153157 NamesAndTypes source_columns;
154158 auto header = group_builder.getCurrentHeader ();
@@ -609,13 +613,14 @@ std::tuple<DM::RSOperatorPtr, DM::ColumnRangePtr> StorageDisaggregated::buildRSO
609613 return {rs_operator, column_range};
610614}
611615
612- std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisaggregated::packSegmentReadTasks (
613- const Context & db_context,
614- DM::SegmentReadTasks && read_tasks,
615- const DM::ColumnDefinesPtr & column_defines,
616- const DM::ScanContextPtr & scan_context,
617- size_t num_streams,
618- int extra_table_id_index)
616+ std::tuple<std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr>, DM::ColumnDefinesPtr> StorageDisaggregated::
617+ packSegmentReadTasks (
618+ const Context & db_context,
619+ DM::SegmentReadTasks && read_tasks,
620+ const DM::ColumnDefinesPtr & column_defines,
621+ const DM::ScanContextPtr & scan_context,
622+ size_t num_streams,
623+ int extra_table_id_index)
619624{
620625 const auto & executor_id = table_scan.getTableScanExecutorID ();
621626
@@ -651,53 +656,61 @@ std::variant<DM::Remote::RNWorkersPtr, DM::SegmentReadTaskPoolPtr> StorageDisagg
651656 scan_context->read_mode = read_mode;
652657 const UInt64 start_ts = sender_target_mpp_task_id.gather_id .query_id .start_ts ;
653658 const auto enable_read_thread = db_context.getSettingsRef ().dt_enable_read_thread ;
659+ const auto & final_columns_defines = push_down_executor && push_down_executor->extra_cast
660+ ? push_down_executor->columns_after_cast
661+ : column_defines;
654662 RUNTIME_CHECK (num_streams > 0 , num_streams);
655663 LOG_INFO (
656664 log,
657665 " packSegmentReadTasks: enable_read_thread={} read_mode={} is_fast_scan={} keep_order={} task_count={} "
658- " num_streams={} column_defines={}" ,
666+ " num_streams={} column_defines={} final_columns_defines={} " ,
659667 enable_read_thread,
660668 magic_enum::enum_name (read_mode),
661669 table_scan.isFastScan (),
662670 table_scan.keepOrder (),
663671 read_tasks.size (),
664672 num_streams,
665- *column_defines);
673+ *column_defines,
674+ *final_columns_defines);
666675
667676 if (enable_read_thread)
668677 {
669678 // Under disagg arch, now we use blocking IO to read data from cloud storage. So it require more active
670679 // segments to fully utilize the read threads.
671680 const size_t read_thread_num_active_seg = 10 * num_streams;
672- return std::make_shared<DM::SegmentReadTaskPool>(
673- extra_table_id_index,
674- *column_defines,
675- push_down_executor,
676- start_ts,
677- db_context.getSettingsRef ().max_block_size ,
678- read_mode,
679- std::move (read_tasks),
680- /* after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {},
681- log->identifier (),
682- /* enable_read_thread*/ true ,
683- num_streams,
684- read_thread_num_active_seg,
685- context.getDAGContext ()->getKeyspaceID (),
686- context.getDAGContext ()->getResourceGroupName ());
681+ return {
682+ std::make_shared<DM::SegmentReadTaskPool>(
683+ extra_table_id_index,
684+ *final_columns_defines,
685+ push_down_executor,
686+ start_ts,
687+ db_context.getSettingsRef ().max_block_size ,
688+ read_mode,
689+ std::move (read_tasks),
690+ /* after_segment_read*/ [](const DM::DMContextPtr &, const DM::SegmentPtr &) {},
691+ log->identifier (),
692+ /* enable_read_thread*/ true ,
693+ num_streams,
694+ read_thread_num_active_seg,
695+ context.getDAGContext ()->getKeyspaceID (),
696+ context.getDAGContext ()->getResourceGroupName ()),
697+ final_columns_defines};
687698 }
688699 else
689700 {
690- return DM::Remote::RNWorkers::create (
691- db_context,
692- std::move (read_tasks),
693- {
694- .log = log->getChild (executor_id),
695- .columns_to_read = column_defines,
696- .start_ts = start_ts,
697- .push_down_executor = push_down_executor,
698- .read_mode = read_mode,
699- },
700- num_streams);
701+ return {
702+ DM::Remote::RNWorkers::create (
703+ db_context,
704+ std::move (read_tasks),
705+ {
706+ .log = log->getChild (executor_id),
707+ .columns_to_read = final_columns_defines,
708+ .start_ts = start_ts,
709+ .push_down_executor = push_down_executor,
710+ .read_mode = read_mode,
711+ },
712+ num_streams),
713+ final_columns_defines};
701714 }
702715}
703716
@@ -739,8 +752,11 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams(
739752 const DM::ScanContextPtr & scan_context)
740753{
741754 // Build the input streams to read blocks from remote segments
742- auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead (table_scan);
743- auto packed_read_tasks = packSegmentReadTasks (
755+ DM::ColumnDefinesPtr column_defines;
756+ int extra_table_id_index;
757+ std::tie (column_defines, extra_table_id_index, generated_column_infos)
758+ = genColumnDefinesForDisaggregatedRead (table_scan);
759+ auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks (
744760 db_context,
745761 std::move (read_tasks),
746762 column_defines,
@@ -751,7 +767,7 @@ void StorageDisaggregated::buildRemoteSegmentInputStreams(
751767
752768 InputStreamBuilder builder{
753769 .tracing_id = log->identifier (),
754- .columns_to_read = column_defines ,
770+ .columns_to_read = final_column_defines ,
755771 .extra_table_id_index = extra_table_id_index,
756772 };
757773 for (size_t stream_idx = 0 ; stream_idx < num_streams; ++stream_idx)
@@ -810,8 +826,11 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps(
810826 const DM::ScanContextPtr & scan_context)
811827{
812828 // Build the input streams to read blocks from remote segments
813- auto [column_defines, extra_table_id_index] = genColumnDefinesForDisaggregatedRead (table_scan);
814- auto packed_read_tasks = packSegmentReadTasks (
829+ DM::ColumnDefinesPtr column_defines;
830+ int extra_table_id_index;
831+ std::tie (column_defines, extra_table_id_index, generated_column_infos)
832+ = genColumnDefinesForDisaggregatedRead (table_scan);
833+ auto [packed_read_tasks, final_column_defines] = packSegmentReadTasks (
815834 db_context,
816835 std::move (read_tasks),
817836 column_defines,
@@ -821,7 +840,7 @@ void StorageDisaggregated::buildRemoteSegmentSourceOps(
821840
822841 SourceOpBuilder builder{
823842 .tracing_id = log->identifier (),
824- .column_defines = column_defines ,
843+ .column_defines = final_column_defines ,
825844 .extra_table_id_index = extra_table_id_index,
826845 .exec_context = exec_context,
827846 };
0 commit comments