Skip to content

Commit 8293494

Browse files
obbardcNikratio
authored andcommitted
Add poll notification support
Expose libfuse low-level poll support through pyfuse3 so that filesystems can implement readiness notifications for poll(2), select(2) and epoll_wait(2). Add bindings for struct fuse_pollhandle, fuse_reply_poll(), fuse_lowlevel_notify_poll() and fuse_pollhandle_destroy(). Introduce a Python PollHandle wrapper with a notify() method to allow a filesystem to retain the poll handle provided by Operations.poll() and notify it later when readiness changes. Wire the low-level FUSE poll callback into Operations.poll(), returning the current readiness mask to the kernel. The default implementation continues to raise -ENOSYS so existing filesystems keep the previous fallback behaviour unless they opt-in. This is needed by filesystems that emulate pollable kernel interfaces, such as sysfs GPIO value files, where edge events must wake userspace processes waiting for POLLPRI. Fixes: #139 Signed-off-by: Christopher Obbard <christopher.obbard@linaro.org>
1 parent 9e9a1f7 commit 8293494

8 files changed

Lines changed: 252 additions & 3 deletions

File tree

Include/fuse_common.pxd

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,11 @@ cdef extern from * nogil: # fuse_common.h should not be included
4242
struct fuse_chan:
4343
pass
4444

45+
struct fuse_pollhandle:
46+
pass
47+
48+
void fuse_pollhandle_destroy(fuse_pollhandle *ph)
49+
4550
struct fuse_loop_config:
4651
int clone_fd
4752
unsigned max_idle_threads

Include/fuse_lowlevel.pxd

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,8 @@ cdef extern from "<fuse_lowlevel.h>" nogil:
119119
off_t offset, off_t length, fuse_file_info *fi) except *
120120
void (*readdirplus) (fuse_req_t req, fuse_ino_t ino, size_t size, off_t off,
121121
fuse_file_info *fi) except *
122+
void (*poll) (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi,
123+
fuse_pollhandle *ph) except *
122124

123125

124126
# Reply functions
@@ -137,6 +139,7 @@ cdef extern from "<fuse_lowlevel.h>" nogil:
137139
fuse_buf_copy_flags flags)
138140
int fuse_reply_statfs(fuse_req_t req, statvfs *stbuf)
139141
int fuse_reply_xattr(fuse_req_t req, size_t count)
142+
int fuse_reply_poll(fuse_req_t req, unsigned revents)
140143

141144
size_t fuse_add_direntry(fuse_req_t req, const_char *buf, size_t bufsize,
142145
const_char *name, struct_stat *stbuf,
@@ -157,6 +160,7 @@ cdef extern from "<fuse_lowlevel.h>" nogil:
157160
fuse_buf_copy_flags flags)
158161
int fuse_lowlevel_notify_retrieve(fuse_session *se, fuse_ino_t ino,
159162
size_t size, off_t offset, void *cookie)
163+
int fuse_lowlevel_notify_poll(fuse_pollhandle *ph)
160164

161165
# Utility functions
162166
void *fuse_req_userdata(fuse_req_t req)

src/pyfuse3/__init__.pyi

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,10 @@ class FUSEError(Exception):
129129
def __init__(self, errno: int) -> None: ...
130130
def __str__(self) -> str: ...
131131

132+
class PollHandle:
133+
def __getstate__(self) -> None: ...
134+
def notify(self) -> None: ...
135+
132136
def listdir(path: str) -> List[str]: ...
133137
def syncfs(path: str) -> str: ...
134138
def setxattr(path: str, name: str, value: bytes, namespace: NamespaceT = ...) -> None: ...

src/pyfuse3/__init__.pyx

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,78 @@ cdef class FUSEError(Exception):
504504
return strerror(self.errno_)
505505

506506

