Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions core/docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,21 @@

### Enhancements

* Add several concurrent combinators for folds in `Streamly.Data.Fold.Prelude`.
* Split the `Fold` type in two, `Fold` and `Scanl`. `Streamly.Data.Scanl`
module is added for the new `Scanl` type.
* Add APIs for prompt cleanup of allocated resources as well as cleanup of
concurrent threads. You can choose to use guaranteed prompt cleanup over GC
based cleanup by using the new APIs.
* Add several new concurrent combinators for folds in
`Streamly.Data.Fold.Prelude`.
* Add `Streamly.Data.Scanl` module is added for the new `Scanl` type. Composable
scans can be used to split streams into multiple streams, process them
independently (concurrently too) and merge the results. The `Fold` type has
been split in two types, `Fold` and `Scanl`.
* Add a `Path` type and some type wrappers for flexibly typed file
system paths, following modules are added:
- Streamly.FileSystem.Path
- Streamly.FileSystem.Path.Node
- Streamly.FileSystem.Path.Seg
- Streamly.FileSystem.Path.SegNode
* Add `Streamly.FileSystem.DirIO` and `Streamly.FileSystem.FileIO`
modules. These new module replace `Streamly.FileSystem.Dir`,
`Streamly.FileSystem.File` modules which have been deprecated. The
Expand All @@ -17,12 +29,6 @@
representation. The DirIO module API takes an additional ReadOptions
argument to modify the behavior. Please note that the directory read
APIs in the new module do not follow symlinks by default.
* Add a `Path` type and some type wrappers for flexibly typed file
system paths, following modules are added:
- Streamly.FileSystem.Path
- Streamly.FileSystem.Path.Node
- Streamly.FileSystem.Path.Seg
- Streamly.FileSystem.Path.SegNode
* Remove the `Storable` constraint from the following functions:
- Streamly.Data.Stream.isInfixOf
- Streamly.Data.Array.writeLastN
Expand Down
46 changes: 46 additions & 0 deletions core/src/Streamly/Internal/Data/Stream/Exception.hs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ module Streamly.Internal.Data.Stream.Exception
, bracketUnsafe
, bracketIO3
, bracketIO
, cleanupIO
, cleanupEffectIO

-- * Exceptions
, onException
Expand All @@ -33,6 +35,8 @@ where
import Control.Monad.IO.Class (MonadIO(..))
import Control.Exception (Exception, SomeException, mask_)
import Control.Monad.Catch (MonadCatch)
import Data.Foldable (sequenceA_)
import Data.IORef (newIORef, readIORef, atomicModifyIORef)
import GHC.Exts (inline)
import Streamly.Internal.Data.IOFinalizer
(newIOFinalizer, runIOFinalizer, clearingIOFinalizer)
Expand Down Expand Up @@ -346,6 +350,27 @@ bracketIO3 bef aft onExc onGC =
onGC
(inline MC.try)

