Skip to content

Commit a0bcc72

Browse files
authored
chore(query): dump slow async running graph separately (#19926)
* chore(query): dump slow async running graph separately * chore(query): add exchange sender details status
1 parent 9ba446d commit a0bcc72

6 files changed

Lines changed: 88 additions & 5 deletions

File tree

src/common/tracing/src/init.rs

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,7 @@ fn env_filter(level: &str) -> EnvFilter {
128128
.filter(Some("databend::log::structlog"), LevelFilter::Off)
129129
.filter(Some("databend::log::time_series"), LevelFilter::Off)
130130
.filter(Some("databend::log::access"), LevelFilter::Off)
131+
.filter(Some("databend::log::dump_graph"), LevelFilter::Off)
131132
.parse(level),
132133
)
133134
}
@@ -462,3 +463,35 @@ pub fn init_logging(
462463

463464
_drop_guards
464465
}
466+
467+
#[cfg(test)]
468+
mod tests {
469+
use log::Level;
470+
use log::Metadata;
471+
use logforth::Filter;
472+
use logforth::filter::FilterResult;
473+
474+
use super::env_filter;
475+
476+
#[test]
477+
fn test_dump_graph_filter_is_off_by_default() {
478+
let filter = env_filter("TRACE");
479+
let metadata = Metadata::builder()
480+
.target("databend::log::dump_graph")
481+
.level(Level::Trace)
482+
.build();
483+
484+
assert_eq!(filter.enabled(&metadata), FilterResult::Reject);
485+
}
486+
487+
#[test]
488+
fn test_dump_graph_filter_can_be_enabled_by_level() {
489+
let filter = env_filter("WARN,databend::log::dump_graph=TRACE");
490+
let metadata = Metadata::builder()
491+
.target("databend::log::dump_graph")
492+
.level(Level::Trace)
493+
.build();
494+
495+
assert_eq!(filter.enabled(&metadata), FilterResult::Neutral);
496+
}
497+
}

src/query/service/src/pipelines/executor/processor_async_task.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ use databend_common_pipeline::core::ProcessorPtr;
3131
use futures_util::FutureExt;
3232
use futures_util::future::BoxFuture;
3333
use futures_util::future::Either;
34+
use log::error;
3435
use log::warn;
3536
use petgraph::prelude::NodeIndex;
3637
use tokio::time::sleep;
@@ -41,6 +42,8 @@ use crate::pipelines::executor::QueryExecutorTasksQueue;
4142
use crate::pipelines::executor::RunningGraph;
4243
use crate::pipelines::executor::WorkersCondvar;
4344

45+
const TARGET_DUMP_GRAPH: &str = "databend::log::dump_graph";
46+
4447
pub enum ExecutorTasksQueue {
4548
QueryExecutorTasksQueue(Arc<QueryExecutorTasksQueue>),
4649
QueriesExecutorTasksQueue(Arc<QueriesExecutorTasksQueue>),
@@ -124,9 +127,11 @@ impl ProcessorAsyncTask {
124127
let processor_id = unsafe { processor.id() };
125128
let processor_name = unsafe { processor.name() };
126129
let queue_clone = queue.clone();
130+
let graph_clone = graph.clone();
127131
let inner = async move {
128132
let start = Instant::now();
129133
let mut inner = inner.boxed();
134+
let mut log_graph = false;
130135

131136
loop {
132137
let interval = Box::pin(sleep(Duration::from_secs(5)));
@@ -139,6 +144,20 @@ impl ProcessorAsyncTask {
139144
"Slow async task detected - query: {:?}, processor: {:?} ({}), elapsed: {:?}, active workers: {:?}",
140145
query_id, processor_id, processor_name, elapsed, active_workers
141146
);
147+
if elapsed >= Duration::from_secs(200) && active_workers == 0 && !log_graph
148+
{
149+
log_graph = true;
150+
error!(
151+
target: TARGET_DUMP_GRAPH,
152+
"Slow async task running graph dump - query: {:?}, processor: {:?} ({}), elapsed: {:?}, active workers: {:?}, {}",
153+
query_id,
154+
processor_id,
155+
processor_name,
156+
elapsed,
157+
active_workers,
158+
graph_clone.format_graph_nodes(false)
159+
);
160+
}
142161
}
143162
Either::Right((res, _)) => {
144163
return res;

src/query/service/src/servers/flight/v1/exchange/broadcast_send_transform.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -170,12 +170,12 @@ impl Processor for BroadcastSendTransform {
170170

171171
fn details_status(&self) -> Option<String> {
172172
Some(format!(
173-
"BroadcastSendTransform {} {:?}",
173+
"handle_pending={}, local_pos={}, closed_channels={}/{}, closed={:?}",
174174
self.handle.is_some(),
175-
self.channels
176-
.iter()
177-
.map(|(_, x)| x.is_closed())
178-
.collect::<Vec<_>>()
175+
self.local_pos,
176+
self.channels.closed_count(),
177+
self.channels.len(),
178+
self.channels.closed_status(),
179179
))
180180
}
181181

src/query/service/src/servers/flight/v1/exchange/hash_send_sink.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,17 @@ impl Processor for HashSendSink {
187187
Ok(Event::NeedData)
188188
}
189189

190+
fn details_status(&self) -> Option<String> {
191+
Some(format!(
192+
"handle_pending={}, closed_channels={}/{}, closed={:?}, buffered_partitions={:?}",
193+
self.handle.is_some(),
194+
self.channels.closed_count(),
195+
self.channels.len(),
196+
self.channels.closed_status(),
197+
self.partition_stream.partition_ids(),
198+
))
199+
}
200+
190201
fn set_id(&mut self, id: NodeIndex) {
191202
self.id = id;
192203
}

src/query/service/src/servers/flight/v1/exchange/hash_send_transform.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,18 @@ impl Processor for HashSendTransform {
223223
Ok(Event::NeedData)
224224
}
225225

226+
fn details_status(&self) -> Option<String> {
227+
Some(format!(
228+
"handle_pending={}, local_pos={}, closed_channels={}/{}, closed={:?}, buffered_partitions={:?}",
229+
self.handle.is_some(),
230+
self.local_pos,
231+
self.channels.closed_count(),
232+
self.channels.len(),
233+
self.channels.closed_status(),
234+
self.partition_stream.partition_ids(),
235+
))
236+
}
237+
226238
fn set_id(&mut self, id: NodeIndex) {
227239
self.id = id;
228240
}

src/query/service/src/servers/flight/v1/exchange/outbound_send_channels.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,14 @@ impl OutboundSendChannels {
6161
.all(|(idx, ch)| idx == except_idx || ch.is_closed())
6262
}
6363

64+
pub(super) fn closed_status(&self) -> Vec<bool> {
65+
self.channels.iter().map(|ch| ch.is_closed()).collect()
66+
}
67+
68+
pub(super) fn closed_count(&self) -> usize {
69+
self.channels.iter().filter(|ch| ch.is_closed()).count()
70+
}
71+
6472
pub(super) fn close(&mut self, idx: usize) {
6573
if !self.channels[idx].is_closed() {
6674
let mut closed = DummyOutboundChannel::create();

0 commit comments

Comments
 (0)