Skip to content

Commit df3b21f

Browse files
Move Query Execution Callback docs from cursor.rst to usage.rst
Relocated the Query Execution Callback documentation to the usage section for better organization and discoverability, positioning it before the Environment variables section to maintain logical flow. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1 parent 38cbd33 commit df3b21f

2 files changed

Lines changed: 191 additions & 190 deletions

File tree

docs/cursor.rst

Lines changed: 0 additions & 190 deletions
Original file line numberDiff line numberDiff line change
@@ -298,196 +298,6 @@ If you want to change the dictionary type (e.g., use OrderedDict), you can speci
298298
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
299299
region_name="us-west-2").cursor(cursor=AsyncDictCursor, dict_type=OrderedDict)
300300
301-
.. _query-execution-callback:
302-
303-
Query Execution Callback
304-
-------------------------
305-
306-
PyAthena provides a callback mechanism that allows you to get immediate access to the query ID
307-
as soon as the ``start_query_execution`` API call is made, before waiting for query completion.
308-
This is useful for monitoring, logging, or cancelling long-running queries from another thread.
309-
310-
The ``on_start_query_execution`` callback can be configured at both the connection level and
311-
the execute level. When both are set, both callbacks will be invoked.
312-
313-
Connection-level callback
314-
~~~~~~~~~~~~~~~~~~~~~~~~~
315-
316-
You can set a default callback for all queries executed through a connection:
317-
318-
.. code:: python
319-
320-
from pyathena import connect
321-
322-
def query_callback(query_id):
323-
print(f"Query started with ID: {query_id}")
324-
# You can use query_id for monitoring or cancellation
325-
326-
cursor = connect(
327-
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
328-
region_name="us-west-2",
329-
on_start_query_execution=query_callback
330-
).cursor()
331-
332-
cursor.execute("SELECT * FROM many_rows") # Callback will be invoked
333-
334-
Execute-level callback
335-
~~~~~~~~~~~~~~~~~~~~~~
336-
337-
You can also specify a callback for individual query executions:
338-
339-
.. code:: python
340-
341-
from pyathena import connect
342-
343-
def specific_callback(query_id):
344-
print(f"Specific query started: {query_id}")
345-
346-
cursor = connect(
347-
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
348-
region_name="us-west-2"
349-
).cursor()
350-
351-
cursor.execute(
352-
"SELECT * FROM many_rows",
353-
on_start_query_execution=specific_callback
354-
)
355-
356-
Query cancellation example
357-
~~~~~~~~~~~~~~~~~~~~~~~~~~
358-
359-
A common use case is to cancel long-running analytical queries after a timeout:
360-
361-
.. code:: python
362-
363-
import time
364-
from concurrent.futures import ThreadPoolExecutor, TimeoutError
365-
from pyathena import connect
366-
367-
def cancel_long_running_query():
368-
"""Example: Cancel a complex analytical query after 10 minutes."""
369-
370-
def track_query_start(query_id):
371-
print(f"Long-running analysis started: {query_id}")
372-
return query_id
373-
374-
def monitor_and_cancel(cursor, timeout_minutes):
375-
"""Monitor query and cancel if it exceeds timeout."""
376-
time.sleep(timeout_minutes * 60) # Convert to seconds
377-
try:
378-
cursor.cancel()
379-
print(f"Query cancelled after {timeout_minutes} minutes timeout")
380-
except Exception as e:
381-
print(f"Cancellation failed: {e}")
382-
383-
cursor = connect(
384-
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
385-
region_name="us-west-2",
386-
on_start_query_execution=track_query_start
387-
).cursor()
388-
389-
# Complex analytical query that might run for a long time
390-
long_query = """
391-
WITH daily_metrics AS (
392-
SELECT
393-
date_trunc('day', timestamp_col) as day,
394-
user_id,
395-
COUNT(*) as events,
396-
AVG(duration) as avg_duration
397-
FROM large_events_table
398-
WHERE timestamp_col >= current_date - interval '1' year
399-
GROUP BY 1, 2
400-
),
401-
user_segments AS (
402-
SELECT
403-
user_id,
404-
CASE
405-
WHEN AVG(events) > 100 THEN 'high_activity'
406-
WHEN AVG(events) > 10 THEN 'medium_activity'
407-
ELSE 'low_activity'
408-
END as segment
409-
FROM daily_metrics
410-
GROUP BY user_id
411-
)
412-
SELECT
413-
segment,
414-
COUNT(DISTINCT user_id) as users,
415-
AVG(events) as avg_daily_events
416-
FROM daily_metrics dm
417-
JOIN user_segments us ON dm.user_id = us.user_id
418-
GROUP BY segment
419-
ORDER BY avg_daily_events DESC
420-
"""
421-
422-
# Use ThreadPoolExecutor for timeout management
423-
with ThreadPoolExecutor(max_workers=1) as executor:
424-
# Start timeout monitor (cancel after 10 minutes)
425-
timeout_future = executor.submit(monitor_and_cancel, cursor, 10)
426-
427-
try:
428-
print("Starting complex analytical query (10-minute timeout)...")
429-
cursor.execute(long_query)
430-
431-
# Process results
432-
results = cursor.fetchall()
433-
print(f"Analysis completed successfully: {len(results)} segments found")
434-
for row in results:
435-
print(f" {row[0]}: {row[1]} users, {row[2]:.1f} avg events")
436-
437-
except Exception as e:
438-
print(f"Query failed or was cancelled: {e}")
439-
finally:
440-
# Clean up timeout monitor
441-
try:
442-
timeout_future.result(timeout=1)
443-
except TimeoutError:
444-
pass # Monitor is still running, which is fine
445-
446-
# Run the example
447-
cancel_long_running_query()
448-
449-
Multiple callbacks
450-
~~~~~~~~~~~~~~~~~~~
451-
452-
When both connection-level and execute-level callbacks are specified,
453-
both callbacks will be invoked:
454-
455-
.. code:: python
456-
457-
from pyathena import connect
458-
459-
def connection_callback(query_id):
460-
print(f"Connection callback: {query_id}")
461-
# Log to monitoring system
462-
463-
def execute_callback(query_id):
464-
print(f"Execute callback: {query_id}")
465-
# Store for cancellation if needed
466-
467-
cursor = connect(
468-
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
469-
region_name="us-west-2",
470-
on_start_query_execution=connection_callback
471-
).cursor()
472-
473-
# This will invoke both connection_callback and execute_callback
474-
cursor.execute(
475-
"SELECT 1",
476-
on_start_query_execution=execute_callback
477-
)
478-
479-
Supported cursor types
480-
~~~~~~~~~~~~~~~~~~~~~~
481-
482-
The ``on_start_query_execution`` callback is supported by the following cursor types:
483-
484-
* ``Cursor`` (default cursor)
485-
* ``DictCursor``
486-
* ``ArrowCursor``
487-
* ``PandasCursor``
488-
489-
Note: ``AsyncCursor`` and its variants do not support this callback as they already
490-
return the query ID immediately through their different execution model.
491301
492302
PandasCursor
493303
------------

