Skip to content

Commit bec7b44

Browse files
Add AioS3FileSystem for native asyncio S3 operations
Implement AioS3FileSystem as an fsspec AsyncFileSystem that composes a sync S3FileSystem internally and dispatches operations via asyncio.to_thread and asyncio.gather for parallelism. AioS3File is a minimal S3File subclass — all async behavior is provided by the injected S3AioExecutor, eliminating the need for method overrides. Wire AioS3FSCursor to use AioS3FileSystem and add pluggable filesystem_class parameter to AthenaS3FSResultSet. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 777370b commit bec7b44

File tree

3 files changed

+370
-6
lines changed

3 files changed

+370
-6
lines changed

pyathena/aio/s3fs/cursor.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from pyathena.aio.common import WithAsyncFetch
99
from pyathena.common import CursorIterator
1010
from pyathena.error import OperationalError, ProgrammingError
11+
from pyathena.filesystem.s3_async import AioS3FileSystem
1112
from pyathena.model import AthenaQueryExecution
1213
from pyathena.s3fs.converter import DefaultS3FSTypeConverter
1314
from pyathena.s3fs.result_set import AthenaS3FSResultSet, CSVReaderType
@@ -16,11 +17,12 @@
1617

1718

1819
class AioS3FSCursor(WithAsyncFetch):
19-
"""Native asyncio cursor that reads CSV results via S3FileSystem.
20+
"""Native asyncio cursor that reads CSV results via AioS3FileSystem.
2021
21-
Uses ``asyncio.to_thread()`` for result set creation and fetch operations
22-
because ``AthenaS3FSResultSet`` lazily streams rows from S3 via a CSV
23-
reader, making fetch calls blocking I/O.
22+
Uses ``AioS3FileSystem`` for S3 operations, which replaces
23+
``ThreadPoolExecutor`` parallelism with ``asyncio.gather`` +
24+
``asyncio.to_thread``. Fetch operations are wrapped in
25+
``asyncio.to_thread()`` because CSV reading is blocking I/O.
2426
2527
Example:
2628
>>> async with await pyathena.aio_connect(...) as conn:
@@ -127,6 +129,7 @@ async def execute( # type: ignore[override]
127129
arraysize=self.arraysize,
128130
retry_config=self._retry_config,
129131
csv_reader=self._csv_reader,
132+
filesystem_class=AioS3FileSystem,
130133
**kwargs,
131134
)
132135
else:

pyathena/filesystem/s3_async.py

