Skip to content

Commit 02e86a3

Browse files
committed
add wap branch
fix fix fix fix fix fix fix fix fix fix fix fix fix
1 parent 20ba839 commit 02e86a3

36 files changed

Lines changed: 1191 additions & 65 deletions

src/query/catalog/src/table.rs

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ use std::sync::Arc;
2020
use chrono::DateTime;
2121
use chrono::Utc;
2222
use databend_common_ast::ast::Expr;
23+
use databend_common_ast::ast::quote::QuotedIdent;
2324
use databend_common_ast::parser::Dialect;
2425
use databend_common_ast::parser::parse_comma_separated_exprs;
2526
use databend_common_ast::parser::tokenize_sql;
@@ -61,6 +62,22 @@ use crate::statistics::BasicColumnStatistics;
6162
use crate::table_args::TableArgs;
6263
use crate::table_context::TableContext;
6364

65+
pub fn quoted_table_reference(
66+
database_name: &str,
67+
table_name: &str,
68+
branch_name: Option<&str>,
69+
quote: char,
70+
) -> String {
71+
let database = QuotedIdent(database_name, quote);
72+
let table = QuotedIdent(table_name, quote);
73+
if let Some(branch_name) = branch_name {
74+
let branch = QuotedIdent(branch_name, quote);
75+
format!("{database}.{table}/{branch}")
76+
} else {
77+
format!("{database}.{table}")
78+
}
79+
}
80+
6481
#[async_trait::async_trait]
6582
pub trait Table: Sync + Send {
6683
fn name(&self) -> &str {
@@ -366,9 +383,10 @@ pub trait Table: Sync + Send {
366383
ctx: Arc<dyn TableContext>,
367384
database_name: &str,
368385
table_name: &str,
386+
branch_name: Option<&str>,
369387
with_options: &str,
370388
) -> Result<String> {
371-
let (_, _, _, _) = (ctx, database_name, table_name, with_options);
389+
let (_, _, _, _, _) = (ctx, database_name, table_name, branch_name, with_options);
372390

373391
Err(ErrorCode::Unimplemented(format!(
374392
"Change tracking operation is not supported for the table '{}', which uses the '{}' engine.",
@@ -551,6 +569,17 @@ pub enum TimeNavigation {
551569
},
552570
}
553571

572+
impl TimeNavigation {
573+
pub fn contains_table_tag(&self) -> bool {
574+
match self {
575+
TimeNavigation::TimeTravel(point) => point.is_table_tag(),
576+
TimeNavigation::Changes { at, end, .. } => {
577+
at.is_table_tag() || end.as_ref().is_some_and(NavigationPoint::is_table_tag)
578+
}
579+
}
580+
}
581+
}
582+
554583
#[derive(Debug, Clone, Eq, PartialEq)]
555584
pub enum NavigationPoint {
556585
SnapshotID(String),
@@ -559,6 +588,12 @@ pub enum NavigationPoint {
559588
TableTag(String),
560589
}
561590

591+
impl NavigationPoint {
592+
pub fn is_table_tag(&self) -> bool {
593+
matches!(self, NavigationPoint::TableTag(_))
594+
}
595+
}
596+
562597
#[derive(Debug, Copy, Clone, Default, serde::Serialize, serde::Deserialize)]
563598
pub struct TableStatistics {
564599
pub num_rows: Option<u64>,

src/query/service/src/interpreters/access/privilege_access.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1799,13 +1799,17 @@ impl AccessChecker for PrivilegeAccess {
17991799
}
18001800
}
18011801
Plan::CreateView(plan) => {
1802-
let mut planner = Planner::new(self.ctx.clone());
1802+
// Check stored view SQL against base tables.
1803+
let mut planner =
1804+
Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
18031805
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
18041806
self.check(ctx, &plan).await?
18051807
}
18061808
Plan::AlterView(plan) => {
18071809
self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Alter, false).await?;
1808-
let mut planner = Planner::new(self.ctx.clone());
1810+
// Check stored view SQL against base tables.
1811+
let mut planner =
1812+
Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
18091813
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
18101814
self.check(ctx, &plan).await?
18111815
}

src/query/service/src/interpreters/interpreter_table_add_column.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ impl Interpreter for AddTableColumnInterpreter {
242242
field.name(),
243243
field.default_expr().unwrap()
244244
);
245-
let mut planner = Planner::new(self.ctx.clone());
245+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
246246
let (plan, _) = planner.plan_sql(&query).await?;
247247
if let Plan::DataMutation { s_expr, schema, .. } = plan {
248248
let mutation: Mutation = s_expr.plan().clone().try_into()?;

src/query/service/src/interpreters/interpreter_table_describe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,8 @@ impl Interpreter for DescribeTableInterpreter {
6565

6666
let schema = if tbl_info.engine() == VIEW_ENGINE {
6767
if let Some(query) = tbl_info.options().get(QUERY) {
68-
let mut planner = Planner::new(self.ctx.clone());
68+
// Replay stored view SQL against base tables.
69+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6970
let (plan, _) = planner.plan_sql(query).await?;
7071
infer_table_schema(&plan.schema())
7172
} else {

src/query/service/src/interpreters/interpreter_table_modify_column.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -871,7 +871,7 @@ pub(crate) async fn build_select_insert_plan(
871871
table_meta_timestamps: TableMetaTimestamps,
872872
) -> Result<PipelineBuildResult> {
873873
// 1. build plan by sql
874-
let mut planner = Planner::new(ctx.clone());
874+
let mut planner = Planner::new(ctx.clone()).with_suppress_wap_branch(true);
875875
let (plan, _extras) = planner.plan_sql(&sql).await?;
876876
let select_schema = plan.schema();
877877

src/query/service/src/interpreters/interpreter_view_alter.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,8 @@ impl Interpreter for AlterViewInterpreter {
5757
.get_table(&self.plan.tenant, &self.plan.database, &self.plan.view_name)
5858
.await
5959
{
60-
let mut planner = Planner::new(self.ctx.clone());
60+
// Replay stored view SQL against base tables.
61+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6162
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
6263

6364
// Detect circular dependency: ALTER VIEW can introduce cycles

src/query/service/src/interpreters/interpreter_view_create.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ impl Interpreter for CreateViewInterpreter {
5959
let tenant = self.ctx.get_tenant();
6060
let table_function = catalog.list_table_functions();
6161
let mut options = BTreeMap::new();
62-
let mut planner = Planner::new(self.ctx.clone());
62+
// Replay stored view SQL against base tables.
63+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6364
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
6465
match plan.clone() {
6566
Plan::Query { metadata, .. } => {

src/query/service/src/interpreters/interpreter_view_describe.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ impl Interpreter for DescribeViewInterpreter {
6363
let engine = table.get_table_info().engine();
6464
let schema = if engine == VIEW_ENGINE {
6565
if let Some(query) = tbl_info.options().get(QUERY) {
66-
let mut planner = Planner::new(self.ctx.clone());
66+
// Replay stored view SQL against base tables.
67+
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
6768
let (plan, _) = planner.plan_sql(query).await?;
6869
infer_table_schema(&plan.schema())
6970
} else {

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1675,6 +1675,13 @@ impl DefaultSettings {
16751675
scope: SettingScope::Both,
16761676
range: Some(SettingRange::Numeric(0..=1)),
16771677
}),
1678+
("wap_branch", DefaultSettingValue {
1679+
value: UserSettingValue::String("".to_string()),
1680+
desc: "WAP branch for table reads and writes. Empty string disables.",
1681+
mode: SettingMode::Both,
1682+
scope: SettingScope::Session,
1683+
range: None,
1684+
}),
16781685
("force_aggregate_shuffle_mode", DefaultSettingValue {
16791686
value: UserSettingValue::String(String::from("auto")),
16801687
desc: "For testing only. Shuffle mode for aggregation. Options are 'auto', 'row', 'bucket'. Default is 'auto'.",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1224,6 +1224,15 @@ impl Settings {
12241224
Ok(self.try_get_u64("enable_experimental_table_ref")? != 0)
12251225
}
12261226

1227+
pub fn get_wap_branch(&self) -> Result<Option<String>> {
1228+
let branch = self.try_get_string("wap_branch")?;
1229+
if branch.is_empty() {
1230+
Ok(None)
1231+
} else {
1232+
Ok(Some(branch))
1233+
}
1234+
}
1235+
12271236
pub fn get_force_aggregate_shuffle_mode(&self) -> Result<String> {
12281237
self.try_get_string("force_aggregate_shuffle_mode")
12291238
}

0 commit comments

Comments
 (0)