Skip to content

Commit 3afb936

Browse files
authored
refactor(query): isolate http timeout tasks (#19838)
1 parent 882d34b commit 3afb936

5 files changed

Lines changed: 66 additions & 38 deletions

File tree

src/common/base/src/runtime/global_runtime.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use crate::runtime::Runtime;
2121

2222
pub struct GlobalIORuntime;
2323

24+
pub struct GlobalControlRuntime(pub Runtime);
25+
2426
pub struct GlobalQueryRuntime(pub Runtime);
2527

2628
impl GlobalQueryRuntime {
@@ -47,6 +49,23 @@ impl GlobalIORuntime {
4749
}
4850
}
4951

52+
impl GlobalControlRuntime {
53+
#[inline(always)]
54+
pub fn runtime(self: &Arc<Self>) -> &Runtime {
55+
&self.0
56+
}
57+
58+
pub fn init() -> Result<()> {
59+
let rt = Runtime::with_worker_threads(2, Some("control-worker".to_owned()))?;
60+
GlobalInstance::set(Arc::new(GlobalControlRuntime(rt)));
61+
Ok(())
62+
}
63+
64+
pub fn instance() -> Arc<GlobalControlRuntime> {
65+
GlobalInstance::get()
66+
}
67+
}
68+
5069
impl GlobalQueryRuntime {
5170
pub fn init(num_cpus: usize) -> Result<()> {
5271
let thread_num = std::cmp::max(num_cpus, num_cpus::get() / 2);

src/common/base/src/runtime/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ pub use defer::defer;
3939
pub use executor_stats::ExecutorStats;
4040
pub use executor_stats::ExecutorStatsSlot;
4141
pub use executor_stats::ExecutorStatsSnapshot;
42+
pub use global_runtime::GlobalControlRuntime;
4243
pub use global_runtime::GlobalIORuntime;
4344
pub use global_runtime::GlobalQueryRuntime;
4445
pub use memory::GLOBAL_MEM_STAT;

src/query/service/src/global_services.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use std::sync::Arc;
1818
use databend_common_base::base::BuildInfoRef;
1919
use databend_common_base::base::GlobalInstance;
2020
use databend_common_base::runtime::GLOBAL_QUERIES_MANAGER;
21+
use databend_common_base::runtime::GlobalControlRuntime;
2122
use databend_common_base::runtime::GlobalIORuntime;
2223
use databend_common_base::runtime::GlobalQueryRuntime;
2324
use databend_common_catalog::catalog::CatalogCreator;
@@ -110,6 +111,7 @@ impl GlobalServices {
110111

111112
// 3. runtime init.
112113
GlobalIORuntime::init(config.storage.num_cpus as usize)?;
114+
GlobalControlRuntime::init()?;
113115
GlobalQueryRuntime::init(config.storage.num_cpus as usize)?;
114116

115117
// 4. cluster discovery init.

src/query/service/src/servers/http/v1/query/http_query_manager.rs

Lines changed: 40 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use std::time::SystemTime;
2424
use chrono::SecondsFormat;
2525
use databend_common_base::JoinHandle;
2626
use databend_common_base::base::GlobalInstance;
27-
use databend_common_base::runtime::GlobalIORuntime;
27+
use databend_common_base::runtime::GlobalControlRuntime;
2828
use databend_common_config::InnerConfig;
2929
use databend_common_exception::ErrorCode;
3030
use databend_common_exception::Result;
@@ -161,34 +161,36 @@ impl HttpQueryManager {
161161
// it may cannot destroy with final or kill when we hold ref of Arc<HttpQuery>
162162
let http_query_weak = Arc::downgrade(&query);
163163

164-
GlobalIORuntime::instance().spawn(async move {
165-
loop {
166-
let expire_res = match http_query_weak.upgrade() {
167-
None => {
168-
break;
169-
}
170-
Some(query) => query.check_timeout().await,
171-
};
164+
GlobalControlRuntime::instance()
165+
.runtime()
166+
.spawn(async move {
167+
loop {
168+
let expire_res = match http_query_weak.upgrade() {
169+
None => {
170+
break;
171+
}
172+
Some(query) => query.check_timeout().await,
173+
};
172174

173-
match expire_res {
174-
TimeoutResult::TimedOut => {
175-
_ = self_clone
176-
.close_query(&query_id_clone, CloseReason::TimedOut, &None, false)
177-
.await
178-
.ok();
179-
}
180-
TimeoutResult::Sleep(t) => {
181-
sleep(t).await;
182-
}
183-
TimeoutResult::Remove => {
184-
let mut queries = self_clone.queries.write();
185-
queries.remove(&query_id_clone);
186-
log::info!("Query {query_id_clone} removed");
187-
break;
175+
match expire_res {
176+
TimeoutResult::TimedOut => {
177+
_ = self_clone
178+
.close_query(&query_id_clone, CloseReason::TimedOut, &None, false)
179+
.await
180+
.ok();
181+
}
182+
TimeoutResult::Sleep(t) => {
183+
sleep(t).await;
184+
}
185+
TimeoutResult::Remove => {
186+
let mut queries = self_clone.queries.write();
187+
queries.remove(&query_id_clone);
188+
log::info!("Query {query_id_clone} removed");
189+
break;
190+
}
188191
}
189192
}
190-
}
191-
});
193+
});
192194
Ok(query)
193195
}
194196

@@ -230,16 +232,18 @@ impl HttpQueryManager {
230232
let deleter = {
231233
let self_clone = self.clone();
232234
let last_query_id_clone = last_query_id.clone();
233-
GlobalIORuntime::instance().spawn(async move {
234-
sleep(Duration::from_secs(timeout_secs)).await;
235-
if self_clone.get_txn(&last_query_id_clone).is_some() {
236-
log::info!(
237-
"Transaction timed out after {} seconds, last_query_id = {}",
238-
timeout_secs,
239-
last_query_id_clone
240-
);
241-
}
242-
})
235+
GlobalControlRuntime::instance()
236+
.runtime()
237+
.spawn(async move {
238+
sleep(Duration::from_secs(timeout_secs)).await;
239+
if self_clone.get_txn(&last_query_id_clone).is_some() {
240+
log::info!(
241+
"Transaction timed out after {} seconds, last_query_id = {}",
242+
timeout_secs,
243+
last_query_id_clone
244+
);
245+
}
246+
})
243247
};
244248
txn_managers.insert(last_query_id, (txn_mgr, deleter));
245249
}

src/query/service/src/servers/http/v1/session/client_session_manager.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use std::time::Duration;
1919
use std::time::SystemTime;
2020

2121
use databend_common_base::base::GlobalInstance;
22-
use databend_common_base::runtime::GlobalIORuntime;
22+
use databend_common_base::runtime::GlobalControlRuntime;
2323
use databend_common_cache::Cache;
2424
use databend_common_cache::LruCache;
2525
use databend_common_config::InnerConfig;
@@ -121,7 +121,9 @@ impl ClientSessionManager {
121121
});
122122
GlobalInstance::set(mgr.clone());
123123

124-
GlobalIORuntime::instance().spawn(async move { Self::check_timeout(mgr).await });
124+
GlobalControlRuntime::instance()
125+
.runtime()
126+
.spawn(async move { Self::check_timeout(mgr).await });
125127
Ok(())
126128
}
127129

0 commit comments

Comments
 (0)