Skip to content

Commit f6110b9

Browse files
committed
Make scanChannelRaw a non-terminating scan to remove deadlock
1 parent 19df218 commit f6110b9

3 files changed

Lines changed: 42 additions & 7 deletions

File tree

src/Streamly/Internal/Data/Fold/Channel/Type.hs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -394,6 +394,7 @@ _scanToChannel chan (Scanl step initial extract final) =
394394
data SendChannelRaw s a
395395
= SCREmptyBuffer s
396396
| SCRBuffered s a
397+
| SCRDrain
397398

398399
{-# INLINE scanToChannelRaw #-}
399400
scanToChannelRaw ::
@@ -423,15 +424,27 @@ scanToChannelRaw chan (Scanl step initial extract final) =
423424
b <- extract s
424425
void $ sendPartialToDriver chan b
425426
return $ Fold.Partial (SCRBuffered s (ChildYield x1))
426-
Fold.Done b ->
427-
Fold.Done <$> void (sendYieldToDriver chan b)
427+
Fold.Done b -> do
428+
-- We don't end the fold here so it can drain any input in the
429+
-- input buffer.
430+
--
431+
-- As a consequence this thread is kept alive until the stream
432+
-- ends.
433+
--
434+
-- One way to schedule a stop is: Producer can choose to send a
435+
-- special event to stop this fold. That way the responsibility
436+
-- of the deadlock is on the producer.
437+
--
438+
void (sendYieldToDriver chan b)
439+
pure $ Fold.Partial SCRDrain
428440
step1 (SCRBuffered st (ChildYield x)) ChildStopChannel = do
429441
r <- step st x
430442
b <-
431443
case r of
432444
Fold.Partial s -> extract s
433445
Fold.Done b0 -> pure b0
434446
Fold.Done <$> void (sendYieldToDriver chan b)
447+
step1 SCRDrain _ = pure $ Fold.Partial SCRDrain
435448
step1 _ _ = error "scanToChannelRaw: Unsupported constructor"
436449

437450
extract1 _ = return ()
@@ -441,6 +454,7 @@ scanToChannelRaw chan (Scanl step initial extract final) =
441454
-- XXX We are losing the input here.
442455
-- XXX Should we consume the input and finalize it instead?
443456
final1 (SCRBuffered st _val) = void (final st)
457+
final1 SCRDrain = pure ()
444458

445459
{-# INLINABLE newChannelWithScan #-}
446460
{-# SPECIALIZE newChannelWithScan ::

src/Streamly/Internal/Data/Fold/Concurrent.hs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -369,6 +369,9 @@ parDistributeScan cfg getFolds (Stream sstep state) =
369369
res <- sstep (adaptState gst) st
370370
next <- case res of
371371
Yield x s -> do
372+
-- UDPATE: forever block does not occur anymore as
373+
-- "scanToChannelRaw" is not a terminating fold anymore.
374+
--
372375
-- XXX We might block forever if some folds are already
373376
-- done but we have not read the output queue yet. To
374377
-- avoid that we have to either (1) precheck if space

test/Streamly/Test/Data/Scanl/Concurrent.hs

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,11 @@ oddScan =
3939
Scanl.filtering odd
4040
& Scanl.lmapM (\x -> threadDelay 100 >> pure x)
4141

42-
parDistributeScanTest :: (Stream.Config -> Stream.Config) -> IO ()
43-
parDistributeScanTest concOpts = do
44-
ref <- newIORef [evenScan, oddScan]
42+
parDistributeScanTestScanEnd :: (Stream.Config -> Stream.Config) -> IO ()
43+
parDistributeScanTestScanEnd concOpts = do
44+
let streamLen = 1000
45+
evenLen = 100
46+
ref <- newIORef [Scanl.take evenLen evenScan, oddScan]
4547
let gen = atomicModifyIORef' ref (\xs -> ([], xs))
4648
inpList = [1..1_000]
4749
inpStream = Stream.fromList inpList
@@ -50,6 +52,20 @@ parDistributeScanTest concOpts = do
5052
& Stream.concatMap Stream.fromList
5153
& Stream.catMaybes
5254
& Stream.fold Fold.toList
55+
sort res1 `shouldBe` [1..evenLen] ++ filter odd [(evenLen+1)..streamLen]
56+
57+
parDistributeScanTestStreamEnd :: (Stream.Config -> Stream.Config) -> IO ()
58+
parDistributeScanTestStreamEnd concOpts = do
59+
let streamLen = 1000
60+
ref <- newIORef [evenScan, oddScan]
61+
let gen = atomicModifyIORef' ref (\xs -> ([], xs))
62+
inpList = [1..streamLen]
63+
inpStream = Stream.fromList inpList
64+
res1 <-
65+
Scanl.parDistributeScan concOpts gen inpStream
66+
& Stream.concatMap Stream.fromList
67+
& Stream.catMaybes
68+
& Stream.fold Fold.toList
5369
sort res1 `shouldBe` inpList
5470

5571
main :: IO ()
@@ -59,5 +75,7 @@ main = hspec
5975
$ modifyMaxSuccess (const 10)
6076
#endif
6177
$ describe moduleName $ do
62-
it "parDistributeScan (maxBuffer 1)"
63-
$ parDistributeScanTest (Stream.maxBuffer 1)
78+
it "parDistributeScan (stream end) (maxBuffer 1)"
79+
$ parDistributeScanTestStreamEnd (Stream.maxBuffer 1)
80+
it "parDistributeScan (scan end)"
81+
$ parDistributeScanTestScanEnd (Stream.maxBuffer 1)

0 commit comments

Comments
 (0)