Skip to content

Commit b6fd22f

Browse files
Make concurrent channel cleanup robust and reliable
1 parent e469dbc commit b6fd22f

11 files changed

Lines changed: 202 additions & 121 deletions

File tree

benchmark/bench-runner/Main.hs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ rtsOpts exeName benchName0 = unwords [general, exeSpecific, benchSpecific]
101101
"-M64M"
102102

103103
| "Data.Stream.ConcurrentEager/o-n-heap.monad-outer-product.toNullAp"
104-
`isPrefixOf` benchName = "-M1024M"
104+
`isPrefixOf` benchName = "-M1500M"
105105
| "Data.Stream.ConcurrentEager/o-1-space."
106106
`isPrefixOf` benchName = "-M128M"
107107

src/Streamly/Internal/Control/ForkLifted.hs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@ where
1818
import Control.Concurrent (ThreadId, forkIO, forkOS)
1919
import Control.Exception (SomeException(..), catch, mask)
2020
import Data.Functor (void)
21-
import Streamly.Internal.Control.Concurrent (MonadRunInIO, RunInIO(..), withRunInIO, withRunInIONoRestore)
21+
import Streamly.Internal.Control.Concurrent
22+
(MonadRunInIO, RunInIO(..), withRunInIO, withRunInIONoRestore)
2223
import Streamly.Internal.Control.ForkIO (rawForkIO, forkManagedWith)
2324

2425
-- | Fork a thread to run the given computation, installing the provided
@@ -47,9 +48,8 @@ doForkWith :: MonadRunInIO m
4748
doForkWith bound action (RunInIO mrun) exHandler =
4849
withRunInIO $ \run ->
4950
mask $ \restore -> do
50-
tid <- (if bound then forkOS else rawForkIO) $
51-
catch (restore $ void $ mrun action)
52-
exHandler
51+
let frk = if bound then forkOS else rawForkIO
52+
tid <- frk $ catch (restore $ void $ mrun action) exHandler
5353
run (return tid)
5454

5555
-- | 'fork' lifted to any monad with 'MonadBaseControl IO m' capability.

src/Streamly/Internal/Data/Channel/Dispatcher.hs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ import Control.Exception (assert)
3232
import Control.Monad (when, void)
3333
import Control.Monad.IO.Class (MonadIO(liftIO))
3434
import Data.IORef (IORef, modifyIORef, readIORef, writeIORef)
35-
import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS, writeBarrier)
35+
import Streamly.Internal.Data.Atomics
36+
(atomicModifyIORefCAS, writeBarrier, atomicModifyIORefCAS_)
3637
import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime)
3738
import Streamly.Internal.Data.Time.Units
3839
( AbsTime, NanoSecond64(..), diffAbsTime64, showNanoSecond64
@@ -214,8 +215,8 @@ dumpSVarStats inspecting rateInfo ss = do
214215
stopTime <- readIORef (svarStopTime ss)
215216
let stopReason =
216217
case stopTime of
217-
Nothing -> "on GC"
218-
Just _ -> "normal"
218+
Nothing -> "stream abandoned"
219+
Just _ -> "stream finished"
219220
(svarCnt, svarGainLossCnt, svarLat, interval) <- case rateInfo of
220221
Nothing -> return (0, 0, 0, 0)
221222
Just yinfo -> do
@@ -277,15 +278,15 @@ dumpSVarStats inspecting rateInfo ss = do
277278
{-# NOINLINE addThread #-}
278279
addThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
279280
addThread workerSet tid =
280-
liftIO $ modifyIORef workerSet (S.insert tid)
281+
liftIO $ atomicModifyIORefCAS_ workerSet (S.insert tid)
281282

282283
-- This is cheaper than modifyThread because we do not have to send a
283284
-- outputDoorBell This can make a difference when more workers are being
284285
-- dispatched.
285286
{-# INLINE delThread #-}
286287
delThread :: MonadIO m => IORef (Set ThreadId) -> ThreadId -> m ()
287288
delThread workerSet tid =
288-
liftIO $ modifyIORef workerSet (S.delete tid)
289+
liftIO $ atomicModifyIORefCAS_ workerSet (S.delete tid)
289290

290291
-- If present then delete else add. This takes care of out of order add and
291292
-- delete i.e. a delete arriving before we even added a thread.

src/Streamly/Internal/Data/Channel/Types.hs

Lines changed: 1 addition & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,6 @@ module Streamly.Internal.Data.Channel.Types
6262
, StopWhen (..)
6363
, magicMaxBuffer
6464

65-
-- ** Cleanup
66-
, cleanupSVar
67-
6865
-- ** Diagnostics
6966
, dumpCreator
7067
, dumpOutputQ
@@ -78,22 +75,19 @@ module Streamly.Internal.Data.Channel.Types
7875
)
7976
where
8077

81-
import Control.Concurrent (ThreadId, throwTo, MVar, tryReadMVar)
78+
import Control.Concurrent (ThreadId, MVar, tryReadMVar)
8279
import Control.Concurrent.MVar (tryPutMVar)
8380
import Control.Exception
8481
( SomeException(..), Exception, catches, throwIO, Handler(..)
8582
, BlockedIndefinitelyOnMVar(..), BlockedIndefinitelyOnSTM(..))
8683
import Control.Monad (void, when)
8784
import Data.Int (Int64)
8885
import Data.IORef (IORef, newIORef, readIORef, writeIORef)
89-
import Data.Set (Set)
9086
import Streamly.Internal.Data.Atomics
9187
(atomicModifyIORefCAS, atomicModifyIORefCAS_, storeLoadBarrier)
9288
import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..))
9389
import System.IO (hPutStrLn, stderr)
9490