docs/usage.rst

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,197 @@ and the query was a DML statement (the assumption being that you always want to
178178

179179
The S3 staging directory is not checked, so it's possible that the location of the results is not in your provided ``s3_staging_dir``.
180180

181+
.. _query-execution-callback:
182+
183+
Query Execution Callback
184+
-------------------------
185+
186+
PyAthena provides a callback mechanism that allows you to get immediate access to the query ID
187+
as soon as the ``start_query_execution`` API call is made, before waiting for query completion.
188+
This is useful for monitoring, logging, or cancelling long-running queries from another thread.
189+
190+
The ``on_start_query_execution`` callback can be configured at both the connection level and
191+
the execute level. When both are set, both callbacks will be invoked.
192+
193+
Connection-level callback
194+
~~~~~~~~~~~~~~~~~~~~~~~~~
195+
196+
You can set a default callback for all queries executed through a connection:
197+
198+
.. code:: python
199+
200+
from pyathena import connect
201+
202+
def query_callback(query_id):
203+
print(f"Query started with ID: {query_id}")
204+
# You can use query_id for monitoring or cancellation
205+
206+
cursor = connect(
207+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
208+
region_name="us-west-2",
209+
on_start_query_execution=query_callback
210+
).cursor()
211+
212+
cursor.execute("SELECT * FROM many_rows") # Callback will be invoked
213+
214+
Execute-level callback
215+
~~~~~~~~~~~~~~~~~~~~~~
216+
217+
You can also specify a callback for individual query executions:
218+
219+
.. code:: python
220+
221+
from pyathena import connect
222+
223+
def specific_callback(query_id):
224+
print(f"Specific query started: {query_id}")
225+
226+
cursor = connect(
227+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
228+
region_name="us-west-2"
229+
).cursor()
230+
231+
cursor.execute(
232+
"SELECT * FROM many_rows",
233+
on_start_query_execution=specific_callback
234+
)
235+
236+
Query cancellation example
237+
~~~~~~~~~~~~~~~~~~~~~~~~~~
238+
239+
A common use case is to cancel long-running analytical queries after a timeout:
240+
241+
.. code:: python
242+
243+
import time
244+
from concurrent.futures import ThreadPoolExecutor, TimeoutError
245+
from pyathena import connect
246+
247+
def cancel_long_running_query():
248+
"""Example: Cancel a complex analytical query after 10 minutes."""
249+
250+
def track_query_start(query_id):
251+
print(f"Long-running analysis started: {query_id}")
252+
return query_id
253+
254+
def monitor_and_cancel(cursor, timeout_minutes):
255+
"""Monitor query and cancel if it exceeds timeout."""
256+
time.sleep(timeout_minutes * 60) # Convert to seconds
257+
try:
258+
cursor.cancel()
259+
print(f"Query cancelled after {timeout_minutes} minutes timeout")
260+
except Exception as e:
261+
print(f"Cancellation failed: {e}")
262+
263+
cursor = connect(
264+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
265+
region_name="us-west-2",
266+
on_start_query_execution=track_query_start
267+
).cursor()
268+
269+
# Complex analytical query that might run for a long time
270+
long_query = """
271+
WITH daily_metrics AS (
272+
SELECT
273+
date_trunc('day', timestamp_col) as day,
274+
user_id,
275+
COUNT(*) as events,
276+
AVG(duration) as avg_duration
277+
FROM large_events_table
278+
WHERE timestamp_col >= current_date - interval '1' year
279+
GROUP BY 1, 2
280+
),
281+
user_segments AS (
282+
SELECT
283+
user_id,
284+
CASE
285+
WHEN AVG(events) > 100 THEN 'high_activity'
286+
WHEN AVG(events) > 10 THEN 'medium_activity'
287+
ELSE 'low_activity'
288+
END as segment
289+
FROM daily_metrics
290+
GROUP BY user_id
291+
)
292+
SELECT
293+
segment,
294+
COUNT(DISTINCT user_id) as users,
295+
AVG(events) as avg_daily_events
296+
FROM daily_metrics dm
297+
JOIN user_segments us ON dm.user_id = us.user_id
298+
GROUP BY segment
299+
ORDER BY avg_daily_events DESC
300+
"""
301+
302+
# Use ThreadPoolExecutor for timeout management
303+
with ThreadPoolExecutor(max_workers=1) as executor:
304+
# Start timeout monitor (cancel after 10 minutes)
305+
timeout_future = executor.submit(monitor_and_cancel, cursor, 10)
306+
307+
try:
308+
print("Starting complex analytical query (10-minute timeout)...")
309+
cursor.execute(long_query)
310+
311+
# Process results
312+
results = cursor.fetchall()
313+
print(f"Analysis completed successfully: {len(results)} segments found")
314+
for row in results:
315+
print(f" {row[0]}: {row[1]} users, {row[2]:.1f} avg events")
316+
317+
except Exception as e:
318+
print(f"Query failed or was cancelled: {e}")
319+
finally:
320+
# Clean up timeout monitor
321+
try:
322+
timeout_future.result(timeout=1)
323+
except TimeoutError:
324+
pass # Monitor is still running, which is fine
325+
326+
# Run the example
327+
cancel_long_running_query()
328+
329+
Multiple callbacks
330+
~~~~~~~~~~~~~~~~~~~
331+
332+
When both connection-level and execute-level callbacks are specified,
333+
both callbacks will be invoked:
334+
335+
.. code:: python
336+
337+
from pyathena import connect
338+
339+
def connection_callback(query_id):
340+
print(f"Connection callback: {query_id}")
341+
# Log to monitoring system
342+
343+
def execute_callback(query_id):
344+
print(f"Execute callback: {query_id}")
345+
# Store for cancellation if needed
346+
347+
cursor = connect(
348+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
349+
region_name="us-west-2",
350+
on_start_query_execution=connection_callback
351+
).cursor()
352+
353+
# This will invoke both connection_callback and execute_callback
354+
cursor.execute(
355+
"SELECT 1",
356+
on_start_query_execution=execute_callback
357+
)
358+
359+
Supported cursor types
360+
~~~~~~~~~~~~~~~~~~~~~~
361+
362+
The ``on_start_query_execution`` callback is supported by the following cursor types:
363+
364+
* ``Cursor`` (default cursor)
365+
* ``DictCursor``
366+
* ``ArrowCursor``
367+
* ``PandasCursor``
368+
369+
Note: ``AsyncCursor`` and its variants do not support this callback as they already
370+
return the query ID immediately through their different execution model.
371+
181372
Environment variables
182373
---------------------
183374

0 commit comments

Comments
 (0)