Skip to content

Commit bb4f71d

Browse files
committed
refactor(query): collapse task gating to task-support
1 parent 327869a commit bb4f71d

File tree

16 files changed

+72
-371
lines changed

16 files changed

+72
-371
lines changed

src/query/service/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,11 @@ cloud-control = [
2424
"databend-common-sql/cloud-control",
2525
"databend-common-storages-system/cloud-control",
2626
]
27-
task-support = ["databend-common-storages-system/task-support"]
27+
task-support = [
28+
"dep:databend-common-cloud-control",
29+
"databend-common-storages-system/cloud-control",
30+
"databend-common-storages-system/task-support",
31+
]
2832
virtual-column = [
2933
"dep:databend-enterprise-virtual-column",
3034
"databend-common-storages-system/virtual-column",

src/query/service/src/databases/system/system_database.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,9 +67,9 @@ use databend_common_storages_system::TableFunctionsTable;
6767
use databend_common_storages_system::TablesTableWithHistory;
6868
use databend_common_storages_system::TablesTableWithoutHistory;
6969
use databend_common_storages_system::TagsTable;
70-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
70+
#[cfg(feature = "task-support")]
7171
use databend_common_storages_system::TaskHistoryTable;
72-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
72+
#[cfg(feature = "task-support")]
7373
use databend_common_storages_system::TasksTable;
7474
use databend_common_storages_system::TempFilesTable;
7575
use databend_common_storages_system::TerseStreamsTable;
@@ -202,9 +202,7 @@ impl SystemDatabase {
202202
table_list.push(PrivateTasksTable::create(sys_db_meta.next_table_id()));
203203
table_list.push(PrivateTaskHistoryTable::create(sys_db_meta.next_table_id()));
204204
} else {
205-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
206205
table_list.push(TasksTable::create(sys_db_meta.next_table_id()));
207-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
208206
table_list.push(TaskHistoryTable::create(sys_db_meta.next_table_id()));
209207
}
210208
disable_system_table_load = config.query.common.disable_system_table_load;

src/query/service/src/interpreters/common/mod.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ mod notification;
2121
mod query_log;
2222
mod stream;
2323
mod table;
24-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
24+
#[cfg(feature = "task-support")]
2525
mod task;
2626
mod util;
2727
#[cfg(feature = "cloud-control")]
@@ -40,11 +40,11 @@ pub use query_log::InterpreterQueryLog;
4040
pub use stream::dml_build_update_stream_req;
4141
pub use stream::query_build_update_stream_req;
4242
pub use table::check_referenced_computed_columns;
43-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
43+
#[cfg(feature = "task-support")]
4444
pub use task::get_task_client_config;
45-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
45+
#[cfg(feature = "task-support")]
4646
pub use task::make_schedule_options;
47-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
47+
#[cfg(feature = "task-support")]
4848
pub use task::make_warehouse_options;
4949
pub use util::check_deduplicate_label;
5050
#[cfg(feature = "cloud-control")]

src/query/service/src/interpreters/task/cloud.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ use databend_common_cloud_control::pb::ExecuteTaskRequest;
2828
use databend_common_cloud_control::pb::ShowTasksRequest;
2929
use databend_common_cloud_control::pb::WarehouseOptions;
3030
use databend_common_cloud_control::pb::alter_task_request::AlterTaskType;
31+
use databend_common_cloud_control::task_utils;
3132
use databend_common_config::GlobalConfig;
3233
use databend_common_exception::ErrorCode;
3334
use databend_common_exception::Result;
@@ -39,7 +40,6 @@ use databend_common_sql::plans::DescribeTaskPlan;
3940
use databend_common_sql::plans::DropTaskPlan;
4041
use databend_common_sql::plans::ExecuteTaskPlan;
4142
use databend_common_sql::plans::ShowTasksPlan;
42-
use databend_common_storages_system::TaskRecord;
4343

4444
use crate::interpreters::common::get_task_client_config;
4545
use crate::interpreters::common::make_schedule_options;
@@ -307,7 +307,7 @@ impl TaskInterpreter for CloudTaskInterpreter {
307307
&self,
308308
ctx: &Arc<QueryContext>,
309309
plan: &DescribeTaskPlan,
310-
) -> Result<Option<TaskRecord>> {
310+
) -> Result<Option<task_utils::Task>> {
311311
let config = GlobalConfig::instance();
312312
if config
313313
.query
@@ -331,7 +331,7 @@ impl TaskInterpreter for CloudTaskInterpreter {
331331
let req = make_request(req, config);
332332
let resp = task_client.describe_task(req).await?;
333333

334-
resp.task.map(TaskRecord::try_from).transpose()
334+
resp.task.map(task_utils::Task::try_from).transpose()
335335
}
336336

337337
async fn drop_task(&self, ctx: &Arc<QueryContext>, plan: &DropTaskPlan) -> Result<()> {
@@ -364,7 +364,7 @@ impl TaskInterpreter for CloudTaskInterpreter {
364364
&self,
365365
ctx: &Arc<QueryContext>,
366366
plan: &ShowTasksPlan,
367-
) -> Result<Vec<TaskRecord>> {
367+
) -> Result<Vec<task_utils::Task>> {
368368
let config = GlobalConfig::instance();
369369
if config
370370
.query

src/query/service/src/interpreters/task/mod.rs

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,8 @@
1515
use std::sync::Arc;
1616

1717
use databend_common_catalog::table_context::TableContext;
18+
use databend_common_cloud_control::task_utils;
1819
use databend_common_config::GlobalConfig;
19-
#[cfg(not(feature = "cloud-control"))]
20-
use databend_common_exception::ErrorCode;
2120
use databend_common_exception::Result;
2221
use databend_common_license::license::Feature;
2322
use databend_common_license::license_manager::LicenseManagerSwitch;
@@ -27,14 +26,11 @@ use databend_common_sql::plans::DescribeTaskPlan;
2726
use databend_common_sql::plans::DropTaskPlan;
2827
use databend_common_sql::plans::ExecuteTaskPlan;
2928
use databend_common_sql::plans::ShowTasksPlan;
30-
use databend_common_storages_system::TaskRecord;
3129

32-
#[cfg(feature = "cloud-control")]
3330
use crate::interpreters::task::cloud::CloudTaskInterpreter;
3431
use crate::interpreters::task::private::PrivateTaskInterpreter;
3532
use crate::sessions::QueryContext;
3633

37-
#[cfg(feature = "cloud-control")]
3834
mod cloud;
3935
mod private;
4036

@@ -47,22 +43,11 @@ impl TaskInterpreterManager {
4743
.check_enterprise_enabled(ctx.get_license_key(), Feature::PrivateTask)?;
4844
return Ok(TaskInterpreterImpl::Private(PrivateTaskInterpreter));
4945
}
50-
#[cfg(feature = "cloud-control")]
51-
{
52-
return Ok(TaskInterpreterImpl::Cloud(CloudTaskInterpreter));
53-
}
54-
55-
#[cfg(not(feature = "cloud-control"))]
56-
{
57-
Err(ErrorCode::Unimplemented(
58-
"Cloud task support is disabled, rebuild with cargo feature 'cloud-control'",
59-
))
60-
}
46+
Ok(TaskInterpreterImpl::Cloud(CloudTaskInterpreter))
6147
}
6248
}
6349

6450
pub(crate) enum TaskInterpreterImpl {
65-
#[cfg(feature = "cloud-control")]
6651
Cloud(CloudTaskInterpreter),
6752
Private(PrivateTaskInterpreter),
6853
}
@@ -78,37 +63,34 @@ pub(crate) trait TaskInterpreter {
7863
&self,
7964
ctx: &Arc<QueryContext>,
8065
plan: &DescribeTaskPlan,
81-
) -> Result<Option<TaskRecord>>;
66+
) -> Result<Option<task_utils::Task>>;
8267

8368
async fn drop_task(&self, ctx: &Arc<QueryContext>, plan: &DropTaskPlan) -> Result<()>;
8469

8570
async fn show_tasks(
8671
&self,
8772
ctx: &Arc<QueryContext>,
8873
plan: &ShowTasksPlan,
89-
) -> Result<Vec<TaskRecord>>;
74+
) -> Result<Vec<task_utils::Task>>;
9075
}
9176

