|
| 1 | +(aio)= |
| 2 | + |
| 3 | +# Native Asyncio Cursors |
| 4 | + |
| 5 | +PyAthena provides native asyncio cursor implementations under `pyathena.aio`. |
| 6 | +These cursors use `asyncio.sleep` for polling and `asyncio.to_thread` for boto3 calls, |
| 7 | +keeping the event loop free without relying on thread pools for concurrency. |
| 8 | + |
| 9 | +## Why native asyncio? |
| 10 | + |
| 11 | +PyAthena has two families of async cursors: |
| 12 | + |
| 13 | +| | AsyncCursor | AioCursor | |
| 14 | +|---|---|---| |
| 15 | +| **Concurrency model** | `concurrent.futures.ThreadPoolExecutor` | Native `asyncio` (`await` / `async for`) | |
| 16 | +| **Event loop** | Blocks a thread per query | Non-blocking | |
| 17 | +| **Connection** | `connect()` (sync) | `aconnect()` (async) | |
| 18 | +| **execute()** returns | `(query_id, Future)` | Awaitable cursor (self) | |
| 19 | +| **Fetch methods** | Sync (via `Future.result()`) | `await cursor.fetchone()` for streaming cursors | |
| 20 | +| **Iteration** | `for row in result_set` | `async for row in cursor` | |
| 21 | +| **Context manager** | `with conn.cursor() as cursor` | `async with conn.cursor() as cursor` | |
| 22 | +| **Best for** | Adding concurrency to sync code | Async frameworks (FastAPI, aiohttp, etc.) | |
| 23 | + |
| 24 | +Choose `AioCursor` when your application already uses `asyncio` (e.g., web frameworks, |
| 25 | +async pipelines). Choose `AsyncCursor` when you want simple parallel query execution |
| 26 | +from synchronous code. |
| 27 | + |
| 28 | +(aio-connection)= |
| 29 | + |
| 30 | +## Connection |
| 31 | + |
| 32 | +Use the `aconnect()` function to create an async connection. |
| 33 | +It returns an `AioConnection` that produces `AioCursor` instances by default. |
| 34 | + |
| 35 | +```python |
| 36 | +from pyathena import aconnect |
| 37 | + |
| 38 | +conn = await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 39 | + region_name="us-west-2") |
| 40 | +``` |
| 41 | + |
| 42 | +The connection supports the async context manager protocol: |
| 43 | + |
| 44 | +```python |
| 45 | +from pyathena import aconnect |
| 46 | + |
| 47 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 48 | + region_name="us-west-2") as conn: |
| 49 | + cursor = conn.cursor() |
| 50 | + await cursor.execute("SELECT 1") |
| 51 | + print(await cursor.fetchone()) |
| 52 | +``` |
| 53 | + |
| 54 | +(aio-cursor)= |
| 55 | + |
| 56 | +## AioCursor |
| 57 | + |
| 58 | +AioCursor is a native asyncio cursor that uses `await` for query execution and result fetching. |
| 59 | +It follows the DB API 2.0 interface adapted for async usage. |
| 60 | + |
| 61 | +```python |
| 62 | +from pyathena import aconnect |
| 63 | +from pyathena.aio.cursor import AioCursor |
| 64 | + |
| 65 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 66 | + region_name="us-west-2") as conn: |
| 67 | + cursor = conn.cursor() |
| 68 | + await cursor.execute("SELECT * FROM many_rows") |
| 69 | + print(await cursor.fetchone()) |
| 70 | + print(await cursor.fetchmany(10)) |
| 71 | + print(await cursor.fetchall()) |
| 72 | +``` |
| 73 | + |
| 74 | +The cursor supports the `async with` context manager: |
| 75 | + |
| 76 | +```python |
| 77 | +from pyathena import aconnect |
| 78 | + |
| 79 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 80 | + region_name="us-west-2") as conn: |
| 81 | + async with conn.cursor() as cursor: |
| 82 | + await cursor.execute("SELECT * FROM many_rows") |
| 83 | + rows = await cursor.fetchall() |
| 84 | +``` |
| 85 | + |
| 86 | +You can iterate over results with `async for`: |
| 87 | + |
| 88 | +```python |
| 89 | +from pyathena import aconnect |
| 90 | + |
| 91 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 92 | + region_name="us-west-2") as conn: |
| 93 | + async with conn.cursor() as cursor: |
| 94 | + await cursor.execute("SELECT * FROM many_rows") |
| 95 | + async for row in cursor: |
| 96 | + print(row) |
| 97 | +``` |
| 98 | + |
| 99 | +Execution information of the query can also be retrieved: |
| 100 | + |
| 101 | +```python |
| 102 | +from pyathena import aconnect |
| 103 | + |
| 104 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 105 | + region_name="us-west-2") as conn: |
| 106 | + async with conn.cursor() as cursor: |
| 107 | + await cursor.execute("SELECT * FROM many_rows") |
| 108 | + print(cursor.state) |
| 109 | + print(cursor.state_change_reason) |
| 110 | + print(cursor.completion_date_time) |
| 111 | + print(cursor.submission_date_time) |
| 112 | + print(cursor.data_scanned_in_bytes) |
| 113 | + print(cursor.engine_execution_time_in_millis) |
| 114 | + print(cursor.query_queue_time_in_millis) |
| 115 | + print(cursor.total_execution_time_in_millis) |
| 116 | + print(cursor.query_planning_time_in_millis) |
| 117 | + print(cursor.service_processing_time_in_millis) |
| 118 | + print(cursor.output_location) |
| 119 | +``` |
| 120 | + |
| 121 | +To cancel a running query: |
| 122 | + |
| 123 | +```python |
| 124 | +from pyathena import aconnect |
| 125 | + |
| 126 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 127 | + region_name="us-west-2") as conn: |
| 128 | + async with conn.cursor() as cursor: |
| 129 | + await cursor.execute("SELECT * FROM many_rows") |
| 130 | + await cursor.cancel() |
| 131 | +``` |
| 132 | + |
| 133 | +(aio-dict-cursor)= |
| 134 | + |
| 135 | +## AioDictCursor |
| 136 | + |
| 137 | +AioDictCursor is an AioCursor that returns rows as dictionaries with column names as keys. |
| 138 | + |
| 139 | +```python |
| 140 | +from pyathena import aconnect |
| 141 | +from pyathena.aio.cursor import AioDictCursor |
| 142 | + |
| 143 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 144 | + region_name="us-west-2") as conn: |
| 145 | + cursor = conn.cursor(AioDictCursor) |
| 146 | + await cursor.execute("SELECT * FROM many_rows LIMIT 10") |
| 147 | + async for row in cursor: |
| 148 | + print(row["a"]) |
| 149 | +``` |
| 150 | + |
| 151 | +If you want to change the dictionary type (e.g., use OrderedDict): |
| 152 | + |
| 153 | +```python |
| 154 | +from collections import OrderedDict |
| 155 | +from pyathena import aconnect |
| 156 | +from pyathena.aio.cursor import AioDictCursor |
| 157 | + |
| 158 | +async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/", |
| 159 | + region_name="us-west-2") as conn: |
| 160 | + cursor = conn.cursor(AioDictCursor, dict_type=OrderedDict) |
| 161 | + await cursor.execute("SELECT * FROM many_rows LIMIT 10") |
| 162 | + async for row in cursor: |
| 163 | + print(row) |
| 164 | +``` |
| 165 | + |
| 166 | +## Specialized Aio Cursors |
| 167 | + |
| 168 | +Native asyncio versions are available for all cursor types: |
| 169 | + |
| 170 | +| Cursor | Module | Result format | |
| 171 | +|--------|--------|---------------| |
| 172 | +| {ref}`AioPandasCursor <aio-pandas-cursor>` | `pyathena.aio.pandas.cursor` | pandas DataFrame | |
| 173 | +| {ref}`AioArrowCursor <aio-arrow-cursor>` | `pyathena.aio.arrow.cursor` | pyarrow Table | |
| 174 | +| {ref}`AioPolarsCursor <aio-polars-cursor>` | `pyathena.aio.polars.cursor` | polars DataFrame | |
| 175 | +| {ref}`AioS3FSCursor <aio-s3fs-cursor>` | `pyathena.aio.s3fs.cursor` | Row tuples (lightweight) | |
| 176 | +| {ref}`AioSparkCursor <aio-spark-cursor>` | `pyathena.aio.spark.cursor` | PySpark execution | |
| 177 | + |
| 178 | +### Fetch behavior |
| 179 | + |
| 180 | +For **AioPandasCursor**, **AioArrowCursor**, and **AioPolarsCursor**, the S3 download |
| 181 | +(CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`. |
| 182 | +By the time `execute()` returns, all data is already loaded into memory. |
| 183 | +Therefore `fetchone()`, `fetchall()`, `as_pandas()`, `as_arrow()`, and `as_polars()` |
| 184 | +are synchronous (in-memory only) and do not need `await`: |
| 185 | + |
| 186 | +```python |
| 187 | +# Pandas, Arrow, Polars — S3 download completes during execute() |
| 188 | +await cursor.execute("SELECT * FROM many_rows") # Downloads data here |
| 189 | +row = cursor.fetchone() # No await — data already in memory |
| 190 | +rows = cursor.fetchall() # No await |
| 191 | +df = cursor.as_pandas() # No await |
| 192 | +``` |
| 193 | + |
| 194 | +The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3. |
| 195 | +Their fetch methods perform I/O and require `await`: |
| 196 | + |
| 197 | +```python |
| 198 | +# AioCursor, AioS3FSCursor — fetch reads from S3 lazily |
| 199 | +await cursor.execute("SELECT * FROM many_rows") |
| 200 | +row = await cursor.fetchone() # Await required — reads from S3 |
| 201 | +rows = await cursor.fetchall() # Await required |
| 202 | +``` |
| 203 | + |
| 204 | +```{note} |
| 205 | +When using AioPandasCursor or AioPolarsCursor with the `chunksize` option, |
| 206 | +`execute()` creates a lazy reader (e.g., pandas `TextFileReader`) instead of |
| 207 | +loading all data at once. Subsequent iteration via `as_pandas()`, `fetchone()`, |
| 208 | +or `async for` triggers chunk-by-chunk S3 reads that are **not** wrapped in |
| 209 | +`asyncio.to_thread()` and will block the event loop. If you need chunked |
| 210 | +processing in an async application, consider wrapping the iteration in |
| 211 | +`asyncio.to_thread()` yourself, or use the default non-chunked mode. |
| 212 | +``` |
| 213 | + |
| 214 | +See each cursor's documentation page for detailed usage examples. |
0 commit comments