Skip to content

Commit c6983a2

Browse files
committed
plan union in single integration query
1 parent fbc4315 commit c6983a2

2 files changed

Lines changed: 74 additions & 50 deletions

File tree

mindsdb_sql/planner/query_planner.py

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ def find_objects(node, is_table, **kwargs):
231231
query_traversal(query, find_objects)
232232

233233
# cte names are not mdb objects
234-
if query.cte:
234+
if isinstance(query, Select) and query.cte:
235235
cte_names = [
236236
cte.name.parts[-1]
237237
for cte in query.cte
@@ -271,21 +271,21 @@ def find_selects(node, **kwargs):
271271
return find_selects
272272

273273
def plan_select_identifier(self, query):
274-
query_info = self.get_query_info(query)
275-
276-
if len(query_info['integrations']) == 0 and len(query_info['predictors']) >= 1:
277-
# select from predictor
278-
return self.plan_select_from_predictor(query)
279-
elif (
280-
len(query_info['integrations']) == 1
281-
and len(query_info['mdb_entities']) == 0
282-
and len(query_info['user_functions']) == 0
283-
):
284-
285-
int_name = list(query_info['integrations'])[0]
286-
if self.integrations.get(int_name, {}).get('class_type') != 'api':
287-
# one integration without predictors, send all query to integration
288-
return self.plan_integration_select(query)
274+
# query_info = self.get_query_info(query)
275+
#
276+
# if len(query_info['integrations']) == 0 and len(query_info['predictors']) >= 1:
277+
# # select from predictor
278+
# return self.plan_select_from_predictor(query)
279+
# elif (
280+
# len(query_info['integrations']) == 1
281+
# and len(query_info['mdb_entities']) == 0
282+
# and len(query_info['user_functions']) == 0
283+
# ):
284+
#
285+
# int_name = list(query_info['integrations'])[0]
286+
# if self.integrations.get(int_name, {}).get('class_type') != 'api':
287+
# # one integration without predictors, send all query to integration
288+
# return self.plan_integration_select(query)
289289

290290
# find subselects
291291
main_integration, _ = self.resolve_database_table(query.from_table)
@@ -380,21 +380,21 @@ def plan_api_db_select(self, query):
380380

381381
def plan_nested_select(self, select):
382382

383-
query_info = self.get_query_info(select)
384-
# get all predictors
385-
386-
if (
387-
len(query_info['mdb_entities']) == 0
388-
and len(query_info['integrations']) == 1
389-
and len(query_info['user_functions']) == 0
390-
and 'files' not in query_info['integrations']
391-
and 'views' not in query_info['integrations']
392-
):
393-
int_name = list(query_info['integrations'])[0]
394-
if self.integrations.get(int_name, {}).get('class_type') != 'api':
395-
396-
# if no predictor inside = run as is
397-
return self.plan_integration_nested_select(select, int_name)
383+
# query_info = self.get_query_info(select)
384+
# # get all predictors
385+
#
386+
# if (
387+
# len(query_info['mdb_entities']) == 0
388+
# and len(query_info['integrations']) == 1
389+
# and len(query_info['user_functions']) == 0
390+
# and 'files' not in query_info['integrations']
391+
# and 'views' not in query_info['integrations']
392+
# ):
393+
# int_name = list(query_info['integrations'])[0]
394+
# if self.integrations.get(int_name, {}).get('class_type') != 'api':
395+
#
396+
# # if no predictor inside = run as is
397+
# return self.plan_integration_nested_select(select, int_name)
398398

399399
return self.plan_mdb_nested_select(select)
400400

@@ -685,22 +685,38 @@ def plan_delete(self, query: Delete):
685685
))
686686

687687
def plan_cte(self, query):
688-
query_info = self.get_query_info(query)
689-
690-
if (
691-
len(query_info['integrations']) == 1
692-
and len(query_info['mdb_entities']) == 0
693-
and len(query_info['user_functions']) == 0
694-
):
695-
# single integration, will be planned later
696-
return
697688

698689
for cte in query.cte:
699690
step = self.plan_select(cte.query)
700691
name = cte.name.parts[-1]
701692
self.cte_results[name] = step.result
702693

694+
def check_single_integration(self, query):
695+
query_info = self.get_query_info(query)
696+
697+
# can we send all query to integration?
698+
699+
# one integration and not mindsdb objects in query
700+
if (
701+
len(query_info['mdb_entities']) == 0
702+
and len(query_info['integrations']) == 1
703+
and 'files' not in query_info['integrations']
704+
and 'views' not in query_info['integrations']
705+
and len(query_info['user_functions']) == 0
706+
):
707+
708+
int_name = list(query_info['integrations'])[0]
709+
# if is sql database
710+
if self.integrations.get(int_name, {}).get('class_type') != 'api':
711+
712+
# send to this integration
713+
self.prepare_integration_select(int_name, query)
714+
715+
last_step = self.plan.add_step(FetchDataframeStep(integration=int_name, query=query))
716+
return last_step
717+
703718
def plan_select(self, query, integration=None):
719+
704720
if isinstance(query, (Union, Except, Intersect)):
705721
return self.plan_union(query, integration=integration)
706722

@@ -775,6 +791,8 @@ def from_query(self, query=None):
775791
query = self.query
776792

777793
if isinstance(query, (Select, Union, Except, Intersect)):
794+
if self.check_single_integration(query):
795+
return self.plan
778796
self.plan_select(query)
779797
elif isinstance(query, CreateTable):
780798
self.plan_create_table(query)

tests/test_planner/test_integration_select.py

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ def test_integration_select_subquery_in_from(self):
290290
steps=[
291291
FetchDataframeStep(integration='int',
292292
query=Select(
293-
targets=[Identifier('column1')],
293+
targets=[Identifier('column1', alias=Identifier('column1'))],
294294
from_table=Select(
295295
targets=[Identifier('column1', alias=Identifier('column1'))],
296296
from_table=Identifier('tab'),
@@ -378,7 +378,7 @@ def test_integration_select_default_namespace_subquery_in_from(self):
378378
steps=[
379379
FetchDataframeStep(integration='int',
380380
query=Select(
381-
targets=[Identifier('column1')],
381+
targets=[Identifier('column1', alias=Identifier('column1')),],
382382
from_table=Select(
383383
targets=[Identifier('column1', alias=Identifier('column1'))],
384384
from_table=Identifier('tab'),
@@ -588,20 +588,26 @@ def test_select_from_single_integration(self):
588588
with tab2 as (
589589
select * from int1.tabl2
590590
)
591-
select x from tab2
592-
join int1.tab1 on 0=0
593-
where x1 in (select id from int1.tab1)
594-
limit 1
591+
select a from (
592+
select x from tab2
593+
union
594+
select y from int1.tab1
595+
where x1 in (select id from int1.tab1)
596+
limit 1
597+
)
595598
'''
596599

597600
sql_integration = '''
598601
with tab2 as (
599602
select * from tabl2
600603
)
601-
select x from tab2
602-
join tab1 on 0=0
603-
where x1 in (select id as id from tab1)
604-
limit 1
604+
select a as a from (
605+
select x as x from tab2
606+
union
607+
select y as y from tab1
608+
where x1 in (select id as id from tab1)
609+
limit 1
610+
)
605611
'''
606612
query = parse_sql(sql_parsed, dialect='mindsdb')
607613

0 commit comments

Comments
 (0)