Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 165 additions & 106 deletions core/src/Streamly/Internal/FileSystem/Posix/ReadDir.hsc
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,8 @@ readScanWith :: -- (MonadIO m, MonadCatch m) =>
-> Stream m a
readScanWith = undefined

-- NOTE: See https://www.manpagez.com/man/2/getattrlistbulk/ for BSD/macOS.

-- | Read with full metadata.
{-# INLINE readPlusScanWith #-}
readPlusScanWith :: -- (MonadIO m, MonadCatch m) =>
Expand Down Expand Up @@ -546,28 +548,40 @@ foreign import ccall unsafe "string.h memcpy" c_memcpy
foreign import ccall unsafe "string.h strlen" c_strlen
:: Ptr CChar -> IO CSize

-- Split a list in half.
splitHalf :: [a] -> ([a], [a])
splitHalf xxs = split xxs xxs

where

split (x:xs) (_:_:ys) =
let (f, s) = split xs ys
in (x:f, s)
split xs _ = ([], xs)

{-# ANN type ChunkStreamByteState Fuse #-}
data ChunkStreamByteState =
ChunkStreamByteInit0
| ChunkStreamByteInit [PosixPath] [PosixPath] Int MutByteArray Int
ChunkStreamByteInit
| ChunkStreamByteStop
| ChunkStreamByteLoop
PosixPath -- current dir path
[PosixPath] -- remaining dirs
(Ptr CDir) -- current dir
[PosixPath] -- dirs buffered
Int -- dir count
(Ptr CDir) -- current dir stream
MutByteArray
Int
| ChunkStreamByteLoopPending
| ChunkStreamReallocBuf
(Ptr CChar) -- pending item
PosixPath -- current dir path
[PosixPath] -- remaining dirs
(Ptr CDir) -- current dir
(Ptr CDir) -- current dir stream
MutByteArray
Int
| ChunkStreamDrainBuf
MutByteArray
Int

-- XXX Add follow-symlinks option.
-- XXX Detect cycles.
-- XXX Detect cycles. ELOOP can be used to avoid cycles, but we can also detect
-- them proactively.

-- XXX Since we are separating paths by newlines, it cannot support newlines in
-- paths. Or we can return null separated paths as well. Provide a Mut array
Expand All @@ -579,15 +593,34 @@ data ChunkStreamByteState =
--
-- A fold may be useful to translate the output to whatever format we want, we
-- can add a prefix or we can colorize it.

-- | Left is directories. Right is a buffer containing directories and files
-- separated by newlines.
--
-- XXX Use bufSize, recursive traversal, split strategy, output entries
-- separator as config options. When not using concurrently we do not need to
-- split the work at all.
--
-- XXX Currently we are quite aggressive in splitting the work because we have
-- no knowledge of whether we need to or not. But this leads to more overhead.
-- Instead, we can measure the coarse monotonic and process cpu time after
-- every n system calls or n iterations. If the cpu utilization is low then
-- yield the dirs otherwise dont. We can use an async thread for computing cpu
-- utilization periodically and all other threads can just read it from an
-- IORef. So this can be shared across all such consumers.

-- | This function may not traverse all the directories supplied and it may
-- traverse the directories recursively. Left contains those directories that
-- were not traversed by this function, these my be the directories that were
-- supplied as input as well as newly discovered directories during traversal.
-- To traverse the entire tree we have to iterate this function on the Left
-- output.
--
-- Right is a buffer containing directories and files separated by newlines.
--
{-# INLINE readEitherByteChunks #-}
readEitherByteChunks :: MonadIO m =>
(ReadOptions -> ReadOptions) ->
[PosixPath] -> Stream m (Either [PosixPath] (Array Word8))
readEitherByteChunks confMod alldirs =
Stream step (ChunkStreamByteInit0)
Stream step ChunkStreamByteInit

where

Expand All @@ -605,121 +638,147 @@ readEitherByteChunks confMod alldirs =
-- XXX Alternatively, we can distribute the dir stream over multiple
-- concurrent folds and return (monadic output) a stream of arrays created
-- from the output channel, then consume that stream by using a monad bind.
bufSize = 4000

bufSize = 32000

copyToBuf dstArr pos dirPath name = do
nameLen <- fmap fromIntegral (liftIO $ c_strlen name)
let PosixPath (Array dirArr start end) = dirPath
dirLen = end - start
-- We know it is already pinned.
MutByteArray.unsafeAsPtr dstArr (\ptr -> liftIO $ do
-- XXX We may need to decode and encode the path if the
-- output encoding differs from fs encoding.
--
-- Account for separator and newline bytes.
byteCount = dirLen + nameLen + 2
if pos + byteCount <= bufSize
then do
-- XXX append a path separator to a dir path
-- We know it is already pinned.
MutByteArray.unsafeAsPtr dstArr (\ptr -> liftIO $ do
MutByteArray.unsafePutSlice dirArr start dstArr pos dirLen
let ptr1 = ptr `plusPtr` (pos + dirLen)
separator = 47 :: Word8
poke ptr1 separator
let ptr2 = ptr1 `plusPtr` 1
_ <- c_memcpy ptr2 (castPtr name) (fromIntegral nameLen)
let ptr3 = ptr2 `plusPtr` nameLen
newline = 10 :: Word8
poke ptr3 newline
)
return (Just (pos + byteCount))
else return Nothing

step _ ChunkStreamByteInit0 = do
let PosixPath (Array dirArr start end) = dirPath
dirLen = end - start
endDir = pos + dirLen
endPos = endDir + nameLen + 2 -- sep + newline
sepOff = ptr `plusPtr` endDir -- separator offset
nameOff = sepOff `plusPtr` 1 -- file name offset
nlOff = nameOff `plusPtr` nameLen -- newline offset
separator = 47 :: Word8
newline = 10 :: Word8
if (endPos < bufSize)
then do
-- XXX We can keep a trailing separator on the dir itself.
MutByteArray.unsafePutSlice dirArr start dstArr pos dirLen
poke sepOff separator
_ <- c_memcpy nameOff (castPtr name) (fromIntegral nameLen)
poke nlOff newline
return (Just endPos)
else return Nothing
)

step _ ChunkStreamByteInit = do
mbarr <- liftIO $ MutByteArray.new' bufSize
return $ Skip (ChunkStreamByteInit alldirs [] 0 mbarr 0)

step _ (ChunkStreamByteInit (x:xs) dirs ndirs mbarr pos) = do
DirStream dirp <- liftIO $ openDirStream x
return $ Skip (ChunkStreamByteLoop x xs dirp dirs ndirs mbarr pos)

step _ (ChunkStreamByteInit [] [] _ _ pos) | pos == 0 =
return Stop

step _ (ChunkStreamByteInit [] [] _ mbarr pos) =
return $ Yield (Right (Array mbarr 0 pos)) (ChunkStreamByteInit [] [] 0 mbarr 0)
case alldirs of
(x:xs) -> do
DirStream dirp <- liftIO $ openDirStream x
return $ Skip $ ChunkStreamByteLoop x xs dirp mbarr 0
[] -> return Stop

step _ (ChunkStreamByteInit [] dirs _ mbarr pos) =
return $ Yield (Left dirs) (ChunkStreamByteInit [] [] 0 mbarr pos)
step _ ChunkStreamByteStop = return Stop

step _ (ChunkStreamByteLoopPending pending curdir xs dirp mbarr pos) = do
step _ (ChunkStreamReallocBuf pending curdir xs dirp mbarr pos) = do
mbarr1 <- liftIO $ MutByteArray.new' bufSize
r1 <- copyToBuf mbarr1 0 curdir pending
case r1 of
Just pos2 ->
return $ Yield (Right (Array mbarr 0 pos))
-- When we come in this state we have emitted dirs
(ChunkStreamByteLoop curdir xs dirp [] 0 mbarr1 pos2)
(ChunkStreamByteLoop curdir xs dirp mbarr1 pos2)
Nothing -> error "Dirname too big for bufSize"

step _ st@(ChunkStreamByteLoop curdir xs dirp dirs ndirs mbarr pos) = do
liftIO resetErrno
dentPtr <- liftIO $ c_readdir dirp
if (dentPtr /= nullPtr)
then do
let dname = #{ptr struct dirent, d_name} dentPtr
dtype :: #{type unsigned char} <-
liftIO $ #{peek struct dirent, d_type} dentPtr

-- XXX Skips come around the entire loop, does that impact perf
-- because it has a StreamK in the middle.
-- Keep the file check first as it is more likely

etype <- liftIO $ getEntryType conf curdir dname dtype
case etype of
EntryIsNotDir -> do
step _ (ChunkStreamDrainBuf mbarr pos) =
if pos == 0
then return Stop
else return $ Yield (Right (Array mbarr 0 pos)) ChunkStreamByteStop

step _ (ChunkStreamByteLoop icurdir ixs idirp mbarr ipos) = do
goOuter icurdir idirp ixs ipos

where

-- This is recursed only when we open the next dir
-- Encapsulates curdir and dirp as static arguments
goOuter curdir dirp = goInner

where

-- This is recursed each time we find a dir
-- Encapsulates dirs as static argument
goInner dirs = nextEntry

where

{-# INLINE nextEntry #-}
nextEntry pos = do
liftIO resetErrno
dentPtr <- liftIO $ c_readdir dirp
if dentPtr /= nullPtr
then handleDentry pos dentPtr
else handleErr pos

openNextDir pos =
case dirs of
(x:xs) -> do
DirStream dirp1 <- liftIO $ openDirStream x
goOuter x dirp1 xs pos
[] ->
if pos == 0
then return Stop
else return
$ Yield
(Right (Array mbarr 0 pos))
ChunkStreamByteStop

handleErr pos = do
errno <- liftIO getErrno
if (errno /= eINTR)
then do
let (Errno n) = errno
liftIO $ closeDirStream (DirStream dirp)
if (n == 0)
then openNextDir pos
else liftIO $ throwErrno "readEitherByteChunks"
else nextEntry pos

splitAndRealloc pos dname xs =
case xs of
[] ->
return $ Skip
(ChunkStreamReallocBuf dname curdir
[] dirp mbarr pos)
_ -> do
let (h,t) = splitHalf xs
return $ Yield (Left t)
(ChunkStreamReallocBuf dname curdir
h dirp mbarr pos)

{-# INLINE handleFileEnt #-}
handleFileEnt pos dname = do
r <- copyToBuf mbarr pos curdir dname
case r of
Just pos1 ->
return $ Skip
(ChunkStreamByteLoop curdir xs dirp dirs ndirs mbarr pos1)
Nothing -> do
-- XXX we do not need to yield the out dirs here
-- XXX But we should yield if the number of dirs
-- become more than a threshold.
if ndirs > 0
then
return $ Yield (Left dirs)
(ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos)
else
return $ Skip
(ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos)
EntryIsDir -> do
Just pos1 -> nextEntry pos1
Nothing -> splitAndRealloc pos dname dirs

{-# INLINE handleDirEnt #-}
handleDirEnt pos dname = do
path <- liftIO $ appendCString curdir dname
let dirs1 = path : dirs
ndirs1 = ndirs + 1
r <- copyToBuf mbarr pos curdir dname
case r of
Just pos1 ->
return $ Skip
(ChunkStreamByteLoop curdir xs dirp dirs1 ndirs1 mbarr pos1)
Nothing -> do
-- We know dirs1 in not empty here
-- XXX Yield only if dirs are more than a threshold
-- otherwise skip.
return $ Yield (Left dirs1)
(ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos)
EntryIgnored -> return $ Skip st
else do
errno <- liftIO getErrno
if (errno == eINTR)
then return $ Skip st
else do
let (Errno n) = errno
-- XXX Exception safety
liftIO $ closeDirStream (DirStream dirp)
if (n == 0)
then return $ Skip (ChunkStreamByteInit xs dirs ndirs mbarr pos)
else liftIO $ throwErrno "readEitherByteChunks"
Just pos1 -> goInner dirs1 pos1
Nothing -> splitAndRealloc pos dname dirs1

handleDentry pos dentPtr = do
let dname = #{ptr struct dirent, d_name} dentPtr
dtype :: #{type unsigned char} <-
liftIO $ #{peek struct dirent, d_type} dentPtr

etype <- liftIO $ getEntryType conf curdir dname dtype
case etype of
EntryIsNotDir -> handleFileEnt pos dname
EntryIsDir -> handleDirEnt pos dname
EntryIgnored -> nextEntry pos

{-# ANN type ByteChunksAt Fuse #-}
data ByteChunksAt =
Expand Down
Loading