507+
@cython.freelist(10)
508+
cdef class PollHandle:
509+
'''
510+
Opaque handle for delivering poll(2) readiness notifications.
511+
512+
Instances of this class are created by pyfuse3 and passed to
513+
`Operations.poll`. The filesystem may keep a reference and later
514+
call `PollHandle.notify` on the handle to wake up any process currently
515+
blocked in :manpage:`poll(2)`, :manpage:`select(2)` or
516+
:manpage:`epoll_wait(2)` for the corresponding file descriptor.
517+
518+
A single notification is sufficient to clear all pending waiters;
519+
filesystems should normally discard the handle after notifying.
520+
521+
The underlying ``fuse_pollhandle`` is automatically destroyed when
522+
the Python object is garbage collected, so filesystems should simply
523+
drop the reference when the notification is no longer needed.
524+
'''
525+
526+
cdef fuse_pollhandle *_ph
527+
528+
def __cinit__(self):
529+
self._ph = NULL
530+
531+
def __init__(self):
532+
raise TypeError('PollHandle cannot be instantiated directly')
533+
534+
@staticmethod
535+
cdef PollHandle from_ptr(fuse_pollhandle *ph):
536+
cdef PollHandle self
537+
538+
if ph == NULL:
539+
raise ValueError('NULL fuse_pollhandle')
540+
541+
self = PollHandle.__new__(PollHandle)
542+
self._ph = ph
543+
return self
544+
545+
def __dealloc__(self):
546+
if self._ph is not NULL:
547+
fuse_pollhandle_destroy(self._ph)
548+
self._ph = NULL
549+
550+
def __getstate__(self):
551+
raise PicklingError("PollHandle instances can't be pickled")
552+
553+
def notify(self):
554+
'''
555+
Notify IO readiness for this poll handle.
556+
557+
After this returns, any process waiting in :manpage:`poll(2)`,
558+
:manpage:`select(2)` or :manpage:`epoll_wait(2)` on the
559+
corresponding file descriptor will be woken so it can re-poll
560+
the filesystem for the current readiness mask.
561+
562+
Each `PollHandle` is intended for a single notification. After a
563+
successful call, the filesystem should not call `notify_poll` again on
564+
the same handle and should discard it.
565+
'''
566+
567+
cdef int ret
568+
569+
if self._ph == NULL:
570+
raise RuntimeError('PollHandle is no longer valid')
571+
572+
with nogil:
573+
ret = fuse_lowlevel_notify_poll(self._ph)
574+
575+
if ret != 0:
576+
raise OSError(-ret, 'fuse_lowlevel_notify_poll returned: ' + strerror(-ret))
577+
578+
507579
def listdir(path):
508580
'''Like `os.listdir`, but releases the GIL.
509581

src/pyfuse3/_pyfuse3.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
EntryAttributes,
3636
FileInfo,
3737
FUSEError,
38+
PollHandle,
3839
ReaddirToken,
3940
RequestContext,
4041
SetattrFields,
@@ -451,6 +452,44 @@ async def fsync(self, fh: FileHandleT, datasync: bool) -> None:
451452

452453
raise FUSEError(errno.ENOSYS)
453454

455+
async def poll(
456+
self,
457+
inode: InodeT,
458+
fh: FileHandleT,
459+
poll_handle: Optional["PollHandle"],
460+
ctx: "RequestContext",
461+
) -> int:
462+
'''Check IO readiness on an open file.
463+
464+
This method is called when a process performs :manpage:`poll(2)`,
465+
:manpage:`select(2)` or :manpage:`epoll_wait(2)` on a file descriptor
466+
backed by *fh* (returned by a prior `open` or `create` call). *inode*
467+
identifies the inode that *fh* refers to.
468+
469+
The method will return the bitwise-or of the currently active poll
470+
events, for example `select.POLLIN`, `select.POLLOUT` or
471+
`select.POLLPRI`. If no events are currently ready, it will return `0`.
472+
473+
If *poll_handle* is `None`, the kernel has not provided a notification
474+
handle for this request. The filesystem should only return the current
475+
readiness mask and must not attempt to store a handle or arrange a later
476+
`PollHandle.notify` call for this poll request.
477+
478+
If *poll_handle* is not `None`, the kernel has provided a notification
479+
handle that may be used to wake waiters if readiness changes after this
480+
method returns. The filesystem may store the handle and later call
481+
`PollHandle.notify` when a relevant event becomes available. Each
482+
`~Operations.poll` call produces a fresh handle; storing a new handle
483+
should replace any previously held one, allowing the old handle to be
484+
destroyed.
485+
486+
If this method raises `FUSEError(errno.ENOSYS)` (the default), the
487+
kernel will fall back to a default poll implementation and will not call
488+
this handler again for the lifetime of the mount.
489+
'''
490+
491+
raise FUSEError(errno.ENOSYS)
492+
454493
async def opendir(self, inode: InodeT, ctx: "RequestContext") -> FileHandleT:
455494
'''Open the directory with inode *inode*.
456495

