Skip to content

Commit 4f1ecc5

Browse files
Merge pull request #596 from laughingman7743/feature/early-query-id-callback
Add early query ID access via callback mechanism
2 parents d340159 + d130e0b commit 4f1ecc5

11 files changed

Lines changed: 683 additions & 3 deletions

File tree

docs/cursor.rst

Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -298,6 +298,197 @@ If you want to change the dictionary type (e.g., use OrderedDict), you can speci
298298
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
299299
region_name="us-west-2").cursor(cursor=AsyncDictCursor, dict_type=OrderedDict)
300300
301+
.. _query-execution-callback:
302+
303+
Query Execution Callback
304+
-------------------------
305+
306+
PyAthena provides a callback mechanism that allows you to get immediate access to the query ID
307+
as soon as the ``start_query_execution`` API call is made, before waiting for query completion.
308+
This is useful for monitoring, logging, or cancelling long-running queries from another thread.
309+
310+
The ``on_start_query_execution`` callback can be configured at both the connection level and
311+
the execute level. When both are set, both callbacks will be invoked.
312+
313+
Connection-level callback
314+
~~~~~~~~~~~~~~~~~~~~~~~~~
315+
316+
You can set a default callback for all queries executed through a connection:
317+
318+
.. code:: python
319+
320+
from pyathena import connect
321+
322+
def query_callback(query_id):
323+
print(f"Query started with ID: {query_id}")
324+
# You can use query_id for monitoring or cancellation
325+
326+
cursor = connect(
327+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
328+
region_name="us-west-2",
329+
on_start_query_execution=query_callback
330+
).cursor()
331+
332+
cursor.execute("SELECT * FROM many_rows") # Callback will be invoked
333+
334+
Execute-level callback
335+
~~~~~~~~~~~~~~~~~~~~~~
336+
337+
You can also specify a callback for individual query executions:
338+
339+
.. code:: python
340+
341+
from pyathena import connect
342+
343+
def specific_callback(query_id):
344+
print(f"Specific query started: {query_id}")
345+
346+
cursor = connect(
347+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
348+
region_name="us-west-2"
349+
).cursor()
350+
351+
cursor.execute(
352+
"SELECT * FROM many_rows",
353+
on_start_query_execution=specific_callback
354+
)
355+
356+
Query cancellation example
357+
~~~~~~~~~~~~~~~~~~~~~~~~~~
358+
359+
A common use case is to cancel long-running analytical queries after a timeout:
360+
361+
.. code:: python
362+
363+
import time
364+
from concurrent.futures import ThreadPoolExecutor, TimeoutError
365+
from pyathena import connect
366+
367+
def cancel_long_running_query():
368+
"""Example: Cancel a complex analytical query after 10 minutes."""
369+
370+
def track_query_start(query_id):
371+
print(f"Long-running analysis started: {query_id}")
372+
return query_id
373+
374+
def monitor_and_cancel(cursor, timeout_minutes):
375+
"""Monitor query and cancel if it exceeds timeout."""
376+
time.sleep(timeout_minutes * 60) # Convert to seconds
377+
try:
378+
cursor.cancel()
379+
print(f"Query cancelled after {timeout_minutes} minutes timeout")
380+
except Exception as e:
381+
print(f"Cancellation failed: {e}")
382+
383+
cursor = connect(
384+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
385+
region_name="us-west-2",
386+
on_start_query_execution=track_query_start
387+
).cursor()
388+
389+
# Complex analytical query that might run for a long time
390+
long_query = """
391+
WITH daily_metrics AS (
392+
SELECT
393+
date_trunc('day', timestamp_col) as day,
394+
user_id,
395+
COUNT(*) as events,
396+
AVG(duration) as avg_duration
397+
FROM large_events_table
398+
WHERE timestamp_col >= current_date - interval '1' year
399+
GROUP BY 1, 2
400+
),
401+
user_segments AS (
402+
SELECT
403+
user_id,
404+
CASE
405+
WHEN AVG(events) > 100 THEN 'high_activity'
406+
WHEN AVG(events) > 10 THEN 'medium_activity'
407+
ELSE 'low_activity'
408+
END as segment
409+
FROM daily_metrics
410+
GROUP BY user_id
411+
)
412+
SELECT
413+
segment,
414+
COUNT(DISTINCT user_id) as users,
415+
AVG(events) as avg_daily_events
416+
FROM daily_metrics dm
417+
JOIN user_segments us ON dm.user_id = us.user_id
418+
GROUP BY segment
419+
ORDER BY avg_daily_events DESC
420+
"""
421+
422+
# Use ThreadPoolExecutor for timeout management
423+
with ThreadPoolExecutor(max_workers=1) as executor:
424+
# Start timeout monitor (cancel after 10 minutes)
425+
timeout_future = executor.submit(monitor_and_cancel, cursor, 10)
426+
427+
try:
428+
print("Starting complex analytical query (10-minute timeout)...")
429+
cursor.execute(long_query)
430+
431+
# Process results
432+
results = cursor.fetchall()
433+
print(f"Analysis completed successfully: {len(results)} segments found")
434+
for row in results:
435+
print(f" {row[0]}: {row[1]} users, {row[2]:.1f} avg events")
436+
437+
except Exception as e:
438+
print(f"Query failed or was cancelled: {e}")
439+
finally:
440+
# Clean up timeout monitor
441+
try:
442+
timeout_future.result(timeout=1)
443+
except TimeoutError:
444+
pass # Monitor is still running, which is fine
445+
446+
# Run the example
447+
cancel_long_running_query()
448+
449+
Multiple callbacks
450+
~~~~~~~~~~~~~~~~~~~
451+
452+
When both connection-level and execute-level callbacks are specified,
453+
both callbacks will be invoked:
454+
455+
.. code:: python
456+
457+
from pyathena import connect
458+
459+
def connection_callback(query_id):
460+
print(f"Connection callback: {query_id}")
461+
# Log to monitoring system
462+
463+
def execute_callback(query_id):
464+
print(f"Execute callback: {query_id}")
465+
# Store for cancellation if needed
466+
467+
cursor = connect(
468+
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
469+
region_name="us-west-2",
470+
on_start_query_execution=connection_callback
471+
).cursor()
472+
473+
# This will invoke both connection_callback and execute_callback
474+
cursor.execute(
475+
"SELECT 1",
476+
on_start_query_execution=execute_callback
477+
)
478+
479+
Supported cursor types
480+
~~~~~~~~~~~~~~~~~~~~~~
481+
482+
The ``on_start_query_execution`` callback is supported by the following cursor types:
483+
484+
* ``Cursor`` (default cursor)
485+
* ``DictCursor``
486+
* ``ArrowCursor``
487+
* ``PandasCursor``
488+
489+
Note: ``AsyncCursor`` and its variants do not support this callback as they already
490+
return the query ID immediately through their different execution model.
491+
301492
PandasCursor
302493
------------
303494

0 commit comments

Comments
 (0)