Skip to content

Commit 97b5c56

Browse files
committed
improve the change and rebase
1 parent 018b9bb commit 97b5c56

8 files changed

Lines changed: 284 additions & 117 deletions

File tree

docsrc/Connecting_and_queries.rst

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -469,8 +469,7 @@ Example with FB_NUMERIC parameter style::
469469
cursor.close()
470470

471471
When ``bulk_insert=True``, the SDK concatenates all INSERT statements into a single batch
472-
and sends them to the server with the ``merge_prepared_statement_batches=true`` parameter,
473-
allowing for optimized batch processing.
472+
and sends them to the server for optimized batch processing.
474473

475474

476475
Setting session parameters

src/firebolt/async_db/cursor.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,6 @@
4646
from firebolt.common.row_set.asynchronous.streaming import StreamingAsyncRowSet
4747
from firebolt.common.statement_formatter import create_statement_formatter
4848
from firebolt.utils.exception import (
49-
ConfigurationError,
5049
EngineNotRunningError,
5150
FireboltDatabaseError,
5251
FireboltError,
@@ -229,11 +228,11 @@ async def _do_execute(
229228

230229
try:
231230
statement_planner = StatementPlannerFactory.create_planner(
232-
paramstyle, self._formatter
231+
paramstyle, self._formatter, bulk_insert
233232
)
234233

235234
plan = statement_planner.create_execution_plan(
236-
raw_query, parameters, skip_parsing, async_execution, streaming, bulk_insert
235+
raw_query, parameters, skip_parsing, async_execution, streaming
237236
)
238237
await self._execute_plan(plan, timeout)
239238
self._state = CursorState.DONE
@@ -407,8 +406,7 @@ async def executemany(
407406
cursor object until it's closed. They can also be removed with
408407
`flush_parameters` method call.
409408
Bulk insert: When bulk_insert=True, multiple INSERT queries are
410-
concatenated and sent as a single batch with
411-
merge_prepared_statement_batches=true for improved performance.
409+
concatenated and sent as a single batch for improved performance.
412410
Only supported for INSERT statements.
413411
414412
Args:

src/firebolt/common/cursor/base_cursor.py

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import logging
44
import re
55
from types import TracebackType
6-
from typing import Any, Dict, List, Optional, Sequence, Tuple, Type, Union
6+
from typing import Any, Dict, List, Optional, Tuple, Type, Union
77

88
from httpx import URL, Response
99

1010
from firebolt.client.auth.base import Auth
1111
from firebolt.client.client import AsyncClient, Client
12-
from firebolt.common._types import ParameterType, RawColType, SetParameter
12+
from firebolt.common._types import RawColType, SetParameter
1313
from firebolt.common.constants import (
1414
DISALLOWED_PARAMETER_LIST,
1515
IMMUTABLE_PARAMETER_LIST,
@@ -27,11 +27,7 @@
2727
SecureCacheKey,
2828
_firebolt_cache,
2929
)
30-
from firebolt.utils.exception import (
31-
ConfigurationError,
32-
FireboltError,
33-
ProgrammingError,
34-
)
30+
from firebolt.utils.exception import ConfigurationError, FireboltError
3531
from firebolt.utils.util import fix_url_schema
3632

3733
logger = logging.getLogger(__name__)
@@ -240,6 +236,7 @@ def _log_query(query: Union[str, SetParameter]) -> None:
240236
"aws_key_id|credentials", query, flags=re.IGNORECASE
241237
):
242238
logger.debug(f"Running query: {query}")
239+
243240
@property
244241
def engine_name(self) -> str:
245242
"""
@@ -320,44 +317,3 @@ def set_cache_record(self, record: ConnectionInfo) -> None:
320317
self._client.auth.secret,
321318
)
322319
_firebolt_cache.set(cache_key, record)
323-
324-
def _validate_bulk_insert_query(self, query: str) -> None:
325-
"""Validate that query is an INSERT statement for bulk_insert."""
326-
query_normalized = query.lstrip().lower()
327-
328-
if not query_normalized.startswith("insert"):
329-
raise ConfigurationError(
330-
"bulk_insert is only supported for INSERT statements"
331-
)
332-
333-
if ";" in query.strip().rstrip(";"):
334-
raise ProgrammingError(
335-
"bulk_insert does not support multi-statement queries"
336-
)
337-
338-
def _prepare_bulk_insert(
339-
self, query: str, parameters_seq: Sequence[Sequence[ParameterType]]
340-
) -> tuple[str, Sequence[Sequence[ParameterType]]]:
341-
"""Execute multiple INSERT queries as a single batch."""
342-
self._validate_bulk_insert_query(query)
343-
344-
if not parameters_seq:
345-
raise ProgrammingError("bulk_insert requires at least one parameter set")
346-
347-
# For bulk insert, we need to create unique parameter names for each INSERT
348-
# Example: ($1, $2); ($3, $4); ($5, $6) instead of ($1, $2); ($1, $2); ($1, $2)
349-
queries = []
350-
param_offset = 0
351-
for param_set in parameters_seq:
352-
# Replace parameter placeholders with unique numbers
353-
modified_query = query
354-
for i in range(len(param_set)):
355-
old_param = f"${i + 1}"
356-
new_param = f"${param_offset + i + 1}"
357-
modified_query = modified_query.replace(old_param, new_param)
358-
queries.append(modified_query)
359-
param_offset += len(param_set)
360-
361-
combined_query = "; ".join(queries)
362-
parameters = [param for param_set in parameters_seq for param in param_set]
363-
return combined_query, [parameters]

0 commit comments

Comments
 (0)