diff --git a/core/src/Streamly/Internal/FileSystem/Posix/ReadDir.hsc b/core/src/Streamly/Internal/FileSystem/Posix/ReadDir.hsc index 9e5264c74e..7f9e7c24ab 100644 --- a/core/src/Streamly/Internal/FileSystem/Posix/ReadDir.hsc +++ b/core/src/Streamly/Internal/FileSystem/Posix/ReadDir.hsc @@ -131,6 +131,8 @@ readScanWith :: -- (MonadIO m, MonadCatch m) => -> Stream m a readScanWith = undefined +-- NOTE: See https://www.manpagez.com/man/2/getattrlistbulk/ for BSD/macOS. + -- | Read with full metadata. {-# INLINE readPlusScanWith #-} readPlusScanWith :: -- (MonadIO m, MonadCatch m) => @@ -546,28 +548,40 @@ foreign import ccall unsafe "string.h memcpy" c_memcpy foreign import ccall unsafe "string.h strlen" c_strlen :: Ptr CChar -> IO CSize +-- Split a list in half. +splitHalf :: [a] -> ([a], [a]) +splitHalf xxs = split xxs xxs + + where + + split (x:xs) (_:_:ys) = + let (f, s) = split xs ys + in (x:f, s) + split xs _ = ([], xs) + {-# ANN type ChunkStreamByteState Fuse #-} data ChunkStreamByteState = - ChunkStreamByteInit0 - | ChunkStreamByteInit [PosixPath] [PosixPath] Int MutByteArray Int + ChunkStreamByteInit + | ChunkStreamByteStop | ChunkStreamByteLoop PosixPath -- current dir path [PosixPath] -- remaining dirs - (Ptr CDir) -- current dir - [PosixPath] -- dirs buffered - Int -- dir count + (Ptr CDir) -- current dir stream MutByteArray Int - | ChunkStreamByteLoopPending + | ChunkStreamReallocBuf (Ptr CChar) -- pending item PosixPath -- current dir path [PosixPath] -- remaining dirs - (Ptr CDir) -- current dir + (Ptr CDir) -- current dir stream + MutByteArray + Int + | ChunkStreamDrainBuf MutByteArray Int --- XXX Add follow-symlinks option. --- XXX Detect cycles. +-- XXX Detect cycles. ELOOP can be used to avoid cycles, but we can also detect +-- them proactively. -- XXX Since we are separating paths by newlines, it cannot support newlines in -- paths. Or we can return null separated paths as well. Provide a Mut array @@ -579,15 +593,34 @@ data ChunkStreamByteState = -- -- A fold may be useful to translate the output to whatever format we want, we -- can add a prefix or we can colorize it. - --- | Left is directories. Right is a buffer containing directories and files --- separated by newlines. +-- +-- XXX Use bufSize, recursive traversal, split strategy, output entries +-- separator as config options. When not using concurrently we do not need to +-- split the work at all. +-- +-- XXX Currently we are quite aggressive in splitting the work because we have +-- no knowledge of whether we need to or not. But this leads to more overhead. +-- Instead, we can measure the coarse monotonic and process cpu time after +-- every n system calls or n iterations. If the cpu utilization is low then +-- yield the dirs otherwise dont. We can use an async thread for computing cpu +-- utilization periodically and all other threads can just read it from an +-- IORef. So this can be shared across all such consumers. + +-- | This function may not traverse all the directories supplied and it may +-- traverse the directories recursively. Left contains those directories that +-- were not traversed by this function, these my be the directories that were +-- supplied as input as well as newly discovered directories during traversal. +-- To traverse the entire tree we have to iterate this function on the Left +-- output. +-- +-- Right is a buffer containing directories and files separated by newlines. +-- {-# INLINE readEitherByteChunks #-} readEitherByteChunks :: MonadIO m => (ReadOptions -> ReadOptions) -> [PosixPath] -> Stream m (Either [PosixPath] (Array Word8)) readEitherByteChunks confMod alldirs = - Stream step (ChunkStreamByteInit0) + Stream step ChunkStreamByteInit where @@ -605,121 +638,147 @@ readEitherByteChunks confMod alldirs = -- XXX Alternatively, we can distribute the dir stream over multiple -- concurrent folds and return (monadic output) a stream of arrays created -- from the output channel, then consume that stream by using a monad bind. - bufSize = 4000 + + bufSize = 32000 copyToBuf dstArr pos dirPath name = do nameLen <- fmap fromIntegral (liftIO $ c_strlen name) - let PosixPath (Array dirArr start end) = dirPath - dirLen = end - start + -- We know it is already pinned. + MutByteArray.unsafeAsPtr dstArr (\ptr -> liftIO $ do -- XXX We may need to decode and encode the path if the -- output encoding differs from fs encoding. - -- - -- Account for separator and newline bytes. - byteCount = dirLen + nameLen + 2 - if pos + byteCount <= bufSize - then do - -- XXX append a path separator to a dir path - -- We know it is already pinned. - MutByteArray.unsafeAsPtr dstArr (\ptr -> liftIO $ do - MutByteArray.unsafePutSlice dirArr start dstArr pos dirLen - let ptr1 = ptr `plusPtr` (pos + dirLen) - separator = 47 :: Word8 - poke ptr1 separator - let ptr2 = ptr1 `plusPtr` 1 - _ <- c_memcpy ptr2 (castPtr name) (fromIntegral nameLen) - let ptr3 = ptr2 `plusPtr` nameLen - newline = 10 :: Word8 - poke ptr3 newline - ) - return (Just (pos + byteCount)) - else return Nothing - - step _ ChunkStreamByteInit0 = do + let PosixPath (Array dirArr start end) = dirPath + dirLen = end - start + endDir = pos + dirLen + endPos = endDir + nameLen + 2 -- sep + newline + sepOff = ptr `plusPtr` endDir -- separator offset + nameOff = sepOff `plusPtr` 1 -- file name offset + nlOff = nameOff `plusPtr` nameLen -- newline offset + separator = 47 :: Word8 + newline = 10 :: Word8 + if (endPos < bufSize) + then do + -- XXX We can keep a trailing separator on the dir itself. + MutByteArray.unsafePutSlice dirArr start dstArr pos dirLen + poke sepOff separator + _ <- c_memcpy nameOff (castPtr name) (fromIntegral nameLen) + poke nlOff newline + return (Just endPos) + else return Nothing + ) + + step _ ChunkStreamByteInit = do mbarr <- liftIO $ MutByteArray.new' bufSize - return $ Skip (ChunkStreamByteInit alldirs [] 0 mbarr 0) - - step _ (ChunkStreamByteInit (x:xs) dirs ndirs mbarr pos) = do - DirStream dirp <- liftIO $ openDirStream x - return $ Skip (ChunkStreamByteLoop x xs dirp dirs ndirs mbarr pos) - - step _ (ChunkStreamByteInit [] [] _ _ pos) | pos == 0 = - return Stop - - step _ (ChunkStreamByteInit [] [] _ mbarr pos) = - return $ Yield (Right (Array mbarr 0 pos)) (ChunkStreamByteInit [] [] 0 mbarr 0) + case alldirs of + (x:xs) -> do + DirStream dirp <- liftIO $ openDirStream x + return $ Skip $ ChunkStreamByteLoop x xs dirp mbarr 0 + [] -> return Stop - step _ (ChunkStreamByteInit [] dirs _ mbarr pos) = - return $ Yield (Left dirs) (ChunkStreamByteInit [] [] 0 mbarr pos) + step _ ChunkStreamByteStop = return Stop - step _ (ChunkStreamByteLoopPending pending curdir xs dirp mbarr pos) = do + step _ (ChunkStreamReallocBuf pending curdir xs dirp mbarr pos) = do mbarr1 <- liftIO $ MutByteArray.new' bufSize r1 <- copyToBuf mbarr1 0 curdir pending case r1 of Just pos2 -> return $ Yield (Right (Array mbarr 0 pos)) -- When we come in this state we have emitted dirs - (ChunkStreamByteLoop curdir xs dirp [] 0 mbarr1 pos2) + (ChunkStreamByteLoop curdir xs dirp mbarr1 pos2) Nothing -> error "Dirname too big for bufSize" - step _ st@(ChunkStreamByteLoop curdir xs dirp dirs ndirs mbarr pos) = do - liftIO resetErrno - dentPtr <- liftIO $ c_readdir dirp - if (dentPtr /= nullPtr) - then do - let dname = #{ptr struct dirent, d_name} dentPtr - dtype :: #{type unsigned char} <- - liftIO $ #{peek struct dirent, d_type} dentPtr - - -- XXX Skips come around the entire loop, does that impact perf - -- because it has a StreamK in the middle. - -- Keep the file check first as it is more likely - - etype <- liftIO $ getEntryType conf curdir dname dtype - case etype of - EntryIsNotDir -> do + step _ (ChunkStreamDrainBuf mbarr pos) = + if pos == 0 + then return Stop + else return $ Yield (Right (Array mbarr 0 pos)) ChunkStreamByteStop + + step _ (ChunkStreamByteLoop icurdir ixs idirp mbarr ipos) = do + goOuter icurdir idirp ixs ipos + + where + + -- This is recursed only when we open the next dir + -- Encapsulates curdir and dirp as static arguments + goOuter curdir dirp = goInner + + where + + -- This is recursed each time we find a dir + -- Encapsulates dirs as static argument + goInner dirs = nextEntry + + where + + {-# INLINE nextEntry #-} + nextEntry pos = do + liftIO resetErrno + dentPtr <- liftIO $ c_readdir dirp + if dentPtr /= nullPtr + then handleDentry pos dentPtr + else handleErr pos + + openNextDir pos = + case dirs of + (x:xs) -> do + DirStream dirp1 <- liftIO $ openDirStream x + goOuter x dirp1 xs pos + [] -> + if pos == 0 + then return Stop + else return + $ Yield + (Right (Array mbarr 0 pos)) + ChunkStreamByteStop + + handleErr pos = do + errno <- liftIO getErrno + if (errno /= eINTR) + then do + let (Errno n) = errno + liftIO $ closeDirStream (DirStream dirp) + if (n == 0) + then openNextDir pos + else liftIO $ throwErrno "readEitherByteChunks" + else nextEntry pos + + splitAndRealloc pos dname xs = + case xs of + [] -> + return $ Skip + (ChunkStreamReallocBuf dname curdir + [] dirp mbarr pos) + _ -> do + let (h,t) = splitHalf xs + return $ Yield (Left t) + (ChunkStreamReallocBuf dname curdir + h dirp mbarr pos) + + {-# INLINE handleFileEnt #-} + handleFileEnt pos dname = do r <- copyToBuf mbarr pos curdir dname case r of - Just pos1 -> - return $ Skip - (ChunkStreamByteLoop curdir xs dirp dirs ndirs mbarr pos1) - Nothing -> do - -- XXX we do not need to yield the out dirs here - -- XXX But we should yield if the number of dirs - -- become more than a threshold. - if ndirs > 0 - then - return $ Yield (Left dirs) - (ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos) - else - return $ Skip - (ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos) - EntryIsDir -> do + Just pos1 -> nextEntry pos1 + Nothing -> splitAndRealloc pos dname dirs + + {-# INLINE handleDirEnt #-} + handleDirEnt pos dname = do path <- liftIO $ appendCString curdir dname let dirs1 = path : dirs - ndirs1 = ndirs + 1 r <- copyToBuf mbarr pos curdir dname case r of - Just pos1 -> - return $ Skip - (ChunkStreamByteLoop curdir xs dirp dirs1 ndirs1 mbarr pos1) - Nothing -> do - -- We know dirs1 in not empty here - -- XXX Yield only if dirs are more than a threshold - -- otherwise skip. - return $ Yield (Left dirs1) - (ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos) - EntryIgnored -> return $ Skip st - else do - errno <- liftIO getErrno - if (errno == eINTR) - then return $ Skip st - else do - let (Errno n) = errno - -- XXX Exception safety - liftIO $ closeDirStream (DirStream dirp) - if (n == 0) - then return $ Skip (ChunkStreamByteInit xs dirs ndirs mbarr pos) - else liftIO $ throwErrno "readEitherByteChunks" + Just pos1 -> goInner dirs1 pos1 + Nothing -> splitAndRealloc pos dname dirs1 + + handleDentry pos dentPtr = do + let dname = #{ptr struct dirent, d_name} dentPtr + dtype :: #{type unsigned char} <- + liftIO $ #{peek struct dirent, d_type} dentPtr + + etype <- liftIO $ getEntryType conf curdir dname dtype + case etype of + EntryIsNotDir -> handleFileEnt pos dname + EntryIsDir -> handleDirEnt pos dname + EntryIgnored -> nextEntry pos {-# ANN type ByteChunksAt Fuse #-} data ByteChunksAt =