Skip to content

Commit 57ba416

Browse files
authored
Merge pull request #413 from mindsdb/cte-support
Cte support
2 parents 5525c2c + 82cae5a commit 57ba416

5 files changed

Lines changed: 56 additions & 12 deletions

File tree

mindsdb_sql/planner/plan_join.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,7 @@ def plan(self, query):
112112
query2 = copy.deepcopy(query)
113113
query2.from_table = None
114114
query2.using = None
115+
query2.cte = None
115116
sup_select = QueryStep(query2, from_table=join_step.result)
116117
self.planner.plan.add_step(sup_select)
117118
return sup_select
@@ -375,7 +376,9 @@ def process_subselect(self, item):
375376
self.step_stack.append(step2)
376377

377378
def process_table(self, item, query_in):
378-
query2 = Select(from_table=item.table, targets=[Star()])
379+
table = copy.deepcopy(item.table)
380+
table.parts.insert(0, item.integration)
381+
query2 = Select(from_table=table, targets=[Star()])
379382
# parts = tuple(map(str.lower, table_name.parts))
380383
conditions = item.conditions
381384
if 'or' in self.query_context['binary_ops']:
@@ -414,8 +417,7 @@ def process_table(self, item, query_in):
414417
else:
415418
query2.where = cond
416419

417-
# step = self.planner.get_integration_select_step(query2)
418-
step = FetchDataframeStep(integration=item.integration, query=query2)
420+
step = self.planner.get_integration_select_step(query2)
419421
self.tables_fetch_step[item.index] = step
420422

421423
self.add_plan_step(step)

mindsdb_sql/planner/query_planner.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ def __init__(self,
8686

8787
self.statement = None
8888

89+
self.cte_results = {}
90+
8991
def is_predictor(self, identifier):
9092
if not isinstance(identifier, Identifier):
9193
return False
@@ -158,6 +160,12 @@ def get_integration_select_step(self, select):
158160
else:
159161
integration_name, table = self.resolve_database_table(select.from_table)
160162

163+
# is it CTE?
164+
table_name = table.parts[-1]
165+
if integration_name == self.default_namespace and table_name in self.cte_results:
166+
select.from_table = None
167+
return SubSelectStep(select, self.cte_results[table_name], table_name=table_name)
168+
161169
fetch_df_select = copy.deepcopy(select)
162170
self.prepare_integration_select(integration_name, fetch_df_select)
163171

@@ -663,10 +671,19 @@ def plan_delete(self, query: Delete):
663671
where=query.where
664672
))
665673

674+
def plan_cte(self, query):
675+
for cte in query.cte:
676+
step = self.plan_select(cte.query)
677+
name = cte.name.parts[-1]
678+
self.cte_results[name] = step.result
679+
666680
def plan_select(self, query, integration=None):
667681
if isinstance(query, Union):
668682
return self.plan_union(query, integration=integration)
669683

684+
if query.cte is not None:
685+
self.plan_cte(query)
686+
670687
from_table = query.from_table
671688

672689
if isinstance(from_table, Identifier):

mindsdb_sql/planner/query_prepare.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,8 @@ def find_predictors(node, is_table, **kwargs):
348348

349349
elif column.name is not None:
350350
# is Identifier
351+
if isinstance(column.name, ast.Star):
352+
continue
351353
col_name = column.name.upper()
352354
if column.table is not None:
353355
table = column.table

tests/test_planner/test_join_predictor.py

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -142,14 +142,6 @@ def test_join_predictor_plan_limit(self):
142142
# plan_query(query, integrations=['postgres_90'], predictor_namespace='mindsdb', predictor_metadata={'hrp3': {}})
143143

144144
def test_join_predictor_plan_complex_query(self):
145-
query = Select(targets=[Identifier('tab.asset'), Identifier('tab.time'), Identifier('pred.predicted')],
146-
from_table=Join(left=Identifier('int.tab'),
147-
right=Identifier('mindsdb.pred'),
148-
join_type=JoinType.INNER_JOIN,
149-
implicit=True),
150-
group_by=[Identifier('tab.asset')],
151-
having=BinaryOperation('=', args=[Identifier('tab.asset'), Constant('bitcoin')])
152-
)
153145

154146
sql = """
155147
select t.asset, t.time, m.predicted

tests/test_planner/test_join_tables.py

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,4 +489,35 @@ def test_join_one_integration(self):
489489
)
490490
plan = plan_query(query, integrations=['int'], default_namespace='int')
491491

492-
assert plan.steps == expected_plan.steps
492+
assert plan.steps == expected_plan.steps
493+
494+
def test_cte(self):
495+
query = parse_sql('''
496+
with t1 as (
497+
select * from int1.tbl1
498+
)
499+
select t1.id, t2.* from t1
500+
join int2.tbl2 t2 on t1.id>t2.id
501+
''')
502+
503+
subquery = copy.deepcopy(query)
504+
subquery.from_table = None
505+
506+
plan = plan_query(query, integrations=['int1', 'int2'], default_namespace='mindsdb')
507+
508+
expected_plan = QueryPlan(
509+
steps=[
510+
FetchDataframeStep(integration='int1', query=parse_sql('select * from tbl1')),
511+
SubSelectStep(dataframe=Result(0), query=Select(targets=[Star()]), table_name='t1'),
512+
FetchDataframeStep(integration='int2', query=parse_sql('select * from tbl2 as t2')),
513+
JoinStep(left=Result(1),
514+
right=Result(2),
515+
query=Join(left=Identifier('tab1'),
516+
right=Identifier('tab2'),
517+
condition=BinaryOperation(op='>', args=[Identifier('t1.id'), Identifier('t2.id')]),
518+
join_type=JoinType.JOIN)),
519+
QueryStep(parse_sql('SELECT t1.`id`, t2.*'), from_table=Result(3)),
520+
]
521+
)
522+
523+
assert plan.steps == expected_plan.steps

0 commit comments

Comments
 (0)