Skip to content

Commit 0aff0ec

Browse files
Implement additional methods for gfal3 development (#114)
* return additional stat info on ls * pre-commit fix * feat: add mv, chmod, checksum; enrich info/ls dicts; handle ls on file path - Add synchronous mv() using _myclient.mv() directly to avoid async event-loop issues and NotImplementedError from the inherited copy+rm path - Add async _chmod() / sync chmod = sync_wrapper(_chmod) via _myclient.chmod() - Add async _checksum() / sync checksum = sync_wrapper(_checksum) using QueryCode.CHECKSUM; returns (algorithm, value) tuple - _info() and _ls() now include uid, gid, nlink, atime, ctime alongside the existing mtime and mode fields so callers get a complete POSIX-like stat dict - _ls() falls back to [_info(path)] when the server reports "not a directory", "unable to open directory", or "no such file or directory" — consistent with how ls() on a plain file path behaves on other filesystems - Add tests: test_mv, test_mv_directory, test_chmod, test_checksum, test_ls_on_file, test_info_dict_fields * fix tests: use fsspec.open in test_mv, skip test_checksum when server unsupported * fix issue with checksum * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --------- Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
1 parent 5ee3768 commit 0aff0ec

2 files changed

Lines changed: 247 additions & 44 deletions

File tree

src/fsspec_xrootd/xrootd.py

Lines changed: 121 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import io
55
import logging
66
import os.path
7+
import stat as _stat
78
import time
89
import warnings
910
import weakref
@@ -220,6 +221,36 @@ async def _open(self, url: str, timeout: int) -> Any: # client.File
220221
return handle
221222

222223

224+
def _flags_to_mode(flags: StatInfoFlags) -> int:
225+
"""Convert XRootD ``StatInfoFlags`` to a POSIX ``st_mode`` integer.
226+
227+
XRootD exposes three relevant bits: ``IS_DIR``, ``IS_READABLE``, and
228+
``IS_WRITABLE``. The mapping used here follows the convention applied by
229+
gfal2-util and the EOS storage system:
230+
231+
* Readable directory → ``dr-xr-xr-x`` (0o40555)
232+
* Readable regular file → ``-r--r--r--`` (0o100444)
233+
* Writable adds user-write (0o200) on top of the above.
234+
* Files/directories that are neither readable nor writable → mode 0.
235+
"""
236+
if flags & StatInfoFlags.IS_DIR:
237+
ftype = _stat.S_IFDIR
238+
# Directories need the execute bit to be traversable.
239+
perms = (0o555 if flags & StatInfoFlags.IS_READABLE else 0) | (
240+
0o200 if flags & StatInfoFlags.IS_WRITABLE else 0
241+
)
242+
elif flags & StatInfoFlags.OTHER:
243+
# Symlinks and other special files — preserve type, no permissions.
244+
ftype = _stat.S_IFLNK
245+
perms = 0
246+
else:
247+
ftype = _stat.S_IFREG
248+
perms = (0o444 if flags & StatInfoFlags.IS_READABLE else 0) | (
249+
0o200 if flags & StatInfoFlags.IS_WRITABLE else 0
250+
)
251+
return ftype | perms
252+
253+
223254
class XRootDFileSystem(AsyncFileSystem): # type: ignore[misc]
224255
protocol = "root"
225256
root_marker = "/"
@@ -345,6 +376,21 @@ async def _makedirs(self, path: str, exist_ok: bool = False) -> None:
345376
if not status.ok and not (status.code == ErrorCodes.INVALID_PATH and exist_ok):
346377
raise OSError(f"Directory not made properly: {status.message}")
347378

379+
def mv(
380+
self,
381+
path1: str,
382+
path2: str,
383+
recursive: bool = False,
384+
maxdepth: int | None = None,
385+
**kwargs: Any,
386+
) -> None:
387+
"""Move a file or directory using the native XRootD client."""
388+
status, _ = self._myclient.mv(path1, path2, timeout=self.timeout)
389+
if not status.ok:
390+
raise OSError(f"Move operation failed: {status.message}")
391+
self.invalidate_cache(os.path.dirname(path1))
392+
self.invalidate_cache(os.path.dirname(path2))
393+
348394
async def _rm(
349395
self,
350396
path: str,
@@ -394,12 +440,39 @@ async def _touch(self, path: str, truncate: bool = False, **kwargs: Any) -> None
394440

395441
touch = sync_wrapper(_touch)
396442

443+
async def _chmod(self, path: str, mode: int) -> None:
444+
"""Change file permissions (POSIX mode integer)."""
445+
status, _ = await _async_wrap(self._myclient.chmod)(path, mode, self.timeout)
446+
if not status.ok:
447+
raise OSError(f"chmod failed: {status.message}")
448+
449+
chmod = sync_wrapper(_chmod)
450+
397451
async def _modified(self, path: str) -> Any:
398452
status, statInfo = await _async_wrap(self._myclient.stat)(path, self.timeout)
399453
return statInfo.modtime
400454

401455
modified = sync_wrapper(_modified)
402456

457+
async def _checksum(self, path: str, algorithm: str = "adler32") -> tuple[str, str]:
458+
"""Query the server for the checksum of a file.
459+
460+
Returns a (algorithm, value) tuple. Raises OSError if the server
461+
does not support checksums or if the file does not exist.
462+
"""
463+
status, response = await _async_wrap(self._myclient.query)(
464+
QueryCode.CHECKSUM, path, self.timeout
465+
)
466+
if not status.ok:
467+
raise OSError(f"Checksum query failed: {status.message}")
468+
text = response.decode() if isinstance(response, bytes) else response
469+
parts = text.strip("\x00").strip().split()
470+
if len(parts) < 2:
471+
raise OSError(f"Unexpected checksum response: {text!r}")
472+
return parts[0], parts[1]
473+
474+
checksum = sync_wrapper(_checksum)
475+
403476
async def _exists(self, path: str, **kwargs: Any) -> bool:
404477
if path in self.dircache:
405478
return True
@@ -417,35 +490,31 @@ async def _info(self, path: str, **kwargs: Any) -> dict[str, Any]:
417490
if deet is not None and len(deet) != 0:
418491
for item in deet:
419492
if item["name"] == path:
420-
return {
421-
"name": path,
422-
"size": item["size"],
423-
"type": item["type"],
424-
}
493+
# Return the full cached entry (including mtime/mode if present).
494+
return cast(dict[str, Any], item)
425495
raise OSError("_ls_from_cache() failed to function")
426496
else:
427497
status, deet = await _async_wrap(self._myclient.stat)(path, self.timeout)
428498
if not status.ok:
429499
raise OSError(f"File stat request failed: {status.message}")
430500
if deet.flags & StatInfoFlags.IS_DIR:
431-
ret = {
432-
"name": path,
433-
"size": deet.size,
434-
"type": "directory",
435-
}
501+
ftype = "directory"
436502
elif deet.flags & StatInfoFlags.OTHER:
437-
ret = {
438-
"name": path,
439-
"size": deet.size,
440-
"type": "other",
441-
}
503+
ftype = "other"
442504
else:
443-
ret = {
444-
"name": path,
445-
"size": deet.size,
446-
"type": "file",
447-
}
448-
return ret
505+
ftype = "file"
506+
return {
507+
"name": path,
508+
"size": deet.size,
509+
"type": ftype,
510+
"mtime": deet.modtime,
511+
"mode": _flags_to_mode(deet.flags),
512+
"uid": 0,
513+
"gid": 0,
514+
"nlink": 1,
515+
"atime": deet.modtime,
516+
"ctime": deet.modtime,
517+
}
449518

450519
async def _ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[Any]:
451520
listing = []
@@ -462,34 +531,42 @@ async def _ls(self, path: str, detail: bool = True, **kwargs: Any) -> list[Any]:
462531
path, DirListFlags.STAT, self.timeout
463532
)
464533
if not status.ok:
534+
msg = status.message.lower()
535+
not_dir = "not a directory" in msg
536+
no_open = "unable to open directory" in msg
537+
no_entry = "no such file or directory" in msg
538+
if not_dir or no_open or no_entry:
539+
info = await self._info(path)
540+
return (
541+
[info]
542+
if detail
543+
else [os.path.basename(info["name"].rstrip("/"))]
544+
)
465545
raise OSError(
466546
f"Server failed to provide directory info: {status.message}"
467547
)
468548
for item in deets:
469-
if item.statinfo.flags & StatInfoFlags.IS_DIR:
470-
listing.append(
471-
{
472-
"name": path + "/" + item.name,
473-
"size": item.statinfo.size,
474-
"type": "directory",
475-
}
476-
)
477-
elif item.statinfo.flags & StatInfoFlags.OTHER:
478-
listing.append(
479-
{
480-
"name": path + "/" + item.name,
481-
"size": item.statinfo.size,
482-
"type": "other",
483-
}
484-
)
549+
flags = item.statinfo.flags
550+
if flags & StatInfoFlags.IS_DIR:
551+
ftype = "directory"
552+
elif flags & StatInfoFlags.OTHER:
553+
ftype = "other"
485554
else:
486-
listing.append(
487-
{
488-
"name": path + "/" + item.name,
489-
"size": item.statinfo.size,
490-
"type": "file",
491-
}
492-
)
555+
ftype = "file"
556+
listing.append(
557+
{
558+
"name": path + "/" + item.name,
559+
"size": item.statinfo.size,
560+
"type": ftype,
561+
"mtime": item.statinfo.modtime,
562+
"mode": _flags_to_mode(flags),
563+
"uid": 0,
564+
"gid": 0,
565+
"nlink": 1,
566+
"atime": item.statinfo.modtime,
567+
"ctime": item.statinfo.modtime,
568+
}
569+
)
493570
self.dircache[path] = listing
494571
if detail:
495572
return listing

tests/test_basicio.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -493,3 +493,129 @@ async def run():
493493
assert read_start < close_stop < read_stop
494494

495495
asyncio.run(run())
496+
497+
498+
def test_mv(localserver, clear_server):
499+
"""mv() should move a file and invalidate the cache."""
500+
remoteurl, localpath = localserver
501+
with open(localpath + "/src.txt", "w") as fout:
502+
fout.write(TESTDATA1)
503+
504+
fs, _, (prefix,) = fsspec.get_fs_token_paths(remoteurl)
505+
src = prefix + "/src.txt"
506+
dst = prefix + "/dst.txt"
507+
508+
fs.mv(src, dst)
509+
time.sleep(sleep_time)
510+
511+
assert not fs.exists(src)
512+
assert fs.exists(dst)
513+
# Use open() rather than cat() to avoid the _cat_file start/end signature issue
514+
with fsspec.open(remoteurl + "/dst.txt", "rb") as f:
515+
assert f.read() == TESTDATA1.encode()
516+
517+
518+
def test_mv_directory(localserver, clear_server):
519+
"""mv() should move a directory."""
520+
remoteurl, localpath = localserver
521+
os.makedirs(localpath + "/srcdir")
522+
with open(localpath + "/srcdir/file.txt", "w") as fout:
523+
fout.write(TESTDATA1)
524+
525+
fs, _, (prefix,) = fsspec.get_fs_token_paths(remoteurl)
526+
src = prefix + "/srcdir"
527+
dst = prefix + "/dstdir"
528+
529+
fs.mv(src, dst)
530+
time.sleep(sleep_time)
531+
532+
assert not fs.exists(src)
533+
assert fs.exists(dst)
534+
assert fs.exists(dst + "/file.txt")
535+
536+
537+
def test_chmod(localserver, clear_server):
538+
"""chmod() should change the mode of a file."""
539+
remoteurl, localpath = localserver
540+
with open(localpath + "/testfile.txt", "w") as fout:
541+
fout.write(TESTDATA1)
542+
543+
fs, _, (prefix,) = fsspec.get_fs_token_paths(remoteurl)
544+
path = prefix + "/testfile.txt"
545+
546+
fs.chmod(path, 0o644)
547+
info = fs.info(path)
548+
# The server returns mode bits; check at least that the call succeeded
549+
# and that the info dict contains a 'mode' key.
550+
assert "mode" in info
551+
552+
553+
def test_checksum(localserver, clear_server):
554+
"""checksum() should return a (algorithm, value) tuple.
555+
556+
Skipped when the server does not support checksum queries (e.g. the
557+
minimal test server started by the fixture).
558+
"""
559+
remoteurl, localpath = localserver
560+
with open(localpath + "/testfile.txt", "w") as fout:
561+
fout.write(TESTDATA1)
562+
563+
fs, _, (prefix,) = fsspec.get_fs_token_paths(remoteurl)
564+
path = prefix + "/testfile.txt"
565+
566+
try:
567+
alg, value = fs.checksum(path, "adler32")
568+
except OSError as e:
569+
if "not supported" in str(e).lower():
570+
pytest.skip(f"XRootD server does not support checksum queries: {e}")
571+
raise
572+
573+
assert alg.lower() == "adler32"
574+
assert len(value) > 0
575+
# Value should be a valid hex string
576+
int(value, 16)
577+
578+
579+
def test_ls_on_file(localserver, clear_server):
580+
"""ls() called on a file path should return a single-entry list, not raise."""
581+
remoteurl, localpath = localserver
582+
with open(localpath + "/testfile.txt", "w") as fout:
583+
fout.write(TESTDATA1)
584+
585+
fs, _, (prefix,) = fsspec.get_fs_token_paths(remoteurl)
586+
path = prefix + "/testfile.txt"
587+
588+
# detail=True
589+
entries = fs.ls(path, detail=True)
590+
assert len(entries) == 1
591+
assert entries[0]["type"] == "file"
592+
593+
# detail=False
594+
names = fs.ls(path, detail=False)
595+
assert len(names) == 1
596+
597+
598+
def test_info_dict_fields(localserver, clear_server):
599+
"""info() and ls() entries should include mtime, mode, uid, gid, nlink."""
600+
remoteurl, localpath = localserver
601+
with open(localpath + "/testfile.txt", "w") as fout:
602+
fout.write(TESTDATA1)
603+
604+
fs, _, (prefix,) = fsspec.get_fs_token_paths(remoteurl)
605+
path = prefix + "/testfile.txt"
606+
607+
info = fs.info(path)
608+
for field in ("mtime", "mode", "uid", "gid", "nlink"):
609+
assert field in info, f"Missing field: {field}"
610+
611+
assert isinstance(info["mtime"], (int, float))
612+
assert info["mtime"] > 0
613+
assert isinstance(info["mode"], int)
614+
assert info["mode"] > 0
615+
616+
# ls() entries should also carry these fields
617+
ls_entries = fs.ls(prefix, detail=True)
618+
file_entry = next(e for e in ls_entries if e["name"].endswith("testfile.txt"))
619+
for field in ("mtime", "mode", "uid", "gid", "nlink"):
620+
assert field in file_entry, f"ls entry missing field: {field}"
621+
assert file_entry["mtime"] > 0

0 commit comments

Comments
 (0)