Skip to content

Commit cefbc5e

Browse files
authored
feat: impl table & database pushdown scan for system.indexes (#18191)
* feat: impl `table` & 'database' pushdown scan for `system.indexes` * chore: fix `columns_table` * test: add tests to 01_0000_system_indexes.test * chore: when table_names exist and are few, then use Database::get_table to further reduce table loading. * chore: codefmt * chore: split the err msg * fix: indexes table filter pushdown does not distinguish between `and` & `or`
1 parent be55479 commit cefbc5e

3 files changed

Lines changed: 186 additions & 9 deletions

File tree

src/query/service/tests/it/storages/testdata/columns_table.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
105105
| 'database' | 'system' | 'clustering_history' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
106106
| 'database' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
107107
| 'database' | 'system' | 'dictionaries' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
108+
| 'database' | 'system' | 'indexes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
108109
| 'database' | 'system' | 'processes' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
109110
| 'database' | 'system' | 'streams' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
110111
| 'database' | 'system' | 'streams_terse' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
@@ -360,6 +361,7 @@ DB.Table: 'system'.'columns', Table: columns-table_id:1, ver:0, Engine: SystemCo
360361
| 'syntax' | 'system' | 'functions' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
361362
| 'table' | 'system' | 'clustering_history' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
362363
| 'table' | 'system' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
364+
| 'table' | 'system' | 'indexes' | 'Nullable(String)' | 'VARCHAR' | '' | '' | 'YES' | '' |
363365
| 'table' | 'system' | 'virtual_columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
364366
| 'table_catalog' | 'information_schema' | 'columns' | 'String' | 'VARCHAR' | '' | '' | 'NO' | '' |
365367
| 'table_catalog' | 'information_schema' | 'key_column_usage' | 'NULL' | 'NULL' | '' | '' | 'NO' | '' |

src/query/storages/system/src/indexes_table.rs

Lines changed: 117 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,24 @@ use databend_common_expression::types::StringType;
2222
use databend_common_expression::types::TimestampType;
2323
use databend_common_expression::DataBlock;
2424
use databend_common_expression::FromData;
25+
use databend_common_expression::Scalar;
2526
use databend_common_expression::TableDataType;
2627
use databend_common_expression::TableField;
2728
use databend_common_expression::TableSchemaRefExt;
29+
use databend_common_functions::BUILTIN_FUNCTIONS;
2830
use databend_common_meta_app::schema::ListIndexesReq;
2931
use databend_common_meta_app::schema::TableIdent;
3032
use databend_common_meta_app::schema::TableInfo;
3133
use databend_common_meta_app::schema::TableMeta;
3234
use databend_common_storages_fuse::TableContext;
35+
use futures::future::try_join_all;
3336
use log::warn;
3437

3538
use crate::table::AsyncOneBlockSystemTable;
3639
use crate::table::AsyncSystemTable;
40+
use crate::util::find_eq_or_filter;
41+
42+
const POINT_GET_TABLE_LIMIT: usize = 20;
3743

3844
pub struct IndexesTable {
3945
table_info: TableInfo,
@@ -50,19 +56,69 @@ impl AsyncSystemTable for IndexesTable {
5056
async fn get_full_data(
5157
&self,
5258
ctx: Arc<dyn TableContext>,
53-
_push_downs: Option<PushDownInfo>,
59+
push_downs: Option<PushDownInfo>,
5460
) -> Result<DataBlock> {
61+
let mut filtered_db_names = None;
62+
let mut filtered_table_names = None;
63+
let mut invalid_optimize = false;
64+
65+
if let Some(filters) = push_downs.and_then(|info| info.filters) {
66+
let expr = filters.filter.as_expr(&BUILTIN_FUNCTIONS);
67+
68+
let mut databases: Vec<String> = Vec::new();
69+
let mut tables: Vec<String> = Vec::new();
70+
71+
invalid_optimize = find_eq_or_filter(
72+
&expr,
73+
&mut |col_name, scalar| {
74+
if col_name == "database" {
75+
if let Scalar::String(database) = scalar {
76+
if !databases.contains(database) {
77+
databases.push(database.clone());
78+
}
79+
}
80+
} else if col_name == "table" {
81+
if let Scalar::String(table) = scalar {
82+
if !tables.contains(table) {
83+
tables.push(table.clone());
84+
}
85+
}
86+
}
87+
Ok(())
88+
},
89+
invalid_optimize,
90+
);
91+
if !databases.is_empty() {
92+
filtered_db_names = Some(databases);
93+
}
94+
if !tables.is_empty() {
95+
filtered_table_names = Some(tables);
96+
}
97+
}
98+
if invalid_optimize {
99+
filtered_db_names = None;
100+
filtered_table_names = None;
101+
}
102+
55103
let tenant = ctx.get_tenant();
56104
let catalog = ctx.get_catalog(CATALOG_DEFAULT).await?;
57105
let indexes = catalog
58106
.list_indexes(ListIndexesReq::new(&tenant, None))
59107
.await?;
60108

61-
let table_index_tables = self.list_table_index_tables(ctx.clone()).await?;
109+
let table_index_tables = self
110+
.list_table_index_tables(
111+
ctx.clone(),
112+
filtered_db_names.as_deref(),
113+
filtered_table_names.as_deref(),
114+
)
115+
.await?;
62116

63117
let len = indexes.len() + table_index_tables.len();
64118
let mut names = Vec::with_capacity(len);
65119
let mut types = Vec::with_capacity(len);
120+
let mut databases = Vec::with_capacity(len);
121+
let mut tables = Vec::with_capacity(len);
66122
let mut originals = Vec::with_capacity(len);
67123
let mut defs = Vec::with_capacity(len);
68124
let mut created_on = Vec::with_capacity(len);
@@ -71,6 +127,8 @@ impl AsyncSystemTable for IndexesTable {
71127
for (_, name, index) in indexes {
72128
names.push(name.clone());
73129
types.push(index.index_type.to_string());
130+
databases.push(ctx.get_current_database());
131+
tables.push(catalog.get_table_name_by_id(index.table_id).await?);
74132
originals.push(index.original_query.clone());
75133
defs.push(index.query.clone());
76134
created_on.push(index.created_on.timestamp_micros());
@@ -81,6 +139,8 @@ impl AsyncSystemTable for IndexesTable {
81139
for (name, index) in &table.meta.indexes {
82140
names.push(name.clone());
83141
types.push(index.index_type.to_string());
142+
databases.push(table.database_name()?.to_string());
143+
tables.push(Some(table.name.to_string()));
84144
originals.push("".to_string());
85145

86146
let schema = table.schema();
@@ -111,6 +171,8 @@ impl AsyncSystemTable for IndexesTable {
111171
Ok(DataBlock::new_from_columns(vec![
112172
StringType::from_data(names),
113173
StringType::from_data(types),
174+
StringType::from_data(databases),
175+
StringType::from_opt_data(tables),
114176
StringType::from_data(originals),
115177
StringType::from_data(defs),
116178
TimestampType::from_data(created_on),
@@ -124,6 +186,11 @@ impl IndexesTable {
124186
let schema = TableSchemaRefExt::create(vec![
125187
TableField::new("name", TableDataType::String),
126188
TableField::new("type", TableDataType::String),
189+
TableField::new("database", TableDataType::String),
190+
TableField::new(
191+
"table",
192+
TableDataType::Nullable(Box::new(TableDataType::String)),
193+
),
127194
TableField::new("original", TableDataType::String),
128195
TableField::new("definition", TableDataType::String),
129196
TableField::new("created_on", TableDataType::Timestamp),
@@ -149,7 +216,12 @@ impl IndexesTable {
149216
AsyncOneBlockSystemTable::create(Self { table_info })
150217
}
151218

152-
async fn list_table_index_tables(&self, ctx: Arc<dyn TableContext>) -> Result<Vec<TableInfo>> {
219+
async fn list_table_index_tables(
220+
&self,
221+
ctx: Arc<dyn TableContext>,
222+
database_names: Option<&[String]>,
223+
table_names: Option<&[String]>,
224+
) -> Result<Vec<TableInfo>> {
153225
let tenant = ctx.get_tenant();
154226
let visibility_checker = ctx.get_visibility_checker(false).await?;
155227
let catalog = ctx.get_catalog(CATALOG_DEFAULT).await?;
@@ -180,20 +252,56 @@ impl IndexesTable {
180252
let db_id = db.get_db_info().database_id.db_id;
181253
let db_name = db.name();
182254

183-
let tables = match catalog.list_tables(&tenant, db_name).await {
184-
Ok(tables) => tables,
185-
Err(err) => {
186-
let msg = format!("Failed to list tables in database: {}, {}", db_name, err);
187-
warn!("{}", msg);
188-
ctx.push_warning(msg);
255+
if let Some(database_names) = database_names {
256+
if !database_names.iter().any(|name| name == db_name) {
189257
continue;
190258
}
259+
}
260+
let tables = match (
261+
table_names,
262+
table_names.iter().len() <= POINT_GET_TABLE_LIMIT,
263+
) {
264+
(Some(table_names), true) => {
265+
match try_join_all(table_names.iter().map(|table_name| async {
266+
db.get_table(table_name)
267+
.await
268+
.map_err(|err| (table_name.to_string(), err))
269+
}))
270+
.await
271+
{
272+
Ok(tables) => tables,
273+
Err((table_name, err)) => {
274+
let msg = format!(
275+
"Failed to get table: {} in database: {}, {}",
276+
table_name, db_name, err
277+
);
278+
warn!("{}", msg);
279+
ctx.push_warning(msg);
280+
continue;
281+
}
282+
}
283+
}
284+
_ => match catalog.list_tables(&tenant, db_name).await {
285+
Ok(tables) => tables,
286+
Err(err) => {
287+
let msg =
288+
format!("Failed to list tables in database: {}, {}", db_name, err);
289+
warn!("{}", msg);
290+
ctx.push_warning(msg);
291+
continue;
292+
}
293+
},
191294
};
192295
for table in tables {
193296
let table_info = table.get_table_info();
194297
if table_info.meta.indexes.is_empty() {
195298
continue;
196299
}
300+
if let Some(table_names) = table_names {
301+
if !table_names.contains(&table_info.name) {
302+
continue;
303+
}
304+
}
197305
if visibility_checker.check_table_visibility(
198306
&ctl_name,
199307
db_name,

tests/sqllogictests/suites/ee/01_ee_system/01_0000_system_indexes.test

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,12 @@ DROP TABLE IF EXISTS t1
2727
statement ok
2828
DROP TABLE IF EXISTS t2
2929

30+
statement ok
31+
DROP DATABASE IF EXISTS test_show_indexes_0
32+
33+
statement ok
34+
DROP DATABASE IF EXISTS test_show_indexes_1
35+
3036
statement ok
3137
DROP AGGREGATING INDEX IF EXISTS idx1
3238

@@ -81,3 +87,64 @@ USE default
8187

8288
statement ok
8389
DROP DATABASE IF EXISTS test_index_db
90+
91+
statement ok
92+
CREATE DATABASE test_show_indexes_0
93+
94+
statement ok
95+
USE test_show_indexes_0
96+
97+
statement ok
98+
CREATE TABLE t1 (id int, title string, content string)
99+
100+
statement ok
101+
CREATE INVERTED INDEX idx1 ON t1(content)
102+
103+
statement ok
104+
CREATE NGRAM INDEX idx2 on t1(title)
105+
106+
statement ok
107+
CREATE DATABASE test_show_indexes_1
108+
109+
statement ok
110+
USE test_show_indexes_1
111+
112+
statement ok
113+
CREATE TABLE t1 (id int, title string, content string)
114+
115+
statement ok
116+
CREATE INVERTED INDEX idx1 ON t1(content)
117+
118+
statement ok
119+
CREATE NGRAM INDEX idx2 on t1(title)
120+
121+
query TTTT
122+
select name, type, database, table from system.indexes where database = 'test_show_indexes_0' or database = 'test_show_indexes_1';
123+
----
124+
idx1 INVERTED test_show_indexes_0 t1
125+
idx2 NGRAM test_show_indexes_0 t1
126+
idx1 INVERTED test_show_indexes_1 t1
127+
idx2 NGRAM test_show_indexes_1 t1
128+
129+
130+
query TTTT
131+
select name, type, database, table from system.indexes where table = 't1';
132+
----
133+
idx1 INVERTED test_show_indexes_0 t1
134+
idx2 NGRAM test_show_indexes_0 t1
135+
idx1 INVERTED test_show_indexes_1 t1
136+
idx2 NGRAM test_show_indexes_1 t1
137+
138+
query TTTT
139+
select name, type, database, table from system.indexes where database = 'test_show_indexes_0' and table = 't1';
140+
----
141+
idx1 INVERTED test_show_indexes_0 t1
142+
idx2 NGRAM test_show_indexes_0 t1
143+
144+
query TTTT
145+
select name, type, database, table from system.indexes where database = 'test_show_indexes_0' or table = 't1';
146+
----
147+
idx1 INVERTED test_show_indexes_0 t1
148+
idx2 NGRAM test_show_indexes_0 t1
149+
idx1 INVERTED test_show_indexes_1 t1
150+
idx2 NGRAM test_show_indexes_1 t1

0 commit comments

Comments
 (0)