@@ -45,6 +45,7 @@ import Data.List (intersperse)
4545import Streamly.Internal.Control.Concurrent
4646 (MonadAsync , MonadRunInIO , askRunInIO )
4747import Streamly.Internal.Control.ForkLifted (doForkWith )
48+ import Streamly.Internal.Data.Atomics (writeBarrier )
4849import Streamly.Internal.Data.Fold (Fold (.. ))
4950import Streamly.Internal.Data.Scanl (Scanl (.. ))
5051import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats )
@@ -66,6 +67,7 @@ data OutEvent b =
6667 FoldException ThreadId SomeException
6768 | FoldPartial b
6869 | FoldDone ThreadId b
70+ | FoldEOF ThreadId
6971
7072-- | The fold driver thread queues the input of the fold in the 'inputQueue'
7173-- The driver rings the doorbell when the queue transitions from empty to
@@ -107,6 +109,7 @@ data Channel m a b = Channel
107109 --
108110 -- [LOCKING] Infrequent, MVar.
109111 , inputItemDoorBell :: MVar ()
112+ , closedForInput :: IORef Bool
110113
111114 -- | Doorbell to tell the driver that there is now space available in the
112115 -- 'inputQueue' and more items can be queued.
@@ -212,6 +215,11 @@ sendPartialToDriver :: MonadIO m => Channel m a b -> b -> m ()
212215sendPartialToDriver sv res = liftIO $ do
213216 void $ sendToDriver sv (FoldPartial res)
214217
218+ sendEOFToDriver :: MonadIO m => Channel m a b -> m ()
219+ sendEOFToDriver sv = liftIO $ do
220+ tid <- myThreadId
221+ void $ sendToDriver sv (FoldEOF tid)
222+
215223{-# NOINLINE sendExceptionToDriver #-}
216224sendExceptionToDriver :: Channel m a b -> SomeException -> IO ()
217225sendExceptionToDriver sv e = do
@@ -281,6 +289,7 @@ mkNewChannelWith outQRev outQMvRev cfg = do
281289 outQ <- newIORef ([] , 0 )
282290 outQMv <- newEmptyMVar
283291 bufferMv <- newEmptyMVar
292+ ref <- newIORef False
284293
285294 stats <- newSVarStats
286295 tid <- myThreadId
@@ -292,6 +301,7 @@ mkNewChannelWith outQRev outQMvRev cfg = do
292301 , outputQueue = outQRev
293302 , outputDoorBell = outQMvRev
294303 , inputSpaceDoorBell = bufferMv
304+ , closedForInput = ref
295305 , maxInputBuffer = getMaxBuffer cfg
296306 , readInputQ = liftIO $ fmap fst (readInputQWithDB sv)
297307 , svarRef = Nothing
@@ -330,10 +340,12 @@ newChannelWith outq outqDBell modifier f = do
330340 let f1 = Fold. rmapM (void . sendYieldToDriver chan) f
331341 in D. fold f1 $ fromInputQueue chan
332342
343+ -- | Returns True if the fold terminated due to completion and False when due
344+ -- to end-of-stream.
333345{-# INLINE scanToChannel #-}
334- scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Scanl m a ()
346+ scanToChannel :: MonadIO m => Channel m a b -> Scanl m a b -> Fold m a Bool
335347scanToChannel chan (Scanl step initial extract final) =
336- Scanl step1 initial1 extract1 final1
348+ Fold step1 initial1 extract1 final1
337349
338350 where
339351
@@ -344,8 +356,9 @@ scanToChannel chan (Scanl step initial extract final) =
344356 b <- extract s
345357 void $ sendPartialToDriver chan b
346358 return $ Fold. Partial s
347- Fold. Done b ->
348- Fold. Done <$> void (sendYieldToDriver chan b)
359+ Fold. Done b -> do
360+ sendYieldToDriver chan b
361+ return $ Fold. Done True
349362
350363 step1 st x = do
351364 r <- step st x
@@ -354,13 +367,16 @@ scanToChannel chan (Scanl step initial extract final) =
354367 b <- extract s
355368 void $ sendPartialToDriver chan b
356369 return $ Fold. Partial s
357- Fold. Done b ->
358- Fold. Done <$> void (sendYieldToDriver chan b)
370+ Fold. Done b -> do
371+ sendYieldToDriver chan b
372+ return $ Fold. Done True
359373
360- extract1 _ = return ()
374+ extract1 _ = error " extract: not supported by folds "
361375
362376 -- XXX Should we not discard the result?
363- final1 st = void (final st)
377+ final1 st = do
378+ void (final st)
379+ return False
364380
365381{-# INLINABLE newChannelWithScan #-}
366382{-# SPECIALIZE newChannelWithScan ::
@@ -386,7 +402,15 @@ newChannelWithScan outq outqDBell modifier f = do
386402 where
387403
388404 {-# NOINLINE work #-}
389- work chan = D. drain $ D. scanl (scanToChannel chan f) $ fromInputQueue chan
405+ work chan = do
406+ completed <- D. fold (scanToChannel chan f) (fromInputQueue chan)
407+ -- We check for only one item in the outputqueue, for example in
408+ -- parTeeWith, multiple messages can make that complicated. Therefore,
409+ -- we first check if we already sent a FoldDone.
410+ when (not completed) $ sendEOFToDriver chan
411+ liftIO $ writeIORef (closedForInput chan) True
412+ liftIO writeBarrier
413+ void $ liftIO $ tryPutMVar (inputSpaceDoorBell chan) ()
390414
391415{-# INLINABLE newChannel #-}
392416{-# SPECIALIZE newChannel ::
@@ -441,7 +465,10 @@ checkFoldStatus sv = do
441465 case ev of
442466 FoldException _ e -> throwM e
443467 FoldDone _ b -> return (Just b)
444- FoldPartial _ -> undefined
468+ FoldPartial _ ->
469+ error " checkFoldStatus: FoldPartial can occur only for scans"
470+ FoldEOF _ ->
471+ error " checkFoldStatus: FoldEOF can occur only for scans"
445472
446473{-# INLINE isBufferAvailable #-}
447474isBufferAvailable :: MonadIO m => Channel m a b -> m Bool
@@ -510,10 +537,10 @@ sendToWorker_ chan a = go
510537 (inputItemDoorBell chan)
511538 (ChildYield a)
512539 else do
513- error " sendToWorker_: No space available in the buffer"
514540 -- Block for space
515- -- () <- liftIO $ takeMVar (inputSpaceDoorBell chan)
516- -- go
541+ () <- liftIO $ takeMVar (inputSpaceDoorBell chan)
542+ closed <- liftIO $ readIORef (closedForInput chan)
543+ when (not closed) go
517544
518545-- XXX Cleanup the fold if the stream is interrupted. Add a GC hook.
519546
0 commit comments