Skip to content

Commit c393dd2

Browse files
committed
rework bulk
1 parent 062def9 commit c393dd2

5 files changed

Lines changed: 151 additions & 129 deletions

File tree

src/firebolt/async_db/cursor.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -228,11 +228,16 @@ async def _do_execute(
228228

229229
try:
230230
statement_planner = StatementPlannerFactory.create_planner(
231-
paramstyle, self._formatter, bulk_insert
231+
paramstyle, self._formatter
232232
)
233233

234234
plan = statement_planner.create_execution_plan(
235-
raw_query, parameters, skip_parsing, async_execution, streaming
235+
raw_query,
236+
parameters,
237+
skip_parsing,
238+
async_execution,
239+
streaming,
240+
bulk_insert,
236241
)
237242
await self._execute_plan(plan, timeout)
238243
self._state = CursorState.DONE

src/firebolt/common/cursor/statement_planners.py

Lines changed: 86 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,12 @@ class ExecutionPlan:
3333
streaming: bool = False
3434

3535

36-
class BulkInsertMixin:
37-
"""Mixin class for bulk insert functionality."""
36+
class BaseStatementPlanner(ABC):
37+
"""Base class for statement planning handlers."""
38+
39+
def __init__(self, formatter: StatementFormatter) -> None:
40+
"""Initialize statement planner with required dependencies."""
41+
self.formatter = formatter
3842

3943
def create_execution_plan(
4044
self,
@@ -43,11 +47,40 @@ def create_execution_plan(
4347
skip_parsing: bool = False,
4448
async_execution: bool = False,
4549
streaming: bool = False,
50+
bulk_insert: bool = False,
4651
) -> ExecutionPlan:
47-
"""Create execution plan for bulk insert operations."""
48-
return self._create_bulk_execution_plan(
49-
raw_query, parameters, async_execution, streaming
50-
)
52+
"""Create an execution plan for a given statement and parameters.
53+
54+
This method serves as a factory for creating an execution plan, which
55+
encapsulates the queries to be executed and the parameters for execution.
56+
It supports standard execution, as well as bulk insert, which is handled
57+
by a separate method.
58+
59+
Args:
60+
raw_query (str): The raw SQL query to be executed.
61+
parameters (Sequence[Sequence[ParameterType]]): A sequence of parameter
62+
sequences for the query.
63+
skip_parsing (bool): If True, the query will not be parsed, and all
64+
special features (e.g., multi-statement, parameterized queries) will
65+
be disabled. Defaults to False.
66+
async_execution (bool): If True, the query will be executed
67+
asynchronously. Defaults to False.
68+
streaming (bool): If True, the query results will be streamed.
69+
Defaults to False.
70+
bulk_insert (bool): If True, the query will be treated as a bulk insert
71+
operation. Defaults to False.
72+
73+
Returns:
74+
ExecutionPlan: An object representing the execution plan.
75+
"""
76+
if bulk_insert:
77+
return self._create_bulk_execution_plan(
78+
raw_query, parameters, async_execution, streaming
79+
)
80+
else:
81+
return self._create_standard_execution_plan(
82+
raw_query, parameters, skip_parsing, async_execution, streaming
83+
)
5184

5285
def _validate_bulk_insert_query(self, query: str) -> None:
5386
"""Validate that query is an INSERT statement for bulk_insert."""
@@ -68,51 +101,36 @@ def _create_bulk_execution_plan(
68101
async_execution: bool,
69102
streaming: bool,
70103
) -> ExecutionPlan:
71-
"""
72-
Create bulk execution plan by delegating to
73-
parameter-style specific methods.
74-
"""
104+
"""Create bulk execution plan using formatter logic."""
75105
# Validate bulk_insert requirements
76106
self._validate_bulk_insert_query(raw_query)
77107
if not parameters:
78108
raise ProgrammingError("bulk_insert requires at least one parameter set")
79109

80-
# Call the parameter-style specific bulk creation method
81110
return self._create_bulk_plan_impl(
82111
raw_query, parameters, async_execution, streaming
83112
)
84113

85-
def _create_bulk_plan_impl(
114+
@abstractmethod
115+
def _create_standard_execution_plan(
86116
self,
87117
raw_query: str,
88118
parameters: Sequence[Sequence[ParameterType]],
119+
skip_parsing: bool,
89120
async_execution: bool,
90121
streaming: bool,
91122
) -> ExecutionPlan:
92-
"""
93-
Override in subclasses to provide parameter-style
94-
specific bulk implementation.
95-
"""
96-
raise NotImplementedError("Subclass must implement _create_bulk_plan_impl")
97-
98-
99-
class BaseStatementPlanner(ABC):
100-
"""Base class for statement planning handlers."""
101-
102-
def __init__(self, formatter: StatementFormatter) -> None:
103-
"""Initialize statement planner with required dependencies."""
104-
self.formatter = formatter
123+
"""Create standard (non-bulk) execution plan."""
105124

106125
@abstractmethod
107-
def create_execution_plan(
126+
def _create_bulk_plan_impl(
108127
self,
109128
raw_query: str,
110129
parameters: Sequence[Sequence[ParameterType]],
111-
skip_parsing: bool = False,
112-
async_execution: bool = False,
113-
streaming: bool = False,
130+
async_execution: bool,
131+
streaming: bool,
114132
) -> ExecutionPlan:
115-
"""Create an execution plan for the given statement and parameters."""
133+
"""Create parameter-style specific bulk execution plan."""
116134

117135
@staticmethod
118136
def _get_output_format(streaming: bool) -> str:
@@ -125,13 +143,13 @@ def _get_output_format(streaming: bool) -> str:
125143
class FbNumericStatementPlanner(BaseStatementPlanner):
126144
"""Statement planner for fb_numeric parameter style."""
127145

128-
def create_execution_plan(
146+
def _create_standard_execution_plan(
129147
self,
130148
raw_query: str,
131149
parameters: Sequence[Sequence[ParameterType]],
132-
skip_parsing: bool = False,
133-
async_execution: bool = False,
134-
streaming: bool = False,
150+
skip_parsing: bool,
151+
async_execution: bool,
152+
streaming: bool,
135153
) -> ExecutionPlan:
136154
"""Create execution plan for fb_numeric parameter style."""
137155
query_params = self._build_fb_numeric_query_params(
@@ -146,6 +164,32 @@ def create_execution_plan(
146164
streaming=streaming,
147165
)
148166

167+
def _create_bulk_plan_impl(
168+
self,
169+
raw_query: str,
170+
parameters: Sequence[Sequence[ParameterType]],
171+
async_execution: bool,
172+
streaming: bool,
173+
) -> ExecutionPlan:
174+
"""Create bulk insert execution plan for fb_numeric parameter style."""
175+
# Prepare bulk insert query and parameters for fb_numeric
176+
processed_query, processed_params = self._prepare_fb_numeric_bulk_insert(
177+
raw_query, parameters
178+
)
179+
180+
# Build query parameters for bulk insert
181+
query_params = self._build_fb_numeric_query_params(
182+
processed_params, streaming, async_execution
183+
)
184+
185+
return ExecutionPlan(
186+
queries=[processed_query],
187+
query_params=query_params,
188+
is_multi_statement=False,
189+
async_execution=async_execution,
190+
streaming=streaming,
191+
)
192+
149193
def _build_fb_numeric_query_params(
150194
self,
151195
parameters: Sequence[Sequence[ParameterType]],
@@ -174,36 +218,6 @@ def _build_fb_numeric_query_params(
174218
query_params.update(extra_params)
175219
return query_params
176220

177-
178-
class FbNumericBulkStatementPlanner(BulkInsertMixin, FbNumericStatementPlanner):
179-
"""Statement planner for fb_numeric parameter style with bulk insert support."""
180-
181-
def _create_bulk_plan_impl(
182-
self,
183-
raw_query: str,
184-
parameters: Sequence[Sequence[ParameterType]],
185-
async_execution: bool,
186-
streaming: bool,
187-
) -> ExecutionPlan:
188-
"""Create bulk insert execution plan for fb_numeric parameter style."""
189-
# Prepare bulk insert query and parameters for fb_numeric
190-
processed_query, processed_params = self._prepare_fb_numeric_bulk_insert(
191-
raw_query, parameters
192-
)
193-
194-
# Build query parameters for bulk insert
195-
query_params = self._build_fb_numeric_query_params(
196-
processed_params, streaming, async_execution
197-
)
198-
199-
return ExecutionPlan(
200-
queries=[processed_query],
201-
query_params=query_params,
202-
is_multi_statement=False,
203-
async_execution=async_execution,
204-
streaming=streaming,
205-
)
206-
207221
def _prepare_fb_numeric_bulk_insert(
208222
self, query: str, parameters_seq: Sequence[Sequence[ParameterType]]
209223
) -> tuple[str, Sequence[Sequence[ParameterType]]]:
@@ -232,13 +246,13 @@ def _prepare_fb_numeric_bulk_insert(
232246
class QmarkStatementPlanner(BaseStatementPlanner):
233247
"""Statement planner for qmark parameter style."""
234248

235-
def create_execution_plan(
249+
def _create_standard_execution_plan(
236250
self,
237251
raw_query: str,
238252
parameters: Sequence[Sequence[ParameterType]],
239-
skip_parsing: bool = False,
240-
async_execution: bool = False,
241-
streaming: bool = False,
253+
skip_parsing: bool,
254+
async_execution: bool,
255+
streaming: bool,
242256
) -> ExecutionPlan:
243257
"""Create execution plan for qmark parameter style."""
244258
queries: List[Union[SetParameter, str]] = (
@@ -267,10 +281,6 @@ def create_execution_plan(
267281
streaming=streaming,
268282
)
269283

270-
271-
class QmarkBulkStatementPlanner(BulkInsertMixin, QmarkStatementPlanner):
272-
"""Statement planner for qmark parameter style with bulk insert support."""
273-
274284
def _create_bulk_plan_impl(
275285
self,
276286
raw_query: str,
@@ -279,20 +289,8 @@ def _create_bulk_plan_impl(
279289
streaming: bool,
280290
) -> ExecutionPlan:
281291
"""Create bulk insert execution plan for qmark parameter style."""
282-
# Import needed modules
283-
from sqlparse import parse as parse_sql # type: ignore
284-
285-
# Prepare bulk insert query for qmark style
286-
statements = parse_sql(raw_query)
287-
if not statements:
288-
raise ProgrammingError("Invalid SQL query for bulk insert")
289-
290-
formatted_queries = []
291-
for param_set in parameters:
292-
formatted_query = self.formatter.format_statement(statements[0], param_set)
293-
formatted_queries.append(formatted_query)
294-
295-
combined_query = "; ".join(formatted_queries)
292+
# Use formatter's bulk insert method to create combined query
293+
combined_query = self.formatter.format_bulk_insert(raw_query, parameters)
296294

297295
# Build query parameters for bulk insert
298296
query_params: Dict[str, Any] = {
@@ -318,32 +316,23 @@ class StatementPlannerFactory:
318316
"qmark": QmarkStatementPlanner,
319317
}
320318

321-
_BULK_PLANNER_CLASSES = {
322-
"fb_numeric": FbNumericBulkStatementPlanner,
323-
"qmark": QmarkBulkStatementPlanner,
324-
}
325-
326319
@classmethod
327320
def create_planner(
328-
cls, paramstyle: str, formatter: StatementFormatter, bulk_insert: bool = False
321+
cls, paramstyle: str, formatter: StatementFormatter
329322
) -> BaseStatementPlanner:
330323
"""Create a statement planner instance for the given paramstyle.
331324
332325
Args:
333326
paramstyle: The parameter style ('fb_numeric' or 'qmark')
334327
formatter: StatementFormatter instance for statement processing
335-
bulk_insert: Whether to create a bulk-capable planner
336328
337329
Returns:
338330
Appropriate statement planner instance
339331
340332
Raises:
341333
ProgrammingError: If paramstyle is not supported
342334
"""
343-
planner_classes = (
344-
cls._BULK_PLANNER_CLASSES if bulk_insert else cls._PLANNER_CLASSES
345-
)
346-
planner_class = planner_classes.get(paramstyle)
335+
planner_class = cls._PLANNER_CLASSES.get(paramstyle)
347336

348337
if planner_class is None:
349338
raise ProgrammingError(f"Unsupported paramstyle: {paramstyle}")

src/firebolt/common/statement_formatter.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -218,6 +218,30 @@ def split_format_sql(
218218
self.statement_to_set(st) or self.statement_to_sql(st) for st in statements
219219
]
220220

221+
def format_bulk_insert(
222+
self, query: str, parameters_seq: Sequence[Sequence[ParameterType]]
223+
) -> str:
224+
"""
225+
Format bulk insert operations by creating multiple INSERT statements.
226+
227+
Args:
228+
query: The base INSERT query template
229+
parameters_seq: Sequence of parameter sets for each INSERT
230+
231+
Returns:
232+
Combined SQL string with all INSERT statements
233+
"""
234+
statements = parse_sql(query)
235+
if not statements:
236+
raise DataError("Invalid SQL query for bulk insert")
237+
238+
formatted_queries = []
239+
for param_set in parameters_seq:
240+
formatted_query = self.format_statement(statements[0], param_set)
241+
formatted_queries.append(formatted_query)
242+
243+
return "; ".join(formatted_queries)
244+
221245

222246
def create_statement_formatter(version: int) -> StatementFormatter:
223247
if version == 1:

src/firebolt/db/cursor.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -234,11 +234,16 @@ def _do_execute(
234234

235235
try:
236236
statement_planner = StatementPlannerFactory.create_planner(
237-
paramstyle, self._formatter, bulk_insert
237+
paramstyle, self._formatter
238238
)
239239

240240
plan = statement_planner.create_execution_plan(
241-
raw_query, parameters, skip_parsing, async_execution, streaming
241+
raw_query,
242+
parameters,
243+
skip_parsing,
244+
async_execution,
245+
streaming,
246+
bulk_insert,
242247
)
243248
self._execute_plan(plan, timeout)
244249
self._state = CursorState.DONE

0 commit comments

Comments
 (0)