Skip to content

Commit e166fd8

Browse files
committed
Introduce FoldEOF and tackle the cleanup after the scan terminates
1 parent 0a7d7c6 commit e166fd8

2 files changed

Lines changed: 24 additions & 9 deletions

File tree

src/Streamly/Internal/Data/Fold/Channel/Type.hs

Lines changed: 17 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ data OutEvent b =
6666
FoldException ThreadId SomeException
6767
| FoldPartial b
6868
| FoldDone ThreadId b
69+
| FoldEOF ThreadId
6970

7071
-- | The fold driver thread queues the input of the fold in the 'inputQueue'
7172
-- The driver rings the doorbell when the queue transitions from empty to
@@ -212,6 +213,11 @@ sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
212213
sendPartialToDriver sv res = liftIO $ do
213214
void $ sendToDriver sv (FoldPartial res)
214215

216+
sendEOFToDriver :: MonadIO m => Channel m a b -> m ()
217+
sendEOFToDriver sv = liftIO $ do
218+
tid <- myThreadId
219+
void $ sendToDriver sv (FoldEOF tid)
220+
215221
{-# NOINLINE sendExceptionToDriver #-}
216222
sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
217223
sendExceptionToDriver sv e = do
@@ -331,9 +337,9 @@ newChannelWith outq outqDBell modifier f = do
331337
in D.fold f1 $ fromInputQueue chan
332338

333339
{-# INLINE scanToChannel #-}
334-
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a ()
340+
scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Fold m a ()
335341
scanToChannel chan (Scanl step initial extract final) =
336-
Scanl step1 initial1 extract1 final1
342+
Fold step1 initial1 extract1 final1
337343

338344
where
339345

@@ -344,8 +350,7 @@ scanToChannel chan (Scanl step initial extract final) =
344350
b <- extract s
345351
void $ sendPartialToDriver chan b
346352
return $ Fold.Partial s
347-
Fold.Done b ->
348-
Fold.Done <$> void (sendYieldToDriver chan b)
353+
Fold.Done b -> Fold.Done <$> sendYieldToDriver chan b
349354

350355
step1 st x = do
351356
r <- step st x
@@ -354,13 +359,12 @@ scanToChannel chan (Scanl step initial extract final) =
354359
b <- extract s
355360
void $ sendPartialToDriver chan b
356361
return $ Fold.Partial s
357-
Fold.Done b ->
358-
Fold.Done <$> void (sendYieldToDriver chan b)
362+
Fold.Done b -> Fold.Done <$> sendYieldToDriver chan b
359363

360-
extract1 _ = return ()
364+
extract1 _ = pure ()
361365

362366
-- XXX Should we not discard the result?
363-
final1 st = void (final st)
367+
final1 st = void $ final st
364368

365369
{-# INLINABLE newChannelWithScan #-}
366370
{-# SPECIALIZE newChannelWithScan ::
@@ -386,7 +390,10 @@ newChannelWithScan outq outqDBell modifier f = do
386390
where
387391

388392
{-# NOINLINE work #-}
389-
work chan = D.drain $ D.scanl (scanToChannel chan f) $ fromInputQueue chan
393+
work chan = do
394+
(_, next) <- D.foldBreak (scanToChannel chan f) (fromInputQueue chan)
395+
sendEOFToDriver chan
396+
D.drain next
390397

391398
{-# INLINABLE newChannel #-}
392399
{-# SPECIALIZE newChannel ::
@@ -441,6 +448,7 @@ checkFoldStatus sv = do
441448
case ev of
442449
FoldException _ e -> throwM e
443450
FoldDone _ b -> return (Just b)
451+
FoldEOF _ -> return Nothing
444452
FoldPartial _ -> undefined
445453

446454
{-# INLINE isBufferAvailable #-}

src/Streamly/Internal/Data/Scanl/Concurrent.hs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,13 @@ parDistributeScan cfg getFolds (Stream sstep state) =
190190
Stream.fold (spanChans tid) (Stream.fromList chans)
191191
Prelude.mapM_ (finalize . fst) chToClose
192192
processOutputs ch xs (b:done)
193+
FoldEOF tid -> do
194+
-- We have to send ChildStopChannel to all the folds
195+
-- that are done to stop the manager fold.
196+
(ch, chToClose) <-
197+
Stream.fold (spanChans tid) (Stream.fromList chans)
198+
Prelude.mapM_ (finalize . fst) chToClose
199+
processOutputs ch xs done
193200
FoldPartial b ->
194201
processOutputs chans xs (b:done)
195202

0 commit comments

Comments
 (0)