Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 13 additions & 23 deletions obstore/python/obstore/fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@
integration.
"""

# ruff: noqa: ANN401, EM102, PTH123, FBT001, FBT002
# ruff: noqa: ANN401, EM102, FBT001, FBT002

from __future__ import annotations

import asyncio
import warnings
from collections import defaultdict
from functools import lru_cache
from functools import cached_property, lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Literal, overload
from urllib.parse import urlparse
Expand Down Expand Up @@ -271,6 +271,10 @@ def __init__( # noqa: PLR0913
batch_size=batch_size,
)

@cached_property
def _local_store(self) -> FsspecStore:
return FsspecStore("file")

def _split_path(self, path: str) -> tuple[str, str]:
"""Split bucket and file path.

Expand Down Expand Up @@ -462,39 +466,25 @@ async def _put_file(
mode: str = "overwrite", # noqa: ARG002
**_kwargs: Any,
) -> None:
# TODO: We shouldn't use blocking file operations in async functions
if not Path(lpath).is_file(): # noqa: ASYNC240
err_msg = f"File {lpath} not found in local"
raise FileNotFoundError(err_msg)

# TODO: convert to use async file system methods using LocalStore
# Async functions should not open files with blocking methods like `open`
if not await self._local_store._exists(lpath): # noqa: SLF001
raise FileNotFoundError(lpath)

rbucket, rpath = self._split_path(rpath)

# Should construct the store instance by rbucket, which is the target path
store = self._construct_store(rbucket)

with open(lpath, "rb") as f: # noqa: ASYNC230
await store.put_async(rpath, f)
await store.put_async(rpath, Path(lpath))

async def _get_file(self, rpath: str, lpath: str, **_kwargs: Any) -> None:
res = urlparse(lpath)
# TODO: We shouldn't use blocking file operations in async functions
if res.scheme or Path(lpath).is_dir(): # noqa: ASYNC240
if res.scheme or await self._local_store._isdir(lpath): # noqa: SLF001
# lpath need to be local file and cannot contain scheme
return

# TODO: convert to use async file system methods using LocalStore
# Async functions should not open files with blocking methods like `open`
rbucket, rpath = self._split_path(rpath)

# Should construct the store instance by rbucket, which is the target path
store = self._construct_store(rbucket)

with open(lpath, "wb") as f: # noqa: ASYNC230
resp = await store.get_async(rpath)
async for buffer in resp.stream():
f.write(buffer)
resp = await self._cat_file(rpath)
await self._local_store._pipe_file(lpath, resp) # noqa: SLF001

async def _info(self, path: str, **_kwargs: Any) -> dict[str, Any]:
bucket, path_no_bucket = self._split_path(path)
Expand Down
Loading