Skip to content

Commit 61c016f

Browse files
feat: Fsspec: Convert async methods that open sync file handles to use LocalStore (#656)
* feat: fix async method using sync interactions with local fs * feat: fix async method using sync interactions with local fs * feat: fix async method using sync interactions with local fs
1 parent 96c2a75 commit 61c016f

1 file changed

Lines changed: 13 additions & 23 deletions

File tree

obstore/python/obstore/fsspec.py

Lines changed: 13 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -28,14 +28,14 @@
2828
integration.
2929
"""
3030

31-
# ruff: noqa: ANN401, EM102, PTH123, FBT001, FBT002
31+
# ruff: noqa: ANN401, EM102, FBT001, FBT002
3232

3333
from __future__ import annotations
3434

3535
import asyncio
3636
import warnings
3737
from collections import defaultdict
38-
from functools import lru_cache
38+
from functools import cached_property, lru_cache
3939
from pathlib import Path
4040
from typing import TYPE_CHECKING, Literal, overload
4141
from urllib.parse import urlparse
@@ -271,6 +271,10 @@ def __init__( # noqa: PLR0913
271271
batch_size=batch_size,
272272
)
273273

274+
@cached_property
275+
def _local_store(self) -> FsspecStore:
276+
return FsspecStore("file")
277+
274278
def _split_path(self, path: str) -> tuple[str, str]:
275279
"""Split bucket and file path.
276280
@@ -462,39 +466,25 @@ async def _put_file(
462466
mode: str = "overwrite", # noqa: ARG002
463467
**_kwargs: Any,
464468
) -> None:
465-
# TODO: We shouldn't use blocking file operations in async functions
466-
if not Path(lpath).is_file(): # noqa: ASYNC240
467-
err_msg = f"File {lpath} not found in local"
468-
raise FileNotFoundError(err_msg)
469469

470-
# TODO: convert to use async file system methods using LocalStore
471-
# Async functions should not open files with blocking methods like `open`
470+
if not await self._local_store._exists(lpath): # noqa: SLF001
471+
raise FileNotFoundError(lpath)
472+
472473
rbucket, rpath = self._split_path(rpath)
473474

474475
# Should construct the store instance by rbucket, which is the target path
475476
store = self._construct_store(rbucket)
476477

477-
with open(lpath, "rb") as f: # noqa: ASYNC230
478-
await store.put_async(rpath, f)
478+
await store.put_async(rpath, Path(lpath))
479479

480480
async def _get_file(self, rpath: str, lpath: str, **_kwargs: Any) -> None:
481481
res = urlparse(lpath)
482-
# TODO: We shouldn't use blocking file operations in async functions
483-
if res.scheme or Path(lpath).is_dir(): # noqa: ASYNC240
482+
if res.scheme or await self._local_store._isdir(lpath): # noqa: SLF001
484483
# lpath need to be local file and cannot contain scheme
485484
return
486485

487-
# TODO: convert to use async file system methods using LocalStore
488-
# Async functions should not open files with blocking methods like `open`
489-
rbucket, rpath = self._split_path(rpath)
490-
491-
# Should construct the store instance by rbucket, which is the target path
492-
store = self._construct_store(rbucket)
493-
494-
with open(lpath, "wb") as f: # noqa: ASYNC230
495-
resp = await store.get_async(rpath)
496-
async for buffer in resp.stream():
497-
f.write(buffer)
486+
resp = await self._cat_file(rpath)
487+
await self._local_store._pipe_file(lpath, resp) # noqa: SLF001
498488

499489
async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]:
500490
bucket, path_no_bucket = self._split_path(path)

0 commit comments

Comments
 (0)