Skip to content

Commit e0b815d

Browse files
committed
add wap branch
fix fix fix fix fix fix fix fix
1 parent 20ba839 commit e0b815d

33 files changed

Lines changed: 1102 additions & 43 deletions

src/query/catalog/src/table.rs

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020
use chrono::DateTime;
2121
use chrono::Utc;
2222
use databend_common_ast::ast::Expr;
23+
use databend_common_ast::ast::quote::QuotedIdent;
2324
use databend_common_ast::parser::Dialect;
2425
use databend_common_ast::parser::parse_comma_separated_exprs;
2526
use databend_common_ast::parser::tokenize_sql;
@@ -61,6 +62,22 @@ use crate::statistics::BasicColumnStatistics;
6162
use crate::table_args::TableArgs;
6263
use crate::table_context::TableContext;
6364

65+
pub fn quoted_table_reference(
66+
database_name: &str,
67+
table_name: &str,
68+
branch_name: Option<&str>,
69+
quote: char,
70+
) -> String {
71+
let database = QuotedIdent(database_name, quote);
72+
let table = QuotedIdent(table_name, quote);
73+
if let Some(branch_name) = branch_name {
74+
let branch = QuotedIdent(branch_name, quote);
75+
format!("{database}.{table}/{branch}")
76+
} else {
77+
format!("{database}.{table}")
78+
}
79+
}
80+
6481
#[async_trait::async_trait]
6582
pub trait Table: Sync + Send {
6683
fn name(&self) -> &str {
@@ -366,9 +383,10 @@ pub trait Table: Sync + Send {
366383
ctx: Arc<dyn TableContext>,
367384
database_name: &str,
368385
table_name: &str,
386+
branch_name: Option<&str>,
369387
with_options: &str,
370388
) -> Result<String> {
371-
let (_, _, _, _) = (ctx, database_name, table_name, with_options);
389+
let (_, _, _, _, _) = (ctx, database_name, table_name, branch_name, with_options);
372390

373391
Err(ErrorCode::Unimplemented(format!(
374392
"Change tracking operation is not supported for the table '{}', which uses the '{}' engine.",

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,13 +1799,17 @@ impl AccessChecker for PrivilegeAccess {
17991799
}
18001800
}
18011801
Plan::CreateView(plan) => {
1802-
let mut planner = Planner::new(self.ctx.clone());
1802+
// Check stored view SQL against base tables.
1803+
let mut planner =
1804+
Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
18031805
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
18041806
self.check(ctx, &plan).await?
18051807
}
18061808
Plan::AlterView(plan) => {
18071809
self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Alter, false).await?;
1808-
let mut planner = Planner::new(self.ctx.clone());
1810+
// Check stored view SQL against base tables.
1811+
let mut planner =
1812+
Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
18091813
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
18101814
self.check(ctx, &plan).await?
18111815
}

src/query/service/src/interpreters/interpreter_table_describe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ impl Interpreter for DescribeTableInterpreter {
6565

6666
let schema = if tbl_info.engine() == VIEW_ENGINE {
6767
if let Some(query) = tbl_info.options().get(QUERY) {
68-
let mut planner = Planner::new(self.ctx.clone());
68+
// Replay stored view SQL against base tables.
69+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6970
let (plan, _) = planner.plan_sql(query).await?;
7071
infer_table_schema(&plan.schema())
7172
} else {

src/query/service/src/interpreters/interpreter_view_alter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ impl Interpreter for AlterViewInterpreter {
5757
.get_table(&self.plan.tenant, &self.plan.database, &self.plan.view_name)
5858
.await
5959
{
60-
let mut planner = Planner::new(self.ctx.clone());
60+
// Replay stored view SQL against base tables.
61+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6162
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
6263

6364
// Detect circular dependency: ALTER VIEW can introduce cycles

src/query/service/src/interpreters/interpreter_view_create.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ impl Interpreter for CreateViewInterpreter {
5959
let tenant = self.ctx.get_tenant();
6060
let table_function = catalog.list_table_functions();
6161
let mut options = BTreeMap::new();
62-
let mut planner = Planner::new(self.ctx.clone());
62+
// Replay stored view SQL against base tables.
63+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6364
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
6465
match plan.clone() {
6566
Plan::Query { metadata, .. } => {

src/query/service/src/interpreters/interpreter_view_describe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ impl Interpreter for DescribeViewInterpreter {
6363
let engine = table.get_table_info().engine();
6464
let schema = if engine == VIEW_ENGINE {
6565
if let Some(query) = tbl_info.options().get(QUERY) {
66-
let mut planner = Planner::new(self.ctx.clone());
66+
// Replay stored view SQL against base tables.
67+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6768
let (plan, _) = planner.plan_sql(query).await?;
6869
infer_table_schema(&plan.schema())
6970
} else {

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,6 +1675,13 @@ impl DefaultSettings {
16751675
scope: SettingScope::Both,
16761676
range: Some(SettingRange::Numeric(0..=1)),
16771677
}),
1678+
("wap_branch", DefaultSettingValue {
1679+
value: UserSettingValue::String("".to_string()),
1680+
desc: "WAP branch for table reads and writes. Empty string disables.",
1681+
mode: SettingMode::Both,
1682+
scope: SettingScope::Session,
1683+
range: None,
1684+
}),
16781685
("force_aggregate_shuffle_mode", DefaultSettingValue {
16791686
value: UserSettingValue::String(String::from("auto")),
16801687
desc: "For testing only. Shuffle mode for aggregation. Options are 'auto', 'row', 'bucket'. Default is 'auto'.",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,15 @@ impl Settings {
12241224
Ok(self.try_get_u64("enable_experimental_table_ref")? != 0)
12251225
}
12261226

1227+
pub fn get_wap_branch(&self) -> Result<Option<String>> {
1228+
let branch = self.try_get_string("wap_branch")?;
1229+
if branch.is_empty() {
1230+
Ok(None)
1231+
} else {
1232+
Ok(Some(branch))
1233+
}
1234+
}
1235+
12271236
pub fn get_force_aggregate_shuffle_mode(&self) -> Result<String> {
12281237
self.try_get_string("force_aggregate_shuffle_mode")
12291238
}

src/query/sql/src/planner/binder/bind_context.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,10 @@ pub struct BindContext {
197197
/// It's used to avoid infinite loop.
198198
pub planning_agg_index: bool,
199199

200+
/// If true, table refs in this scope ignore the session `wap_branch`.
201+
/// Used while replaying persisted definitions against base tables.
202+
pub suppress_wap_branch: bool,
203+
200204
pub window_definitions: DashMap<String, WindowSpec>,
201205
}
202206

@@ -276,6 +280,7 @@ impl BindContext {
276280
expr_context: ExprContext::default(),
277281
group_by_column_first: false,
278282
planning_agg_index: false,
283+
suppress_wap_branch: false,
279284
window_definitions: DashMap::new(),
280285
}
281286
}
@@ -324,6 +329,7 @@ impl BindContext {
324329
expr_context: ExprContext::default(),
325330
group_by_column_first: parent.group_by_column_first,
326331
planning_agg_index: false,
332+
suppress_wap_branch: parent.suppress_wap_branch,
327333
window_definitions: DashMap::new(),
328334
})
329335
}
@@ -335,6 +341,8 @@ impl BindContext {
335341
bind_context.cte_context = self.cte_context.clone();
336342
bind_context.udf_cache = self.udf_cache.clone();
337343
bind_context.binding_views = self.binding_views.clone();
344+
// Keep WAP suppression across `replace()` (used by bind_join).
345+
bind_context.suppress_wap_branch = self.suppress_wap_branch;
338346
bind_context.group_by_column_first = self.group_by_column_first;
339347
bind_context
340348
}

src/query/sql/src/planner/binder/bind_mutation/bind.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,13 @@ impl Binder {
145145
target_table_identifier.table_name_alias(),
146146
);
147147

148+
let branch_name = self.resolve_write_branch_with_wap_branch(
149+
&catalog_name,
150+
&database_name,
151+
&table_name,
152+
branch_name,
153+
)?;
154+
148155
// Add table lock before execution.
149156
let lock_guard = if strategy != MutationStrategy::NotMatchedOnly {
150157
self.ctx

0 commit comments

Comments
 (0)