Lines changed: 357 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,357 @@
1+
# -*- coding: utf-8 -*-
2+
from __future__ import annotations
3+
4+
import asyncio
5+
import logging
6+
from multiprocessing import cpu_count
7+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
8+
9+
from fsspec.asyn import AsyncFileSystem
10+
from fsspec.callbacks import _DEFAULT_CALLBACK
11+
12+
from pyathena.filesystem.s3 import S3File, S3FileSystem
13+
from pyathena.filesystem.s3_executor import S3AioExecutor
14+
from pyathena.filesystem.s3_object import S3Object
15+
16+
if TYPE_CHECKING:
17+
from datetime import datetime
18+
19+
from pyathena.connection import Connection
20+
21+
_logger = logging.getLogger(__name__)
22+
23+
24+
class AioS3FileSystem(AsyncFileSystem):
25+
"""An async filesystem interface for Amazon S3 using fsspec's AsyncFileSystem.
26+
27+
This class wraps ``S3FileSystem`` to provide native asyncio support. Instead of
28+
using ``ThreadPoolExecutor`` for parallel operations, it uses ``asyncio.gather``
29+
with ``asyncio.to_thread`` for natural integration with the asyncio event loop.
30+
31+
The implementation uses composition: an internal ``S3FileSystem`` instance handles
32+
all boto3 calls, while this class delegates to it via ``asyncio.to_thread()``.
33+
This avoids diamond inheritance issues and keeps all boto3 logic in one place.
34+
35+
File handles created by ``_open`` use ``S3AioExecutor`` so that parallel
36+
operations (range reads, multipart uploads) are dispatched via the event loop
37+
instead of spawning additional threads.
38+
39+
Attributes:
40+
_sync_fs: The internal synchronous S3FileSystem instance.
41+
42+
Example:
43+
>>> from pyathena.filesystem.s3_async import AioS3FileSystem
44+
>>> fs = AioS3FileSystem(asynchronous=True)
45+
>>>
46+
>>> # Use in async context
47+
>>> files = await fs._ls('s3://my-bucket/data/')
48+
>>>
49+
>>> # Sync wrappers also available (auto-generated by fsspec)
50+
>>> files = fs.ls('s3://my-bucket/data/')
51+
"""
52+
53+
# https://docs.aws.amazon.com/AmazonS3/latest/API/API_DeleteObjects.html
54+
DELETE_OBJECTS_MAX_KEYS: int = 1000
55+
56+
protocol = ("s3", "s3a")
57+
mirror_sync_methods = True
58+
async_impl = True
59+
_extra_tokenize_attributes = ("default_block_size",)
60+
61+
def __init__(
62+
self,
63+
connection: Optional["Connection[Any]"] = None,
64+
default_block_size: Optional[int] = None,
65+
default_cache_type: Optional[str] = None,
66+
max_workers: int = (cpu_count() or 1) * 5,
67+
s3_additional_kwargs: Optional[Dict[str, Any]] = None,
68+
asynchronous: bool = False,
69+
loop: Optional[Any] = None,
70+
batch_size: Optional[int] = None,
71+
**kwargs,
72+
) -> None:
73+
super().__init__(
74+
asynchronous=asynchronous,
75+
loop=loop,
76+
batch_size=batch_size,
77+
**kwargs,
78+
)
79+
self._sync_fs = S3FileSystem(
80+
connection=connection,
81+
default_block_size=default_block_size,
82+
default_cache_type=default_cache_type,
83+
max_workers=max_workers,
84+
s3_additional_kwargs=s3_additional_kwargs,
85+
**kwargs,
86+
)
87+
# Share dircache for cache coherence between async and sync instances
88+
self.dircache = self._sync_fs.dircache
89+
90+
@staticmethod
91+
def parse_path(path: str) -> Tuple[str, Optional[str], Optional[str]]:
92+
return S3FileSystem.parse_path(path)
93+
94+
async def _info(self, path: str, **kwargs) -> S3Object:
95+
return await asyncio.to_thread(self._sync_fs.info, path, **kwargs)
96+
97+
async def _ls(
98+
self, path: str, detail: bool = False, **kwargs
99+
) -> Union[List[S3Object], List[str]]:
100+
return await asyncio.to_thread(self._sync_fs.ls, path, detail=detail, **kwargs)
101+
102+
async def _cat_file(
103+
self, path: str, start: Optional[int] = None, end: Optional[int] = None, **kwargs
104+
) -> bytes:
105+
return await asyncio.to_thread(self._sync_fs.cat_file, path, start=start, end=end, **kwargs)
106+
107+
async def _exists(self, path: str, **kwargs) -> bool:
108+
return await asyncio.to_thread(self._sync_fs.exists, path, **kwargs)
109+
110+
async def _rm_file(self, path: str, **kwargs) -> None:
111+
await asyncio.to_thread(self._sync_fs.rm_file, path, **kwargs)
112+
113+
async def _pipe_file(self, path: str, value: bytes, **kwargs) -> None:
114+
await asyncio.to_thread(self._sync_fs.pipe_file, path, value, **kwargs)
115+
116+
async def _put_file(self, lpath: str, rpath: str, callback=_DEFAULT_CALLBACK, **kwargs) -> None:
117+
await asyncio.to_thread(self._sync_fs.put_file, lpath, rpath, callback=callback, **kwargs)
118+
119+
async def _get_file(self, rpath: str, lpath: str, callback=_DEFAULT_CALLBACK, **kwargs) -> None:
120+
await asyncio.to_thread(self._sync_fs.get_file, rpath, lpath, callback=callback, **kwargs)
121+
122+
async def _mkdir(self, path: str, create_parents: bool = True, **kwargs) -> None:
123+
await asyncio.to_thread(self._sync_fs.mkdir, path, create_parents=create_parents, **kwargs)
124+
125+
async def _makedirs(self, path: str, exist_ok: bool = False) -> None:
126+
await asyncio.to_thread(self._sync_fs.makedirs, path, exist_ok=exist_ok)
127+
128+
async def _rm(self, path: Union[str, List[str]], recursive: bool = False, **kwargs) -> None:
129+
"""Remove files or directories using async parallel batch deletion.
130+
131+
For multiple paths, chunks into batches of 1000 (S3 API limit) and uses
132+
``asyncio.gather`` with ``asyncio.to_thread`` instead of ThreadPoolExecutor.
133+
"""
134+
if isinstance(path, str):
135+
path = [path]
136+
137+
bucket, _, _ = self.parse_path(path[0])
138+
139+
expand_paths: List[str] = []
140+
for p in path:
141+
expanded = await asyncio.to_thread(self._sync_fs.expand_path, p, recursive=recursive)
142+
expand_paths.extend(expanded)
143+
144+
if not expand_paths:
145+
return
146+
147+
quiet = kwargs.pop("Quiet", True)
148+
delete_objects: List[Dict[str, Any]] = []
149+
for p in expand_paths:
150+
_, key, version_id = self.parse_path(p)
151+
if key:
152+
object_: Dict[str, Any] = {"Key": key}
153+
if version_id:
154+
object_["VersionId"] = version_id
155+
delete_objects.append(object_)
156+
157+
if not delete_objects:
158+
return
159+
160+
chunks = [
161+
delete_objects[i : i + self.DELETE_OBJECTS_MAX_KEYS]
162+
for i in range(0, len(delete_objects), self.DELETE_OBJECTS_MAX_KEYS)
163+
]
164+
165+
async def _delete_chunk(chunk: List[Dict[str, Any]]) -> None:
166+
request = {
167+
"Bucket": bucket,
168+
"Delete": {
169+
"Objects": chunk,
170+
"Quiet": quiet,
171+
},
172+
}
173+
await asyncio.to_thread(
174+
self._sync_fs._call, self._sync_fs._client.delete_objects, **request
175+
)
176+
177+
await asyncio.gather(*[_delete_chunk(chunk) for chunk in chunks])
178+
179+
for p in expand_paths:
180+
self._sync_fs.invalidate_cache(p)
181+
182+
async def _cp_file(self, path1: str, path2: str, **kwargs) -> None:
183+
"""Copy an S3 object, using async parallel multipart upload for large files."""
184+
kwargs.pop("onerror", None)
185+
bucket1, key1, version_id1 = self.parse_path(path1)
186+
bucket2, key2, version_id2 = self.parse_path(path2)
187+
if version_id2:
188+
raise ValueError("Cannot copy to a versioned file.")
189+
if not key1 or not key2:
190+
raise ValueError("Cannot copy buckets.")
191+
192+
info1 = await self._info(path1)
193+
size1 = info1.get("size", 0)
194+
if size1 <= S3FileSystem.MULTIPART_UPLOAD_MAX_PART_SIZE:
195+
await asyncio.to_thread(
196+
self._sync_fs._copy_object,
197+
bucket1=bucket1,
198+
key1=key1,
199+
version_id1=version_id1,
200+
bucket2=bucket2,
201+
key2=key2,
202+
**kwargs,
203+
)
204+
else:
205+
await self._copy_object_with_multipart_upload(
206+
bucket1=bucket1,
207+
key1=key1,
208+
version_id1=version_id1,
209+
size1=size1,
210+
bucket2=bucket2,
211+
key2=key2,
212+
**kwargs,
213+
)
214+
self._sync_fs.invalidate_cache(path2)
215+
216+
async def _copy_object_with_multipart_upload(
217+
self,
218+
bucket1: str,
219+
key1: str,
220+
size1: int,
221+
bucket2: str,
222+
key2: str,
223+
block_size: Optional[int] = None,
224+
version_id1: Optional[str] = None,
225+
**kwargs,
226+
) -> None:
227+
block_size = block_size if block_size else S3FileSystem.MULTIPART_UPLOAD_MAX_PART_SIZE
228+
if (
229+
block_size < S3FileSystem.MULTIPART_UPLOAD_MIN_PART_SIZE
230+
or block_size > S3FileSystem.MULTIPART_UPLOAD_MAX_PART_SIZE
231+
):
232+
raise ValueError("Block size must be greater than 5MiB and less than 5GiB.")
233+
234+
copy_source: Dict[str, Any] = {
235+
"Bucket": bucket1,
236+
"Key": key1,
237+
}
238+
if version_id1:
239+
copy_source["VersionId"] = version_id1
240+
241+
ranges = S3File._get_ranges(
242+
0,
243+
size1,
244+
self._sync_fs.max_workers,
245+
block_size,
246+
)
247+
multipart_upload = await asyncio.to_thread(
248+
self._sync_fs._create_multipart_upload,
249+
bucket=bucket2,
250+
key=key2,
251+
**kwargs,
252+
)
253+
254+
async def _upload_part(i: int, range_: Tuple[int, int]) -> Dict[str, Any]:
255+
result = await asyncio.to_thread(
256+
self._sync_fs._upload_part_copy,
257+
bucket=bucket2,
258+
key=key2,
259+
copy_source=copy_source,
260+
upload_id=cast(str, multipart_upload.upload_id),
261+
part_number=i + 1,
262+
copy_source_ranges=range_,
263+
)
264+
return {
265+
"ETag": result.etag,
266+
"PartNumber": result.part_number,
267+
}
268+
269+
parts = await asyncio.gather(*[_upload_part(i, r) for i, r in enumerate(ranges)])
270+
parts_list = sorted(parts, key=lambda x: x["PartNumber"])
271+
272+
await asyncio.to_thread(
273+
self._sync_fs._complete_multipart_upload,
274+
bucket=bucket2,
275+
key=key2,
276+
upload_id=cast(str, multipart_upload.upload_id),
277+
parts=parts_list,
278+
)
279+
280+
async def _find(
281+
self,
282+
path: str,
283+
maxdepth: Optional[int] = None,
284+
withdirs: bool = False,
285+
**kwargs,
286+
) -> Union[Dict[str, S3Object], List[str]]:
287+
detail = kwargs.pop("detail", False)
288+
files = await asyncio.to_thread(
289+
self._sync_fs._find, path, maxdepth=maxdepth, withdirs=withdirs, **kwargs
290+
)
291+
if detail:
292+
return {f.name: f for f in files}
293+
return [f.name for f in files]
294+
295+
def _open(
296+
self,
297+
path: str,
298+
mode: str = "rb",
299+
block_size: Optional[int] = None,
300+
cache_type: Optional[str] = None,
301+
autocommit: bool = True,
302+
cache_options: Optional[Dict[Any, Any]] = None,
303+
**kwargs,
304+
) -> "AioS3File":
305+
if block_size is None:
306+
block_size = self._sync_fs.default_block_size
307+
if cache_type is None:
308+
cache_type = self._sync_fs.default_cache_type
309+
max_workers = kwargs.pop("max_worker", self._sync_fs.max_workers)
310+
s3_additional_kwargs = kwargs.pop("s3_additional_kwargs", {})
311+
s3_additional_kwargs.update(self._sync_fs.s3_additional_kwargs)
312+
313+
return AioS3File(
314+
self._sync_fs,
315+
path,
316+
mode,
317+
version_id=None,
318+
max_workers=max_workers,
319+
executor=S3AioExecutor(loop=self._loop),
320+
block_size=block_size,
321+
cache_type=cache_type,
322+
autocommit=autocommit,
323+
cache_options=cache_options,
324+
s3_additional_kwargs=s3_additional_kwargs,
325+
**kwargs,
326+
)
327+
328+
def sign(self, path: str, expiration: int = 3600, **kwargs) -> str:
329+
return cast(str, self._sync_fs.sign(path, expiration=expiration, **kwargs))
330+
331+
def checksum(self, path: str, **kwargs) -> int:
332+
return cast(int, self._sync_fs.checksum(path, **kwargs))
333+
334+
def created(self, path: str) -> "datetime":
335+
return self._sync_fs.created(path)
336+
337+
def modified(self, path: str) -> "datetime":
338+
return self._sync_fs.modified(path)
339+
340+
def invalidate_cache(self, path: Optional[str] = None) -> None:
341+
self._sync_fs.invalidate_cache(path)
342+
343+
async def _touch(self, path: str, truncate: bool = True, **kwargs) -> None:
344+
await asyncio.to_thread(self._sync_fs.touch, path, truncate=truncate, **kwargs)
345+
346+
347+
class AioS3File(S3File):
348+
"""Async-aware S3 file handle using ``S3AioExecutor``.
349+
350+
Functionally identical to ``S3File``; exists as a distinct type for
351+
``isinstance`` checks and to document the async execution model.
352+
All parallel operations (range reads, multipart uploads) are dispatched
353+
through the ``S3Executor`` interface — the ``S3AioExecutor``
354+
provided by ``AioS3FileSystem`` uses the event loop instead of threads.
355+
"""
356+
357+
pass

0 commit comments

Comments
 (0)