Skip to content

Commit ddc401e

Browse files
Remove on_start_query_execution callback from all aio cursors
Native asyncio cursors provide direct control flow via await, making the on_start_query_execution callback unnecessary. This aligns with AsyncCursor which also omits this callback. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 5627385 commit ddc401e

File tree

7 files changed

+4
-63
lines changed

7 files changed

+4
-63
lines changed

pyathena/aio/arrow/cursor.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import asyncio
55
import logging
6-
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union, cast
6+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
77

88
from pyathena.aio.common import WithAsyncFetch
99
from pyathena.arrow.converter import (
@@ -49,7 +49,6 @@ def __init__(
4949
unload: bool = False,
5050
result_reuse_enable: bool = False,
5151
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
52-
on_start_query_execution: Optional[Callable[[str], None]] = None,
5352
connect_timeout: Optional[float] = None,
5453
request_timeout: Optional[float] = None,
5554
**kwargs,
@@ -65,7 +64,6 @@ def __init__(
6564
kill_on_interrupt=kill_on_interrupt,
6665
result_reuse_enable=result_reuse_enable,
6766
result_reuse_minutes=result_reuse_minutes,
68-
on_start_query_execution=on_start_query_execution,
6967
**kwargs,
7068
)
7169
self._unload = unload
@@ -92,7 +90,6 @@ async def execute( # type: ignore[override]
9290
result_reuse_enable: Optional[bool] = None,
9391
result_reuse_minutes: Optional[int] = None,
9492
paramstyle: Optional[str] = None,
95-
on_start_query_execution: Optional[Callable[[str], None]] = None,
9693
**kwargs,
9794
) -> "AioArrowCursor":
9895
"""Execute a SQL query asynchronously and return results as Arrow Tables.
@@ -107,7 +104,6 @@ async def execute( # type: ignore[override]
107104
result_reuse_enable: Enable Athena result reuse for this query.
108105
result_reuse_minutes: Minutes to reuse cached results.
109106
paramstyle: Parameter style ('qmark' or 'pyformat').
110-
on_start_query_execution: Callback called when query starts.
111107
**kwargs: Additional execution parameters.
112108
113109
Returns:
@@ -138,10 +134,6 @@ async def execute( # type: ignore[override]
138134
paramstyle=paramstyle,
139135
)
140136

141-
if self._on_start_query_execution:
142-
self._on_start_query_execution(self.query_id)
143-
if on_start_query_execution:
144-
on_start_query_execution(self.query_id)
145137
query_execution = await self._poll(self.query_id)
146138
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
147139
self.result_set = await asyncio.to_thread(

pyathena/aio/common.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,6 @@ def __init__(self, **kwargs) -> None:
365365
super().__init__(**kwargs)
366366
self._query_id: Optional[str] = None
367367
self._result_set: Optional[AthenaResultSet] = None
368-
self._on_start_query_execution = kwargs.get("on_start_query_execution")
369368

370369
@property
371370
def arraysize(self) -> int:

pyathena/aio/cursor.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
from __future__ import annotations
33

44
import logging
5-
from typing import Any, Callable, Dict, List, Optional, Union, cast
5+
from typing import Any, Dict, List, Optional, Union, cast
66

77
from pyathena.aio.common import WithAsyncFetch
88
from pyathena.aio.result_set import AthenaAioDictResultSet, AthenaAioResultSet
@@ -39,7 +39,6 @@ def __init__(
3939
kill_on_interrupt: bool = True,
4040
result_reuse_enable: bool = False,
4141
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
42-
on_start_query_execution: Optional[Callable[[str], None]] = None,
4342
**kwargs,
4443
) -> None:
4544
super().__init__(
@@ -53,7 +52,6 @@ def __init__(
5352
kill_on_interrupt=kill_on_interrupt,
5453
result_reuse_enable=result_reuse_enable,
5554
result_reuse_minutes=result_reuse_minutes,
56-
on_start_query_execution=on_start_query_execution,
5755
**kwargs,
5856
)
5957
self._result_set: Optional[AthenaAioResultSet] = None
@@ -82,7 +80,6 @@ async def execute( # type: ignore[override]
8280
result_reuse_enable: Optional[bool] = None,
8381
result_reuse_minutes: Optional[int] = None,
8482
paramstyle: Optional[str] = None,
85-
on_start_query_execution: Optional[Callable[[str], None]] = None,
8683
**kwargs,
8784
) -> "AioCursor":
8885
"""Execute a SQL query asynchronously.
@@ -97,7 +94,6 @@ async def execute( # type: ignore[override]
9794
result_reuse_enable: Enable result reuse (optional).
9895
result_reuse_minutes: Result reuse duration in minutes (optional).
9996
paramstyle: Parameter style to use (optional).
100-
on_start_query_execution: Callback invoked with query_id after submission.
10197
**kwargs: Additional execution parameters.
10298
10399
Returns:
@@ -116,11 +112,6 @@ async def execute( # type: ignore[override]
116112
paramstyle=paramstyle,
117113
)
118114

119-
if self._on_start_query_execution:
120-
self._on_start_query_execution(self.query_id)
121-
if on_start_query_execution:
122-
on_start_query_execution(self.query_id)
123-
124115
query_execution = await self._poll(self.query_id)
125116
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
126117
self.result_set = await self._result_set_class.create(

pyathena/aio/pandas/cursor.py

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
from typing import (
88
TYPE_CHECKING,
99
Any,
10-
Callable,
1110
Dict,
1211
Iterable,
1312
List,
@@ -65,7 +64,6 @@ def __init__(
6564
result_reuse_enable: bool = False,
6665
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
6766
auto_optimize_chunksize: bool = False,
68-
on_start_query_execution: Optional[Callable[[str], None]] = None,
6967
**kwargs,
7068
) -> None:
7169
super().__init__(
@@ -79,7 +77,6 @@ def __init__(
7977
kill_on_interrupt=kill_on_interrupt,
8078
result_reuse_enable=result_reuse_enable,
8179
result_reuse_minutes=result_reuse_minutes,
82-
on_start_query_execution=on_start_query_execution,
8380
**kwargs,
8481
)
8582
self._unload = unload
@@ -113,7 +110,6 @@ async def execute( # type: ignore[override]
113110
keep_default_na: bool = False,
114111
na_values: Optional[Iterable[str]] = ("",),
115112
quoting: int = 1,
116-
on_start_query_execution: Optional[Callable[[str], None]] = None,
117113
**kwargs,
118114
) -> "AioPandasCursor":
119115
"""Execute a SQL query asynchronously and return results as pandas DataFrames.
@@ -131,7 +127,6 @@ async def execute( # type: ignore[override]
131127
keep_default_na: Whether to keep default pandas NA values.
132128
na_values: Additional values to treat as NA.
133129
quoting: CSV quoting behavior (pandas csv.QUOTE_* constants).
134-
on_start_query_execution: Callback called when query starts.
135130
**kwargs: Additional pandas read_csv/read_parquet parameters.
136131
137132
Returns:
@@ -162,10 +157,6 @@ async def execute( # type: ignore[override]
162157
paramstyle=paramstyle,
163158
)
164159

165-
if self._on_start_query_execution:
166-
self._on_start_query_execution(self.query_id)
167-
if on_start_query_execution:
168-
on_start_query_execution(self.query_id)
169160
query_execution = await self._poll(self.query_id)
170161
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
171162
self.result_set = await asyncio.to_thread(

pyathena/aio/polars/cursor.py

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
import asyncio
55
import logging
66
from multiprocessing import cpu_count
7-
from typing import TYPE_CHECKING, Any, Callable, Dict, List, Optional, Union, cast
7+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
88

99
from pyathena.aio.common import WithAsyncFetch
1010
from pyathena.common import CursorIterator
@@ -50,7 +50,6 @@ def __init__(
5050
unload: bool = False,
5151
result_reuse_enable: bool = False,
5252
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
53-
on_start_query_execution: Optional[Callable[[str], None]] = None,
5453
block_size: Optional[int] = None,
5554
cache_type: Optional[str] = None,
5655
max_workers: int = (cpu_count() or 1) * 5,
@@ -68,7 +67,6 @@ def __init__(
6867
kill_on_interrupt=kill_on_interrupt,
6968
result_reuse_enable=result_reuse_enable,
7069
result_reuse_minutes=result_reuse_minutes,
71-
on_start_query_execution=on_start_query_execution,
7270
**kwargs,
7371
)
7472
self._unload = unload
@@ -97,7 +95,6 @@ async def execute( # type: ignore[override]
9795
result_reuse_enable: Optional[bool] = None,
9896
result_reuse_minutes: Optional[int] = None,
9997
paramstyle: Optional[str] = None,
100-
on_start_query_execution: Optional[Callable[[str], None]] = None,
10198
**kwargs,
10299
) -> "AioPolarsCursor":
103100
"""Execute a SQL query asynchronously and return results as Polars DataFrames.
@@ -112,7 +109,6 @@ async def execute( # type: ignore[override]
112109
result_reuse_enable: Enable Athena result reuse for this query.
113110
result_reuse_minutes: Minutes to reuse cached results.
114111
paramstyle: Parameter style ('qmark' or 'pyformat').
115-
on_start_query_execution: Callback called when query starts.
116112
**kwargs: Additional execution parameters passed to Polars read functions.
117113
118114
Returns:
@@ -143,10 +139,6 @@ async def execute( # type: ignore[override]
143139
paramstyle=paramstyle,
144140
)
145141

146-
if self._on_start_query_execution:
147-
self._on_start_query_execution(self.query_id)
148-
if on_start_query_execution:
149-
on_start_query_execution(self.query_id)
150142
query_execution = await self._poll(self.query_id)
151143
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
152144
self.result_set = await asyncio.to_thread(

pyathena/aio/s3fs/cursor.py

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import asyncio
55
import logging
6-
from typing import Any, Callable, Dict, List, Optional, Tuple, Union, cast
6+
from typing import Any, Dict, List, Optional, Tuple, Union, cast
77

88
from pyathena.aio.common import WithAsyncFetch
99
from pyathena.common import CursorIterator
@@ -41,7 +41,6 @@ def __init__(
4141
kill_on_interrupt: bool = True,
4242
result_reuse_enable: bool = False,
4343
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
44-
on_start_query_execution: Optional[Callable[[str], None]] = None,
4544
csv_reader: Optional[CSVReaderType] = None,
4645
**kwargs,
4746
) -> None:
@@ -56,7 +55,6 @@ def __init__(
5655
kill_on_interrupt=kill_on_interrupt,
5756
result_reuse_enable=result_reuse_enable,
5857
result_reuse_minutes=result_reuse_minutes,
59-
on_start_query_execution=on_start_query_execution,
6058
**kwargs,
6159
)
6260
self._csv_reader = csv_reader
@@ -87,7 +85,6 @@ async def execute( # type: ignore[override]
8785
result_reuse_enable: Optional[bool] = None,
8886
result_reuse_minutes: Optional[int] = None,
8987
paramstyle: Optional[str] = None,
90-
on_start_query_execution: Optional[Callable[[str], None]] = None,
9188
**kwargs,
9289
) -> "AioS3FSCursor":
9390
"""Execute a SQL query asynchronously via S3FileSystem CSV reader.
@@ -102,7 +99,6 @@ async def execute( # type: ignore[override]
10299
result_reuse_enable: Enable Athena result reuse for this query.
103100
result_reuse_minutes: Minutes to reuse cached results.
104101
paramstyle: Parameter style ('qmark' or 'pyformat').
105-
on_start_query_execution: Callback called when query starts.
106102
**kwargs: Additional execution parameters.
107103
108104
Returns:
@@ -121,11 +117,6 @@ async def execute( # type: ignore[override]
121117
paramstyle=paramstyle,
122118
)
123119

124-
if self._on_start_query_execution:
125-
self._on_start_query_execution(self.query_id)
126-
if on_start_query_execution:
127-
on_start_query_execution(self.query_id)
128-
129120
query_execution = await self._poll(self.query_id)
130121
if query_execution.state == AthenaQueryExecution.STATE_SUCCEEDED:
131122
self.result_set = await asyncio.to_thread(

tests/pyathena/aio/test_cursor.py

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -146,21 +146,6 @@ async def test_executemany_fetch(self, aio_cursor):
146146
with pytest.raises(ProgrammingError):
147147
await aio_cursor.fetchone()
148148

149-
async def test_execute_with_callback(self, aio_cursor):
150-
callback_results = []
151-
152-
def on_start(query_id):
153-
assert query_id is not None
154-
assert len(query_id) > 0
155-
callback_results.append(query_id)
156-
157-
result = await aio_cursor.execute("SELECT 1", on_start_query_execution=on_start)
158-
159-
assert len(callback_results) == 1
160-
assert callback_results[0] == aio_cursor.query_id
161-
assert result is aio_cursor
162-
assert await aio_cursor.fetchone() == (1,)
163-
164149
async def test_context_manager(self):
165150
conn = await _aio_connect(schema_name=ENV.schema)
166151
try:

0 commit comments

Comments
 (0)