(usage)=
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row")
print(cursor.description)
print(cursor.fetchall())When using a workgroup with managed query result storage enabled, you don't need to specify an S3 staging directory.
from pyathena import connect
cursor = connect(work_group="YOUR_MANAGED_WORK_GROUP",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row")
print(cursor.fetchall())If the AWS_ATHENA_S3_STAGING_DIR environment variable is set, pass s3_staging_dir=""
to explicitly disable the fallback. Otherwise the API will reject the request because
ResultConfiguration and ManagedQueryResultsConfiguration cannot be set together.
cursor = connect(work_group="YOUR_MANAGED_WORK_GROUP",
s3_staging_dir="",
region_name="us-west-2").cursor()With managed query result storage, query results are retrieved via the `GetQueryResults` API
(1000 rows per request) instead of reading S3 files directly. This may be slower for large
result sets. For large datasets, consider using customer-managed storage or the `UNLOAD` statement.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM many_rows LIMIT 10")
for row in cursor:
print(row)Supported DB API paramstyle is only PyFormat.
PyFormat only supports named placeholders with old % operator style and parameters specify dictionary format.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s
""",
{"param": "a string"})
print(cursor.fetchall())if % character is contained in your query, it must be escaped with %% like the following:
SELECT col_string FROM one_row_complex
WHERE col_string = %(param)s OR col_string LIKE 'a%%'If you want to use Athena's parameterized queries, you can do so by changing the paramstyle to qmark as follows.
from pyathena import connect
pyathena.paramstyle = "qmark"
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = ?
""",
["'a string'"])
print(cursor.fetchall())You can also specify the paramstyle using the execute method when executing a query.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("""
SELECT col_string FROM one_row_complex
WHERE col_string = ?
""",
["'a string'"],
paramstyle="qmark")
print(cursor.fetchall())You can find more information about the considerations and limitations of parameterized queries in the official documentation.
Athena engine version 3 allows you to reuse the results of previous queries.
It is available by specifying the arguments result_reuse_enable and result_reuse_minutes in the connection object.
from pyathena import connect
conn = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
work_group="YOUR_WORK_GROUP",
result_reuse_enable=True,
result_reuse_minutes=60)You can also specify result_reuse_enable and result_reuse_minutes when executing a query.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row",
work_group="YOUR_WORK_GROUP",
result_reuse_enable=True,
result_reuse_minutes=60)If the following error occurs, please use a workgroup configured with Athena engine version 3.
pyathena.error.DatabaseError: An error occurred (InvalidRequestException) when calling the StartQueryExecution operation: This functionality is not enabled in the selected engine version. Please check the engine version settings or contact AWS support for further assistance.
If for some reason you cannot use the reuse feature of Athena engine version 3, please use the {ref}Cache configuration <cache-configuration> implemented by PyAthena.
:::{note} Semantic-equivalence matching (November 2025)
As of November 2025, Athena's result reuse no longer requires byte-for-byte identical query text. Result reuse is now triggered when a query is semantically equivalent to a previously executed query — cosmetic differences such as whitespace, comments, or keyword casing no longer prevent a cache hit.
This behavior change happens server-side; PyAthena does not need to be updated, and existing code using result_reuse_enable continues to work unchanged. Users who relied on exact-string matching for cache invalidation should be aware that more queries may now reuse prior results than before.
See the Athena documentation for the latest rules on what counts as semantically equivalent. :::
(cache-configuration)=
Please use the Result reuse configuration.
You can attempt to re-use the results from a previously executed query to help save time and money in the cases where your underlying data isn't changing.
Set the cache_size or cache_expiration_time parameter of cursor.execute() to a number larger than 0 to enable caching.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row") # run once
print(cursor.query_id)
cursor.execute("SELECT * FROM one_row", cache_size=10) # re-use earlier results
print(cursor.query_id) # You should expect to see the same Query IDThe unit of expiration_time is seconds. To use the results of queries executed up to one hour ago, specify like the following.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row", cache_expiration_time=3600) # Use queries executed within 1 hour as cache.If cache_size is not specified, the value of sys.maxsize will be automatically set and all query results executed up to one hour ago will be checked.
Therefore, it is recommended to specify cache_expiration_time together with cache_size like the following.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("SELECT * FROM one_row", cache_size=100, cache_expiration_time=3600) # Use the last 100 queries within 1 hour as cache.Results will only be re-used if the query strings match exactly,
and the query was a DML statement (the assumption being that you always want to re-run queries like CREATE TABLE and DROP TABLE).
The S3 staging directory is not checked, so it's possible that the location of the results is not in your provided s3_staging_dir.
Athena's federated query feature lets you query data in external sources (PostgreSQL, MySQL, Snowflake, etc.) through Lambda-based connectors. By default Athena's planner translates your SQL into the source dialect, but for complex queries — or when you need a feature only the source supports — you can use passthrough queries to push native SQL directly to the source.
Passthrough queries use the TABLE(<connector>.system.query(query => '...')) syntax. Because PyAthena passes the SQL string through to Athena unchanged, no client-side changes are required — these queries work out of the box.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute("""
SELECT * FROM TABLE(
my_postgres_connector.system.query(
query => 'SELECT id, name FROM public.users WHERE created_at > NOW() - INTERVAL ''7 days'''
)
)
""")
print(cursor.fetchall())The connector (my_postgres_connector above) must be registered in Athena as a data source connector first; see the Athena documentation for the connector setup steps and the list of connectors that support passthrough.
Result columns from a passthrough query come back with the source system's types, which Athena maps to its own type system. When using `PandasCursor` or `ArrowCursor`, verify that the resulting dtypes match your expectations — connector-specific types (e.g., PostgreSQL `json`, MySQL `geometry`) may be returned as strings.
(query-execution-callback)=
PyAthena provides a callback mechanism that allows you to get immediate access to the query ID
as soon as the start_query_execution API call is made, before waiting for query completion.
This is useful for monitoring, logging, or cancelling long-running queries from another thread.
The on_start_query_execution callback can be configured at both the connection level and
the execute level. When both are set, both callbacks will be invoked.
You can set a default callback for all queries executed through a connection:
from pyathena import connect
def query_callback(query_id):
print(f"Query started with ID: {query_id}")
# You can use query_id for monitoring or cancellation
cursor = connect(
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
on_start_query_execution=query_callback
).cursor()
cursor.execute("SELECT * FROM many_rows") # Callback will be invokedYou can also specify a callback for individual query executions:
from pyathena import connect
def specific_callback(query_id):
print(f"Specific query started: {query_id}")
cursor = connect(
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2"
).cursor()
cursor.execute(
"SELECT * FROM many_rows",
on_start_query_execution=specific_callback
)A common use case is to cancel long-running analytical queries after a timeout:
import time
from concurrent.futures import ThreadPoolExecutor, TimeoutError
from pyathena import connect
def cancel_long_running_query():
"""Example: Cancel a complex analytical query after 10 minutes."""
def track_query_start(query_id):
print(f"Long-running analysis started: {query_id}")
return query_id
def monitor_and_cancel(cursor, timeout_minutes):
"""Monitor query and cancel if it exceeds timeout."""
time.sleep(timeout_minutes * 60) # Convert to seconds
try:
cursor.cancel()
print(f"Query cancelled after {timeout_minutes} minutes timeout")
except Exception as e:
print(f"Cancellation failed: {e}")
cursor = connect(
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
on_start_query_execution=track_query_start
).cursor()
# Complex analytical query that might run for a long time
long_query = """
WITH daily_metrics AS (
SELECT
date_trunc('day', timestamp_col) as day,
user_id,
COUNT(*) as events,
AVG(duration) as avg_duration
FROM large_events_table
WHERE timestamp_col >= current_date - interval '1' year
GROUP BY 1, 2
),
user_segments AS (
SELECT
user_id,
CASE
WHEN AVG(events) > 100 THEN 'high_activity'
WHEN AVG(events) > 10 THEN 'medium_activity'
ELSE 'low_activity'
END as segment
FROM daily_metrics
GROUP BY user_id
)
SELECT
segment,
COUNT(DISTINCT user_id) as users,
AVG(events) as avg_daily_events
FROM daily_metrics dm
JOIN user_segments us ON dm.user_id = us.user_id
GROUP BY segment
ORDER BY avg_daily_events DESC
"""
# Use ThreadPoolExecutor for timeout management
with ThreadPoolExecutor(max_workers=1) as executor:
# Start timeout monitor (cancel after 10 minutes)
timeout_future = executor.submit(monitor_and_cancel, cursor, 10)
try:
print("Starting complex analytical query (10-minute timeout)...")
cursor.execute(long_query)
# Process results
results = cursor.fetchall()
print(f"Analysis completed successfully: {len(results)} segments found")
for row in results:
print(f" {row[0]}: {row[1]} users, {row[2]:.1f} avg events")
except Exception as e:
print(f"Query failed or was cancelled: {e}")
finally:
# Clean up timeout monitor
try:
timeout_future.result(timeout=1)
except TimeoutError:
pass # Monitor is still running, which is fine
# Run the example
cancel_long_running_query()When both connection-level and execute-level callbacks are specified, both callbacks will be invoked:
from pyathena import connect
def connection_callback(query_id):
print(f"Connection callback: {query_id}")
# Log to monitoring system
def execute_callback(query_id):
print(f"Execute callback: {query_id}")
# Store for cancellation if needed
cursor = connect(
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2",
on_start_query_execution=connection_callback
).cursor()
# This will invoke both connection_callback and execute_callback
cursor.execute(
"SELECT 1",
on_start_query_execution=execute_callback
)The on_start_query_execution callback is supported by the following cursor types:
Cursor(default cursor)DictCursorArrowCursorPandasCursorPolarsCursorS3FSCursor
Note: AsyncCursor and its variants do not support this callback as they already
return the query ID immediately through their different execution model.
New in version 3.30.0.
The Athena API does not return element-level type information for complex types (array, map, row/struct). PyAthena parses the string representation returned by Athena, but without type metadata the converter can only apply heuristics — which may produce incorrect Python types for nested values (e.g. integers left as strings inside a struct).
The result_set_type_hints parameter solves this by letting you provide Athena DDL
type signatures for specific columns. The converter then uses precise, recursive
type-aware conversion instead of heuristics.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()
cursor.execute(
"SELECT col_array, col_map, col_struct FROM one_row_complex",
result_set_type_hints={
"col_array": "array(integer)",
"col_map": "map(integer, integer)",
"col_struct": "row(a integer, b integer)",
},
)
row = cursor.fetchone()
# col_struct values are now integers, not strings:
# {"a": 1, "b": 2} instead of {"a": "1", "b": "2"}Column name matching is case-insensitive. Type hints support arbitrarily nested types:
cursor.execute(
"""
SELECT CAST(
ROW(ROW('2024-01-01', 123), 4.736, 0.583)
AS ROW(header ROW(stamp VARCHAR, seq INTEGER), x DOUBLE, y DOUBLE)
) AS positions
""",
result_set_type_hints={
"positions": "row(header row(stamp varchar, seq integer), x double, y double)",
},
)
row = cursor.fetchone()
positions = row[0]
# positions["header"]["seq"] == 123 (int, not "123")
# positions["x"] == 4.736 (float, not "4.736")You can paste type signatures from Hive DDL or DESCRIBE TABLE output directly.
Hive-style angle brackets and colons are automatically converted to Trino-style syntax:
# Both are equivalent:
result_set_type_hints={"col": "array(struct(a integer, b varchar))"} # Trino
result_set_type_hints={"col": "array<struct<a:int,b:varchar>>"} # HiveThe int alias is also supported and resolves to integer.
When a query produces columns with the same alias (e.g. SELECT a AS x, b AS x),
name-based hints cannot distinguish between them. Use integer keys to specify hints
by zero-based column position:
cursor.execute(
"SELECT a AS x, b AS x FROM my_table",
result_set_type_hints={
0: "array(integer)", # first "x" column
1: "map(varchar, integer)", # second "x" column
},
)Integer (index-based) hints take priority over string (name-based) hints for the same column. You can mix both styles in the same dictionary.
- Nested arrays in native format — Athena's native (non-JSON) string representation
does not clearly delimit nested arrays. If your query returns nested arrays
(e.g.
array(array(integer))), useCAST(... AS JSON)in your query to get JSON-formatted output, which is parsed reliably. - Arrow, Pandas, and Polars cursors — These cursors accept
result_set_type_hintsbut their converters do not currently use the hints because they rely on their own type systems. The parameter is passed through for forward compatibility and for result sets that fall back to the default conversion path.
Prior to 3.30.0, PyAthena attempted to infer Python types for scalar values inside
complex types using heuristics (e.g. "123" → 123). Starting with 3.30.0, values
inside complex types are kept as strings unless result_set_type_hints is provided.
This change avoids silent misconversion but means existing code that relied on the
heuristic behavior may see string values where it previously saw integers or floats.
To restore typed conversion, pass result_set_type_hints with the appropriate type
signatures for the affected columns.
Support Boto3 environment variables.
AWS_ATHENA_S3_STAGING_DIR
: The S3 location where Athena automatically stores the query results and metadata information. Required if you have not set up workgroups. Not required if a workgroup has been set up. When connecting to a workgroup with managed query result storage, pass s3_staging_dir="" to explicitly disable this environment variable fallback (see Managed query result storage).
AWS_ATHENA_WORK_GROUP : The setting of the workgroup to execute the query.
Support Boto3 credentials.
from pyathena import connect
cursor = connect(aws_access_key_id="YOUR_ACCESS_KEY_ID",
aws_secret_access_key="YOUR_SECRET_ACCESS_KEY",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()from pyathena import connect
cursor = connect(aws_access_key_id="YOUR_ACCESS_KEY_ID",
aws_secret_access_key="YOUR_SECRET_ACCESS_KEY",
aws_session_token="YOUR_SESSION_TOKEN",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()You will be prompted to enter the MFA code. The program execution will be blocked until the MFA code is entered.
from pyathena import connect
cursor = connect(duration_seconds=3600,
serial_number="arn:aws:iam::ACCOUNT_NUMBER_WITHOUT_HYPHENS:mfa/MFA_DEVICE_ID",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()The shared credentials file has a default location of ~/.aws/credentials.
If you use the default profile, there is no need to specify credential information.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()You can also specify a profile other than the default.
from pyathena import connect
cursor = connect(profile_name="YOUR_PROFILE_NAME",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()from pyathena import connect
cursor = connect(role_arn="YOUR_ASSUME_ROLE_ARN",
role_session_name="PyAthena-session",
duration_seconds=3600,
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()You will be prompted to enter the MFA code. The program execution will be blocked until the MFA code is entered.
from pyathena import connect
cursor = connect(role_arn="YOUR_ASSUME_ROLE_ARN",
role_session_name="PyAthena-session",
duration_seconds=3600,
serial_number="arn:aws:iam::ACCOUNT_NUMBER_WITHOUT_HYPHENS:mfa/MFA_DEVICE_ID",
s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()No need to specify credential information.
from pyathena import connect
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
region_name="us-west-2").cursor()Amazon Athena supports JWT-based Trusted Identity Propagation (TIP) for the official JDBC and ODBC drivers, allowing enterprise SSO identities (Okta, Entra ID, etc.) to be propagated to Athena and Lake Formation for fine-grained access control.
PyAthena does not support JWT TIP, because this auth flow is not exposed through the AWS SDK (boto3 / botocore). PyAthena builds its Athena client via boto3 and therefore relies on standard IAM-based credentials.
If your environment requires JWT TIP, the options are:
- Use the Athena JDBC driver or ODBC driver directly.
- Use IAM Identity Center with role-based access (assume-role flow) — see the Assume role provider examples above. This is not byte-equivalent to TIP but satisfies most SSO-driven access-control requirements.
This is a limitation of the AWS SDK, not of PyAthena. If boto3/botocore adds JWT TIP support in the future, PyAthena will expose it via Connection.