Skip to content

Commit 7a05d4f

Browse files
Merge pull request #598 from laughingman7743/feature/pandas-memory-optimization
feat: enhance PandasCursor with memory optimization and chunked processing
2 parents 35531a7 + 657c8e5 commit 7a05d4f

4 files changed

Lines changed: 474 additions & 38 deletions

File tree

docs/pandas.rst

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -397,6 +397,27 @@ This object has exactly the same interface as the ``TextFileReader`` object and
397397
print(df.describe())
398398
print(df.head())
399399
400+
**Memory-efficient iteration with iter_chunks()**
401+
402+
PandasCursor provides an ``iter_chunks()`` method for convenient chunked processing:
403+
404+
.. code:: python
405+
406+
from pyathena import connect
407+
from pyathena.pandas.cursor import PandasCursor
408+
409+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
410+
region_name="us-west-2",
411+
cursor_class=PandasCursor).cursor()
412+
413+
# Process large dataset in chunks
414+
cursor.execute("SELECT * FROM large_table", chunksize=50_000)
415+
for chunk in cursor.iter_chunks():
416+
# Process each chunk
417+
processed = chunk.groupby('category').sum()
418+
# Memory can be freed after each chunk
419+
del chunk
420+
400421
You can also concatenate them into a single `pandas.DataFrame object`_ using `pandas.concat`_.
401422

402423
.. code:: python
@@ -427,6 +448,101 @@ When all rows have been read, calling the ``get_chunk`` method will raise ``Stop
427448
df_iter.get_chunk(10)
428449
df_iter.get_chunk(10) # raise StopIteration
429450
451+
**Auto-optimization of chunksize**
452+
453+
PandasCursor can automatically determine optimal chunksize based on result file size when enabled:
454+
455+
.. code:: python
456+
457+
from pyathena import connect
458+
from pyathena.pandas.cursor import PandasCursor
459+
460+
# Enable auto-optimization (chunksize will be determined automatically for large files)
461+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
462+
region_name="us-west-2",
463+
cursor_class=PandasCursor).cursor(auto_optimize_chunksize=True)
464+
465+
# For large files, chunksize will be automatically set based on file size
466+
cursor.execute("SELECT * FROM very_large_table")
467+
for chunk in cursor.iter_chunks():
468+
process_chunk(chunk)
469+
470+
**Priority of chunksize settings:**
471+
472+
1. **Explicit chunksize** (highest priority): Always respected
473+
2. **auto_optimize_chunksize=True**: Automatic determination for large files
474+
3. **auto_optimize_chunksize=False** (default): No chunking, load entire DataFrame
475+
476+
.. code:: python
477+
478+
# Explicit chunksize always takes precedence
479+
cursor = connection.cursor(PandasCursor, chunksize=50_000, auto_optimize_chunksize=True)
480+
# Will use chunksize=50_000, auto-optimization is ignored
481+
482+
# Auto-optimization only when chunksize is not specified
483+
cursor = connection.cursor(PandasCursor, auto_optimize_chunksize=True)
484+
# Will determine chunksize automatically for large files
485+
486+
# Default behavior - no chunking
487+
cursor = connection.cursor(PandasCursor)
488+
# Will load entire DataFrame regardless of file size
489+
490+
You can customize the automatic chunksize determination by modifying class attributes:
491+
492+
.. code:: python
493+
494+
from pyathena.pandas.result_set import AthenaPandasResultSet
495+
496+
# Customize thresholds and chunk sizes for your use case
497+
AthenaPandasResultSet.LARGE_FILE_THRESHOLD_BYTES = 100 * 1024 * 1024 # 100MB
498+
AthenaPandasResultSet.AUTO_CHUNK_SIZE_LARGE = 200_000 # Larger chunks
499+
AthenaPandasResultSet.AUTO_CHUNK_SIZE_MEDIUM = 100_000
500+
501+
**Performance tuning options**
502+
503+
PandasCursor accepts additional pandas.read_csv() options for performance optimization:
504+
505+
.. code:: python
506+
507+
from pyathena import connect
508+
from pyathena.pandas.cursor import PandasCursor
509+
510+
cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
511+
region_name="us-west-2",
512+
cursor_class=PandasCursor).cursor()
513+
514+
# High-performance reading with PyArrow engine
515+
cursor.execute("SELECT * FROM large_table",
516+
engine="pyarrow",
517+
chunksize=100_000,
518+
use_threads=True)
519+
520+
# Memory-conscious reading with Python engine
521+
cursor.execute("SELECT * FROM huge_table",
522+
engine="python",
523+
chunksize=25_000,
524+
low_memory=True)
525+
526+
# Fine-tuned C engine with custom buffer
527+
cursor.execute("SELECT * FROM data_table",
528+
engine="c",
529+
chunksize=50_000,
530+
buffer_lines=100_000)
531+
532+
# Custom data types for better performance
533+
cursor.execute("SELECT * FROM typed_table",
534+
dtype={'col1': 'int64', 'col2': 'float32'},
535+
parse_dates=['timestamp_col'])
536+
537+
Common performance options:
538+
539+
- ``engine``: CSV parsing engine ('c', 'python', 'pyarrow')
540+
- ``use_threads``: Enable threading for PyArrow engine
541+
- ``low_memory``: Use low memory mode for Python engine
542+
- ``buffer_lines``: Buffer size for C engine
543+
- ``dtype``: Explicit column data types
544+
- ``parse_dates``: Columns to parse as dates
545+
430546
Unload options
431547
~~~~~~~~~~~~~~
432548

