1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: ops:: ControlFlow ;
1516use std:: sync:: Arc ;
1617
18+ use databend_common_ast:: Span ;
1719use databend_common_catalog:: catalog:: CATALOG_DEFAULT ;
1820use databend_common_catalog:: plan:: PushDownInfo ;
1921use databend_common_catalog:: table:: Table ;
2022use databend_common_exception:: Result ;
23+ use databend_common_expression:: types:: DataType ;
2124use databend_common_expression:: types:: StringType ;
2225use databend_common_expression:: types:: TimestampType ;
26+ use databend_common_expression:: visit_expr;
2327use databend_common_expression:: DataBlock ;
28+ use databend_common_expression:: Expr ;
2429use databend_common_expression:: FromData ;
30+ use databend_common_expression:: Scalar ;
2531use databend_common_expression:: TableDataType ;
2632use databend_common_expression:: TableField ;
2733use databend_common_expression:: TableSchemaRefExt ;
34+ use databend_common_functions:: BUILTIN_FUNCTIONS ;
2835use databend_common_meta_app:: schema:: ListIndexesReq ;
2936use databend_common_meta_app:: schema:: TableIdent ;
3037use databend_common_meta_app:: schema:: TableInfo ;
3138use databend_common_meta_app:: schema:: TableMeta ;
39+ use databend_common_storages_fuse:: index:: EqVisitor ;
40+ use databend_common_storages_fuse:: index:: ResultRewrite ;
41+ use databend_common_storages_fuse:: index:: Visitor ;
3242use databend_common_storages_fuse:: TableContext ;
3343use log:: warn;
3444
@@ -50,19 +60,40 @@ impl AsyncSystemTable for IndexesTable {
5060 async fn get_full_data (
5161 & self ,
5262 ctx : Arc < dyn TableContext > ,
53- _push_downs : Option < PushDownInfo > ,
63+ push_downs : Option < PushDownInfo > ,
5464 ) -> Result < DataBlock > {
65+ let mut database_names = None ;
66+ let mut table_names = None ;
67+
68+ if let Some ( filters) = push_downs. and_then ( |info| info. filters ) {
69+ let expr = filters. filter . as_expr ( & BUILTIN_FUNCTIONS ) ;
70+ let mut visitor = Visitor :: new ( TableFilterVisitor :: default ( ) ) ;
71+ visit_expr ( & expr, & mut visitor) ?;
72+
73+ let inner = visitor. inner ( ) ;
74+ database_names = ( !inner. database_names . is_empty ( ) ) . then_some ( inner. database_names ) ;
75+ table_names = ( !inner. table_names . is_empty ( ) ) . then_some ( inner. table_names ) ;
76+ }
77+
5578 let tenant = ctx. get_tenant ( ) ;
5679 let catalog = ctx. get_catalog ( CATALOG_DEFAULT ) . await ?;
5780 let indexes = catalog
5881 . list_indexes ( ListIndexesReq :: new ( & tenant, None ) )
5982 . await ?;
6083
61- let table_index_tables = self . list_table_index_tables ( ctx. clone ( ) ) . await ?;
84+ let table_index_tables = self
85+ . list_table_index_tables (
86+ ctx. clone ( ) ,
87+ database_names. as_deref ( ) ,
88+ table_names. as_deref ( ) ,
89+ )
90+ . await ?;
6291
6392 let len = indexes. len ( ) + table_index_tables. len ( ) ;
6493 let mut names = Vec :: with_capacity ( len) ;
6594 let mut types = Vec :: with_capacity ( len) ;
95+ let mut databases = Vec :: with_capacity ( len) ;
96+ let mut tables = Vec :: with_capacity ( len) ;
6697 let mut originals = Vec :: with_capacity ( len) ;
6798 let mut defs = Vec :: with_capacity ( len) ;
6899 let mut created_on = Vec :: with_capacity ( len) ;
@@ -71,6 +102,8 @@ impl AsyncSystemTable for IndexesTable {
71102 for ( _, name, index) in indexes {
72103 names. push ( name. clone ( ) ) ;
73104 types. push ( index. index_type . to_string ( ) ) ;
105+ databases. push ( ctx. get_current_database ( ) ) ;
106+ tables. push ( catalog. get_table_name_by_id ( index. table_id ) . await ?) ;
74107 originals. push ( index. original_query . clone ( ) ) ;
75108 defs. push ( index. query . clone ( ) ) ;
76109 created_on. push ( index. created_on . timestamp_micros ( ) ) ;
@@ -81,6 +114,8 @@ impl AsyncSystemTable for IndexesTable {
81114 for ( name, index) in & table. meta . indexes {
82115 names. push ( name. clone ( ) ) ;
83116 types. push ( index. index_type . to_string ( ) ) ;
117+ databases. push ( table. database_name ( ) ?. to_string ( ) ) ;
118+ tables. push ( Some ( table. name . to_string ( ) ) ) ;
84119 originals. push ( "" . to_string ( ) ) ;
85120
86121 let schema = table. schema ( ) ;
@@ -111,6 +146,8 @@ impl AsyncSystemTable for IndexesTable {
111146 Ok ( DataBlock :: new_from_columns ( vec ! [
112147 StringType :: from_data( names) ,
113148 StringType :: from_data( types) ,
149+ StringType :: from_data( databases) ,
150+ StringType :: from_opt_data( tables) ,
114151 StringType :: from_data( originals) ,
115152 StringType :: from_data( defs) ,
116153 TimestampType :: from_data( created_on) ,
@@ -124,6 +161,11 @@ impl IndexesTable {
124161 let schema = TableSchemaRefExt :: create ( vec ! [
125162 TableField :: new( "name" , TableDataType :: String ) ,
126163 TableField :: new( "type" , TableDataType :: String ) ,
164+ TableField :: new( "database" , TableDataType :: String ) ,
165+ TableField :: new(
166+ "table" ,
167+ TableDataType :: Nullable ( Box :: new( TableDataType :: String ) ) ,
168+ ) ,
127169 TableField :: new( "original" , TableDataType :: String ) ,
128170 TableField :: new( "definition" , TableDataType :: String ) ,
129171 TableField :: new( "created_on" , TableDataType :: Timestamp ) ,
@@ -149,7 +191,12 @@ impl IndexesTable {
149191 AsyncOneBlockSystemTable :: create ( Self { table_info } )
150192 }
151193
152- async fn list_table_index_tables ( & self , ctx : Arc < dyn TableContext > ) -> Result < Vec < TableInfo > > {
194+ async fn list_table_index_tables (
195+ & self ,
196+ ctx : Arc < dyn TableContext > ,
197+ database_names : Option < & [ String ] > ,
198+ table_names : Option < & [ String ] > ,
199+ ) -> Result < Vec < TableInfo > > {
153200 let tenant = ctx. get_tenant ( ) ;
154201 let visibility_checker = ctx. get_visibility_checker ( false ) . await ?;
155202 let catalog = ctx. get_catalog ( CATALOG_DEFAULT ) . await ?;
@@ -180,6 +227,11 @@ impl IndexesTable {
180227 let db_id = db. get_db_info ( ) . database_id . db_id ;
181228 let db_name = db. name ( ) ;
182229
230+ if let Some ( database_names) = database_names {
231+ if !database_names. iter ( ) . any ( |name| name == db_name) {
232+ continue ;
233+ }
234+ }
183235 let tables = match catalog. list_tables ( & tenant, db_name) . await {
184236 Ok ( tables) => tables,
185237 Err ( err) => {
@@ -194,6 +246,11 @@ impl IndexesTable {
194246 if table_info. meta . indexes . is_empty ( ) {
195247 continue ;
196248 }
249+ if let Some ( table_names) = table_names {
250+ if !table_names. contains ( & table_info. name ) {
251+ continue ;
252+ }
253+ }
197254 if visibility_checker. check_table_visibility (
198255 & ctl_name,
199256 db_name,
@@ -208,3 +265,48 @@ impl IndexesTable {
208265 Ok ( index_tables)
209266 }
210267}
268+
269+ #[ derive( Debug , Default ) ]
270+ struct TableFilterVisitor {
271+ database_names : Vec < String > ,
272+ table_names : Vec < String > ,
273+ }
274+
275+ impl EqVisitor for TableFilterVisitor {
276+ fn enter_target (
277+ & mut self ,
278+ _: Span ,
279+ col_name : & str ,
280+ scalar : & Scalar ,
281+ _: & DataType ,
282+ _: & DataType ,
283+ is_like : bool ,
284+ ) -> ResultRewrite {
285+ if is_like {
286+ return Ok ( ControlFlow :: Break ( None ) ) ;
287+ }
288+ let Some ( name) = scalar. as_string ( ) else {
289+ return Ok ( ControlFlow :: Break ( None ) ) ;
290+ } ;
291+ match col_name {
292+ "table" => & mut self . table_names ,
293+ "database" => & mut self . database_names ,
294+ _ => return Ok ( ControlFlow :: Break ( None ) ) ,
295+ }
296+ . push ( name. clone ( ) ) ;
297+
298+ Ok ( ControlFlow :: Break ( None ) )
299+ }
300+
301+ fn enter_map_column (
302+ & mut self ,
303+ _: Span ,
304+ _: & [ Expr < String > ] ,
305+ _: & Scalar ,
306+ _: & DataType ,
307+ _: & DataType ,
308+ _: bool ,
309+ ) -> ResultRewrite {
310+ Ok ( ControlFlow :: Continue ( None ) )
311+ }
312+ }
0 commit comments