Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 36 additions & 1 deletion src/query/catalog/src/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::sync::Arc;
use chrono::DateTime;
use chrono::Utc;
use databend_common_ast::ast::Expr;
use databend_common_ast::ast::quote::QuotedIdent;
use databend_common_ast::parser::Dialect;
use databend_common_ast::parser::parse_comma_separated_exprs;
use databend_common_ast::parser::tokenize_sql;
Expand Down Expand Up @@ -61,6 +62,22 @@ use crate::statistics::BasicColumnStatistics;
use crate::table_args::TableArgs;
use crate::table_context::TableContext;

pub fn quoted_table_reference(
database_name: &str,
table_name: &str,
branch_name: Option<&str>,
quote: char,
) -> String {
let database = QuotedIdent(database_name, quote);
let table = QuotedIdent(table_name, quote);
if let Some(branch_name) = branch_name {
let branch = QuotedIdent(branch_name, quote);
format!("{database}.{table}/{branch}")
} else {
format!("{database}.{table}")
}
}

#[async_trait::async_trait]
pub trait Table: Sync + Send {
fn name(&self) -> &str {
Expand Down Expand Up @@ -366,9 +383,10 @@ pub trait Table: Sync + Send {
ctx: Arc<dyn TableContext>,
database_name: &str,
table_name: &str,
branch_name: Option<&str>,
with_options: &str,
) -> Result<String> {
let (_, _, _, _) = (ctx, database_name, table_name, with_options);
let (_, _, _, _, _) = (ctx, database_name, table_name, branch_name, with_options);

Err(ErrorCode::Unimplemented(format!(
"Change tracking operation is not supported for the table '{}', which uses the '{}' engine.",
Expand Down Expand Up @@ -551,6 +569,17 @@ pub enum TimeNavigation {
},
}

impl TimeNavigation {
pub fn contains_table_tag(&self) -> bool {
match self {
TimeNavigation::TimeTravel(point) => point.is_table_tag(),
TimeNavigation::Changes { at, end, .. } => {
at.is_table_tag() || end.as_ref().is_some_and(NavigationPoint::is_table_tag)
}
}
}
}

#[derive(Debug, Clone, Eq, PartialEq)]
pub enum NavigationPoint {
SnapshotID(String),
Expand All @@ -559,6 +588,12 @@ pub enum NavigationPoint {
TableTag(String),
}

impl NavigationPoint {
pub fn is_table_tag(&self) -> bool {
matches!(self, NavigationPoint::TableTag(_))
}
}

#[derive(Debug, Copy, Clone, Default, serde::Serialize, serde::Deserialize)]
pub struct TableStatistics {
pub num_rows: Option<u64>,
Expand Down
8 changes: 6 additions & 2 deletions src/query/service/src/interpreters/access/privilege_access.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1799,13 +1799,17 @@ impl AccessChecker for PrivilegeAccess {
}
}
Plan::CreateView(plan) => {
let mut planner = Planner::new(self.ctx.clone());
// Check stored view SQL against base tables.
let mut planner =
Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
self.check(ctx, &plan).await?
}
Plan::AlterView(plan) => {
self.validate_db_access(&plan.catalog, &plan.database, UserPrivilegeType::Alter, false).await?;
let mut planner = Planner::new(self.ctx.clone());
// Check stored view SQL against base tables.
let mut planner =
Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let (plan, _) = planner.plan_sql(&plan.subquery).await?;
self.check(ctx, &plan).await?
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ impl Interpreter for AddTableColumnInterpreter {
field.name(),
field.default_expr().unwrap()
);
let mut planner = Planner::new(self.ctx.clone());
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let (plan, _) = planner.plan_sql(&query).await?;
if let Plan::DataMutation { s_expr, schema, .. } = plan {
let mutation: Mutation = s_expr.plan().clone().try_into()?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ impl AnalyzeTableInterpreter {
sql: String,
force_disable_distributed_optimization: bool,
) -> Result<(PhysicalPlan, BindContext)> {
let mut planner = Planner::new(self.ctx.clone());
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let extras = planner.parse_sql(&sql)?;
let plan = planner
.plan_stmt(&extras.statement, force_disable_distributed_optimization)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ impl Interpreter for DescribeTableInterpreter {

let schema = if tbl_info.engine() == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let mut planner = Planner::new(self.ctx.clone());
// Replay stored view SQL against base tables.
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let (plan, _) = planner.plan_sql(query).await?;
infer_table_schema(&plan.schema())
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -871,7 +871,7 @@ pub(crate) async fn build_select_insert_plan(
table_meta_timestamps: TableMetaTimestamps,
) -> Result<PipelineBuildResult> {
// 1. build plan by sql
let mut planner = Planner::new(ctx.clone());
let mut planner = Planner::new(ctx.clone()).with_suppress_wap_branch(true);
let (plan, _extras) = planner.plan_sql(&sql).await?;
let select_schema = plan.schema();

Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/interpreters/interpreter_view_alter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,8 @@ impl Interpreter for AlterViewInterpreter {
.get_table(&self.plan.tenant, &self.plan.database, &self.plan.view_name)
.await
{
let mut planner = Planner::new(self.ctx.clone());
// Replay stored view SQL against base tables.
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;

// Detect circular dependency: ALTER VIEW can introduce cycles
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ impl Interpreter for CreateViewInterpreter {
let tenant = self.ctx.get_tenant();
let table_function = catalog.list_table_functions();
let mut options = BTreeMap::new();
let mut planner = Planner::new(self.ctx.clone());
// Replay stored view SQL against base tables.
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let (plan, _) = planner.plan_sql(&self.plan.subquery.clone()).await?;
match plan.clone() {
Plan::Query { metadata, .. } => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@ impl Interpreter for DescribeViewInterpreter {
let engine = table.get_table_info().engine();
let schema = if engine == VIEW_ENGINE {
if let Some(query) = tbl_info.options().get(QUERY) {
let mut planner = Planner::new(self.ctx.clone());
// Replay stored view SQL against base tables.
let mut planner = Planner::new(self.ctx.clone()).with_suppress_wap_branch(true);
let (plan, _) = planner.plan_sql(query).await?;
infer_table_schema(&plan.schema())
} else {
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1675,6 +1675,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("wap_branch", DefaultSettingValue {
value: UserSettingValue::String("".to_string()),
desc: "WAP branch for table reads and writes. Empty string disables.",
mode: SettingMode::Both,
scope: SettingScope::Session,
range: None,
}),
("force_aggregate_shuffle_mode", DefaultSettingValue {
value: UserSettingValue::String(String::from("auto")),
desc: "For testing only. Shuffle mode for aggregation. Options are 'auto', 'row', 'bucket'. Default is 'auto'.",
Expand Down
9 changes: 9 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1224,6 +1224,15 @@ impl Settings {
Ok(self.try_get_u64("enable_experimental_table_ref")? != 0)
}

pub fn get_wap_branch(&self) -> Result<Option<String>> {
let branch = self.try_get_string("wap_branch")?;
if branch.is_empty() {
Ok(None)
} else {
Ok(Some(branch))
}
}

pub fn get_force_aggregate_shuffle_mode(&self) -> Result<String> {
self.try_get_string("force_aggregate_shuffle_mode")
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/sql/src/planner/binder/bind_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ pub struct BindContext {
/// It's used to avoid infinite loop.
pub planning_agg_index: bool,

/// If true, table refs in this scope ignore the session `wap_branch`.
/// Used while replaying persisted definitions against base tables.
pub suppress_wap_branch: bool,

pub window_definitions: DashMap<String, WindowSpec>,
}

Expand Down Expand Up @@ -276,6 +280,7 @@ impl BindContext {
expr_context: ExprContext::default(),
group_by_column_first: false,
planning_agg_index: false,
suppress_wap_branch: false,
window_definitions: DashMap::new(),
}
}
Expand Down Expand Up @@ -324,6 +329,7 @@ impl BindContext {
expr_context: ExprContext::default(),
group_by_column_first: parent.group_by_column_first,
planning_agg_index: false,
suppress_wap_branch: parent.suppress_wap_branch,
window_definitions: DashMap::new(),
})
}
Expand All @@ -335,6 +341,8 @@ impl BindContext {
bind_context.cte_context = self.cte_context.clone();
bind_context.udf_cache = self.udf_cache.clone();
bind_context.binding_views = self.binding_views.clone();
// Keep WAP suppression across `replace()` (used by bind_join).
bind_context.suppress_wap_branch = self.suppress_wap_branch;
bind_context.group_by_column_first = self.group_by_column_first;
bind_context
}
Expand Down
8 changes: 8 additions & 0 deletions src/query/sql/src/planner/binder/bind_mutation/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,14 @@ impl Binder {
target_table_identifier.table_name_alias(),
);

let branch_name = self.resolve_write_branch_with_wap_branch(
&catalog_name,
&database_name,
&table_name,
branch_name,
bind_context.suppress_wap_branch,
)?;

// Add table lock before execution.
let lock_guard = if strategy != MutationStrategy::NotMatchedOnly {
self.ctx
Expand Down
2 changes: 2 additions & 0 deletions src/query/sql/src/planner/binder/bind_query/bind.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,8 @@ impl Binder {
let create_table_sql = create_table_stmt.to_string();
log::info!("[CTE]create_table_sql: {create_table_sql}");
if let Some(subquery_executor) = &self.subquery_executor {
// This CTAS does not inherit WAP suppression. Safe while persisted views
// reject explicit MATERIALIZED CTEs; see `v_mat_cte_in_view_rejected`.
let _ = databend_common_base::runtime::block_on(async move {
subquery_executor
.execute_query_with_sql_string(&create_table_sql)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ impl Binder {
&cte_name,
&bind_context.cte_context.cte_map,
&cte.query,
bind_context,
)?;
// The physical CTE registry is query-global, while CTE aliases are scoped.
// Give each materialized producer a unique execution name.
Expand Down Expand Up @@ -111,6 +112,7 @@ impl Binder {
table_name,
bind_context.cte_context.cte_map.as_ref(),
&cte_info.query,
bind_context,
)?;

let (table_alias, column_alias) = match alias {
Expand Down Expand Up @@ -191,6 +193,7 @@ impl Binder {
cte_name: &str,
cte_map: &IndexMap<String, CteInfo>,
query: &Query,
parent: &BindContext,
) -> Result<(SExpr, BindContext)> {
let mut prev_cte_map = Box::new(IndexMap::new());
for (name, cte_info) in cte_map.iter() {
Expand All @@ -199,11 +202,15 @@ impl Binder {
}
prev_cte_map.insert(name.clone(), cte_info.clone());
}
// Fresh CTE context: preserve view WAP suppression, but not agg-index state
// because agg-index queries reject WITH.
let mut cte_bind_context = BindContext {
cte_context: CteContext {
cte_name: Some(cte_name.to_string()),
cte_map: prev_cte_map,
},
suppress_wap_branch: parent.suppress_wap_branch,
binding_views: parent.binding_views.clone(),
..Default::default()
};
let (s_expr, cte_bind_context) = self.bind_query(&mut cte_bind_context, query)?;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ impl Binder {
new_bind_context
.cte_context
.set_cte_context_and_name(bind_context.cte_context.clone());
// Preserve WAP suppression while using the outer parent's scope.
new_bind_context.binding_views = bind_context.binding_views.clone();
new_bind_context.suppress_wap_branch = bind_context.suppress_wap_branch;
self.bind_query(&mut new_bind_context, subquery)?
};

Expand Down
Loading
Loading