From 24e7ee8cc8a40f398bedf22b4d8eb5e492ed4a0a Mon Sep 17 00:00:00 2001 From: Adithya Kumar Date: Sat, 22 Mar 2025 03:40:38 +0530 Subject: [PATCH 1/3] Add a test-suite for concurrent scans --- streamly.cabal | 1 + test/Streamly/Test/Data/Scanl/Concurrent.hs | 81 +++++++++++++++++++++ test/streamly-tests.cabal | 8 ++ 3 files changed, 90 insertions(+) create mode 100644 test/Streamly/Test/Data/Scanl/Concurrent.hs diff --git a/streamly.cabal b/streamly.cabal index 3b86be46f3..f544f5d20a 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -136,6 +136,7 @@ extra-source-files: test/Streamly/Test/Prelude/*.hs test/Streamly/Test/Unicode/*.hs test/Streamly/Test/Serialize/*.hs + test/Streamly/Test/Data/Scanl/*.hs test/Streamly/Test/Data/Fold/*.hs test/lib/Streamly/Test/Common.hs test/lib/Streamly/Test/Prelude/Common.hs diff --git a/test/Streamly/Test/Data/Scanl/Concurrent.hs b/test/Streamly/Test/Data/Scanl/Concurrent.hs new file mode 100644 index 0000000000..36217d2bb8 --- /dev/null +++ b/test/Streamly/Test/Data/Scanl/Concurrent.hs @@ -0,0 +1,81 @@ +-- | +-- Module : Streamly.Test.Data.Scanl.Concurrent +-- Copyright : (c) 2020 Composewell Technologies +-- +-- License : BSD-3-Clause +-- Maintainer : streamly@composewell.com +-- Stability : experimental +-- Portability : GHC + +module Streamly.Test.Data.Scanl.Concurrent (main) where + +import Control.Concurrent (threadDelay) +import Data.Function ( (&) ) +import Data.IORef (newIORef, atomicModifyIORef') +import Data.List (sort) +import Streamly.Data.Scanl (Scanl) +import Test.Hspec as H + +import qualified Streamly.Data.Fold as Fold +import qualified Streamly.Data.Stream as Stream +import qualified Streamly.Data.Stream.Prelude as Stream +import qualified Streamly.Internal.Data.Scanl as Scanl +import qualified Streamly.Internal.Data.Scanl.Prelude as Scanl + +moduleName :: String +moduleName = "Data.Scanl.Concurrent" + +--------------------------------------------------------------------------- +-- Main +--------------------------------------------------------------------------- + +evenScan :: Scanl IO Int (Maybe Int) +evenScan = + Scanl.filtering even + & Scanl.lmapM (\x -> threadDelay 100 >> pure x) + +oddScan :: Scanl IO Int (Maybe Int) +oddScan = + Scanl.filtering odd + & Scanl.lmapM (\x -> threadDelay 100 >> pure x) + +parDistributeScanTestScanEnd :: (Stream.Config -> Stream.Config) -> IO () +parDistributeScanTestScanEnd concOpts = do + let streamLen = 10000 + evenLen = 100 + ref <- newIORef [Scanl.take evenLen evenScan, oddScan] + let gen = atomicModifyIORef' ref (\xs -> ([], xs)) + inpList = [1..streamLen] + inpStream = Stream.fromList inpList + res1 <- + Scanl.parDistributeScan concOpts gen inpStream + & Stream.concatMap Stream.fromList + & Stream.catMaybes + & Stream.fold Fold.toList + sort res1 `shouldBe` [1..evenLen] ++ filter odd [(evenLen+1)..streamLen] + +parDistributeScanTestStreamEnd :: (Stream.Config -> Stream.Config) -> IO () +parDistributeScanTestStreamEnd concOpts = do + let streamLen = 10000 + ref <- newIORef [evenScan, oddScan] + let gen = atomicModifyIORef' ref (\xs -> ([], xs)) + inpList = [1..streamLen] + inpStream = Stream.fromList inpList + res1 <- + Scanl.parDistributeScan concOpts gen inpStream + & Stream.concatMap Stream.fromList + & Stream.catMaybes + & Stream.fold Fold.toList + sort res1 `shouldBe` inpList + +main :: IO () +main = hspec + $ H.parallel +#ifdef COVERAGE_BUILD + $ modifyMaxSuccess (const 10) +#endif + $ describe moduleName $ do + it "parDistributeScan (stream end) (maxBuffer 1)" + $ parDistributeScanTestStreamEnd (Stream.maxBuffer 1) + it "parDistributeScan (scan end)" + $ parDistributeScanTestScanEnd (Stream.maxBuffer 1) diff --git a/test/streamly-tests.cabal b/test/streamly-tests.cabal index 470955f4ef..74ce136143 100644 --- a/test/streamly-tests.cabal +++ b/test/streamly-tests.cabal @@ -304,6 +304,14 @@ test-suite Data.RingArray main-is: Streamly/Test/Data/RingArray.hs ghc-options: -main-is Streamly.Test.Data.RingArray.main +test-suite Data.Scanl.Concurrent + import: test-options + type: exitcode-stdio-1.0 + main-is: Streamly/Test/Data/Scanl/Concurrent.hs + ghc-options: -main-is Streamly.Test.Data.Scanl.Concurrent.main + if flag(use-streamly-core) + buildable: False + -- XXX Rename to MutByteArray test-suite Data.Serialize import: test-options From 0961c0aa5b0bda43b63338c86f2e840dd1fa8e94 Mon Sep 17 00:00:00 2001 From: Adithya Kumar Date: Mon, 24 Mar 2025 00:50:57 +0530 Subject: [PATCH 2/3] Change the edge-case behaviour of parDistributeScan - Add a new type of concurrent fold output indicating EOF - Add a reference in the channel: closedForInput - Check on closedForInput in sendToWorker_ --- .../Internal/Data/Fold/Channel/Type.hs | 53 ++++++++++++++----- src/Streamly/Internal/Data/Fold/Concurrent.hs | 11 +++- .../Internal/Data/Scanl/Concurrent.hs | 42 ++++++++------- 3 files changed, 71 insertions(+), 35 deletions(-) diff --git a/src/Streamly/Internal/Data/Fold/Channel/Type.hs b/src/Streamly/Internal/Data/Fold/Channel/Type.hs index 7a6c2875be..2d4a23690b 100644 --- a/src/Streamly/Internal/Data/Fold/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Fold/Channel/Type.hs @@ -45,6 +45,7 @@ import Data.List (intersperse) import Streamly.Internal.Control.Concurrent (MonadAsync, MonadRunInIO, askRunInIO) import Streamly.Internal.Control.ForkLifted (doForkWith) +import Streamly.Internal.Data.Atomics (writeBarrier) import Streamly.Internal.Data.Fold (Fold(..)) import Streamly.Internal.Data.Scanl (Scanl(..)) import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats) @@ -66,6 +67,7 @@ data OutEvent b = FoldException ThreadId SomeException | FoldPartial b | FoldDone ThreadId b + | FoldEOF ThreadId -- | The fold driver thread queues the input of the fold in the 'inputQueue' -- The driver rings the doorbell when the queue transitions from empty to @@ -107,6 +109,7 @@ data Channel m a b = Channel -- -- [LOCKING] Infrequent, MVar. , inputItemDoorBell :: MVar () + , closedForInput :: IORef Bool -- | Doorbell to tell the driver that there is now space available in the -- 'inputQueue' and more items can be queued. @@ -212,6 +215,11 @@ sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m () sendPartialToDriver sv res = liftIO $ do void $ sendToDriver sv (FoldPartial res) +sendEOFToDriver :: MonadIO m => Channel m a b -> m () +sendEOFToDriver sv = liftIO $ do + tid <- myThreadId + void $ sendToDriver sv (FoldEOF tid) + {-# NOINLINE sendExceptionToDriver #-} sendExceptionToDriver :: Channel m a b -> SomeException -> IO () sendExceptionToDriver sv e = do @@ -281,6 +289,7 @@ mkNewChannelWith outQRev outQMvRev cfg = do outQ <- newIORef ([], 0) outQMv <- newEmptyMVar bufferMv <- newEmptyMVar + ref <- newIORef False stats <- newSVarStats tid <- myThreadId @@ -292,6 +301,7 @@ mkNewChannelWith outQRev outQMvRev cfg = do , outputQueue = outQRev , outputDoorBell = outQMvRev , inputSpaceDoorBell = bufferMv + , closedForInput = ref , maxInputBuffer = getMaxBuffer cfg , readInputQ = liftIO $ fmap fst (readInputQWithDB sv) , svarRef = Nothing @@ -330,10 +340,12 @@ newChannelWith outq outqDBell modifier f = do let f1 = Fold.rmapM (void . sendYieldToDriver chan) f in D.fold f1 $ fromInputQueue chan +-- | Returns True if the fold terminated due to completion and False when due +-- to end-of-stream. {-# INLINE scanToChannel #-} -scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a () +scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Fold m a Bool scanToChannel chan (Scanl step initial extract final) = - Scanl step1 initial1 extract1 final1 + Fold step1 initial1 extract1 final1 where @@ -344,8 +356,9 @@ scanToChannel chan (Scanl step initial extract final) = b <- extract s void $ sendPartialToDriver chan b return $ Fold.Partial s - Fold.Done b -> - Fold.Done <$> void (sendYieldToDriver chan b) + Fold.Done b -> do + sendYieldToDriver chan b + return $ Fold.Done True step1 st x = do r <- step st x @@ -354,13 +367,16 @@ scanToChannel chan (Scanl step initial extract final) = b <- extract s void $ sendPartialToDriver chan b return $ Fold.Partial s - Fold.Done b -> - Fold.Done <$> void (sendYieldToDriver chan b) + Fold.Done b -> do + sendYieldToDriver chan b + return $ Fold.Done True - extract1 _ = return () + extract1 _ = error "extract: not supported by folds" -- XXX Should we not discard the result? - final1 st = void (final st) + final1 st = do + void (final st) + return False {-# INLINABLE newChannelWithScan #-} {-# SPECIALIZE newChannelWithScan :: @@ -386,7 +402,15 @@ newChannelWithScan outq outqDBell modifier f = do where {-# NOINLINE work #-} - work chan = D.drain $ D.scanl (scanToChannel chan f) $ fromInputQueue chan + work chan = do + completed <- D.fold (scanToChannel chan f) (fromInputQueue chan) + -- We check for only one item in the outputqueue, for example in + -- parTeeWith, multiple messages can make that complicated. Therefore, + -- we first check if we already sent a FoldDone. + when (not completed) $ sendEOFToDriver chan + liftIO $ writeIORef (closedForInput chan) True + liftIO writeBarrier + void $ liftIO $ tryPutMVar (inputSpaceDoorBell chan) () {-# INLINABLE newChannel #-} {-# SPECIALIZE newChannel :: @@ -441,7 +465,10 @@ checkFoldStatus sv = do case ev of FoldException _ e -> throwM e FoldDone _ b -> return (Just b) - FoldPartial _ -> undefined + FoldPartial _ -> + error "checkFoldStatus: FoldPartial can occur only for scans" + FoldEOF _ -> + error "checkFoldStatus: FoldEOF can occur only for scans" {-# INLINE isBufferAvailable #-} isBufferAvailable :: MonadIO m => Channel m a b -> m Bool @@ -510,10 +537,10 @@ sendToWorker_ chan a = go (inputItemDoorBell chan) (ChildYield a) else do - error "sendToWorker_: No space available in the buffer" -- Block for space - -- () <- liftIO $ takeMVar (inputSpaceDoorBell chan) - -- go + () <- liftIO $ takeMVar (inputSpaceDoorBell chan) + closed <- liftIO $ readIORef (closedForInput chan) + when (not closed) go -- XXX Cleanup the fold if the stream is interrupted. Add a GC hook. diff --git a/src/Streamly/Internal/Data/Fold/Concurrent.hs b/src/Streamly/Internal/Data/Fold/Concurrent.hs index 2dd6fb7d13..589f87d8cf 100644 --- a/src/Streamly/Internal/Data/Fold/Concurrent.hs +++ b/src/Streamly/Internal/Data/Fold/Concurrent.hs @@ -340,7 +340,11 @@ parDistributeScan cfg getFolds (Stream sstep state) = FoldDone tid b -> let ch = filter (\(_, t) -> t /= tid) chans in processOutputs ch xs (b:done) - FoldPartial _ -> undefined + FoldPartial _ -> + error "parDistributeScan: cannot occur for folds" + FoldEOF _ -> + error + "parDistributeScan: FoldEOF cannot occur for folds" collectOutputs qref chans = do (_, n) <- liftIO $ readIORef qref @@ -464,7 +468,10 @@ parDemuxScan cfg getKey getFold (Stream sstep state) = FoldDone _tid o@(k, _) -> let ch = Map.delete k keyToChan in processOutputs ch xs (o:done) - FoldPartial _ -> undefined + FoldPartial _ -> + error "parDemuxScan: cannot occur for folds" + FoldEOF _ -> + error "parDemuxScan: FoldEOF cannot occur for folds" collectOutputs qref keyToChan = do (_, n) <- liftIO $ readIORef qref diff --git a/src/Streamly/Internal/Data/Scanl/Concurrent.hs b/src/Streamly/Internal/Data/Scanl/Concurrent.hs index 6692ffe2cc..6c35deeb38 100644 --- a/src/Streamly/Internal/Data/Scanl/Concurrent.hs +++ b/src/Streamly/Internal/Data/Scanl/Concurrent.hs @@ -89,6 +89,7 @@ parTeeWith cfg f c1 c2 = Scanl step initial extract final liftIO $ throwM ex FoldDone _tid b -> return (Left b) FoldPartial b -> return (Right b) + FoldEOF _ -> error "parTeeWith: FoldEOF cannot occur here" _ -> error "parTeeWith: not expecting more than one msg in q" processResponses ch1 ch2 r1 r2 = @@ -145,12 +146,18 @@ data ScanState s q db f = -- XXX We can use a one way mailbox type abstraction instead of using an IORef -- for adding new folds dynamically. --- | Evaluate a stream and scan its outputs using zero or more dynamically --- generated parallel scans. It checks for any new folds at each input --- generation step. Any new fold is added to the list of folds which are --- currently running. If there are no folds available, the input is discarded. --- If a fold completes its output is emitted in the output of the scan. The --- outputs of the parallel scans are merged in the output stream. +-- | Evaluate a stream and scan its outputs using zero or more parallel scans, +-- which can be generated dynamically. It takes an action for producing new +-- scans which is run before processing each input. The list of scans produced +-- is added to the currently running scans. If you do not want the same scan +-- added every time then the action should generate it only once (see the +-- example below). If there are no scans available, the input is discarded. The +-- outputs of all the scans are merged in the output stream. +-- +-- If the input buffer (see maxBuffer) is limited then a scan may block until +-- space becomes available in the input buffer. If a scan blocks then input is +-- not provided to any of the scans, input is distributed to scans only when +-- all scans have input buffer available. -- -- >>> import Data.IORef -- >>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int] @@ -180,6 +187,9 @@ parDistributeScan cfg getFolds (Stream sstep state) = FoldDone tid b -> let ch = filter (\(_, t) -> t /= tid) chans in processOutputs ch xs (b:done) + FoldEOF tid -> do + let ch = filter (\(_, t) -> t /= tid) chans + in processOutputs ch xs done FoldPartial b -> processOutputs chans xs (b:done) @@ -209,20 +219,8 @@ parDistributeScan cfg getFolds (Stream sstep state) = res <- sstep (adaptState gst) st next <- case res of Yield x s -> do - -- XXX We might block forever if some folds are already - -- done but we have not read the output queue yet. To - -- avoid that we have to either (1) precheck if space - -- is available in the input queues of all folds so - -- that this does not block, or (2) we have to use a - -- non-blocking read and track progress so that we can - -- restart from where we left. - -- - -- If there is no space available then we should block - -- on doorbell db or inputSpaceDoorBell of the relevant - -- channel. To avoid deadlock the output space can be - -- kept unlimited. However, the blocking will delay the - -- processing of outputs. We should yield the outputs - -- before blocking. + -- XXX The blocking will delay the processing of outputs. + -- Should we yield the outputs before blocking? Prelude.mapM_ (`sendToWorker_` x) (fmap fst running) return $ ScanGo s q db running Skip s -> do @@ -305,6 +303,10 @@ parDemuxScan cfg getKey getFold (Stream sstep state) = FoldDone _tid o@(k, _) -> let ch = Map.delete k keyToChan in processOutputs ch xs (o:done) + FoldEOF tid -> + let chans = Map.toList keyToChan + ch = filter (\(_, (_, t)) -> t /= tid) chans + in processOutputs (Map.fromList ch) xs done FoldPartial b -> processOutputs keyToChan xs (b:done) From adfd7b539e32b7b8cb20b1751288eeaadc00a68b Mon Sep 17 00:00:00 2001 From: Adithya Kumar Date: Tue, 25 Mar 2025 20:42:18 +0530 Subject: [PATCH 3/3] Add tests for Concurrent.parDemuxScan --- .../Internal/Data/Scanl/Concurrent.hs | 14 ----- test/Streamly/Test/Data/Scanl/Concurrent.hs | 54 ++++++++++++++++--- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/src/Streamly/Internal/Data/Scanl/Concurrent.hs b/src/Streamly/Internal/Data/Scanl/Concurrent.hs index 6c35deeb38..bc02456ba5 100644 --- a/src/Streamly/Internal/Data/Scanl/Concurrent.hs +++ b/src/Streamly/Internal/Data/Scanl/Concurrent.hs @@ -345,20 +345,6 @@ parDemuxScan cfg getKey getFold (Stream sstep state) = r@(chan, _) <- newChannelWithScan q db cfg (fmap (k,) fld) return (Map.insert k r keyToChan1, chan) Just (chan, _) -> return (keyToChan1, chan) - -- XXX We might block forever if some folds are already - -- done but we have not read the output queue yet. To - -- avoid that we have to either (1) precheck if space - -- is available in the input queues of all folds so - -- that this does not block, or (2) we have to use a - -- non-blocking read and track progress so that we can - -- restart from where we left. - -- - -- If there is no space available then we should block - -- on doorbell db or inputSpaceDoorBell of the relevant - -- channel. To avoid deadlock the output space can be - -- kept unlimited. However, the blocking will delay the - -- processing of outputs. We should yield the outputs - -- before blocking. sendToWorker_ ch x return $ DemuxGo s q db keyToChan2 Skip s -> diff --git a/test/Streamly/Test/Data/Scanl/Concurrent.hs b/test/Streamly/Test/Data/Scanl/Concurrent.hs index 36217d2bb8..8657b9444c 100644 --- a/test/Streamly/Test/Data/Scanl/Concurrent.hs +++ b/test/Streamly/Test/Data/Scanl/Concurrent.hs @@ -39,8 +39,8 @@ oddScan = Scanl.filtering odd & Scanl.lmapM (\x -> threadDelay 100 >> pure x) -parDistributeScanTestScanEnd :: (Stream.Config -> Stream.Config) -> IO () -parDistributeScanTestScanEnd concOpts = do +parDistributeScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO () +parDistributeScan_ScanEnd concOpts = do let streamLen = 10000 evenLen = 100 ref <- newIORef [Scanl.take evenLen evenScan, oddScan] @@ -54,8 +54,28 @@ parDistributeScanTestScanEnd concOpts = do & Stream.fold Fold.toList sort res1 `shouldBe` [1..evenLen] ++ filter odd [(evenLen+1)..streamLen] -parDistributeScanTestStreamEnd :: (Stream.Config -> Stream.Config) -> IO () -parDistributeScanTestStreamEnd concOpts = do +parDemuxScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO () +parDemuxScan_ScanEnd concOpts = do + let streamLen = 10000 + evenLen = 100 + demuxer i = even (i :: Int) + ref <- newIORef (Scanl.take evenLen $ Scanl.mkScanl1 (\_ x -> x)) + let gen True = + atomicModifyIORef' ref (\xs -> (fmap (const Nothing) Scanl.drain, xs)) + gen False = pure $ Scanl.mkScanl1 (\_ x -> x) + inpList = [1..streamLen] + inpStream = Stream.fromList inpList + res <- + Scanl.parDemuxScan concOpts demuxer gen inpStream + & Stream.concatMap Stream.fromList + & fmap (\x -> (fst x,) <$> snd x) + & Stream.catMaybes + & Stream.fold Fold.toList + map snd (filter fst res) `shouldBe` take evenLen [2, 4 ..] + map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen] + +parDistributeScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO () +parDistributeScan_StreamEnd concOpts = do let streamLen = 10000 ref <- newIORef [evenScan, oddScan] let gen = atomicModifyIORef' ref (\xs -> ([], xs)) @@ -68,6 +88,22 @@ parDistributeScanTestStreamEnd concOpts = do & Stream.fold Fold.toList sort res1 `shouldBe` inpList +parDemuxScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO () +parDemuxScan_StreamEnd concOpts = do + let streamLen = 10000 + demuxer i = even (i :: Int) + gen _ = pure $ Scanl.mkScanl1 (\_ x -> x) + inpList = [1..streamLen] + inpStream = Stream.fromList inpList + res <- + Scanl.parDemuxScan concOpts demuxer gen inpStream + & Stream.concatMap Stream.fromList + & fmap (\x -> (fst x,) <$> snd x) + & Stream.catMaybes + & Stream.fold Fold.toList + map snd (filter fst res) `shouldBe` filter even [1..streamLen] + map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen] + main :: IO () main = hspec $ H.parallel @@ -76,6 +112,10 @@ main = hspec #endif $ describe moduleName $ do it "parDistributeScan (stream end) (maxBuffer 1)" - $ parDistributeScanTestStreamEnd (Stream.maxBuffer 1) - it "parDistributeScan (scan end)" - $ parDistributeScanTestScanEnd (Stream.maxBuffer 1) + $ parDistributeScan_StreamEnd (Stream.maxBuffer 1) + it "parDistributeScan (scan end) (maxBuffer 1)" + $ parDistributeScan_ScanEnd (Stream.maxBuffer 1) + it "parDemuxScan (stream end) (maxBuffer 1)" + $ parDemuxScan_StreamEnd (Stream.maxBuffer 1) + it "parDemuxScan (scan end) (maxBuffer 1)" + $ parDemuxScan_ScanEnd (Stream.maxBuffer 1)