src/pyfuse3/handlers.pxi

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,39 @@ async def fuse_access_async (_Container c):
836836

837837

838838

839+
cdef void fuse_poll (fuse_req_t req, fuse_ino_t ino, fuse_file_info *fi,
840+
fuse_pollhandle *ph):
841+
cdef _Container c = _Container()
842+
cdef object py_ph
843+
c.req = req
844+
c.ino = ino
845+
if fi is NULL:
846+
c.fh = 0
847+
else:
848+
c.fh = fi.fh
849+
if ph == NULL:
850+
py_ph = None
851+
else:
852+
py_ph = PollHandle.from_ptr(ph)
853+
save_retval(fuse_poll_async(c, py_ph))
854+
855+
async def fuse_poll_async (_Container c, object py_ph):
856+
cdef int ret
857+
cdef unsigned revents
858+
859+
ctx = get_request_context(c.req)
860+
try:
861+
result = await operations.poll(c.ino, c.fh, py_ph, ctx)
862+
except FUSEError as e:
863+
ret = fuse_reply_err(c.req, e.errno)
864+
else:
865+
revents = <unsigned> (result if result is not None else 0)
866+
ret = fuse_reply_poll(c.req, revents)
867+
868+
if ret != 0:
869+
log.error('fuse_poll(): fuse_reply_* failed with %s', strerror(-ret))
870+
871+
839872
cdef void fuse_create (fuse_req_t req, fuse_ino_t parent, const_char *name,
840873
mode_t mode, fuse_file_info *fi):
841874
cdef _Container c = _Container()

src/pyfuse3/internal.pxi

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ cdef void init_fuse_ops():
6969
fuse_ops.create = fuse_create
7070
fuse_ops.forget_multi = fuse_forget_multi
7171
fuse_ops.write_buf = fuse_write_buf
72+
fuse_ops.poll = fuse_poll
7273

7374
cdef make_fuse_args(args, fuse_args* f_args):
7475
cdef char* arg

test/test_fs.py

Lines changed: 94 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import logging
2222
import multiprocessing
2323
import os
24+
import select
2425
import stat
2526
import threading
2627
import time
@@ -34,6 +35,7 @@
3435
FileInfo,
3536
FUSEError,
3637
InodeT,
38+
PollHandle,
3739
ReaddirToken,
3840
RequestContext,
3941
)
@@ -59,11 +61,22 @@ def get_mp():
5961

6062
@pytest.fixture()
6163
def testfs(tmpdir):
64+
yield from _mount_fs(tmpdir, Fs)
65+
66+
67+
@pytest.fixture()
68+
def pollfs(tmpdir):
69+
yield from _mount_fs(tmpdir, PollTestFs)
70+
71+
72+
def _mount_fs(tmpdir, fs_class):
6273
mnt_dir = str(tmpdir)
6374
mp = get_mp()
6475
with mp.Manager() as mgr:
6576
cross_process = mgr.Namespace()
66-
mount_process = mp.Process(target=run_fs, args=(mnt_dir, cross_process))
77+
mount_process = mp.Process(
78+
target=run_fs, args=(mnt_dir, cross_process, fs_class)
79+
)
6780

