From 378d553adabbccfa008f0ba427f1c81e16ca3a60 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 19 May 2025 17:21:42 +0530 Subject: [PATCH 1/4] Rearrange, update changelog, add more description --- core/docs/Changelog.md | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/docs/Changelog.md b/core/docs/Changelog.md index c9cdeab162..36fcdb3934 100644 --- a/core/docs/Changelog.md +++ b/core/docs/Changelog.md @@ -6,9 +6,18 @@ ### Enhancements -* Add several concurrent combinators for folds in `Streamly.Data.Fold.Prelude`. -* Split the `Fold` type in two, `Fold` and `Scanl`. `Streamly.Data.Scanl` - module is added for the new `Scanl` type. +* Add several new concurrent combinators for folds in + `Streamly.Data.Fold.Prelude`. +* Add `Streamly.Data.Scanl` module is added for the new `Scanl` type. Composable + scans can be used to split streams into multiple streams, process them + independently (concurrently too) and merge the results. The `Fold` type has + been split in two types, `Fold` and `Scanl`. +* Add a `Path` type and some type wrappers for flexibly typed file + system paths, following modules are added: + - Streamly.FileSystem.Path + - Streamly.FileSystem.Path.Node + - Streamly.FileSystem.Path.Seg + - Streamly.FileSystem.Path.SegNode * Add `Streamly.FileSystem.DirIO` and `Streamly.FileSystem.FileIO` modules. These new module replace `Streamly.FileSystem.Dir`, `Streamly.FileSystem.File` modules which have been deprecated. The @@ -17,12 +26,6 @@ representation. The DirIO module API takes an additional ReadOptions argument to modify the behavior. Please note that the directory read APIs in the new module do not follow symlinks by default. -* Add a `Path` type and some type wrappers for flexibly typed file - system paths, following modules are added: - - Streamly.FileSystem.Path - - Streamly.FileSystem.Path.Node - - Streamly.FileSystem.Path.Seg - - Streamly.FileSystem.Path.SegNode * Remove the `Storable` constraint from the following functions: - Streamly.Data.Stream.isInfixOf - Streamly.Data.Array.writeLastN From 2c1b0ec8b97c983b806060d4d558c8dd044ceb9a Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Tue, 20 May 2025 09:18:49 +0530 Subject: [PATCH 2/4] Add functionality for prompt resource cleanup --- .../Internal/Data/Stream/Exception.hs | 46 +++++++++++++++++++ src/Streamly/Internal/Data/Channel/Types.hs | 1 + src/Streamly/Internal/Data/Stream/Channel.hs | 3 +- .../Data/Stream/Channel/Dispatcher.hs | 6 ++- .../Data/Stream/Channel/Operations.hs | 25 +++++++--- .../Internal/Data/Stream/Channel/Type.hs | 31 +++++++++++++ 6 files changed, 103 insertions(+), 9 deletions(-) diff --git a/core/src/Streamly/Internal/Data/Stream/Exception.hs b/core/src/Streamly/Internal/Data/Stream/Exception.hs index c8c20d955c..328309389f 100644 --- a/core/src/Streamly/Internal/Data/Stream/Exception.hs +++ b/core/src/Streamly/Internal/Data/Stream/Exception.hs @@ -20,6 +20,8 @@ module Streamly.Internal.Data.Stream.Exception , bracketUnsafe , bracketIO3 , bracketIO + , cleanupIO + , cleanupEffectIO -- * Exceptions , onException @@ -33,6 +35,8 @@ where import Control.Monad.IO.Class (MonadIO(..)) import Control.Exception (Exception, SomeException, mask_) import Control.Monad.Catch (MonadCatch) +import Data.Foldable (sequenceA_) +import Data.IORef (newIORef, readIORef, atomicModifyIORef) import GHC.Exts (inline) import Streamly.Internal.Data.IOFinalizer (newIOFinalizer, runIOFinalizer, clearingIOFinalizer) @@ -346,6 +350,27 @@ bracketIO3 bef aft onExc onGC = onGC (inline MC.try) +-- | Run a monadic effect supplying it with a function to register cleanup +-- actions that are automatically invoked on exception or after the effect +-- function is done. +{-# INLINE cleanupEffectIO #-} +cleanupEffectIO :: (MonadIO m, MonadCatch m) => + ((IO () -> IO ()) -> m ()) -> m () +cleanupEffectIO action = do + ref <- liftIO $ newIORef [] + -- XXX use mask or MC.finally? + action (register ref) `MC.onException` aft ref + aft ref + + where + + aft ref = liftIO $ do + xs <- readIORef ref + sequenceA_ xs + + register ref f = + atomicModifyIORef ref (\xs -> (f : xs, ())) + -- | Run the alloc action @IO b@ with async exceptions disabled but keeping -- blocking operations interruptible (see 'Control.Exception.mask'). Use the -- output @b@ of the IO action as input to the function @b -> Stream m a@ to @@ -380,6 +405,27 @@ bracketIO :: (MonadIO m, MonadCatch m) => IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a bracketIO bef aft = bracketIO3 bef aft aft aft +-- | Run a stream supplying it a function to register cleanup actions which are +-- automatically called on exception or when the stream is done. +{-# INLINE cleanupIO #-} +cleanupIO :: (MonadIO m, MonadCatch m) => + ((IO () -> IO ()) -> Stream m a) -> Stream m a +cleanupIO action = do + bracketIO bef aft (\(_, reg) -> action reg) + + where + + bef = do + ref <- liftIO $ newIORef [] + return (ref, register ref) + + aft (ref, _) = liftIO $ do + xs <- readIORef ref + sequenceA_ xs + + register ref f = + atomicModifyIORef ref (\xs -> (f : xs, ())) + data BracketState s v = BracketInit | BracketRun s v -- | Alternate (custom) implementation of 'bracket'. diff --git a/src/Streamly/Internal/Data/Channel/Types.hs b/src/Streamly/Internal/Data/Channel/Types.hs index 42eeafb0bb..11b3564d77 100644 --- a/src/Streamly/Internal/Data/Channel/Types.hs +++ b/src/Streamly/Internal/Data/Channel/Types.hs @@ -475,3 +475,4 @@ cleanupSVar workerSet = do Prelude.mapM_ (`throwTo` ThreadAbort) -- (Prelude.filter (/= self) $ Set.toList workers) (Set.toList workers) + writeIORef workerSet Set.empty diff --git a/src/Streamly/Internal/Data/Stream/Channel.hs b/src/Streamly/Internal/Data/Stream/Channel.hs index 577e774d41..ab96290506 100644 --- a/src/Streamly/Internal/Data/Stream/Channel.hs +++ b/src/Streamly/Internal/Data/Stream/Channel.hs @@ -76,7 +76,8 @@ withChannelK modifier input evaluator = K.concatEffect action action = do chan <- newChannel modifier toChannelK chan (evaluator chan input) - return $ fromChannelK chan + let cfg = modifier defaultConfig + return $ fromChannelK (getCleanup cfg) chan -- | A wrapper over 'withChannelK', converts 'Stream' to 'StreamK' and invokes -- 'withChannelK'. diff --git a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs index e951b879e1..315f85d742 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs @@ -82,12 +82,14 @@ forkWorker yieldMax sv = do -- modifyThread which performs a toggle rather than adding or deleting. -- -- XXX We can use addThread or modThread based on eager flag. - doFork (workLoop sv winfo) (svarMrun sv) exception >>= modThread + -- tid <- liftIO myThreadId + -- liftIO $ putStrLn $ "Dispatcher thread: " ++ show tid + doFork (workLoop sv winfo) (svarMrun sv) exHandler >>= modThread where modThread = modifyThread (workerThreads sv) (outputDoorBell sv) - exception = sendException (outputQueue sv) (outputDoorBell sv) + exHandler = sendException (outputQueue sv) (outputDoorBell sv) -- | Determine the maximum number of workers required based on 'maxWorkerLimit' -- and 'remainingWork'. diff --git a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs index 58c671cf32..1de60eaa13 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs @@ -171,6 +171,8 @@ fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do ChildYield a -> yld a rest ChildStopChannel -> do liftIO (cleanupSVar (workerThreads sv)) + -- XXX drain all threads before stopping? + -- XXX We can use a config option to drain or abort. cleanup >> stp ChildStop tid e -> do accountThread sv tid @@ -188,6 +190,8 @@ fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do -- K.foldStream st yld sng stp rest Nothing -> do liftIO (cleanupSVar (workerThreads sv)) + -- XXX Should we wait for all threads to abort + -- before throwing the exception? cleanup >> throwM ex #ifdef INSPECTION @@ -214,17 +218,21 @@ inspect $ hasNoTypeClassesExcept 'fromChannelRaw -- XXX Add an option to block the consumer rather than stopping the stream if -- the work queue gets over. -chanCleanupOnGc :: Channel m a -> IO () -chanCleanupOnGc chan = do +chanCleanup :: String -> Channel m a -> IO () +chanCleanup reason chan = do when (svarInspectMode chan) $ do r <- liftIO $ readIORef (svarStopTime (svarStats chan)) when (isNothing r) $ - printSVar (dumpChannel chan) "Channel Garbage Collected" + printSVar (dumpChannel chan) reason cleanupSVar (workerThreads chan) -- If there are any other channels referenced by this channel a GC will -- prompt them to be cleaned up quickly. when (svarInspectMode chan) performMajorGC +chanCleanupOnGc :: Channel m a -> IO () +chanCleanupOnGc chan = + chanCleanup "Channel Garbage Collected" chan + -- | Draw a stream from a concurrent channel. The stream consists of the -- evaluated values from the input streams that were enqueued on the channel -- using 'toChannelK'. @@ -247,11 +255,15 @@ chanCleanupOnGc chan = do -- -- CAUTION! This API must not be called more than once on a channel. {-# INLINE fromChannelK #-} -fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a -fromChannelK chan = +fromChannelK :: MonadAsync m => Maybe (IO () -> IO ()) -> Channel m a -> K.StreamK m a +fromChannelK register chan = K.mkStream $ \st yld sng stp -> do ref <- liftIO $ newIORef () _ <- liftIO $ mkWeakIORef ref (chanCleanupOnGc chan) + let msg = "Channel cleanup via registered handler" + case register of + Nothing -> return () + Just f -> liftIO $ f (chanCleanup msg chan) startChannel chan -- We pass a copy of sv to fromStreamVar, so that we know that it has @@ -263,7 +275,8 @@ fromChannelK chan = -- | A wrapper over 'fromChannelK' for 'Stream' type. {-# INLINE fromChannel #-} fromChannel :: MonadAsync m => Channel m a -> Stream m a -fromChannel = Stream.fromStreamK . fromChannelK +-- XXX Pass the cleanup registration function to fromChannelK +fromChannel = Stream.fromStreamK . fromChannelK Nothing #if __GLASGOW_HASKELL__ >= 810 type FromSVarState :: Type -> (Type -> Type) -> Type -> Type diff --git a/src/Streamly/Internal/Data/Stream/Channel/Type.hs b/src/Streamly/Internal/Data/Stream/Channel/Type.hs index d10ec718ef..40aafcd6b6 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Type.hs @@ -44,6 +44,10 @@ module Streamly.Internal.Data.Stream.Channel.Type -- *** Diagnostics , inspect + -- *** Resource cleanup + , addCleanup + , clearCleanup + -- *** Get config , getMaxBuffer , getMaxThreads @@ -54,6 +58,7 @@ module Streamly.Internal.Data.Stream.Channel.Type , getOrdered , getStopWhen , getInterleaved + , getCleanup -- ** Sending Worker Events , yieldWith @@ -314,6 +319,18 @@ data Config = Config , _ordered :: Bool , _interleaved :: Bool , _bound :: Bool + + -- XXX We can also use resource-t to release the channel. But that will + -- require a MonadResource constraint. It is a bigger change, we can plan + -- in future. With MonadResource, runResourceT will have to be called to + -- create a scope. Here we have an option to use prompt release or GC + -- release, but there are chances of missing a prompt release when the + -- option is provided to the programmer instead of always enforcing it. + -- + -- We could store Channel m a, here instead of a deallocation function, if + -- we make the Config type as "Config m a". That way we can also share + -- channels across multiple computations. + , _release :: Maybe (IO () -> IO ()) } ------------------------------------------------------------------------------- @@ -342,6 +359,7 @@ defaultConfig = Config , _ordered = False , _interleaved = False , _bound = False + , _release = Nothing } ------------------------------------------------------------------------------- @@ -535,6 +553,19 @@ boundThreads flag st = st { _bound = flag } _getBound :: Config -> Bool _getBound = _bound +-- | Specify a function that registers a resource relase function. The resource +-- release function can be called on exception or when the stream pipeline has +-- finished to promptly release the channel instead of waiting for GC. +addCleanup :: (IO () -> IO ()) -> Config -> Config +addCleanup ref cfg = cfg { _release = Just ref } + +-- | Clear the resource release registration function. +clearCleanup :: Config -> Config +clearCleanup cfg = cfg { _release = Nothing } + +getCleanup :: Config -> Maybe (IO () -> IO ()) +getCleanup = _release + ------------------------------------------------------------------------------- -- Initialization ------------------------------------------------------------------------------- From 6a2f850de2ec047a80c76fda23094227f5dc2496 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Tue, 20 May 2025 09:07:01 +0530 Subject: [PATCH 3/4] Add a testsuite for resource cleanup tests --- test/Streamly/Test/Data/Stream/Exception.hs | 121 ++++++++++++++++++++ test/streamly-tests.cabal | 6 + 2 files changed, 127 insertions(+) create mode 100644 test/Streamly/Test/Data/Stream/Exception.hs diff --git a/test/Streamly/Test/Data/Stream/Exception.hs b/test/Streamly/Test/Data/Stream/Exception.hs new file mode 100644 index 0000000000..770b15f76b --- /dev/null +++ b/test/Streamly/Test/Data/Stream/Exception.hs @@ -0,0 +1,121 @@ +module Streamly.Test.Data.Stream.Exception (main) + +where + +import Control.Concurrent (threadDelay) +import Control.Exception (SomeException, throw, catch, finally, bracket_) +import Control.Monad (when) +import Data.Foldable (sequenceA_) +import Data.Function ((&)) +import Data.IORef (IORef, newIORef, atomicModifyIORef, readIORef) +import System.Mem (performMajorGC) + +import qualified Streamly.Internal.Data.Stream.Prelude as Stream +import qualified Streamly.Internal.Data.Stream as Stream +import qualified Streamly.Data.Fold as Fold + +incr :: Num a => IORef a -> IO () +incr ref = do + -- tid <- myThreadId + -- putStrLn $ "Incrementing the counter: " ++ show tid + atomicModifyIORef ref (\x -> (x + 1, ())) + +decr :: Num a => IORef a -> IO () +decr ref = do + atomicModifyIORef ref (\x -> (x - 1, ())) + -- tid <- myThreadId + -- putStrLn $ "Decremented the counter: " ++ show tid + +handler :: SomeException -> IO b +handler (e :: SomeException) = do + -- tid <- myThreadId + -- putStrLn $ "Child: " ++ show tid ++ " " ++ show e + -- Rethrowing the exception is important, otherwise the thread will not + -- exit. + throw e + +run :: Num a => IORef a -> IO c -> IO c +run ref x = bracket_ (incr ref) (decr ref) (x `catch` handler) + +timeout :: Int +timeout = 1000000 + +takeCount :: Int +takeCount = 1 + +stream :: Num a => + IORef a -> (Stream.Config -> Stream.Config) -> Stream.Stream IO () +stream ref modifier = + Stream.enumerateFrom (1 :: Int) + & Stream.parMapM modifier + ( \x -> + -- somehow if all of them have same timeout then the chances of + -- failure are more. + run ref $ threadDelay (if x == 1 then 1000000 else timeout) + ) + & Stream.take takeCount + +finalAction :: (Show a, Eq a, Num a) => Bool -> IORef a -> Int -> IO () +finalAction gc ref t = do + -- We have initiated cleanup but we do not wait for the threads to + -- exit, therefore, we have to give them some time to be scheduled and + -- run the exception handler. + when gc $ performMajorGC + threadDelay t + r <- readIORef ref + putStrLn $ "Pending computations: " ++ show r + when (r /= 0) $ error "Failed" + +cleanup :: Int -> (Stream.Config -> Stream.Config) -> IO () +cleanup t cfg = do + ref <- newIORef (0 :: Int) + (Stream.cleanupIO (\f -> stream ref (cfg . Stream.addCleanup f)) + & Stream.fold Fold.drain) `finally` finalAction False ref t + +cleanupEffect :: Int -> (Stream.Config -> Stream.Config) -> IO () +cleanupEffect t cfg = do + ref <- newIORef (0 :: Int) + Stream.cleanupEffectIO (\f -> stream ref (cfg . Stream.addCleanup f) + & Stream.fold Fold.drain) `finally` finalAction False ref t + +finallyGC :: Int -> (Stream.Config -> Stream.Config) -> IO () +finallyGC t cfg = do + ref <- newIORef (0 :: Int) + Stream.finallyIO (finalAction True ref t) (stream ref cfg) + & Stream.fold Fold.drain + +-- XXX Include rate as well +limits :: [(String, Stream.Config -> Stream.Config)] +limits = + [ ("default", id) + , ("maxBuffer 10", Stream.maxBuffer 10) + , ("maxThreads 10", Stream.maxThreads 10) + ] + +sched :: [(String, Stream.Config -> Stream.Config)] +sched = + [ ("default", id) + , ("eager", Stream.eager True) + , ("ordered", Stream.ordered True) + , ("interleaved", Stream.interleaved True) + ] + +funcs :: [(String, Int -> (Stream.Config -> Stream.Config) -> IO ())] +funcs = + [ ("cleanup", cleanup) + , ("cleanupEffect", cleanupEffect) + , ("finallyGC", finallyGC) + ] + +main :: IO () +main = do + let cfg = id -- Stream.inspect True + + -- TODO: Interrupt test + sequenceA_ + [ putStrLn ("Running: " ++ fst f ++ " " ++ fst x1 ++ " " ++ fst x2) + >> (snd f) + (if fst x1 == "default" then 100000 else 100000) + (snd x1 . snd x2 . cfg) + | f <- funcs, x1 <- limits, x2 <- sched + ] diff --git a/test/streamly-tests.cabal b/test/streamly-tests.cabal index 74ce136143..7297140e68 100644 --- a/test/streamly-tests.cabal +++ b/test/streamly-tests.cabal @@ -369,6 +369,12 @@ test-suite Data.Stream.Concurrent if flag(use-streamly-core) buildable: False +test-suite Data.Stream.Exception + import: test-options + type: exitcode-stdio-1.0 + main-is: Streamly/Test/Data/Stream/Exception.hs + ghc-options: -main-is Streamly.Test.Data.Stream.Exception.main + test-suite Data.Stream.Time import: test-options type: exitcode-stdio-1.0 From da6e9311be77b9bf208b80b4a656464c4fb1235b Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Mon, 19 May 2025 17:39:39 +0530 Subject: [PATCH 4/4] Update changelog for prompt cleanup --- core/docs/Changelog.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/core/docs/Changelog.md b/core/docs/Changelog.md index 36fcdb3934..95bb6c34f1 100644 --- a/core/docs/Changelog.md +++ b/core/docs/Changelog.md @@ -6,6 +6,9 @@ ### Enhancements +* Add APIs for prompt cleanup of allocated resources as well as cleanup of + concurrent threads. You can choose to use guaranteed prompt cleanup over GC + based cleanup by using the new APIs. * Add several new concurrent combinators for folds in `Streamly.Data.Fold.Prelude`. * Add `Streamly.Data.Scanl` module is added for the new `Scanl` type. Composable