95-
import qualified Data.Set as Set
96-
9791
------------------------------------------------------------------------------
9892
-- Common types
9993
------------------------------------------------------------------------------
@@ -462,17 +456,3 @@ printSVar :: IO String -> String -> IO ()
462456
printSVar dump how = do
463457
svInfo <- dump
464458
hPutStrLn stderr $ "\n" <> how <> "\n" <> svInfo
465-
466-
-------------------------------------------------------------------------------
467-
-- Cleanup
468-
-------------------------------------------------------------------------------
469-
470-
-- | Never called from a worker thread.
471-
cleanupSVar :: IORef (Set ThreadId) -> IO ()
472-
cleanupSVar workerSet = do
473-
workers <- readIORef workerSet
474-
-- self <- myThreadId
475-
Prelude.mapM_ (`throwTo` ThreadAbort)
476-
-- (Prelude.filter (/= self) $ Set.toList workers)
477-
(Set.toList workers)
478-
writeIORef workerSet Set.empty

src/Streamly/Internal/Data/Channel/Worker.hs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -423,18 +423,20 @@ sendStop workerCount rateInfo q bell workerInfo = do
423423
workerStopUpdate winfo rinfo
424424
_ ->
425425
return ()
426-
myThreadId >>= \tid ->
427-
void $ sendEvent q bell (ChildStop tid Nothing)
426+
tid <- myThreadId
427+
void $ sendEvent q bell (ChildStop tid Nothing)
428428

429429
-- XXX Shouldn't we perform a workerStopUpdate even in this case?
430430