6881
mount_process.start()
6982
try:
@@ -118,6 +131,38 @@ def test_notify_store(testfs):
118131
assert not fs_state.read_called
119132

120133

134+
def test_notify_poll(pollfs):
135+
(mnt_dir, fs_state) = pollfs
136+
path = os.path.join(mnt_dir, 'message')
137+
138+
with open(path, 'rb', buffering=0) as fh:
139+
poller = select.poll()
140+
poller.register(fh.fileno(), select.POLLPRI)
141+
142+
events = []
143+
144+
def poll_wait():
145+
events.extend(poller.poll(5000))
146+
147+
thread = threading.Thread(target=poll_wait)
148+
thread.start()
149+
150+
deadline = time.monotonic() + 5
151+
while time.monotonic() < deadline and not fs_state.poll_handle_received:
152+
time.sleep(0.01)
153+
154+
assert fs_state.poll_called
155+
assert fs_state.poll_handle_received
156+
assert not events
157+
158+
pyfuse3.setxattr(path, 'command', b'poll_ready')
159+
thread.join(5)
160+
assert not thread.is_alive()
161+
assert events
162+
assert events[0][0] == fh.fileno()
163+
assert events[0][1] & select.POLLPRI
164+
165+
121166
def test_entry_timeout(testfs):
122167
(mnt_dir, fs_state) = testfs
123168
fs_state.entry_timeout = 1
@@ -267,11 +312,57 @@ async def setxattr(self, inode, name, value, ctx):
267312

268313
elif value == b'terminate':
269314
pyfuse3.terminate()
315+
270316
else:
271317
raise FUSEError(errno.EINVAL)
272318

273319

274-
def run_fs(mountpoint, cross_process):
320+
class PollTestFs(Fs):
321+
def __init__(self, cross_process):
322+
super().__init__(cross_process)
323+
self.poll_handle: PollHandle | None = None
324+
self.status.poll_called = False
325+
self.status.poll_handle_received = False
326+
self.status.poll_ready = False
327+
328+
async def poll(
329+
self,
330+
inode: InodeT,
331+
fh: FileHandleT,
332+
poll_handle: PollHandle | None,
333+
ctx: RequestContext,
334+
) -> int:
335+
assert inode == self.hello_inode
336+
assert fh == self.hello_inode
337+
338+
self.status.poll_called = True
339+
340+
if poll_handle is not None:
341+
self.poll_handle = poll_handle
342+
self.status.poll_handle_received = True
343+
344+
if self.status.poll_ready:
345+
return select.POLLPRI
346+
347+
return 0
348+
349+
async def setxattr(self, inode, name, value, ctx):
350+
if value != b"poll_ready":
351+
return await super().setxattr(inode, name, value, ctx)
352+
353+
if inode != self.hello_inode or name != b"command":
354+
raise FUSEError(errno.ENOTSUP)
355+
356+
self.status.poll_ready = True
357+
358+
if self.poll_handle is None:
359+
raise FUSEError(errno.EINVAL)
360+
361+
self.poll_handle.notify()
362+
self.poll_handle = None
363+
364+
365+
def run_fs(mountpoint, cross_process, fs_class=Fs):
275366
# Logging (note that we run in a new process, so we can't
276367
# rely on direct log capture and instead print to stdout)
277368
root_logger = logging.getLogger()
@@ -285,7 +376,7 @@ def run_fs(mountpoint, cross_process):
285376
root_logger.addHandler(handler)
286377
root_logger.setLevel(logging.DEBUG)
287378

288-
testfs = Fs(cross_process)
379+
testfs = fs_class(cross_process)
289380
fuse_options = set(pyfuse3.default_options)
290381
fuse_options.add('fsname=pyfuse3_testfs')
291382
pyfuse3.init(testfs, mountpoint, fuse_options)

0 commit comments

Comments
 (0)