Skip to content

Commit bbc06cd

Browse files
Optimize readEitherByteChunks for better perf
1 parent acfb443 commit bbc06cd

1 file changed

Lines changed: 143 additions & 83 deletions

File tree

core/src/Streamly/Internal/FileSystem/Posix/ReadDir.hsc

Lines changed: 143 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ readScanWith :: -- (MonadIO m, MonadCatch m) =>
131131
-> Stream m a
132132
readScanWith = undefined
133133

134+
-- NOTE: See https://www.manpagez.com/man/2/getattrlistbulk/ for BSD/macOS.
135+
134136
-- | Read with full metadata.
135137
{-# INLINE readPlusScanWith #-}
136138
readPlusScanWith :: -- (MonadIO m, MonadCatch m) =>
@@ -546,28 +548,40 @@ foreign import ccall unsafe "string.h memcpy" c_memcpy
546548
foreign import ccall unsafe "string.h strlen" c_strlen
547549
:: Ptr CChar -> IO CSize
548550

551+
-- Split a list in half.
552+
splitHalf :: [a] -> ([a], [a])
553+
splitHalf xxs = split xxs xxs
554+
555+
where
556+
557+
split (x:xs) (_:_:ys) =
558+
let (f, s) = split xs ys
559+
in (x:f, s)
560+
split xs _ = ([], xs)
561+
549562
{-# ANN type ChunkStreamByteState Fuse #-}
550563
data ChunkStreamByteState =
551-
ChunkStreamByteInit0
552-
| ChunkStreamByteInit [PosixPath] [PosixPath] Int MutByteArray Int
564+
ChunkStreamByteInit
565+
| ChunkStreamByteStop
553566
| ChunkStreamByteLoop
554567
PosixPath -- current dir path
555568
[PosixPath] -- remaining dirs
556-
(Ptr CDir) -- current dir
557-
[PosixPath] -- dirs buffered
558-
Int -- dir count
569+
(Ptr CDir) -- current dir stream
559570
MutByteArray
560571
Int
561-
| ChunkStreamByteLoopPending
572+
| ChunkStreamReallocBuf
562573
(Ptr CChar) -- pending item
563574
PosixPath -- current dir path
564575
[PosixPath] -- remaining dirs
565-
(Ptr CDir) -- current dir
576+
(Ptr CDir) -- current dir stream
577+
MutByteArray
578+
Int
579+
| ChunkStreamDrainBuf
566580
MutByteArray
567581
Int
568582

569-
-- XXX Add follow-symlinks option.
570-
-- XXX Detect cycles.
583+
-- XXX Detect cycles. ELOOP can be used to avoid cycles, but we can also detect
584+
-- them proactively.
571585

572586
-- XXX Since we are separating paths by newlines, it cannot support newlines in
573587
-- paths. Or we can return null separated paths as well. Provide a Mut array
@@ -579,15 +593,34 @@ data ChunkStreamByteState =
579593
--
580594
-- A fold may be useful to translate the output to whatever format we want, we
581595
-- can add a prefix or we can colorize it.
582-
583-
-- | Left is directories. Right is a buffer containing directories and files
584-
-- separated by newlines.
596+
--
597+
-- XXX Use bufSize, recursive traversal, split strategy, output entries
598+
-- separator as config options. When not using concurrently we do not need to
599+
-- split the work at all.
600+
--
601+
-- XXX Currently we are quite aggressive in splitting the work because we have
602+
-- no knowledge of whether we need to or not. But this leads to more overhead.
603+
-- Instead, we can measure the coarse monotonic and process cpu time after
604+
-- every n system calls or n iterations. If the cpu utilization is low then
605+
-- yield the dirs otherwise dont. We can use an async thread for computing cpu
606+
-- utilization periodically and all other threads can just read it from an
607+
-- IORef. So this can be shared across all such consumers.
608+
609+
-- | This function may not traverse all the directories supplied and it may
610+
-- traverse the directories recursively. Left contains those directories that
611+
-- were not traversed by this function, these my be the directories that were
612+
-- supplied as input as well as newly discovered directories during traversal.
613+
-- To traverse the entire tree we have to iterate this function on the Left
614+
-- output.
615+
--
616+
-- Right is a buffer containing directories and files separated by newlines.
617+
--
585618
{-# INLINE readEitherByteChunks #-}
586619
readEitherByteChunks :: MonadIO m =>
587620
(ReadOptions -> ReadOptions) ->
588621
[PosixPath] -> Stream m (Either [PosixPath] (Array Word8))
589622
readEitherByteChunks confMod alldirs =
590-
Stream step (ChunkStreamByteInit0)
623+
Stream step ChunkStreamByteInit
591624

592625
where
593626

@@ -605,7 +638,8 @@ readEitherByteChunks confMod alldirs =
605638
-- XXX Alternatively, we can distribute the dir stream over multiple
606639
-- concurrent folds and return (monadic output) a stream of arrays created
607640
-- from the output channel, then consume that stream by using a monad bind.
608-
bufSize = 4000
641+
642+
bufSize = 32000
609643

610644
copyToBuf dstArr pos dirPath name = do
611645
nameLen <- fmap fromIntegral (liftIO $ c_strlen name)
@@ -634,92 +668,118 @@ readEitherByteChunks confMod alldirs =
634668
return (Just (pos + byteCount))
635669
else return Nothing
636670

637-
step _ ChunkStreamByteInit0 = do
671+
step _ ChunkStreamByteInit = do
638672
mbarr <- liftIO $ MutByteArray.new' bufSize
639-
return $ Skip (ChunkStreamByteInit alldirs [] 0 mbarr 0)
640-
641-
step _ (ChunkStreamByteInit (x:xs) dirs ndirs mbarr pos) = do
642-
DirStream dirp <- liftIO $ openDirStream x
643-
return $ Skip (ChunkStreamByteLoop x xs dirp dirs ndirs mbarr pos)
644-
645-
step _ (ChunkStreamByteInit [] [] _ _ pos) | pos == 0 =
646-
return Stop
673+
case alldirs of
674+
(x:xs) -> do
675+
DirStream dirp <- liftIO $ openDirStream x
676+
return $ Skip $ ChunkStreamByteLoop x xs dirp mbarr 0
677+
[] -> return Stop
647678

648-
step _ (ChunkStreamByteInit [] [] _ mbarr pos) =
649-
return $ Yield (Right (Array mbarr 0 pos)) (ChunkStreamByteInit [] [] 0 mbarr 0)
679+
step _ ChunkStreamByteStop = return Stop
650680

651-
step _ (ChunkStreamByteInit [] dirs _ mbarr pos) =
652-
return $ Yield (Left dirs) (ChunkStreamByteInit [] [] 0 mbarr pos)
653-
654-
step _ (ChunkStreamByteLoopPending pending curdir xs dirp mbarr pos) = do
681+
step _ (ChunkStreamReallocBuf pending curdir xs dirp mbarr pos) = do
655682
mbarr1 <- liftIO $ MutByteArray.new' bufSize
656683
r1 <- copyToBuf mbarr1 0 curdir pending
657684
case r1 of
658685
Just pos2 ->
659686
return $ Yield (Right (Array mbarr 0 pos))
660687
-- When we come in this state we have emitted dirs
661-
(ChunkStreamByteLoop curdir xs dirp [] 0 mbarr1 pos2)
688+
(ChunkStreamByteLoop curdir xs dirp mbarr1 pos2)
662689
Nothing -> error "Dirname too big for bufSize"
663690

664-
step _ st@(ChunkStreamByteLoop curdir xs dirp dirs ndirs mbarr pos) = do
665-
liftIO resetErrno
666-
dentPtr <- liftIO $ c_readdir dirp
667-
if (dentPtr /= nullPtr)
668-
then do
669-
let dname = #{ptr struct dirent, d_name} dentPtr
670-
dtype :: #{type unsigned char} <-
671-
liftIO $ #{peek struct dirent, d_type} dentPtr
672-
673-
-- XXX Skips come around the entire loop, does that impact perf
674-
-- because it has a StreamK in the middle.
675-
-- Keep the file check first as it is more likely
676-
677-
etype <- liftIO $ getEntryType conf curdir dname dtype
678-
case etype of
679-
EntryIsNotDir -> do
691+
step _ (ChunkStreamDrainBuf mbarr pos) =
692+
if pos == 0
693+
then return Stop
694+
else return $ Yield (Right (Array mbarr 0 pos)) ChunkStreamByteStop
695+
696+
step _ (ChunkStreamByteLoop icurdir ixs idirp mbarr ipos) = do
697+
goOuter icurdir idirp ixs ipos
698+
699+
where
700+
701+
-- This is recursed only when we open the next dir
702+
-- Encapsulates curdir and dirp as static arguments
703+
goOuter curdir dirp = goInner
704+
705+
where
706+
707+
-- This is recursed each time we find a dir
708+
-- Encapsulates dirs as static argument
709+
goInner dirs = nextEntry
710+
711+
where
712+
713+
{-# INLINE nextEntry #-}
714+
nextEntry pos = do
715+
liftIO resetErrno
716+
dentPtr <- liftIO $ c_readdir dirp
717+
if dentPtr /= nullPtr
718+
then handleDentry pos dentPtr
719+
else handleErr pos
720+
721+
openNextDir pos =
722+
case dirs of
723+
(x:xs) -> do
724+
DirStream dirp1 <- liftIO $ openDirStream x
725+
goOuter x dirp1 xs pos
726+
[] ->
727+
if pos == 0
728+
then return Stop
729+
else return
730+
$ Yield
731+
(Right (Array mbarr 0 pos))
732+
ChunkStreamByteStop
733+
734+
handleErr pos = do
735+
errno <- liftIO getErrno
736+
if (errno /= eINTR)
737+
then do
738+
let (Errno n) = errno
739+
liftIO $ closeDirStream (DirStream dirp)
740+
if (n == 0)
741+
then openNextDir pos
742+
else liftIO $ throwErrno "readEitherByteChunks"
743+
else nextEntry pos
744+
745+
splitAndRealloc pos dname xs =
746+
case xs of
747+
[] ->
748+
return $ Skip
749+
(ChunkStreamReallocBuf dname curdir
750+
[] dirp mbarr pos)
751+
_ -> do
752+
let (h,t) = splitHalf xs
753+
return $ Yield (Left t)
754+
(ChunkStreamReallocBuf dname curdir
755+
h dirp mbarr pos)
756+
757+
{-# INLINE handleFileEnt #-}
758+
handleFileEnt pos dname = do
680759
r <- copyToBuf mbarr pos curdir dname
681760
case r of
682-
Just pos1 ->
683-
return $ Skip
684-
(ChunkStreamByteLoop curdir xs dirp dirs ndirs mbarr pos1)
685-
Nothing -> do
686-
-- XXX we do not need to yield the out dirs here
687-
-- XXX But we should yield if the number of dirs
688-
-- become more than a threshold.
689-
if ndirs > 0
690-
then
691-
return $ Yield (Left dirs)
692-
(ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos)
693-
else
694-
return $ Skip
695-
(ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos)
696-
EntryIsDir -> do
761+
Just pos1 -> nextEntry pos1
762+
Nothing -> splitAndRealloc pos dname dirs
763+
764+
{-# INLINE handleDirEnt #-}
765+
handleDirEnt pos dname = do
697766
path <- liftIO $ appendCString curdir dname
698767
let dirs1 = path : dirs
699-
ndirs1 = ndirs + 1
700768
r <- copyToBuf mbarr pos curdir dname
701769
case r of
702-
Just pos1 ->
703-
return $ Skip
704-
(ChunkStreamByteLoop curdir xs dirp dirs1 ndirs1 mbarr pos1)
705-
Nothing -> do
706-
-- We know dirs1 in not empty here
707-
-- XXX Yield only if dirs are more than a threshold
708-
-- otherwise skip.
709-
return $ Yield (Left dirs1)
710-
(ChunkStreamByteLoopPending dname curdir xs dirp mbarr pos)
711-
EntryIgnored -> return $ Skip st
712-
else do
713-
errno <- liftIO getErrno
714-
if (errno == eINTR)
715-
then return $ Skip st
716-
else do
717-
let (Errno n) = errno
718-
-- XXX Exception safety
719-
liftIO $ closeDirStream (DirStream dirp)
720-
if (n == 0)
721-
then return $ Skip (ChunkStreamByteInit xs dirs ndirs mbarr pos)
722-
else liftIO $ throwErrno "readEitherByteChunks"
770+
Just pos1 -> goInner dirs1 pos1
771+
Nothing -> splitAndRealloc pos dname dirs1
772+
773+
handleDentry pos dentPtr = do
774+
let dname = #{ptr struct dirent, d_name} dentPtr
775+
dtype :: #{type unsigned char} <-
776+
liftIO $ #{peek struct dirent, d_type} dentPtr
777+
778+
etype <- liftIO $ getEntryType conf curdir dname dtype
779+
case etype of
780+
EntryIsNotDir -> handleFileEnt pos dname
781+
EntryIsDir -> handleDirEnt pos dname
782+
EntryIgnored -> nextEntry pos
723783

724784
{-# ANN type ByteChunksAt Fuse #-}
725785
data ByteChunksAt =

0 commit comments

Comments
 (0)