-- | Run a monadic effect supplying it with a function to register cleanup
-- actions that are automatically invoked on exception or after the effect
-- function is done.
{-# INLINE cleanupEffectIO #-}
cleanupEffectIO :: (MonadIO m, MonadCatch m) =>
((IO () -> IO ()) -> m ()) -> m ()
cleanupEffectIO action = do
ref <- liftIO $ newIORef []
-- XXX use mask or MC.finally?
action (register ref) `MC.onException` aft ref
aft ref

where

aft ref = liftIO $ do
xs <- readIORef ref
sequenceA_ xs

register ref f =
atomicModifyIORef ref (\xs -> (f : xs, ()))

-- | Run the alloc action @IO b@ with async exceptions disabled but keeping
-- blocking operations interruptible (see 'Control.Exception.mask'). Use the
-- output @b@ of the IO action as input to the function @b -> Stream m a@ to
Expand Down Expand Up @@ -380,6 +405,27 @@ bracketIO :: (MonadIO m, MonadCatch m)
=> IO b -> (b -> IO c) -> (b -> Stream m a) -> Stream m a
bracketIO bef aft = bracketIO3 bef aft aft aft

-- | Run a stream supplying it a function to register cleanup actions which are
-- automatically called on exception or when the stream is done.
{-# INLINE cleanupIO #-}
cleanupIO :: (MonadIO m, MonadCatch m) =>
((IO () -> IO ()) -> Stream m a) -> Stream m a
cleanupIO action = do
bracketIO bef aft (\(_, reg) -> action reg)

where

bef = do
ref <- liftIO $ newIORef []
return (ref, register ref)

aft (ref, _) = liftIO $ do
xs <- readIORef ref
sequenceA_ xs

register ref f =
atomicModifyIORef ref (\xs -> (f : xs, ()))

data BracketState s v = BracketInit | BracketRun s v

-- | Alternate (custom) implementation of 'bracket'.
Expand Down
1 change: 1 addition & 0 deletions src/Streamly/Internal/Data/Channel/Types.hs
Original file line number Diff line number Diff line change
Expand Up @@ -475,3 +475,4 @@ cleanupSVar workerSet = do
Prelude.mapM_ (`throwTo` ThreadAbort)
-- (Prelude.filter (/= self) $ Set.toList workers)
(Set.toList workers)
writeIORef workerSet Set.empty
3 changes: 2 additions & 1 deletion src/Streamly/Internal/Data/Stream/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ withChannelK modifier input evaluator = K.concatEffect action
action = do
chan <- newChannel modifier
toChannelK chan (evaluator chan input)
return $ fromChannelK chan
let cfg = modifier defaultConfig
return $ fromChannelK (getCleanup cfg) chan

-- | A wrapper over 'withChannelK', converts 'Stream' to 'StreamK' and invokes
-- 'withChannelK'.
Expand Down
6 changes: 4 additions & 2 deletions src/Streamly/Internal/Data/Stream/Channel/Dispatcher.hs
Original file line number Diff line number Diff line change
Expand Up @@ -82,12 +82,14 @@ forkWorker yieldMax sv = do
-- modifyThread which performs a toggle rather than adding or deleting.
--
-- XXX We can use addThread or modThread based on eager flag.
doFork (workLoop sv winfo) (svarMrun sv) exception >>= modThread
-- tid <- liftIO myThreadId
-- liftIO $ putStrLn $ "Dispatcher thread: " ++ show tid
doFork (workLoop sv winfo) (svarMrun sv) exHandler >>= modThread

where

modThread = modifyThread (workerThreads sv) (outputDoorBell sv)
exception = sendException (outputQueue sv) (outputDoorBell sv)
exHandler = sendException (outputQueue sv) (outputDoorBell sv)

-- | Determine the maximum number of workers required based on 'maxWorkerLimit'
-- and 'remainingWork'.
Expand Down
25 changes: 19 additions & 6 deletions src/Streamly/Internal/Data/Stream/Channel/Operations.hs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,8 @@ fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do
ChildYield a -> yld a rest
ChildStopChannel -> do
liftIO (cleanupSVar (workerThreads sv))
-- XXX drain all threads before stopping?
-- XXX We can use a config option to drain or abort.
cleanup >> stp
ChildStop tid e -> do
accountThread sv tid
Expand All @@ -188,6 +190,8 @@ fromChannelRaw sv = K.MkStream $ \st yld sng stp -> do
-- K.foldStream st yld sng stp rest
Nothing -> do
liftIO (cleanupSVar (workerThreads sv))
-- XXX Should we wait for all threads to abort
-- before throwing the exception?
cleanup >> throwM ex

#ifdef INSPECTION
Expand All @@ -214,17 +218,21 @@ inspect $ hasNoTypeClassesExcept 'fromChannelRaw
-- XXX Add an option to block the consumer rather than stopping the stream if
-- the work queue gets over.

chanCleanupOnGc :: Channel m a -> IO ()
chanCleanupOnGc chan = do
chanCleanup :: String -> Channel m a -> IO ()
chanCleanup reason chan = do
when (svarInspectMode chan) $ do
r <- liftIO $ readIORef (svarStopTime (svarStats chan))
when (isNothing r) $
printSVar (dumpChannel chan) "Channel Garbage Collected"
printSVar (dumpChannel chan) reason
cleanupSVar (workerThreads chan)
-- If there are any other channels referenced by this channel a GC will
-- prompt them to be cleaned up quickly.
when (svarInspectMode chan) performMajorGC

chanCleanupOnGc :: Channel m a -> IO ()
chanCleanupOnGc chan =
chanCleanup "Channel Garbage Collected" chan

-- | Draw a stream from a concurrent channel. The stream consists of the
-- evaluated values from the input streams that were enqueued on the channel
-- using 'toChannelK'.
Expand All @@ -247,11 +255,15 @@ chanCleanupOnGc chan = do
--
-- CAUTION! This API must not be called more than once on a channel.
{-# INLINE fromChannelK #-}
fromChannelK :: MonadAsync m => Channel m a -> K.StreamK m a
fromChannelK chan =
fromChannelK :: MonadAsync m => Maybe (IO () -> IO ()) -> Channel m a -> K.StreamK m a
fromChannelK register chan =
K.mkStream $ \st yld sng stp -> do
ref <- liftIO $ newIORef ()
_ <- liftIO $ mkWeakIORef ref (chanCleanupOnGc chan)
let msg = "Channel cleanup via registered handler"
case register of
Nothing -> return ()
Just f -> liftIO $ f (chanCleanup msg chan)

startChannel chan
-- We pass a copy of sv to fromStreamVar, so that we know that it has
Expand All @@ -263,7 +275,8 @@ fromChannelK chan =
-- | A wrapper over 'fromChannelK' for 'Stream' type.
{-# INLINE fromChannel #-}
fromChannel :: MonadAsync m => Channel m a -> Stream m a
fromChannel = Stream.fromStreamK . fromChannelK
-- XXX Pass the cleanup registration function to fromChannelK
fromChannel = Stream.fromStreamK . fromChannelK Nothing

#if __GLASGOW_HASKELL__ >= 810
type FromSVarState :: Type -> (Type -> Type) -> Type -> Type
Expand Down
31 changes: 31 additions & 0 deletions src/Streamly/Internal/Data/Stream/Channel/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ module Streamly.Internal.Data.Stream.Channel.Type
-- *** Diagnostics
, inspect

-- *** Resource cleanup
, addCleanup
, clearCleanup

-- *** Get config
, getMaxBuffer
, getMaxThreads
Expand All @@ -54,6 +58,7 @@ module Streamly.Internal.Data.Stream.Channel.Type
, getOrdered
, getStopWhen
, getInterleaved
, getCleanup

-- ** Sending Worker Events
, yieldWith
Expand Down Expand Up @@ -314,6 +319,18 @@ data Config = Config
, _ordered :: Bool
, _interleaved :: Bool
, _bound :: Bool

-- XXX We can also use resource-t to release the channel. But that will
-- require a MonadResource constraint. It is a bigger change, we can plan
-- in future. With MonadResource, runResourceT will have to be called to
-- create a scope. Here we have an option to use prompt release or GC
-- release, but there are chances of missing a prompt release when the
-- option is provided to the programmer instead of always enforcing it.
--
-- We could store Channel m a, here instead of a deallocation function, if
-- we make the Config type as "Config m a". That way we can also share
-- channels across multiple computations.
, _release :: Maybe (IO () -> IO ())
}

-------------------------------------------------------------------------------
Expand Down Expand Up @@ -342,6 +359,7 @@ defaultConfig = Config
, _ordered = False
, _interleaved = False
, _bound = False
, _release = Nothing
}

-------------------------------------------------------------------------------
Expand Down Expand Up @@ -535,6 +553,19 @@ boundThreads flag st = st { _bound = flag }
_getBound :: Config -> Bool
_getBound = _bound

-- | Specify a function that registers a resource relase function. The resource
-- release function can be called on exception or when the stream pipeline has
-- finished to promptly release the channel instead of waiting for GC.
addCleanup :: (IO () -> IO ()) -> Config -> Config
addCleanup ref cfg = cfg { _release = Just ref }

-- | Clear the resource release registration function.
clearCleanup :: Config -> Config
clearCleanup cfg = cfg { _release = Nothing }

getCleanup :: Config -> Maybe (IO () -> IO ())
getCleanup = _release

-------------------------------------------------------------------------------
-- Initialization
-------------------------------------------------------------------------------
Expand Down
Loading
Loading