Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 40 additions & 13 deletions src/Streamly/Internal/Data/Fold/Channel/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import Data.List (intersperse)
import Streamly.Internal.Control.Concurrent
(MonadAsync, MonadRunInIO, askRunInIO)
import Streamly.Internal.Control.ForkLifted (doForkWith)
import Streamly.Internal.Data.Atomics (writeBarrier)
import Streamly.Internal.Data.Fold (Fold(..))
import Streamly.Internal.Data.Scanl (Scanl(..))
import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats)
Expand All @@ -66,6 +67,7 @@ data OutEvent b =
FoldException ThreadId SomeException
| FoldPartial b
| FoldDone ThreadId b
| FoldEOF ThreadId

-- | The fold driver thread queues the input of the fold in the 'inputQueue'
-- The driver rings the doorbell when the queue transitions from empty to
Expand Down Expand Up @@ -107,6 +109,7 @@ data Channel m a b = Channel
--
-- [LOCKING] Infrequent, MVar.
, inputItemDoorBell :: MVar ()
, closedForInput :: IORef Bool

-- | Doorbell to tell the driver that there is now space available in the
-- 'inputQueue' and more items can be queued.
Expand Down Expand Up @@ -212,6 +215,11 @@ sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
sendPartialToDriver sv res = liftIO $ do
void $ sendToDriver sv (FoldPartial res)

sendEOFToDriver :: MonadIO m => Channel m a b -> m ()
sendEOFToDriver sv = liftIO $ do
tid <- myThreadId
void $ sendToDriver sv (FoldEOF tid)

