Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
272 changes: 266 additions & 6 deletions be/src/exec/pipeline/pipeline_fragment_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
#include "exec/pipeline/pipeline_fragment_context.h"

#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <pthread.h>
Expand All @@ -26,6 +28,8 @@
#include <cstdlib>
// IWYU pragma: no_include <bits/chrono.h>
#include <fmt/format.h>
#include <thrift/Thrift.h>
#include <thrift/transport/TTransportException.h>

#include <chrono> // IWYU pragma: keep
#include <map>
Expand Down Expand Up @@ -122,23 +126,24 @@
#include "runtime/result_buffer_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "util/client_cache.h"
#include "util/countdown_latch.h"
#include "util/debug_util.h"
#include "util/network_util.h"
#include "util/uid_util.h"

namespace doris {
PipelineFragmentContext::PipelineFragmentContext(
TUniqueId query_id, const TPipelineFragmentParams& request,
std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
report_status_callback report_status_cb)
const std::function<void(RuntimeState*, Status*)>& call_back)
: _query_id(std::move(query_id)),
_fragment_id(request.fragment_id),
_exec_env(exec_env),
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
_is_report_on_cancel(true),
_report_status_cb(std::move(report_status_cb)),
_params(request),
_parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0),
_need_notify_close(request.__isset.need_notify_close ? request.need_notify_close
Expand Down Expand Up @@ -1958,6 +1963,256 @@ std::string PipelineFragmentContext::get_first_error_msg() {
return "";
}

std::string PipelineFragmentContext::_to_http_path(const std::string& file_name) const {
std::stringstream url;
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_download_load?"
<< "token=" << _exec_env->token() << "&file=" << file_name;
return url.str();
}

void PipelineFragmentContext::_coordinator_callback(const ReportStatusRequest& req) {
DBUG_EXECUTE_IF("FragmentMgr::coordinator_callback.report_delay", {
int random_seconds = req.status.is<ErrorCode::DATA_QUALITY_ERROR>() ? 8 : 2;
LOG_INFO("sleep : ").tag("time", random_seconds).tag("query_id", print_id(req.query_id));
std::this_thread::sleep_for(std::chrono::seconds(random_seconds));
LOG_INFO("sleep done").tag("query_id", print_id(req.query_id));
});

DCHECK(req.status.ok() || req.done); // if !status.ok() => done
if (req.coord_addr.hostname == "external") {
// External query (flink/spark read tablets) not need to report to FE.
return;
}
int callback_retries = 10;
const int sleep_ms = 1000;
Status exec_status = req.status;
Status coord_status;
std::unique_ptr<FrontendServiceConnection> coord = nullptr;
do {
coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
req.coord_addr, &coord_status);
if (!coord_status.ok()) {
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
}
} while (!coord_status.ok() && callback_retries-- > 0);

if (!coord_status.ok()) {
UniqueId uid(req.query_id.hi, req.query_id.lo);
static_cast<void>(req.cancel_fn(Status::InternalError(
"query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
return;
}

TReportExecStatusParams params;
params.protocol_version = FrontendServiceVersion::V1;
params.__set_query_id(req.query_id);
params.__set_backend_num(req.backend_num);
params.__set_fragment_instance_id(req.fragment_instance_id);
params.__set_fragment_id(req.fragment_id);
params.__set_status(exec_status.to_thrift());
params.__set_done(req.done);
params.__set_query_type(req.runtime_state->query_type());
params.__isset.profile = false;

DCHECK(req.runtime_state != nullptr);

if (req.runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
} else {
DCHECK(!req.runtime_states.empty());
if (!req.runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(_to_http_path(it));
}
}
if (!params.delta_urls.empty()) {
params.__isset.delta_urls = true;
}
}

static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
static std::string s_unselected_rows = "unselected.rows";
int64_t num_rows_load_success = 0;
int64_t num_rows_load_filtered = 0;
int64_t num_rows_load_unselected = 0;
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
params.__isset.load_counters = true;

num_rows_load_success = req.runtime_state->num_rows_load_success();
num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
rs->num_finished_range() > 0) {
params.__isset.load_counters = true;
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
num_rows_load_unselected += rs->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
t.__set_loaded_rows(rs->num_rows_load_total());
t.__set_loaded_bytes(rs->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
}
}
}
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));

if (!req.load_error_url.empty()) {
params.__set_tracking_url(req.load_error_url);
}
if (!req.first_error_msg.empty()) {
params.__set_first_error_msg(req.first_error_msg);
}
for (auto* rs : req.runtime_states) {
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
}
if (!req.runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = req.runtime_state->export_output_files();
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files.insert(params.export_files.end(),
rs->export_output_files().begin(),
rs->export_output_files().end());
}
}
}
if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
}
}
}
if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
rs_eti.end());
}
}
}
if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
hpu.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
rs_hpu.begin(), rs_hpu.end());
}
}
}
if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
icd.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
rs_icd.begin(), rs_icd.end());
}
}
}

