Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions src/query/service/tests/it/storages/system.rs
Original file line number Diff line number Diff line change
Expand Up @@ -651,3 +651,95 @@ async fn test_show_tables_ignores_broken_attached_table_refresh() -> anyhow::Res

Ok(())
}

// `system.columns` refreshes ATTACH table schemas, unlike `SHOW TABLES`. This guards the
// resilience part of that: one ATTACH table with unreachable storage must not drop the columns
// of healthy tables in the same database. (New-column visibility is covered by the EE SLT.)
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn test_system_columns_tolerates_broken_attached_table() -> anyhow::Result<()> {
let fixture = TestFixture::setup().await?;
let ctx = fixture.new_query_ctx().await?;
let tenant = ctx.get_tenant();
let catalog = ctx.get_catalog("default").await?;
let database = catalog.get_database(&tenant, "default").await?;

// ATTACH schema refresh is opt-in; enable it so the resilience path under test is exercised.
ctx.get_settings()
.set_setting("enable_table_schema_refresh".to_string(), "1".to_string())?;

execute_command(ctx.clone(), "create table default.healthy(a int)").await?;

// Simulate a broken attached-table storage: any refresh from this S3 endpoint fails with 403.
let mock_server = MockServer::start().await;
Mock::given(any())
.respond_with(ResponseTemplate::new(403))
.mount(&mock_server)
.await;

let broken_schema = Arc::new(TableSchema::new(vec![TableField::new(
"a",
TableDataType::Number(NumberDataType::Int32),
)]));

catalog
.create_table(CreateTableReq {
create_option: CreateOption::Create,
catalog_name: None,
name_ident: TableNameIdent {
tenant: tenant.clone(),
db_name: "default".to_string(),
table_name: "broken_attached".to_string(),
},
table_meta: TableMeta {
schema: broken_schema,
engine: "FUSE".to_string(),
options: [
(
OPT_KEY_DATABASE_ID.to_string(),
database.get_db_info().database_id.db_id.to_string(),
),
(
FUSE_OPT_KEY_ENABLE_AUTO_ANALYZE.to_string(),
"0".to_string(),
),
(
OPT_KEY_TABLE_ATTACHED_DATA_URI.to_string(),
"s3://broken-bucket/broken-attached/".to_string(),
),
]
.into(),
storage_params: Some(StorageParams::S3(StorageS3Config {
region: "us-east-2".to_string(),
endpoint_url: mock_server.uri(),
bucket: "broken-bucket".to_string(),
root: "/".to_string(),
access_key_id: "access_key_id".to_string(),
secret_access_key: "secret_access_key".to_string(),
disable_credential_loader: true,
..Default::default()
})),
..TableMeta::default()
},
as_dropped: false,
table_properties: None,
table_partition: None,
})
.await?;

let result = execute_query(
ctx.clone(),
"select database, table, name from system.columns \
where database = 'default' order by table, name",
)
.await?;
let blocks = result.try_collect::<Vec<_>>().await?;
let output = pretty_format_blocks(&blocks)?;
println!("{}", output);

assert!(
output.contains("healthy"),
"system.columns must still expose healthy table columns despite a broken ATTACH table: {output}"
);

Ok(())
}
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1121,6 +1121,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_table_schema_refresh", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "Refresh table schema from storage when listing system.columns/statistics, so schema changes invisible to the meta server (currently only read-only ATTACH tables) are reflected. Disabled by default; each refreshed table costs one storage round-trip.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_experimental_row_access_policy", DefaultSettingValue {
value: UserSettingValue::UInt64(0),
desc: "experiment setting enable row access policy(disable by default).",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ impl Settings {
Ok(self.try_get_u64("enable_auto_fix_missing_bloom_index")? != 0)
}

pub fn get_enable_table_schema_refresh(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_table_schema_refresh")? != 0)
}