{-# NOINLINE sendExceptionToDriver #-}
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
sendExceptionToDriver sv e = do
Expand Down Expand Up @@ -281,6 +289,7 @@ mkNewChannelWith outQRev outQMvRev cfg = do
outQ <- newIORef ([], 0)
outQMv <- newEmptyMVar
bufferMv <- newEmptyMVar
ref <- newIORef False

stats <- newSVarStats
tid <- myThreadId
Expand All @@ -292,6 +301,7 @@ mkNewChannelWith outQRev outQMvRev cfg = do
, outputQueue = outQRev
, outputDoorBell = outQMvRev
, inputSpaceDoorBell = bufferMv
, closedForInput = ref
, maxInputBuffer = getMaxBuffer cfg
, readInputQ = liftIO $ fmap fst (readInputQWithDB sv)
, svarRef = Nothing
Expand Down Expand Up @@ -330,10 +340,12 @@ newChannelWith outq outqDBell modifier f = do
let f1 = Fold.rmapM (void . sendYieldToDriver chan) f
in D.fold f1 $ fromInputQueue chan

-- | Returns True if the fold terminated due to completion and False when due
-- to end-of-stream.
{-# INLINE scanToChannel #-}
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a ()
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Fold m a Bool
scanToChannel chan (Scanl step initial extract final) =
Scanl step1 initial1 extract1 final1
Fold step1 initial1 extract1 final1

where

Expand All @@ -344,8 +356,9 @@ scanToChannel chan (Scanl step initial extract final) =
b <- extract s
void $ sendPartialToDriver chan b
return $ Fold.Partial s
Fold.Done b ->
Fold.Done <$> void (sendYieldToDriver chan b)
Fold.Done b -> do
sendYieldToDriver chan b
return $ Fold.Done True

step1 st x = do
r <- step st x
Expand All @@ -354,13 +367,16 @@ scanToChannel chan (Scanl step initial extract final) =
b <- extract s
void $ sendPartialToDriver chan b
return $ Fold.Partial s
Fold.Done b ->
Fold.Done <$> void (sendYieldToDriver chan b)
Fold.Done b -> do
sendYieldToDriver chan b
return $ Fold.Done True

extract1 _ = return ()
extract1 _ = error "extract: not supported by folds"

-- XXX Should we not discard the result?
final1 st = void (final st)
final1 st = do
void (final st)
return False

{-# INLINABLE newChannelWithScan #-}
{-# SPECIALIZE newChannelWithScan ::
Expand All @@ -386,7 +402,15 @@ newChannelWithScan outq outqDBell modifier f = do
where

{-# NOINLINE work #-}
work chan = D.drain $ D.scanl (scanToChannel chan f) $ fromInputQueue chan
work chan = do
completed <- D.fold (scanToChannel chan f) (fromInputQueue chan)
-- We check for only one item in the outputqueue, for example in
-- parTeeWith, multiple messages can make that complicated. Therefore,
-- we first check if we already sent a FoldDone.
when (not completed) $ sendEOFToDriver chan
liftIO $ writeIORef (closedForInput chan) True
liftIO writeBarrier
void $ liftIO $ tryPutMVar (inputSpaceDoorBell chan) ()

{-# INLINABLE newChannel #-}
{-# SPECIALIZE newChannel ::
Expand Down Expand Up @@ -441,7 +465,10 @@ checkFoldStatus sv = do
case ev of
FoldException _ e -> throwM e
FoldDone _ b -> return (Just b)
FoldPartial _ -> undefined
FoldPartial _ ->
error "checkFoldStatus: FoldPartial can occur only for scans"
FoldEOF _ ->
error "checkFoldStatus: FoldEOF can occur only for scans"

{-# INLINE isBufferAvailable #-}
isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
Expand Down Expand Up @@ -510,10 +537,10 @@ sendToWorker_ chan a = go
(inputItemDoorBell chan)
(ChildYield a)
else do
error "sendToWorker_: No space available in the buffer"
-- Block for space
-- () <- liftIO $ takeMVar (inputSpaceDoorBell chan)
-- go
() <- liftIO $ takeMVar (inputSpaceDoorBell chan)
closed <- liftIO $ readIORef (closedForInput chan)
when (not closed) go

-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.

Expand Down
11 changes: 9 additions & 2 deletions src/Streamly/Internal/Data/Fold/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -340,7 +340,11 @@ parDistributeScan cfg getFolds (Stream sstep state) =
FoldDone tid b ->
let ch = filter (\(_, t) -> t /= tid) chans
in processOutputs ch xs (b:done)
FoldPartial _ -> undefined
FoldPartial _ ->
error "parDistributeScan: cannot occur for folds"
FoldEOF _ ->
error
"parDistributeScan: FoldEOF cannot occur for folds"

collectOutputs qref chans = do
(_, n) <- liftIO $ readIORef qref
Expand Down Expand Up @@ -464,7 +468,10 @@ parDemuxScan cfg getKey getFold (Stream sstep state) =
FoldDone _tid o@(k, _) ->
let ch = Map.delete k keyToChan
in processOutputs ch xs (o:done)
FoldPartial _ -> undefined
FoldPartial _ ->
error "parDemuxScan: cannot occur for folds"
FoldEOF _ ->
error "parDemuxScan: FoldEOF cannot occur for folds"

collectOutputs qref keyToChan = do
(_, n) <- liftIO $ readIORef qref
Expand Down
56 changes: 22 additions & 34 deletions src/Streamly/Internal/Data/Scanl/Concurrent.hs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ parTeeWith cfg f c1 c2 = Scanl step initial extract final
liftIO $ throwM ex
FoldDone _tid b -> return (Left b)
FoldPartial b -> return (Right b)
FoldEOF _ -> error "parTeeWith: FoldEOF cannot occur here"
_ -> error "parTeeWith: not expecting more than one msg in q"

processResponses ch1 ch2 r1 r2 =
Expand Down Expand Up @@ -145,12 +146,18 @@ data ScanState s q db f =
-- XXX We can use a one way mailbox type abstraction instead of using an IORef
-- for adding new folds dynamically.

-- | Evaluate a stream and scan its outputs using zero or more dynamically
-- generated parallel scans. It checks for any new folds at each input
-- generation step. Any new fold is added to the list of folds which are
-- currently running. If there are no folds available, the input is discarded.
-- If a fold completes its output is emitted in the output of the scan. The
-- outputs of the parallel scans are merged in the output stream.
-- | Evaluate a stream and scan its outputs using zero or more parallel scans,
-- which can be generated dynamically. It takes an action for producing new
-- scans which is run before processing each input. The list of scans produced
-- is added to the currently running scans. If you do not want the same scan
-- added every time then the action should generate it only once (see the
-- example below). If there are no scans available, the input is discarded. The
-- outputs of all the scans are merged in the output stream.
--
-- If the input buffer (see maxBuffer) is limited then a scan may block until
-- space becomes available in the input buffer. If a scan blocks then input is
-- not provided to any of the scans, input is distributed to scans only when
-- all scans have input buffer available.
--
-- >>> import Data.IORef
-- >>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
Expand Down Expand Up @@ -180,6 +187,9 @@ parDistributeScan cfg getFolds (Stream sstep state) =
FoldDone tid b ->
let ch = filter (\(_, t) -> t /= tid) chans
in processOutputs ch xs (b:done)
FoldEOF tid -> do
let ch = filter (\(_, t) -> t /= tid) chans
in processOutputs ch xs done
FoldPartial b ->
processOutputs chans xs (b:done)

Expand Down Expand Up @@ -209,20 +219,8 @@ parDistributeScan cfg getFolds (Stream sstep state) =
res <- sstep (adaptState gst) st
next <- case res of
Yield x s -> do
-- XXX We might block forever if some folds are already
-- done but we have not read the output queue yet. To
-- avoid that we have to either (1) precheck if space
-- is available in the input queues of all folds so
-- that this does not block, or (2) we have to use a
-- non-blocking read and track progress so that we can
-- restart from where we left.
--
-- If there is no space available then we should block
-- on doorbell db or inputSpaceDoorBell of the relevant
-- channel. To avoid deadlock the output space can be
-- kept unlimited. However, the blocking will delay the
-- processing of outputs. We should yield the outputs
-- before blocking.
-- XXX The blocking will delay the processing of outputs.
-- Should we yield the outputs before blocking?
Prelude.mapM_ (`sendToWorker_` x) (fmap fst running)
return $ ScanGo s q db running
Skip s -> do
Expand Down Expand Up @@ -305,6 +303,10 @@ parDemuxScan cfg getKey getFold (Stream sstep state) =
FoldDone _tid o@(k, _) ->
let ch = Map.delete k keyToChan
in processOutputs ch xs (o:done)
FoldEOF tid ->
let chans = Map.toList keyToChan
ch = filter (\(_, (_, t)) -> t /= tid) chans
in processOutputs (Map.fromList ch) xs done
FoldPartial b ->
processOutputs keyToChan xs (b:done)

Expand Down Expand Up @@ -343,20 +345,6 @@ parDemuxScan cfg getKey getFold (Stream sstep state) =
r@(chan, _) <- newChannelWithScan q db cfg (fmap (k,) fld)
return (Map.insert k r keyToChan1, chan)
Just (chan, _) -> return (keyToChan1, chan)
-- XXX We might block forever if some folds are already
-- done but we have not read the output queue yet. To
-- avoid that we have to either (1) precheck if space
-- is available in the input queues of all folds so
-- that this does not block, or (2) we have to use a
-- non-blocking read and track progress so that we can
-- restart from where we left.
--
-- If there is no space available then we should block
-- on doorbell db or inputSpaceDoorBell of the relevant
-- channel. To avoid deadlock the output space can be
-- kept unlimited. However, the blocking will delay the
-- processing of outputs. We should yield the outputs
-- before blocking.
sendToWorker_ ch x
return $ DemuxGo s q db keyToChan2
Skip s ->
Expand Down
1 change: 1 addition & 0 deletions streamly.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ extra-source-files:
test/Streamly/Test/Prelude/*.hs
test/Streamly/Test/Unicode/*.hs
test/Streamly/Test/Serialize/*.hs
test/Streamly/Test/Data/Scanl/*.hs
test/Streamly/Test/Data/Fold/*.hs
test/lib/Streamly/Test/Common.hs
test/lib/Streamly/Test/Prelude/Common.hs
Expand Down
121 changes: 121 additions & 0 deletions test/Streamly/Test/Data/Scanl/Concurrent.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
-- |
-- Module : Streamly.Test.Data.Scanl.Concurrent
-- Copyright : (c) 2020 Composewell Technologies
--
-- License : BSD-3-Clause
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC

module Streamly.Test.Data.Scanl.Concurrent (main) where

import Control.Concurrent (threadDelay)
import Data.Function ( (&) )
import Data.IORef (newIORef, atomicModifyIORef')
import Data.List (sort)
import Streamly.Data.Scanl (Scanl)
import Test.Hspec as H

import qualified Streamly.Data.Fold as Fold
import qualified Streamly.Data.Stream as Stream
import qualified Streamly.Data.Stream.Prelude as Stream
import qualified Streamly.Internal.Data.Scanl as Scanl
import qualified Streamly.Internal.Data.Scanl.Prelude as Scanl

moduleName :: String
moduleName = "Data.Scanl.Concurrent"

---------------------------------------------------------------------------
-- Main
---------------------------------------------------------------------------

evenScan :: Scanl IO Int (Maybe Int)
evenScan =
Scanl.filtering even
& Scanl.lmapM (\x -> threadDelay 100 >> pure x)

oddScan :: Scanl IO Int (Maybe Int)
oddScan =
Scanl.filtering odd
& Scanl.lmapM (\x -> threadDelay 100 >> pure x)

parDistributeScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO ()
parDistributeScan_ScanEnd concOpts = do
let streamLen = 10000
evenLen = 100
ref <- newIORef [Scanl.take evenLen evenScan, oddScan]
let gen = atomicModifyIORef' ref (\xs -> ([], xs))
inpList = [1..streamLen]
inpStream = Stream.fromList inpList
res1 <-
Scanl.parDistributeScan concOpts gen inpStream
& Stream.concatMap Stream.fromList
& Stream.catMaybes
& Stream.fold Fold.toList
sort res1 `shouldBe` [1..evenLen] ++ filter odd [(evenLen+1)..streamLen]

parDemuxScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO ()
parDemuxScan_ScanEnd concOpts = do
let streamLen = 10000
evenLen = 100
demuxer i = even (i :: Int)
ref <- newIORef (Scanl.take evenLen $ Scanl.mkScanl1 (\_ x -> x))
let gen True =
atomicModifyIORef' ref (\xs -> (fmap (const Nothing) Scanl.drain, xs))
gen False = pure $ Scanl.mkScanl1 (\_ x -> x)
inpList = [1..streamLen]
inpStream = Stream.fromList inpList
res <-
Scanl.parDemuxScan concOpts demuxer gen inpStream
& Stream.concatMap Stream.fromList
& fmap (\x -> (fst x,) <$> snd x)
& Stream.catMaybes
& Stream.fold Fold.toList
map snd (filter fst res) `shouldBe` take evenLen [2, 4 ..]
map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen]

parDistributeScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO ()
parDistributeScan_StreamEnd concOpts = do
let streamLen = 10000
ref <- newIORef [evenScan, oddScan]
let gen = atomicModifyIORef' ref (\xs -> ([], xs))
inpList = [1..streamLen]
inpStream = Stream.fromList inpList
res1 <-
Scanl.parDistributeScan concOpts gen inpStream
& Stream.concatMap Stream.fromList
& Stream.catMaybes
& Stream.fold Fold.toList
sort res1 `shouldBe` inpList

parDemuxScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO ()
parDemuxScan_StreamEnd concOpts = do
let streamLen = 10000
demuxer i = even (i :: Int)
gen _ = pure $ Scanl.mkScanl1 (\_ x -> x)
inpList = [1..streamLen]
inpStream = Stream.fromList inpList
res <-
Scanl.parDemuxScan concOpts demuxer gen inpStream
& Stream.concatMap Stream.fromList
& fmap (\x -> (fst x,) <$> snd x)
& Stream.catMaybes
& Stream.fold Fold.toList
map snd (filter fst res) `shouldBe` filter even [1..streamLen]
map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen]

main :: IO ()
main = hspec
$ H.parallel
#ifdef COVERAGE_BUILD
$ modifyMaxSuccess (const 10)
#endif
$ describe moduleName $ do
it "parDistributeScan (stream end) (maxBuffer 1)"
$ parDistributeScan_StreamEnd (Stream.maxBuffer 1)
it "parDistributeScan (scan end) (maxBuffer 1)"
$ parDistributeScan_ScanEnd (Stream.maxBuffer 1)
it "parDemuxScan (stream end) (maxBuffer 1)"
$ parDemuxScan_StreamEnd (Stream.maxBuffer 1)
it "parDemuxScan (scan end) (maxBuffer 1)"
$ parDemuxScan_ScanEnd (Stream.maxBuffer 1)
Loading
Loading