pyathena/pandas/cursor.py

Lines changed: 98 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
Any,
99
Callable,
1010
Dict,
11+
Generator,
1112
Iterable,
1213
List,
1314
Optional,
@@ -24,7 +25,7 @@
2425
DefaultPandasTypeConverter,
2526
DefaultPandasUnloadTypeConverter,
2627
)
27-
from pyathena.pandas.result_set import AthenaPandasResultSet
28+
from pyathena.pandas.result_set import AthenaPandasResultSet, DataFrameIterator
2829
from pyathena.result_set import WithResultSet
2930

3031
if TYPE_CHECKING:
@@ -34,6 +35,12 @@
3435

3536

3637
class PandasCursor(BaseCursor, CursorIterator, WithResultSet):
38+
"""Cursor for handling pandas DataFrame results from Athena queries.
39+
40+
This cursor provides memory-efficient DataFrame processing with chunking support
41+
and automatic chunksize optimization for large result sets.
42+
"""
43+
3744
def __init__(
3845
self,
3946
s3_staging_dir: Optional[str] = None,
@@ -52,9 +59,36 @@ def __init__(
5259
max_workers: int = (cpu_count() or 1) * 5,
5360
result_reuse_enable: bool = False,
5461
result_reuse_minutes: int = CursorIterator.DEFAULT_RESULT_REUSE_MINUTES,
62+
auto_optimize_chunksize: bool = False,
5563
on_start_query_execution: Optional[Callable[[str], None]] = None,
5664
**kwargs,
5765
) -> None:
66+
"""Initialize PandasCursor with configuration options.
67+
68+
Args:
69+
s3_staging_dir: S3 directory for query result staging.
70+
schema_name: Default schema name for queries.
71+
catalog_name: Default catalog name for queries.
72+
work_group: Athena workgroup name.
73+
poll_interval: Query polling interval in seconds.
74+
encryption_option: S3 encryption option.
75+
kms_key: KMS key for encryption.
76+
kill_on_interrupt: Cancel query on interrupt signal.
77+
unload: Use UNLOAD statement for faster result retrieval.
78+
engine: CSV parsing engine ('auto', 'c', 'python', 'pyarrow').
79+
chunksize: Number of rows per chunk for memory-efficient processing.
80+
If specified, takes precedence over auto_optimize_chunksize.
81+
block_size: S3 read block size.
82+
cache_type: S3 caching strategy.
83+
max_workers: Maximum worker threads for parallel processing.
84+
result_reuse_enable: Enable query result reuse.
85+
result_reuse_minutes: Result reuse duration in minutes.
86+
auto_optimize_chunksize: Enable automatic chunksize determination for
87+
large files. Only effective when chunksize is None.
88+
Default: False (no automatic chunking).
89+
on_start_query_execution: Callback for query start events.
90+
**kwargs: Additional arguments passed to pandas.read_csv.
91+
"""
5892
super().__init__(
5993
s3_staging_dir=s3_staging_dir,
6094
schema_name=schema_name,
@@ -74,6 +108,7 @@ def __init__(
74108
self._block_size = block_size
75109
self._cache_type = cache_type
76110
self._max_workers = max_workers
111+
self._auto_optimize_chunksize = auto_optimize_chunksize
77112
self._on_start_query_execution = on_start_query_execution
78113
self._query_id: Optional[str] = None
79114
self._result_set: Optional[AthenaPandasResultSet] = None
@@ -185,10 +220,12 @@ def execute(
185220
block_size=kwargs.pop("block_size", self._block_size),
186221
cache_type=kwargs.pop("cache_type", self._cache_type),
187222
max_workers=kwargs.pop("max_workers", self._max_workers),
223+
auto_optimize_chunksize=self._auto_optimize_chunksize,
188224
**kwargs,
189225
)
190226
else:
191227
raise OperationalError(query_execution.state_change_reason)
228+
192229
return self
193230

194231
def executemany(
@@ -231,8 +268,67 @@ def fetchall(
231268
result_set = cast(AthenaPandasResultSet, self.result_set)
232269
return result_set.fetchall()
233270

234-
def as_pandas(self) -> "DataFrame":
271+
def as_pandas(self) -> Union["DataFrame", DataFrameIterator]:
272+
"""Return DataFrame or DataFrameIterator based on chunksize setting.
273+
274+
Returns:
275+
DataFrame when chunksize is None, DataFrameIterator when chunksize is set.
276+
"""
235277
if not self.has_result_set:
236278
raise ProgrammingError("No result set.")
237279
result_set = cast(AthenaPandasResultSet, self.result_set)
238280
return result_set.as_pandas()
281+
282+
def iter_chunks(self) -> Generator["DataFrame", None, None]:
283+
"""Iterate over DataFrame chunks for memory-efficient processing.
284+
285+
This method provides an iterator interface for processing large result sets
286+
in chunks, preventing memory exhaustion when working with datasets that are
287+
too large to fit in memory as a single DataFrame.
288+
289+
Chunking behavior:
290+
- If chunksize is explicitly set, uses that value
291+
- If auto_optimize_chunksize=True and chunksize=None, automatically determines
292+
optimal chunksize based on file size
293+
- If auto_optimize_chunksize=False and chunksize=None, yields entire DataFrame
294+
295+
Yields:
296+
DataFrame: Individual chunks of the result set when chunking is enabled,
297+
or the entire DataFrame as a single chunk when chunking is disabled.
298+
299+
Examples:
300+
# Explicit chunksize
301+
cursor = connection.cursor(PandasCursor, chunksize=50000)
302+
cursor.execute("SELECT * FROM large_table")
303+
for chunk in cursor.iter_chunks():
304+
process_chunk(chunk)
305+
306+
# Auto-optimization enabled
307+
cursor = connection.cursor(PandasCursor, auto_optimize_chunksize=True)
308+
cursor.execute("SELECT * FROM large_table")
309+
for chunk in cursor.iter_chunks():
310+
process_chunk(chunk) # Chunks determined automatically for large files
311+
312+
# No chunking (default behavior)
313+
cursor = connection.cursor(PandasCursor)
314+
cursor.execute("SELECT * FROM large_table")
315+
for chunk in cursor.iter_chunks():
316+
process_chunk(chunk) # Single DataFrame regardless of size
317+
"""
318+
if not self.has_result_set:
319+
raise ProgrammingError("No result set.")
320+
321+
result = self.as_pandas()
322+
if isinstance(result, DataFrameIterator):
323+
# It's an iterator (chunked mode)
324+
import gc
325+
326+
for chunk_count, chunk in enumerate(result, 1):
327+
yield chunk
328+
329+
# Suggest garbage collection every 10 chunks for large datasets
330+
if chunk_count % 10 == 0:
331+
gc.collect()
332+
else:
333+
# Single DataFrame - yield as one chunk
334+
yield result

0 commit comments

Comments
 (0)