2727import logging
2828import os
2929import stat
30- from argparse import ArgumentParser
30+ from argparse import ArgumentParser , Namespace
3131from typing import cast
3232
3333import pyfuse3
3434import pyfuse3 .asyncio
35- from pyfuse3 import FileHandleT , FileInfo , InodeT
35+ from pyfuse3 import EntryAttributes , FileHandleT , FileInfo , InodeT , ReaddirToken , RequestContext
3636
3737try :
3838 import faulthandler
4646
4747
4848class TestFs (pyfuse3 .Operations ):
49- def __init__ (self ):
49+ def __init__ (self ) -> None :
5050 super (TestFs , self ).__init__ ()
5151 self .hello_name = b"message"
5252 self .hello_inode = cast (InodeT , pyfuse3 .ROOT_INODE + 1 )
5353 self .hello_data = b"hello world\n "
5454
55- async def getattr (self , inode , ctx = None ):
56- entry = pyfuse3 . EntryAttributes ()
55+ async def getattr (self , inode : InodeT , ctx : RequestContext | None = None ) -> EntryAttributes :
56+ entry = EntryAttributes ()
5757 if inode == pyfuse3 .ROOT_INODE :
5858 entry .st_mode = stat .S_IFDIR | 0o755
5959 entry .st_size = 0
@@ -73,26 +73,28 @@ async def getattr(self, inode, ctx=None):
7373
7474 return entry
7575
76- async def lookup (self , parent_inode , name , ctx = None ):
76+ async def lookup (
77+ self , parent_inode : InodeT , name : bytes , ctx : RequestContext
78+ ) -> EntryAttributes :
7779 if parent_inode != pyfuse3 .ROOT_INODE or name != self .hello_name :
7880 raise pyfuse3 .FUSEError (errno .ENOENT )
79- return await self .getattr (self .hello_inode )
81+ return await self .getattr (self .hello_inode , ctx )
8082
81- async def opendir (self , inode , ctx ) :
83+ async def opendir (self , inode : InodeT , ctx : RequestContext ) -> FileHandleT :
8284 if inode != pyfuse3 .ROOT_INODE :
8385 raise pyfuse3 .FUSEError (errno .ENOENT )
8486 # For simplicity, we use the inode as file handle
8587 return FileHandleT (inode )
8688
87- async def readdir (self , fh , start_id , token ) :
89+ async def readdir (self , fh : FileHandleT , start_id : int , token : ReaddirToken ) -> None :
8890 assert fh == pyfuse3 .ROOT_INODE
8991
9092 # only one entry
9193 if start_id == 0 :
9294 pyfuse3 .readdir_reply (token , self .hello_name , await self .getattr (self .hello_inode ), 1 )
9395 return
9496
95- async def setxattr (self , inode , name , value , ctx ) :
97+ async def setxattr (self , inode : InodeT , name : bytes , value : bytes , ctx : RequestContext ) -> None :
9698 if inode != pyfuse3 .ROOT_INODE or name != b'command' :
9799 raise pyfuse3 .FUSEError (errno .ENOTSUP )
98100
@@ -101,20 +103,20 @@ async def setxattr(self, inode, name, value, ctx):
101103 else :
102104 raise pyfuse3 .FUSEError (errno .EINVAL )
103105
104- async def open (self , inode , flags , ctx ) :
106+ async def open (self , inode : InodeT , flags : int , ctx : RequestContext ) -> FileInfo :
105107 if inode != self .hello_inode :
106108 raise pyfuse3 .FUSEError (errno .ENOENT )
107109 if flags & os .O_RDWR or flags & os .O_WRONLY :
108110 raise pyfuse3 .FUSEError (errno .EACCES )
109111 # For simplicity, we use the inode as file handle
110112 return FileInfo (fh = FileHandleT (inode ))
111113
112- async def read (self , fh , off , size ) :
114+ async def read (self , fh : FileHandleT , off : int , size : int ) -> bytes :
113115 assert fh == self .hello_inode
114116 return self .hello_data [off : off + size ]
115117
116118
117- def init_logging (debug = False ):
119+ def init_logging (debug : bool = False ) -> None :
118120 formatter = logging .Formatter (
119121 '%(asctime)s.%(msecs)03d %(threadName)s: [%(name)s] %(message)s' ,
120122 datefmt = "%Y-%m-%d %H:%M:%S" ,
@@ -131,7 +133,7 @@ def init_logging(debug=False):
131133 root_logger .addHandler (handler )
132134
133135
134- def parse_args ():
136+ def parse_args () -> Namespace :
135137 '''Parse command line'''
136138
137139 parser = ArgumentParser ()
@@ -146,7 +148,7 @@ def parse_args():
146148 return parser .parse_args ()
147149
148150
149- def main ():
151+ def main () -> None :
150152 options = parse_args ()
151153 init_logging (options .debug )
152154
0 commit comments