9277
impl TaskInterpreter for TaskInterpreterImpl {
9378
async fn create_task(&self, ctx: &Arc<QueryContext>, plan: &CreateTaskPlan) -> Result<()> {
9479
match self {
95-
#[cfg(feature = "cloud-control")]
9680
TaskInterpreterImpl::Cloud(interpreter) => interpreter.create_task(ctx, plan).await,
9781
TaskInterpreterImpl::Private(interpreter) => interpreter.create_task(ctx, plan).await,
9882
}
9983
}
10084

10185
async fn execute_task(&self, ctx: &Arc<QueryContext>, plan: &ExecuteTaskPlan) -> Result<()> {
10286
match self {
103-
#[cfg(feature = "cloud-control")]
10487
TaskInterpreterImpl::Cloud(interpreter) => interpreter.execute_task(ctx, plan).await,
10588
TaskInterpreterImpl::Private(interpreter) => interpreter.execute_task(ctx, plan).await,
10689
}
10790
}
10891

10992
async fn alter_task(&self, ctx: &Arc<QueryContext>, plan: &AlterTaskPlan) -> Result<()> {
11093
match self {
111-
#[cfg(feature = "cloud-control")]
11294
TaskInterpreterImpl::Cloud(interpreter) => interpreter.alter_task(ctx, plan).await,
11395
TaskInterpreterImpl::Private(interpreter) => interpreter.alter_task(ctx, plan).await,
11496
}
@@ -118,17 +100,15 @@ impl TaskInterpreter for TaskInterpreterImpl {
118100
&self,
119101
ctx: &Arc<QueryContext>,
120102
plan: &DescribeTaskPlan,
121-
) -> Result<Option<TaskRecord>> {
103+
) -> Result<Option<task_utils::Task>> {
122104
match self {
123-
#[cfg(feature = "cloud-control")]
124105
TaskInterpreterImpl::Cloud(interpreter) => interpreter.describe_task(ctx, plan).await,
125106
TaskInterpreterImpl::Private(interpreter) => interpreter.describe_task(ctx, plan).await,
126107
}
127108
}
128109

129110
async fn drop_task(&self, ctx: &Arc<QueryContext>, plan: &DropTaskPlan) -> Result<()> {
130111
match self {
131-
#[cfg(feature = "cloud-control")]
132112
TaskInterpreterImpl::Cloud(interpreter) => interpreter.drop_task(ctx, plan).await,
133113
TaskInterpreterImpl::Private(interpreter) => interpreter.drop_task(ctx, plan).await,
134114
}
@@ -138,9 +118,8 @@ impl TaskInterpreter for TaskInterpreterImpl {
138118
&self,
139119
ctx: &Arc<QueryContext>,
140120
plan: &ShowTasksPlan,
141-
) -> Result<Vec<TaskRecord>> {
121+
) -> Result<Vec<task_utils::Task>> {
142122
match self {
143-
#[cfg(feature = "cloud-control")]
144123
TaskInterpreterImpl::Cloud(interpreter) => interpreter.show_tasks(ctx, plan).await,
145124
TaskInterpreterImpl::Private(interpreter) => interpreter.show_tasks(ctx, plan).await,
146125
}

src/query/service/src/interpreters/task/private.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use std::sync::Arc;
1717
use chrono::Utc;
1818
use databend_common_ast::ast::TaskSql;
1919
use databend_common_catalog::table_context::TableContext;
20+
use databend_common_cloud_control::task_utils;
2021
use databend_common_exception::Result;
2122
use databend_common_management::task::TaskMgr;
2223
use databend_common_meta_app::principal::Status;
@@ -28,7 +29,6 @@ use databend_common_sql::plans::DropTaskPlan;
2829
use databend_common_sql::plans::ExecuteTaskPlan;
2930
use databend_common_sql::plans::ShowTasksPlan;
3031
use databend_common_storages_system::PrivateTasksTable;
31-
use databend_common_storages_system::TaskRecord;
3232
use databend_common_users::UserApiProvider;
3333

3434
use crate::interpreters::task::TaskInterpreter;
@@ -100,7 +100,7 @@ impl TaskInterpreter for PrivateTaskInterpreter {
100100
&self,
101101
_ctx: &Arc<QueryContext>,
102102
plan: &DescribeTaskPlan,
103-
) -> Result<Option<TaskRecord>> {
103+
) -> Result<Option<task_utils::Task>> {
104104
let task = UserApiProvider::instance()
105105
.task_api(&plan.tenant)
106106
.describe_task(&plan.task_name)
@@ -122,7 +122,7 @@ impl TaskInterpreter for PrivateTaskInterpreter {
122122
&self,
123123
_ctx: &Arc<QueryContext>,
124124
plan: &ShowTasksPlan,
125-
) -> Result<Vec<TaskRecord>> {
125+
) -> Result<Vec<task_utils::Task>> {
126126
let tasks = UserApiProvider::instance()
127127
.task_api(&plan.tenant)
128128
.list_task()

src/query/service/src/table_functions/cloud/task_dependents.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,6 @@ use databend_common_pipeline::core::Pipeline;
5151
use databend_common_pipeline::core::ProcessorPtr;
5252
use databend_common_pipeline::sources::AsyncSource;
5353
use databend_common_pipeline::sources::AsyncSourcer;
54-
use databend_common_storages_system::TaskRecord;
5554

5655
pub struct TaskDependentsTable {
5756
table_info: TableInfo,
@@ -200,12 +199,12 @@ impl TaskDependentsSource {
200199

201200
for task in tasks {
202201
let task = task.clone();
203-
let tsk: TaskRecord = task.try_into()?;
202+
let tsk: databend_common_cloud_control::task_utils::Task = task.try_into()?;
204203
created_on.push(tsk.created_at.timestamp_micros());
205204
name.push(tsk.task_name.clone());
206205
owner.push(tsk.owner.clone());
207206
comment.push(tsk.comment.clone());
208-
warehouse.push(tsk.warehouse.clone());
207+
warehouse.push(tsk.warehouse_options.and_then(|s| s.warehouse.clone()));
209208
schedule.push(tsk.schedule_options.clone());
210209
predecessors.push(tsk.after.clone());
211210
state.push(tsk.status.to_string());

src/query/service/src/table_functions/cloud/task_history.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ use databend_common_catalog::table_function::TableFunction;
2626
use databend_common_cloud_control::client_config::build_client_config;
2727
use databend_common_cloud_control::cloud_api::CloudControlApiProvider;
2828
use databend_common_cloud_control::pb::ShowTaskRunsRequest;
29+
use databend_common_cloud_control::task_utils::TaskRun;
2930
use databend_common_config::GlobalConfig;
3031
use databend_common_exception::ErrorCode;
3132
use databend_common_exception::Result;
@@ -221,7 +222,13 @@ impl AsyncSource for TaskHistorySource {
221222
.into_iter()
222223
.flat_map(|r| r.task_runs)
223224
.collect::<Vec<_>>();
224-
parse_task_runs_to_datablock(trs).map(Some)
225+
226+
let task_runs = trs
227+
.into_iter()
228+
.map(TaskRun::try_from)
229+
.collect::<Result<Vec<_>>>()?;
230+
231+
parse_task_runs_to_datablock(task_runs).map(Some)
225232
}
226233
}
227234

src/query/service/src/table_functions/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
// limitations under the License.
1414

1515
mod async_crash_me;
16-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
16+
#[cfg(feature = "task-support")]
1717
mod cloud;
1818
mod copy_history;
1919
mod fuse_vacuum2;

src/query/service/src/table_functions/table_function_factory.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,11 +51,11 @@ use crate::storages::fuse::table_functions::FuseSegmentFunc;
5151
use crate::storages::fuse::table_functions::FuseSnapshotFunc;
5252
use crate::table_functions::TableFunction;
5353
use crate::table_functions::async_crash_me::AsyncCrashMeTable;
54-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
54+
#[cfg(feature = "task-support")]
5555
use crate::table_functions::cloud::TaskDependentsEnableTable;
56-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
56+
#[cfg(feature = "task-support")]
5757
use crate::table_functions::cloud::TaskDependentsTable;
58-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
58+
#[cfg(feature = "task-support")]
5959
use crate::table_functions::cloud::TaskHistoryTable;
6060
use crate::table_functions::copy_history::CopyHistoryTable;
6161
use crate::table_functions::fuse_vacuum2::FuseVacuum2Table;
@@ -108,7 +108,7 @@ pub struct TableFunctionFactory {
108108

109109
impl TableFunctionFactory {
110110
pub fn create(config: &InnerConfig) -> Self {
111-
#[cfg(not(all(feature = "cloud-control", feature = "task-support")))]
111+
#[cfg(not(feature = "task-support"))]
112112
let _ = config;
113113

114114
let mut id = SYS_TBL_FUNC_ID_BEGIN;
@@ -312,7 +312,7 @@ impl TableFunctionFactory {
312312
),
313313
);
314314

315-
#[cfg(all(feature = "cloud-control", feature = "task-support"))]
315+
#[cfg(feature = "task-support")]
316316
if !config.task.on {
317317
creators.insert(
318318
"task_dependents".to_string(),

0 commit comments

Comments
 (0)