Skip to content

Commit f07de1f

Browse files
Add functionality for prompt resource cleanup
1 parent 528bb2a commit f07de1f

6 files changed

Lines changed: 102 additions & 9 deletions

File tree

core/src/Streamly/Internal/Data/Stream/Exception.hs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@ module Streamly.Internal.Data.Stream.Exception
2020
, bracketUnsafe
2121
, bracketIO3
2222
, bracketIO
23+
, cleanupIO
24+
, cleanupEffectIO
2325

2426
-- * Exceptions
2527
, onException
@@ -33,6 +35,8 @@ where
3335
import Control.Monad.IO.Class (MonadIO(..))
3436
import Control.Exception (Exception, SomeException, mask_)
3537
import Control.Monad.Catch (MonadCatch)
38+
import Data.Foldable (sequenceA_)
39+
import Data.IORef (newIORef, readIORef, atomicModifyIORef)
3640
import GHC.Exts (inline)
3741
import Streamly.Internal.Data.IOFinalizer
3842
(newIOFinalizer, runIOFinalizer, clearingIOFinalizer)
@@ -346,6 +350,27 @@ bracketIO3 bef aft onExc onGC =
346350
onGC
347351
(inline MC.try)
348352

353+
-- | Run a monadic effect supplying it with a function to register cleanup
354+
-- actions that are automatically invoked on exception or after the effect
355+
-- function is done.
356+
{-# INLINE cleanupEffectIO #-}
357+
cleanupEffectIO :: (MonadIO m, MonadCatch m) =>
358+
((IO () -> IO ()) -> m ()) -> m ()
359+
cleanupEffectIO action = do
360+
ref <- liftIO $ newIORef []
361+
-- XXX use mask or MC.finally?
362+
action (register ref) `MC.onException` aft ref
363+
aft ref
364+
365+
where
366+
367+
aft ref = liftIO $ do
368+
xs <- readIORef ref
369+
sequenceA_ xs
370+
371+
register ref f =
372+
atomicModifyIORef ref (\xs -> (f : xs, ()))
373+
349374
-- | Run the alloc action @IO b@ with async exceptions disabled but keeping
350375
-- blocking operations interruptible (see 'Control.Exception.mask'). Use the
351376
-- output @b@ of the IO action as input to the function @b -> Stream m a@ to
@@ -380,6 +405,27 @@ bracketIO :: (MonadIO m, MonadCatch m)
380405
=> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
381406
bracketIO bef aft = bracketIO3 bef aft aft aft
382407

408+
-- | Run a stream supplying it a function to register cleanup actions which are
409+
-- automatically called on exception or when the stream is done.
410+
{-# INLINE cleanupIO #-}
411+
cleanupIO :: (MonadIO m, MonadCatch m) =>
412+
((IO () -> IO ()) -> Stream m a) -> Stream m a
413+
cleanupIO action = do
414+
bracketIO bef aft (\(_, reg) -> action reg)
415+
416+
where
417+
418+
bef = do
419+
ref <- liftIO $ newIORef []
420+
return (ref, register ref)
421+
422+
aft (ref, _) = liftIO $ do
423+
xs <- readIORef ref
424+
sequenceA_ xs
425+
426+
register ref f =
427+
atomicModifyIORef ref (\xs -> (f : xs, ()))
428+
383429
data BracketState s v = BracketInit | BracketRun s v
384430

385431
-- | Alternate (custom) implementation of 'bracket'.

src/Streamly/Internal/Data/Channel/Types.hs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -475,3 +475,4 @@ cleanupSVar workerSet = do
475475
Prelude.mapM_ (`throwTo` ThreadAbort)
476476
-- (Prelude.filter (/= self) $ Set.toList workers)
477477
(Set.toList workers)
478+
writeIORef workerSet Set.empty

src/Streamly/Internal/Data/Stream/Channel.hs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,8 @@ withChannelK modifier input evaluator = K.concatEffect action
7676
action = do
7777
chan <- newChannel modifier
7878
toChannelK chan (evaluator chan input)
79-
return $ fromChannelK chan
79+
let cfg = modifier defaultConfig
80+
return $ fromChannelK (getCleanup cfg) chan
8081

8182
-- | A wrapper over 'withChannelK', converts 'Stream' to 'StreamK' and invokes
8283
-- 'withChannelK'.

src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,14 @@ forkWorker yieldMax sv = do
8282
-- modifyThread which performs a toggle rather than adding or deleting.
8383
--
8484
-- XXX We can use addThread or modThread based on eager flag.
85-
doFork (workLoop sv winfo) (svarMrun sv) exception >>= modThread
85+
-- tid <- liftIO myThreadId
86+
-- liftIO $ putStrLn $ "Dispatcher thread: " ++ show tid
87+
doFork (workLoop sv winfo) (svarMrun sv) exHandler >>= modThread
8688

8789
where
8890

8991
modThread = modifyThread (workerThreads sv) (outputDoorBell sv)
90-
exception = sendException (outputQueue sv) (outputDoorBell sv)
92+
exHandler = sendException (outputQueue sv) (outputDoorBell sv)
9193

9294
-- | Determine the maximum number of workers required based on 'maxWorkerLimit'
9395
-- and 'remainingWork'.

src/Streamly/Internal/Data/Stream/Channel/Operations.hs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,8 @@ fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do
171171
ChildYield a -> yld a rest
172172
ChildStopChannel -> do
173173
liftIO (cleanupSVar (workerThreads sv))
174+
-- XXX drain all threads before stopping?
175+
-- XXX We can use a config option to drain or abort.
174176
cleanup >> stp
175177
ChildStop tid e -> do
176178
accountThread sv tid
@@ -188,6 +190,8 @@ fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do
188190
-- K.foldStream st yld sng stp rest
189191
Nothing -> do
190192
liftIO (cleanupSVar (workerThreads sv))
193+
-- XXX Should we wait for all threads to abort
194+
-- before throwing the exception?
191195
cleanup >> throwM ex
192196

193197
#ifdef INSPECTION
@@ -214,17 +218,20 @@ inspect $ hasNoTypeClassesExcept 'fromChannelRaw
214218
-- XXX Add an option to block the consumer rather than stopping the stream if
215219
-- the work queue gets over.
216220

217-
chanCleanupOnGc :: Channel m a -> IO ()
218-
chanCleanupOnGc chan = do
221+
chanCleanup :: String -> Channel m a -> IO ()
222+
chanCleanup reason chan = do
219223
when (svarInspectMode chan) $ do
220224
r <- liftIO $ readIORef (svarStopTime (svarStats chan))
221225
when (isNothing r) $
222-
printSVar (dumpChannel chan) "Channel Garbage Collected"
226+
printSVar (dumpChannel chan) reason
223227
cleanupSVar (workerThreads chan)
224228
-- If there are any other channels referenced by this channel a GC will
225229
-- prompt them to be cleaned up quickly.
226230
when (svarInspectMode chan) performMajorGC
227231

232+
chanCleanupOnGc :: Channel m a -> IO ()
233+
chanCleanupOnGc = chanCleanup "Channel Garbage Collected"
234+
228235
-- | Draw a stream from a concurrent channel. The stream consists of the
229236
-- evaluated values from the input streams that were enqueued on the channel
230237
-- using 'toChannelK'.
@@ -247,11 +254,15 @@ chanCleanupOnGc chan = do
247254
--
248255
-- CAUTION! This API must not be called more than once on a channel.
249256
{-# INLINE fromChannelK #-}
250-
fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a
251-
fromChannelK chan =
257+
fromChannelK :: MonadAsync m => Maybe (IO () -> IO ()) -> Channel m a -> K.StreamK m a
258+
fromChannelK register chan =
252259
K.mkStream $ \st yld sng stp -> do
253260
ref <- liftIO $ newIORef ()
254261
_ <- liftIO $ mkWeakIORef ref (chanCleanupOnGc chan)
262+
let msg = "Channel cleanup via registered handler"
263+
case register of
264+
Nothing -> return ()
265+
Just f -> liftIO $ f (chanCleanup msg chan)
255266

256267
startChannel chan
257268
-- We pass a copy of sv to fromStreamVar, so that we know that it has
@@ -263,7 +274,8 @@ fromChannelK chan =
263274
-- | A wrapper over 'fromChannelK' for 'Stream' type.
264275
{-# INLINE fromChannel #-}
265276
fromChannel :: MonadAsync m => Channel m a -> Stream m a
266-
fromChannel = Stream.fromStreamK . fromChannelK
277+
-- XXX Pass the cleanup registration function to fromChannelK
278+
fromChannel = Stream.fromStreamK . fromChannelK Nothing
267279

268280
#if __GLASGOW_HASKELL__ >= 810
269281
type FromSVarState :: Type -> (Type -> Type) -> Type -> Type

src/Streamly/Internal/Data/Stream/Channel/Type.hs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ module Streamly.Internal.Data.Stream.Channel.Type
4444
-- *** Diagnostics
4545
, inspect
4646

47+
-- *** Resource cleanup
48+
, addCleanup
49+
, clearCleanup
50+
4751
-- *** Get config
4852
, getMaxBuffer
4953
, getMaxThreads
@@ -54,6 +58,7 @@ module Streamly.Internal.Data.Stream.Channel.Type
5458
, getOrdered
5559
, getStopWhen
5660
, getInterleaved
61+
, getCleanup
5762

5863
-- ** Sending Worker Events
5964
, yieldWith
@@ -314,6 +319,18 @@ data Config = Config
314319
, _ordered :: Bool
315320
, _interleaved :: Bool
316321
, _bound :: Bool
322+
323+
-- XXX We can also use resource-t to release the channel. But that will
324+
-- require a MonadResource constraint. It is a bigger change, we can plan
325+
-- in future. With MonadResource, runResourceT will have to be called to
326+
-- create a scope. Here we have an option to use prompt release or GC
327+
-- release, but there are chances of missing a prompt release when the
328+
-- option is provided to the programmer instead of always enforcing it.
329+
--
330+
-- We could store Channel m a, here instead of a deallocation function, if
331+
-- we make the Config type as "Config m a". That way we can also share
332+
-- channels across multiple computations.
333+
, _release :: Maybe (IO () -> IO ())
317334
}
318335

319336
-------------------------------------------------------------------------------
@@ -342,6 +359,7 @@ defaultConfig = Config
342359
, _ordered = False
343360
, _interleaved = False
344361
, _bound = False
362+
, _release = Nothing
345363
}
346364

347365
-------------------------------------------------------------------------------
@@ -535,6 +553,19 @@ boundThreads flag st = st { _bound = flag }
535553
_getBound :: Config -> Bool
536554
_getBound = _bound
537555

556+
-- | Specify a function that registers a resource relase function. The resource
557+
-- release function can be called on exception or when the stream pipeline has
558+
-- finished to promptly release the channel instead of waiting for GC.
559+
addCleanup :: (IO () -> IO ()) -> Config -> Config
560+
addCleanup ref cfg = cfg { _release = Just ref }
561+
562+
-- | Clear the resource release registration function.
563+
clearCleanup :: Config -> Config
564+
clearCleanup cfg = cfg { _release = Nothing }
565+
566+
getCleanup :: Config -> Maybe (IO () -> IO ())
567+
getCleanup = _release
568+
538569
-------------------------------------------------------------------------------
539570
-- Initialization
540571
-------------------------------------------------------------------------------

0 commit comments

Comments
 (0)