Skip to content

Commit 8972b39

Browse files
add metics for fragment cancel counts
1 parent aa88a9c commit 8972b39

5 files changed

Lines changed: 27 additions & 1 deletion

File tree

be/src/runtime/fragment_mgr.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,9 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::
100100
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
101101

102102
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
103+
104+
bvar::Adder<uint64_t> g_timeout_canceled_fragment_count("timeout_canceled_fragment_count");
105+
103106
bvar::Status<uint64_t> g_fragment_last_active_time(
104107
"fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
105108
std::chrono::system_clock::now().time_since_epoch())
@@ -882,6 +885,7 @@ Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
882885
return Status::OK();
883886
}
884887

888+
// fe->be 取消的调用
885889
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
886890
std::shared_ptr<QueryContext> query_ctx = nullptr;
887891
{
@@ -893,6 +897,13 @@ void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
893897
return;
894898
}
895899
}
900+
901+
// cancel_query 有两部分发起者,
902+
if (reason.is<ErrorCode::TIMEOUT>()) {
903+
g_timeout_canceled_fragment_count << 1;
904+
LOG(WARNING) << "Query " << print_id(query_id) << " it is cancel by be";
905+
}
906+
896907
query_ctx->cancel(reason);
897908
_query_ctx_map.erase(query_id);
898909
LOG(INFO) << "Query " << print_id(query_id)

be/src/service/internal_service.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -646,6 +646,10 @@ void PInternalService::cancel_plan_fragment(google::protobuf::RpcController* /*c
646646
// to cancel status here.
647647
if (request->cancel_reason() == PPlanFragmentCancelReason::LIMIT_REACH) {
648648
actual_cancel_status = Status::Error<ErrorCode::LIMIT_REACH>("limit reach");
649+
} else if (request->has_cancel_reason() == PPlanFragmentCancelReason::TIMEOUT) {
650+
actual_cancel_status = Status::Error<ErrorCode::TIMEOUT>(
651+
"cancelled by timeout, reason: {}", PPlanFragmentCancelReason_Name(
652+
request->cancel_reason()));
649653
} else {
650654
// Use cancel reason as error message
651655
actual_cancel_status = Status::InternalError(

fe/fe-core/src/main/java/org/apache/doris/common/Status.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ public void updateStatus(TStatusCode code, String errorMessage) {
8383
this.errorMsg = errorMessage;
8484
}
8585

86+
public void updateErrorCode(TStatusCode code) {
87+
this.errorCode = code;
88+
}
89+
8690
public boolean ok() {
8791
return this.errorCode == TStatusCode.OK;
8892
}

fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1018,7 +1018,11 @@ protected Map<TNetworkAddress, List<Long>> waitPipelineRpc(List<Pair<Long, Trip
10181018
if (exception != null && errMsg == null) {
10191019
errMsg = operation + " failed. " + exception.getMessage();
10201020
}
1021-
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
1021+
if (code == TStatusCode.TIMEOUT) {
1022+
queryStatus.updateStatus(TStatusCode.TIMEOUT, errMsg);
1023+
} else {
1024+
queryStatus.updateStatus(TStatusCode.INTERNAL_ERROR, errMsg);
1025+
}
10221026
cancelInternal(queryStatus);
10231027
switch (code) {
10241028
case TIMEOUT:

fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1652,6 +1652,9 @@ public void executeAndSendResult(boolean isOutfileQuery, boolean isSendFields,
16521652
Status internalErrorSt = new Status(TStatusCode.INTERNAL_ERROR,
16531653
"cancel fragment query_id:{} cause {}",
16541654
DebugUtil.printId(context.queryId()), e.getMessage());
1655+
if (e.getMessage().contains("query timeout")) {
1656+
internalErrorSt.updateErrorCode(TStatusCode.TIMEOUT);
1657+
}
16551658
LOG.warn(internalErrorSt.getErrorMsg());
16561659
coordBase.cancel(internalErrorSt);
16571660
throw e;

0 commit comments

Comments
 (0)