Skip to content

Commit 777370b

Browse files
Add S3Executor abstraction for pluggable parallel execution
Introduce S3Executor ABC with two implementations: - S3ThreadPoolExecutor: wraps ThreadPoolExecutor (default for sync) - S3AioExecutor: dispatches work via asyncio event loop S3Executor supports the context manager protocol for safe resource cleanup. S3FileSystem methods now use `with` statements instead of manual try/finally for short-lived executors. Replace hardcoded ThreadPoolExecutor usage in S3File and S3FileSystem with the S3Executor interface. This enables async-aware parallel operations without thread-in-thread nesting. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 424d564 commit 777370b

File tree

2 files changed

+120
-22
lines changed

2 files changed

+120
-22
lines changed

pyathena/filesystem/s3.py

Lines changed: 31 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
# -*- coding: utf-8 -*-
22
from __future__ import annotations
33

4-
import itertools
54
import logging
65
import mimetypes
76
import os.path
87
import re
98
from concurrent.futures import Future, as_completed
10-
from concurrent.futures.thread import ThreadPoolExecutor
119
from copy import deepcopy
1210
from datetime import datetime
1311
from multiprocessing import cpu_count
@@ -23,6 +21,7 @@
2321
from fsspec.utils import tokenize
2422

2523
import pyathena
24+
from pyathena.filesystem.s3_executor import S3Executor, S3ThreadPoolExecutor
2625
from pyathena.filesystem.s3_object import (
2726
S3CompleteMultipartUpload,
2827
S3MultipartUpload,
@@ -686,6 +685,20 @@ def _delete_object(
686685
**request,
687686
)
688687

688+
def _create_executor(self, max_workers: int) -> S3Executor:
689+
"""Create an executor strategy for parallel operations.
690+
691+
Subclasses can override to provide alternative execution strategies
692+
(e.g., asyncio-based execution).
693+
694+
Args:
695+
max_workers: Maximum number of parallel workers.
696+
697+
Returns:
698+
An S3Executor instance.
699+
"""
700+
return S3ThreadPoolExecutor(max_workers=max_workers)
701+
689702
def _delete_objects(
690703
self, bucket: str, paths: List[str], max_workers: Optional[int] = None, **kwargs
691704
) -> None:
@@ -703,7 +716,7 @@ def _delete_objects(
703716
object_.update({"VersionId": version_id})
704717
delete_objects.append(object_)
705718

706-
with ThreadPoolExecutor(max_workers=max_workers) as executor:
719+
with self._create_executor(max_workers=max_workers) as executor:
707720
fs = []
708721
for delete in [
709722
delete_objects[i : i + self.DELETE_OBJECTS_MAX_KEYS]
@@ -861,7 +874,7 @@ def _copy_object_with_multipart_upload(
861874
**kwargs,
862875
)
863876
parts = []
864-
with ThreadPoolExecutor(max_workers=max_workers) as executor:
877+
with self._create_executor(max_workers=max_workers) as executor:
865878
fs = [
866879
executor.submit(
867880
self._upload_part_copy,
@@ -1106,6 +1119,7 @@ def _open(
11061119
mode,
11071120
version_id=None,
11081121
max_workers=max_workers,
1122+
executor=self._create_executor(max_workers=max_workers),
11091123
block_size=block_size,
11101124
cache_type=cache_type,
11111125
autocommit=autocommit,
@@ -1256,6 +1270,7 @@ def __init__(
12561270
mode: str = "rb",
12571271
version_id: Optional[str] = None,
12581272
max_workers: int = (cpu_count() or 1) * 5,
1273+
executor: Optional[S3Executor] = None,
12591274
block_size: int = S3FileSystem.DEFAULT_BLOCK_SIZE,
12601275
cache_type: str = "bytes",
12611276
autocommit: bool = True,
@@ -1265,7 +1280,7 @@ def __init__(
12651280
**kwargs,
12661281
) -> None:
12671282
self.max_workers = max_workers
1268-
self._executor = ThreadPoolExecutor(max_workers=max_workers)
1283+
self._executor: S3Executor = executor or S3ThreadPoolExecutor(max_workers=max_workers)
12691284
self.s3_additional_kwargs = s3_additional_kwargs if s3_additional_kwargs else {}
12701285

12711286
super().__init__(
@@ -1481,24 +1496,18 @@ def _fetch_range(self, start: int, end: int) -> bytes:
14811496
start, end, max_workers=self.max_workers, worker_block_size=self.blocksize
14821497
)
14831498
if len(ranges) > 1:
1484-
object_ = self._merge_objects(
1485-
list(
1486-
self._executor.map(
1487-
lambda bucket, key, ranges, version_id, kwargs: self.fs._get_object(
1488-
bucket=bucket,
1489-
key=key,
1490-
ranges=ranges,
1491-
version_id=version_id,
1492-
**kwargs,
1493-
),
1494-
itertools.repeat(self.bucket),
1495-
itertools.repeat(self.key),
1496-
ranges,
1497-
itertools.repeat(self.version_id),
1498-
itertools.repeat(self.s3_additional_kwargs),
1499-
)
1499+
futures = [
1500+
self._executor.submit(
1501+
self.fs._get_object,
1502+
bucket=self.bucket,
1503+
key=self.key,
1504+
ranges=r,
1505+
version_id=self.version_id,
1506+
**self.s3_additional_kwargs,
15001507
)
1501-
)
1508+
for r in ranges
1509+
]
1510+
object_ = self._merge_objects([f.result() for f in as_completed(futures)])
15021511
else:
15031512
object_ = self.fs._get_object(
15041513
self.bucket,

pyathena/filesystem/s3_executor.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
4+
import asyncio
5+
from abc import ABCMeta, abstractmethod
6+
from concurrent.futures import Future
7+
from concurrent.futures.thread import ThreadPoolExecutor
8+
from typing import Any, Callable, Optional, TypeVar
9+
10+
T = TypeVar("T")
11+
12+
13+
class S3Executor(metaclass=ABCMeta):
14+
"""Abstract executor for parallel S3 operations.
15+
16+
Defines the interface used by ``S3File`` and ``S3FileSystem`` for submitting
17+
work to run in parallel and for shutting down the executor when done.
18+
Both ``submit`` and ``shutdown`` mirror the ``concurrent.futures.Executor``
19+
interface so that ``as_completed()`` and ``Future.cancel()`` work unchanged.
20+
"""
21+
22+
@abstractmethod
23+
def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
24+
"""Submit a callable for execution and return a Future."""
25+
...
26+
27+
@abstractmethod
28+
def shutdown(self, wait: bool = True) -> None:
29+
"""Shut down the executor, freeing any resources."""
30+
...
31+
32+
def __enter__(self) -> "S3Executor":
33+
return self
34+
35+
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
36+
self.shutdown(wait=True)
37+
38+
39+
class S3ThreadPoolExecutor(S3Executor):
40+
"""Executor that delegates to a ``ThreadPoolExecutor``.
41+
42+
This is the default executor used by ``S3File`` and ``S3FileSystem``
43+
for synchronous parallel operations.
44+
"""
45+
46+
def __init__(self, max_workers: int) -> None:
47+
self._executor = ThreadPoolExecutor(max_workers=max_workers)
48+
49+
def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
50+
return self._executor.submit(fn, *args, **kwargs)
51+
52+
def shutdown(self, wait: bool = True) -> None:
53+
self._executor.shutdown(wait=wait)
54+
55+
56+
class S3AioExecutor(S3Executor):
57+
"""Executor that schedules work on an asyncio event loop.
58+
59+
Uses ``asyncio.run_coroutine_threadsafe(asyncio.to_thread(fn), loop)`` to
60+
dispatch blocking functions onto the event loop's thread pool, returning
61+
``concurrent.futures.Future`` objects that are compatible with
62+
``as_completed()`` and ``Future.cancel()``.
63+
64+
This avoids thread-in-thread nesting when ``S3File`` is used from within
65+
``asyncio.to_thread()`` calls (the pattern used by ``AioS3FileSystem``).
66+
67+
Args:
68+
loop: A running asyncio event loop.
69+
70+
Raises:
71+
RuntimeError: If the event loop is not running when ``submit`` is called.
72+
"""
73+
74+
def __init__(self, loop: Optional[asyncio.AbstractEventLoop] = None) -> None:
75+
self._loop = loop
76+
77+
def submit(self, fn: Callable[..., T], *args: Any, **kwargs: Any) -> Future[T]:
78+
if self._loop is not None and self._loop.is_running():
79+
return asyncio.run_coroutine_threadsafe(
80+
asyncio.to_thread(fn, *args, **kwargs), self._loop
81+
)
82+
raise RuntimeError(
83+
"S3AioExecutor requires a running event loop. "
84+
"Use S3ThreadPoolExecutor for synchronous usage."
85+
)
86+
87+
def shutdown(self, wait: bool = True) -> None:
88+
# No resources to release — work is dispatched to the event loop.
89+
pass

0 commit comments

Comments
 (0)