if (auto mcd = req.runtime_state->mc_commit_datas(); !mcd.empty()) {
params.__isset.mc_commit_datas = true;
params.mc_commit_datas.insert(params.mc_commit_datas.end(), mcd.begin(), mcd.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_mcd = rs->mc_commit_datas(); !rs_mcd.empty()) {
params.__isset.mc_commit_datas = true;
params.mc_commit_datas.insert(params.mc_commit_datas.end(), rs_mcd.begin(),
rs_mcd.end());
}
}
}

req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (!params.error_log.empty());

if (_exec_env->cluster_info()->backend_id != 0) {
params.__set_backend_id(_exec_env->cluster_info()->backend_id);
}

TReportExecStatusResult res;
Status rpc_status;

VLOG_DEBUG << "reportExecStatus params is "
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.msg()
<< " to coordinator: " << req.coord_addr
<< ", query id: " << print_id(req.query_id);
}
try {
try {
(*coord)->reportExecStatus(res, params);
} catch ([[maybe_unused]] apache::thrift::transport::TTransportException& e) {
#ifndef ADDRESS_SANITIZER
LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
<< ", instance id: " << print_id(req.fragment_instance_id) << " to "
<< req.coord_addr << ", err: " << e.what();
#endif
rpc_status = coord->reopen();

if (!rpc_status.ok()) {
req.cancel_fn(rpc_status);
return;
}
(*coord)->reportExecStatus(res, params);
}

rpc_status = Status::create<false>(res.status);
} catch (apache::thrift::TException& e) {
rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
PrintThriftNetworkAddress(req.coord_addr), e.what());
}

if (!rpc_status.ok()) {
LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
print_id(req.query_id), rpc_status.to_string());
req.cancel_fn(rpc_status);
}
}

Status PipelineFragmentContext::send_report(bool done) {
Status exec_status = _query_ctx->exec_status();
// If plan is done successfully, but _is_report_success is false,
Expand Down Expand Up @@ -2009,9 +2264,14 @@ Status PipelineFragmentContext::send_report(bool done) {
.load_error_url = load_eror_url,
.first_error_msg = first_error_msg,
.cancel_fn = [this](const Status& reason) { cancel(reason); }};

return _report_status_cb(
req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()));
auto ctx = std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this());
return _exec_env->fragment_mgr()->get_thread_pool()->submit_func([this, req, ctx]() {
SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
_coordinator_callback(req);
if (!req.done) {
ctx->refresh_next_report_time();
}
});
}

size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const {
Expand Down
22 changes: 4 additions & 18 deletions be/src/exec/pipeline/pipeline_fragment_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,18 +52,9 @@ class Dependency;
class PipelineFragmentContext : public TaskExecutionContext {
public:
ENABLE_FACTORY_CREATOR(PipelineFragmentContext);
// Callback to report execution status of plan fragment.
// 'profile' is the cumulative profile, 'done' indicates whether the execution
// is done or still continuing.
// Note: this does not take a const RuntimeProfile&, because it might need to call
// functions like PrettyPrint() or to_thrift(), neither of which is const
// because they take locks.
using report_status_callback = std::function<Status(
const ReportStatusRequest, std::shared_ptr<PipelineFragmentContext>&&)>;
PipelineFragmentContext(TUniqueId query_id, const TPipelineFragmentParams& request,
std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
report_status_callback report_status_cb);
const std::function<void(RuntimeState*, Status*)>& call_back);

~PipelineFragmentContext() override;

Expand Down Expand Up @@ -157,6 +148,9 @@ class PipelineFragmentContext : public TaskExecutionContext {
}

private:
void _coordinator_callback(const ReportStatusRequest& req);
std::string _to_http_path(const std::string& file_name) const;

void _release_resource();

Status _build_and_prepare_full_pipeline(ThreadPool* thread_pool);
Expand Down Expand Up @@ -257,14 +251,6 @@ class PipelineFragmentContext : public TaskExecutionContext {
std::atomic_bool _disable_period_report = true;
std::atomic_uint64_t _previous_report_time = 0;

// This callback is used to notify the FE of the status of the fragment.
// For example:
// 1. when the fragment is cancelled, it will be called.
// 2. when the fragment is finished, it will be called. especially, when the fragment is
// a insert into select statement, it should notfiy FE every fragment's status.
// And also, this callback is called periodly to notify FE the load process.
report_status_callback _report_status_cb;

DescriptorTbl* _desc_tbl = nullptr;
int _num_instances = 1;

Expand Down
Loading
Loading