431431
-- | Add a 'ChildStop' event with exception to the channel's output queue.
432432
{-# NOINLINE sendException #-}
433433
sendException ::
434-
IORef ([ChildEvent a], Int) -- ^ Queue where the exception event is added
434+
IORef Int -- ^ Channel's current worker count
435+
-> IORef ([ChildEvent a], Int) -- ^ Queue where the event is added
435436
-> MVar () -- ^ Door bell to ring
436437
-> SomeException -- ^ The exception to send
437438
-> IO ()
438-
sendException q bell e = do
439+
sendException workerCount q bell e = do
440+
atomicModifyIORefCAS_ workerCount $ \n -> n - 1
439441
tid <- myThreadId
440442
void $ sendEvent q bell (ChildStop tid (Just e))

src/Streamly/Internal/Data/Stream/Channel/Append.hs

Lines changed: 15 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ workLoopLIFO qref sv winfo = run
8484
run = do
8585
work <- dequeue qref
8686
case work of
87-
Nothing -> liftIO $ stopWith winfo sv
87+
Nothing -> return ()
8888
Just (RunInIO runin, m) -> process runin m
8989

9090
process runin m = do
@@ -105,7 +105,7 @@ workLoopLIFO qref sv winfo = run
105105
res <- restoreM r
106106
case res of
107107
Continue -> run
108-
Suspend -> liftIO $ stopWith winfo sv
108+
Suspend -> return ()
109109

110110
where
111111

@@ -143,8 +143,9 @@ workLoopLIFOLimited qref sv winfo = run
143143

144144
run = do
145145
work <- dequeue qref
146+
{- HLINT ignore "Use forM_" -}
146147
case work of
147-
Nothing -> liftIO $ stopWith winfo sv
148+
Nothing -> return ()
148149
Just item -> process item
149150

150151
process item@(RunInIO runin, m) = do
@@ -167,13 +168,12 @@ workLoopLIFOLimited qref sv winfo = run
167168
res <- restoreM r
168169
case res of
169170
Continue -> run
170-
Suspend -> liftIO $ stopWith winfo sv
171+
Suspend -> return ()
171172
-- Avoid any side effects, undo the yield limit decrement if we
172173
-- never yielded anything.
173174
else liftIO $ do
174175
enqueueLIFO sv qref item
175176
incrementYieldLimit (remainingWork sv)
176-
stopWith winfo sv
177177

178178
where
179179

@@ -497,11 +497,6 @@ preStopCheck sv heap =
497497
if beyondRate then stopping else continue
498498
else stopping
499499

500-
abortExecution :: Channel m a -> Maybe WorkerInfo -> IO ()
501-
abortExecution sv winfo = do
502-
incrementYieldLimit (remainingWork sv)
503-
stopWith winfo sv
504-
505500
-- XXX In absence of a "noyield" primitive (i.e. do not pre-empt inside a
506501
-- critical section) from GHC RTS, we have a difficult problem. Assume we have
507502
-- a 100,000 threads producing output and queuing it to the heap for
@@ -580,32 +575,31 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
580575
liftIO $ do
581576
requeueOnHeapTop heap ent seqNo
582577
incrementYieldLimit (remainingWork sv)
583-
stopWith winfo sv
584578

585579
processWorkQueue prevSeqNo = do
586580
yieldLimitOk <- liftIO $ decrementYieldLimit (remainingWork sv)
587581
if yieldLimitOk
588582
then do
589583
work <- dequeueAhead q
590584
case work of
591-
Nothing -> liftIO $ stopWith winfo sv
585+
Nothing -> return ()
592586
Just (m, seqNo) -> do
593587
if seqNo == prevSeqNo + 1
594588
then processWithToken q heap sv winfo m seqNo
595589
else processWithoutToken q heap sv winfo m seqNo
596-
else liftIO $ abortExecution sv winfo
590+
else liftIO $ incrementYieldLimit (remainingWork sv)
597591

598592
nextHeap prevSeqNo = do
599593
res <- liftIO $ dequeueFromHeapSeq heap (prevSeqNo + 1)
600594
case res of
601595
Ready (Entry seqNo hent) -> loopHeap seqNo hent
602-
Clearing -> liftIO $ stopWith winfo sv
596+
Clearing -> return ()
603597
Waiting _ ->
604598
if stopping
605599
then do
606600
r <- liftIO $ preStopCheck sv heap
607601
if r
608-
then liftIO $ stopWith winfo sv
602+
then return ()
609603
else processWorkQueue prevSeqNo
610604
else inline processWorkQueue prevSeqNo
611605

@@ -643,7 +637,6 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
643637
then liftIO $ do
644638
-- put the entry back in the heap and stop
645639
requeueOnHeapTop heap (Entry seqNo ent) seqNo
646-
stopWith winfo sv
647640
else go
648641
else go
649642
AheadEntryStream (RunInIO runin, Nothing, r) -> do
@@ -660,7 +653,6 @@ processHeap q heap sv winfo entry sno stopping = loopHeap sno entry
660653
then liftIO $ do
661654
-- put the entry back in the heap and stop
662655
requeueOnHeapTop heap (Entry seqNo ent) seqNo
663-
stopWith winfo sv
664656
else go
665657
else go
666658

@@ -677,7 +669,7 @@ drainHeap q heap sv winfo = do
677669
case r of
678670
Ready (Entry seqNo hent) ->
679671
processHeap q heap sv winfo hent seqNo True
680-
_ -> liftIO $ stopWith winfo sv
672+
_ -> return ()
681673

682674
data HeapStatus = HContinue | HStop
683675

@@ -902,7 +894,7 @@ workLoopAhead q heap sv winfo = do
902894
case r of
903895
Ready (Entry seqNo hent) ->
904896
processHeap q heap sv winfo hent seqNo False
905-
Clearing -> liftIO $ stopWith winfo sv
897+
Clearing -> return ()
906898
Waiting _ -> do
907899
-- Before we execute the next item from the work queue we check
908900
-- if we are beyond the yield limit. It is better to check the
@@ -925,12 +917,12 @@ workLoopAhead q heap sv winfo = do
925917
then do
926918
work <- dequeueAhead q
927919
case work of
928-
Nothing -> liftIO $ stopWith winfo sv
920+
Nothing -> return ()
929921
Just (m, seqNo) -> do
930922
if seqNo == 0
931923
then processWithToken q heap sv winfo m seqNo
932924
else processWithoutToken q heap sv winfo m seqNo
933-
else liftIO $ abortExecution sv winfo
925+
else liftIO $ incrementYieldLimit (remainingWork sv)
934926

935927
-------------------------------------------------------------------------------
936928
-- SVar creation
@@ -967,6 +959,7 @@ getLifoSVar mrun cfg = do
967959
case getYieldLimit cfg of
968960
Nothing -> return Nothing
969961
Just x -> Just <$> newIORef x
962+
stopRef <- newIORef False
970963
rateInfo <- newRateInfo cfg
971964

972965
stats <- newSVarStats
@@ -1015,6 +1008,7 @@ getLifoSVar mrun cfg = do
10151008
if inOrder
10161009
then workLoopAhead aheadQ outH sv
10171010
else wloop q sv
1011+
, channelStopped = stopRef
10181012
, enqueue =
10191013
if inOrder
10201014
then enqueueAhead sv aheadQ

src/Streamly/Internal/Data/Stream/Channel/Consumer.hs

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ where
2020
import Control.Monad (when, void)
2121
import Control.Monad.IO.Class (MonadIO(liftIO))
2222
import Data.IORef (readIORef)
23-
import Streamly.Internal.Control.Concurrent (MonadRunInIO)
2423

2524
import Streamly.Internal.Data.Channel.Dispatcher
2625
import Streamly.Internal.Data.Channel.Types
@@ -42,7 +41,7 @@ readOutputQChan sv = do
4241
-- there is at least one outstanding worker.
4342
--
4443
-- To be used as 'readOutputQ' function for the channel.
45-
readOutputQBounded :: MonadRunInIO m => Bool -> Channel m a -> m [ChildEvent a]
44+
readOutputQBounded :: MonadIO m => Bool -> Channel m a -> m [ChildEvent a]
4645
readOutputQBounded eagerEval sv = do
4746
(list, len) <- liftIO $ readOutputQChan sv
4847
-- When there is no output seen we dispatch more workers to help
@@ -75,7 +74,7 @@ readOutputQBounded eagerEval sv = do
7574
--
7675
-- To be used as 'readOutputQ' function for the channel when rate control is
7776
-- on.
78-
readOutputQPaced :: MonadRunInIO m => Channel m a -> m [ChildEvent a]
77+
readOutputQPaced :: MonadIO m => Channel m a -> m [ChildEvent a]
7978
readOutputQPaced sv = do
8079
(list, len) <- liftIO $ readOutputQChan sv
8180
if len <= 0
@@ -98,7 +97,7 @@ readOutputQPaced sv = do
9897
--
9998
-- To be used as 'postProcess' function for the channel when rate control is
10099
-- enabled.
101-
postProcessPaced :: MonadRunInIO m => Channel m a -> m Bool
100+
postProcessPaced :: MonadIO m => Channel m a -> m Bool
102101
postProcessPaced sv = do
103102
workersDone <- allThreadsDone (workerThreads sv)
104103
-- XXX If during consumption we figure out we are getting delayed then we
@@ -120,7 +119,7 @@ postProcessPaced sv = do
120119
-- | If there is work to do ensure that we have at least one worker disptached.
121120
--
122121
-- To be used as 'postProcess' function for the channel.
123-
postProcessBounded :: MonadRunInIO m => Channel m a -> m Bool
122+
postProcessBounded :: MonadIO m => Channel m a -> m Bool
124123
postProcessBounded sv = do
125124
workersDone <- allThreadsDone (workerThreads sv)
126125
-- There may still be work pending even if there are no workers pending

0 commit comments

Comments
 (0)