Skip to content

Commit adfd7b5

Browse files
committed
Add tests for Concurrent.parDemuxScan
1 parent 0961c0a commit adfd7b5

2 files changed

Lines changed: 47 additions & 21 deletions

File tree

src/Streamly/Internal/Data/Scanl/Concurrent.hs

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -345,20 +345,6 @@ parDemuxScan cfg getKey getFold (Stream sstep state) =
345345
r@(chan, _) <- newChannelWithScan q db cfg (fmap (k,) fld)
346346
return (Map.insert k r keyToChan1, chan)
347347
Just (chan, _) -> return (keyToChan1, chan)
348-
-- XXX We might block forever if some folds are already
349-
-- done but we have not read the output queue yet. To
350-
-- avoid that we have to either (1) precheck if space
351-
-- is available in the input queues of all folds so
352-
-- that this does not block, or (2) we have to use a
353-
-- non-blocking read and track progress so that we can
354-
-- restart from where we left.
355-
--
356-
-- If there is no space available then we should block
357-
-- on doorbell db or inputSpaceDoorBell of the relevant
358-
-- channel. To avoid deadlock the output space can be
359-
-- kept unlimited. However, the blocking will delay the
360-
-- processing of outputs. We should yield the outputs
361-
-- before blocking.
362348
sendToWorker_ ch x
363349
return $ DemuxGo s q db keyToChan2
364350
Skip s ->

test/Streamly/Test/Data/Scanl/Concurrent.hs

Lines changed: 47 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ oddScan =
3939
Scanl.filtering odd
4040
& Scanl.lmapM (\x -> threadDelay 100 >> pure x)
4141

42-
parDistributeScanTestScanEnd :: (Stream.Config -> Stream.Config) -> IO ()
43-
parDistributeScanTestScanEnd concOpts = do
42+
parDistributeScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO ()
43+
parDistributeScan_ScanEnd concOpts = do
4444
let streamLen = 10000
4545
evenLen = 100
4646
ref <- newIORef [Scanl.take evenLen evenScan, oddScan]
@@ -54,8 +54,28 @@ parDistributeScanTestScanEnd concOpts = do
5454
& Stream.fold Fold.toList
5555
sort res1 `shouldBe` [1..evenLen] ++ filter odd [(evenLen+1)..streamLen]
5656

57-
parDistributeScanTestStreamEnd :: (Stream.Config -> Stream.Config) -> IO ()
58-
parDistributeScanTestStreamEnd concOpts = do
57+
parDemuxScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO ()
58+
parDemuxScan_ScanEnd concOpts = do
59+
let streamLen = 10000
60+
evenLen = 100
61+
demuxer i = even (i :: Int)
62+
ref <- newIORef (Scanl.take evenLen $ Scanl.mkScanl1 (\_ x -> x))
63+
let gen True =
64+
atomicModifyIORef' ref (\xs -> (fmap (const Nothing) Scanl.drain, xs))
65+
gen False = pure $ Scanl.mkScanl1 (\_ x -> x)
66+
inpList = [1..streamLen]
67+
inpStream = Stream.fromList inpList
68+
res <-
69+
Scanl.parDemuxScan concOpts demuxer gen inpStream
70+
& Stream.concatMap Stream.fromList
71+
& fmap (\x -> (fst x,) <$> snd x)
72+
& Stream.catMaybes
73+
& Stream.fold Fold.toList
74+
map snd (filter fst res) `shouldBe` take evenLen [2, 4 ..]
75+
map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen]
76+
77+
parDistributeScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO ()
78+
parDistributeScan_StreamEnd concOpts = do
5979
let streamLen = 10000
6080
ref <- newIORef [evenScan, oddScan]
6181
let gen = atomicModifyIORef' ref (\xs -> ([], xs))
@@ -68,6 +88,22 @@ parDistributeScanTestStreamEnd concOpts = do
6888
& Stream.fold Fold.toList
6989
sort res1 `shouldBe` inpList
7090

91+
parDemuxScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO ()
92+
parDemuxScan_StreamEnd concOpts = do
93+
let streamLen = 10000
94+
demuxer i = even (i :: Int)
95+
gen _ = pure $ Scanl.mkScanl1 (\_ x -> x)
96+
inpList = [1..streamLen]
97+
inpStream = Stream.fromList inpList
98+
res <-
99+
Scanl.parDemuxScan concOpts demuxer gen inpStream
100+
& Stream.concatMap Stream.fromList
101+
& fmap (\x -> (fst x,) <$> snd x)
102+
& Stream.catMaybes
103+
& Stream.fold Fold.toList
104+
map snd (filter fst res) `shouldBe` filter even [1..streamLen]
105+
map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen]
106+
71107
main :: IO ()
72108
main = hspec
73109
$ H.parallel
@@ -76,6 +112,10 @@ main = hspec
76112
#endif
77113
$ describe moduleName $ do
78114
it "parDistributeScan (stream end) (maxBuffer 1)"
79-
$ parDistributeScanTestStreamEnd (Stream.maxBuffer 1)
80-
it "parDistributeScan (scan end)"
81-
$ parDistributeScanTestScanEnd (Stream.maxBuffer 1)
115+
$ parDistributeScan_StreamEnd (Stream.maxBuffer 1)
116+
it "parDistributeScan (scan end) (maxBuffer 1)"
117+
$ parDistributeScan_ScanEnd (Stream.maxBuffer 1)
118+
it "parDemuxScan (stream end) (maxBuffer 1)"
119+
$ parDemuxScan_StreamEnd (Stream.maxBuffer 1)
120+
it "parDemuxScan (scan end) (maxBuffer 1)"
121+
$ parDemuxScan_ScanEnd (Stream.maxBuffer 1)

0 commit comments

Comments
 (0)