@@ -41,16 +41,16 @@ Status WalReader::init_reader(const TupleDescriptor* tuple_descriptor) {
4141
4242// ---- Unified init_reader(ReaderInitContext*) overrides ----
4343
44- Status WalReader::_open_file_reader (ReaderInitContext* /* ctx*/ ) {
44+ Status WalReader::_open_file_reader (ReaderInitContext* ctx) {
45+ auto * wal_ctx = checked_context_cast<WalInitContext>(ctx);
46+ _tuple_descriptor = wal_ctx->output_tuple_descriptor ;
4547 RETURN_IF_ERROR (_state->exec_env ()->wal_mgr ()->get_wal_path (_wal_id, _wal_path));
4648 _wal_reader = std::make_shared<doris::WalFileReader>(_wal_path);
4749 RETURN_IF_ERROR (_wal_reader->init ());
4850 return Status::OK ();
4951}
5052
51- Status WalReader::_do_init_reader (ReaderInitContext* base_ctx) {
52- auto * ctx = checked_context_cast<WalInitContext>(base_ctx);
53- _tuple_descriptor = ctx->output_tuple_descriptor ;
53+ Status WalReader::_do_init_reader (ReaderInitContext* /* base_ctx*/ ) {
5454 return Status::OK ();
5555}
5656
@@ -131,6 +131,15 @@ Status WalReader::_get_columns_impl(std::unordered_map<std::string, DataTypePtr>
131131 } catch (const std::invalid_argument& e) {
132132 return Status::InvalidArgument (" Invalid format, {}" , e.what ());
133133 }
134+ // Report WAL columns so on_before_init_reader does not mark them as missing.
135+ if (_tuple_descriptor) {
136+ for (auto * slot_desc : _tuple_descriptor->slots ()) {
137+ if (_column_pos_map.contains (slot_desc->col_unique_id ())) {
138+ name_to_type->emplace (slot_desc->col_name (),
139+ slot_desc->get_data_type_ptr ());
140+ }
141+ }
142+ }
134143 return Status::OK ();
135144}
136145
0 commit comments