Skip to content

Commit 52f6cbb

Browse files
committed
Add a way to stop the drain fold
1 parent f6110b9 commit 52f6cbb

2 files changed

Lines changed: 16 additions & 11 deletions

File tree

src/Streamly/Internal/Data/Fold/Channel/Type.hs

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -428,12 +428,8 @@ scanToChannelRaw chan (Scanl step initial extract final) =
428428
-- We don't end the fold here so it can drain any input in the
429429
-- input buffer.
430430
--
431-
-- As a consequence this thread is kept alive until the stream
432-
-- ends.
433-
--
434-
-- One way to schedule a stop is: Producer can choose to send a
435-
-- special event to stop this fold. That way the responsibility
436-
-- of the deadlock is on the producer.
431+
-- The caller should send a ChildStopChannel to the fold again
432+
-- after the fold is done to complete this scan.
437433
--
438434
void (sendYieldToDriver chan b)
439435
pure $ Fold.Partial SCRDrain
@@ -444,6 +440,7 @@ scanToChannelRaw chan (Scanl step initial extract final) =
444440
Fold.Partial s -> extract s
445441
Fold.Done b0 -> pure b0
446442
Fold.Done <$> void (sendYieldToDriver chan b)
443+
step1 SCRDrain ChildStopChannel = pure $ Fold.Done ()
447444
step1 SCRDrain _ = pure $ Fold.Partial SCRDrain
448445
step1 _ _ = error "scanToChannelRaw: Unsupported constructor"
449446

@@ -481,7 +478,7 @@ newChannelWithScan outq outqDBell modifier f = do
481478

482479
{-# NOINLINE work #-}
483480
work chan =
484-
D.drain $ D.scanl (scanToChannelRaw chan f) $ fromInputQueueRaw chan
481+
D.drain $ D.scanl (scanToChannelRaw chan f) $ fromInputQueueRaw chan
485482

486483
{-# INLINABLE newChannel #-}
487484
{-# SPECIALIZE newChannel ::

src/Streamly/Internal/Data/Scanl/Concurrent.hs

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ import Streamly.Internal.Data.Scanl (Scanl(..))
2828
import Streamly.Internal.Data.Stream (Stream(..), Step(..))
2929
import Streamly.Internal.Data.SVar.Type (adaptState)
3030
import Streamly.Internal.Data.Tuple.Strict (Tuple3'(..))
31-
31+
import qualified Streamly.Data.Fold as Fold
32+
import qualified Streamly.Data.Stream as Stream
3233
import qualified Data.Map.Strict as Map
3334

3435
import Streamly.Internal.Data.Fold.Channel.Type
@@ -166,6 +167,11 @@ parDistributeScan cfg getFolds (Stream sstep state) =
166167

167168
where
168169

170+
spanChans tid =
171+
Fold.tee
172+
(Fold.filter (\(_, t) -> t /= tid) Fold.toList)
173+
(Fold.filter (\(_, t) -> t == tid) Fold.toList)
174+
169175
-- XXX can be written as a fold
170176
processOutputs chans events done = do
171177
case events of
@@ -177,9 +183,11 @@ parDistributeScan cfg getFolds (Stream sstep state) =
177183
liftIO $ mapM_ (`throwTo` ThreadAbort) (fmap snd chans)
178184
mapM_ cleanup (fmap fst chans)
179185
liftIO $ throwM ex
180-
FoldDone tid b ->
181-
let ch = filter (\(_, t) -> t /= tid) chans
182-
in processOutputs ch xs (b:done)
186+
FoldDone tid b -> do
187+
(ch, chToClose) <-
188+
Stream.fold (spanChans tid) (Stream.fromList chans)
189+
Prelude.mapM_ (finalize . fst) chToClose
190+
processOutputs ch xs (b:done)
183191
FoldPartial b ->
184192
processOutputs chans xs (b:done)
185193

0 commit comments

Comments
 (0)