// Get max_block_size.
pub fn get_max_block_size(&self) -> Result<u64> {
self.try_get_u64("max_block_size")
Expand Down
54 changes: 48 additions & 6 deletions src/query/storages/system/src/columns_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ use databend_common_meta_app::tenant::Tenant;
use databend_common_sql::Planner;
use databend_common_storages_basic::view_table::QUERY;
use databend_common_storages_basic::view_table::VIEW_ENGINE;
use databend_common_storages_fuse::FuseTable;
use databend_common_storages_stream::stream_table::STREAM_ENGINE;
use databend_common_storages_stream::stream_table::StreamTable;
use log::warn;
Expand Down Expand Up @@ -269,24 +270,65 @@ pub(crate) async fn dump_tables(
push_downs: Option<PushDownInfo>,
catalog: &Arc<dyn Catalog>,
) -> Result<Vec<(String, Vec<Arc<dyn Table>>)>> {
// For performance considerations, we do not require the most up-to-date table information here
let catalog = disable_catalog_refresh(catalog.clone())?;
// List through a refresh-disabled catalog: refreshing every ATTACH table here costs one S3
// round-trip each, and a single unreachable one would fail the whole listing (#19759).
let disabled_catalog = disable_catalog_refresh(catalog.clone())?;

// Extract filters from push_downs
let func_ctx = ctx.get_function_context()?;
let (filtered_db_names, filtered_table_names) = extract_filters(&push_downs, &func_ctx)?;

// Use unified visibility collection from util.rs
let db_with_tables =
collect_visible_tables(ctx, &catalog, &filtered_db_names, &filtered_table_names).await?;
let db_with_tables = collect_visible_tables(
ctx,
&disabled_catalog,
&filtered_db_names,
&filtered_table_names,
)
.await?;

// Convert to the expected return format
// A read-only ATTACH table keeps its current schema in the source snapshot, not on the meta
// server, so the disabled catalog above hands back the schema frozen at ATTACH time. Refreshing
// each one through the original catalog picks up source schema changes, but costs one S3
// round-trip per ATTACH table, so it is opt-in via enable_table_schema_refresh. A refresh
// failure must not drop sibling tables, so fall back to the cached handle.
let refresh = ctx.get_settings().get_enable_table_schema_refresh()?;
Ok(db_with_tables
.into_iter()
.map(|db| (db.name, db.tables))
.map(|db| {
let tables = if refresh {
db.tables
.into_iter()
.map(|table| refresh_attach_table(catalog, table))
.collect()
} else {
db.tables
};
(db.name, tables)
})
.collect())
}

/// Refresh an ATTACH table's schema from its source storage, returning the cached handle on
/// failure. Other tables are returned as-is; their schema already comes from the meta server.
fn refresh_attach_table(catalog: &Arc<dyn Catalog>, table: Arc<dyn Table>) -> Arc<dyn Table> {
if !FuseTable::is_table_attached(&table.get_table_info().meta.options) {
return table;
}

match catalog.get_table_by_info(table.get_table_info()) {
Ok(refreshed) => refreshed,
Err(e) => {
warn!(
"failed to refresh schema of attach table {}, fallback to cached schema: {}",
table.get_table_info().desc,
e
);
table
}
}
}

fn extract_filters(
push_downs: &Option<PushDownInfo>,
func_ctx: &databend_common_expression::FunctionContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ c1 VARCHAR NO 'c1'
c2 VARCHAR NO 'c2'
expects one row, 3 columns
0 c1 c2
system.columns hides source-side added columns when refresh is disabled (default)
number
system.columns should reflect the added columns
c1
c2
number
information_schema.columns should reflect the added columns
c1
c2
number
refresh does NOT persist to meta server: disable refresh again, should still see frozen schema
number
alter table drop column
expects new columns: number, c2
number VARCHAR YES NULL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ echo "desc attach_read_only;" | $BENDSQL_CLIENT_CONNECT
echo "expects one row, 3 columns"
echo "select * from attach_read_only order by number;" | $BENDSQL_CLIENT_CONNECT

echo "system.columns hides source-side added columns when refresh is disabled (default)"
echo "select name from system.columns where database='default' and table='attach_read_only' order by name;" | $BENDSQL_CLIENT_CONNECT
echo "system.columns should reflect the added columns"
echo "set enable_table_schema_refresh=1; select name from system.columns where database='default' and table='attach_read_only' order by name;" | $BENDSQL_CLIENT_CONNECT
echo "information_schema.columns should reflect the added columns"
echo "set enable_table_schema_refresh=1; select column_name from information_schema.columns where table_schema='default' and table_name='attach_read_only' order by column_name;" | $BENDSQL_CLIENT_CONNECT

echo "refresh does NOT persist to meta server: disable refresh again, should still see frozen schema"
echo "select name from system.columns where database='default' and table='attach_read_only' order by name;" | $BENDSQL_CLIENT_CONNECT


echo "alter table drop column"
echo "alter table base drop column c1;" | $BENDSQL_CLIENT_CONNECT
Expand Down
Loading