Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 164 additions & 0 deletions src/Streamly/Internal/Data/Fold.hs
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ module Streamly.Internal.Data.Fold
, foldChunks
, duplicate

-- * Exceptions
, handle
, onException
, bracket
, before
, after
, finally

-- * Running Folds
, initialize
, runStep
Expand All @@ -209,8 +217,12 @@ module Streamly.Internal.Data.Fold
)
where

import Control.Exception (mask_)
import Control.Monad (void)
import Control.Monad.Catch (Exception, MonadCatch)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (liftBaseOp_)
import Data.Functor (($>))
import Data.Functor.Identity (Identity(..))
import Data.Int (Int64)
import Data.Map.Strict (Map)
Expand All @@ -223,9 +235,11 @@ import Prelude
scanl, scanl1, replicate, concatMap, mconcat, foldMap, unzip,
span, splitAt, break, mapM)

import qualified Control.Monad.Catch as MC
import qualified Data.Map.Strict as Map
import qualified Prelude

import Streamly.Internal.Data.IORef (newFinalizedIORef, runIORefFinalizer)
import Streamly.Internal.Data.Pipe.Types (Pipe (..), PipeState(..))
import Streamly.Internal.Data.Fold.Types
import Streamly.Internal.Data.Strict
Expand Down Expand Up @@ -1605,6 +1619,156 @@ lchunksInRange low high (Fold step1 initial1 extract1)
(Fold step2 initial2 extract2) = undefined
-}

------------------------------------------------------------------------------
-- Exceptions
------------------------------------------------------------------------------

-- | Exception handling states of a fold
data HandleExc s f1 f2 = InitDone !s | InitFailed !f1 | StepFailed !f2

-- | @handle initHandler stepHandler fold@ produces a new fold from a given
-- fold. The new fold executes the original @fold@, if an exception occurs
-- when initializing the fold then @initHandler@ is executed and fold resulting
-- from that starts execution. If an exception occurs while executing the
-- @step@ function of a fold then the @stephandler@ is executed and we start
-- executing the fold resulting from that.
--
-- The exception is caught and handled, not rethrown. If the exception handler
-- itself throws an exception that exception is thrown.
--
-- /Internal/
--
{-# INLINE handle #-}
handle :: (MonadCatch m, Exception e)
=> (e -> m (Fold m a b))
-> (e -> Fold m a b -> m (Fold m a b))
-> Fold m a b
-> Fold m a b
handle initH stepH (Fold step1 initial1 extract1) = Fold step initial extract

where

initial = fmap InitDone initial1 `MC.catch` (fmap InitFailed . initH)

step (InitDone s) a =
let f = Fold step1 (return s) extract1
in fmap InitDone (step1 s a)
`MC.catch` (\e -> fmap StepFailed (stepH e f))
step (InitFailed (Fold step2 initial2 extract2)) a = do
s <- initial2
s1 <- step2 s a
return $ InitFailed $ Fold step2 (return s1) extract2
step (StepFailed (Fold step2 initial2 extract2)) a = do
s <- initial2
s1 <- step2 s a
return $ StepFailed $ Fold step2 (return s1) extract2

extract (InitDone s) = extract1 s
extract (InitFailed (Fold _ initial2 extract2)) = initial2 >>= extract2
extract (StepFailed (Fold _ initial2 extract2)) = initial2 >>= extract2

-- | @onException action fold@ runs @action@ whenever the fold throws an
-- exception. The action is executed on any exception whether it is in
-- initial, step or extract action of the fold.
--
-- The exception is not caught, simply rethrown. If the @action@ itself
-- throws an exception that exception is thrown instead of the original
-- exception.
--
-- /Internal/
--
{-# INLINE onException #-}
onException :: MonadCatch m => m x -> Fold m a b -> Fold m a b
onException action (Fold step1 initial1 extract1) = Fold step initial extract

where

initial = initial1 `MC.onException` action
step s a = step1 s a `MC.onException` action
extract s = extract1 s `MC.onException` action

-- XXX we cannot use a bracketed fold for scan, because extract would release
-- the resource. This can be fixed when we have terminating folds, we can
-- release the resource on Stop instead of on extract.
--
-- | @bracket before after between@ runs @before@ and invokes @between@ using
-- its output, then runs the fold generated by @between@. If the fold ends
-- normally, due to an exception or if it is garbage collected prematurely then
-- @after@ is run with the output of @before@ as argument.
--
-- If @before@ or @after@ throw an exception that exception is thrown.
--
-- /Internal/
--
{-# INLINE bracket #-}
bracket :: (MonadAsync m, MonadCatch m)
=> m x -> (x -> m c) -> (x -> m (Fold m a b)) -> Fold m a b
bracket bef aft bet = Fold step initial extract

where

initial = do
(r, ref) <- liftBaseOp_ mask_ $ do
r <- bef
ref <- newFinalizedIORef (aft r)
return (r, ref)
fld <- bet r
return $ Tuple' ref fld

step (Tuple' ref (Fold step1 initial1 extract1)) a = do
s <- initial1
s1 <- step1 s a `MC.onException` runIORefFinalizer ref
return $ Tuple' ref $ Fold step1 (return s1) extract1

extract (Tuple' ref (Fold _ initial1 extract1)) = do
runIORefFinalizer ref
initial1 >>= extract1

-- | Run a side effect whenever the fold stops normally, aborts due to an
-- exception or is garbage collected.
--
-- /Internal/
--
{-# INLINE finally #-}
finally :: (MonadAsync m, MonadCatch m) => m b -> Fold m a b -> Fold m a b
finally aft (Fold step1 initial1 extract1) = Fold step initial extract

where

initial = do
ref <- newFinalizedIORef aft
Tuple' ref <$> initial1

step (Tuple' ref s) a = do
s1 <- step1 s a `MC.onException` runIORefFinalizer ref
return $ Tuple' ref s1

extract (Tuple' ref s) = do
runIORefFinalizer ref
extract1 s

-- | Run a side effect before the fold consumes its first element.
--
-- /Internal/
--
{-# INLINE before #-}
before :: Monad m => m x -> Fold m a b -> Fold m a b
before effect (Fold step1 initial1 extract1) = Fold step1 initial extract1

where

initial = effect *> initial1

-- | Run a side effect after the fold stops normally. Please use 'finally' if
-- you need a guarantee that the action runs even if the fold is garbage
-- collected.
--
-- /Internal/
--
{-# INLINE after #-}
after :: Monad m => m x -> Fold m a b -> Fold m a b
after effect = mapM (effect $>)

------------------------------------------------------------------------------
-- Fold to a Parallel SVar
------------------------------------------------------------------------------
Expand Down
58 changes: 58 additions & 0 deletions src/Streamly/Internal/Data/IORef.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
{-# LANGUAGE FlexibleContexts #-}

-- |
-- Module : Streamly.Internal.Data.IORef
-- Copyright : (c) 2019 Composewell Technologies
-- License : BSD3
-- Maintainer : streamly@composewell.com
-- Stability : experimental
-- Portability : GHC
--
--
module Streamly.Internal.Data.IORef
(
newFinalizedIORef
, runIORefFinalizer
, clearIORefFinalizer
)
where

import Control.Monad (void)
import Control.Monad.IO.Class (MonadIO(..))
import Control.Monad.Trans.Control (MonadBaseControl)
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef, IORef)

import Streamly.Internal.Data.SVar

-- | Create an IORef holding a finalizer that is called automatically when the
-- IORef is garbage collected. The IORef can be written to with a 'Nothing'
-- value to deactivate the finalizer.
newFinalizedIORef :: (MonadIO m, MonadBaseControl IO m)
=> m a -> m (IORef (Maybe (IO ())))
newFinalizedIORef finalizer = do
mrun <- captureMonadState
ref <- liftIO $ newIORef $ Just $ liftIO $ void $ do
_ <- runInIO mrun finalizer
return ()
let finalizer1 = do
res <- readIORef ref
case res of
Nothing -> return ()
Just f -> f
_ <- liftIO $ mkWeakIORef ref finalizer1
return ref

-- | Run the finalizer stored in an IORef and deactivate it so that it is run
-- only once.
--
runIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer ref = liftIO $ do
res <- readIORef ref
case res of
Nothing -> return ()
Just f -> writeIORef ref Nothing >> f

-- | Deactivate the finalizer stored in an IORef without running it.
--
clearIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
clearIORefFinalizer ref = liftIO $ writeIORef ref Nothing
43 changes: 6 additions & 37 deletions src/Streamly/Internal/Data/Stream/StreamD.hs
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,6 @@ module Streamly.Internal.Data.Stream.StreamD
, the

-- * Exceptions
, newFinalizedIORef
, runIORefFinalizer
, clearIORefFinalizer
, gbracket
, before
, after
Expand Down Expand Up @@ -335,7 +332,7 @@ import Control.Monad.Trans.Control (MonadBaseControl, liftBaseOp_)
import Data.Bits (shiftR, shiftL, (.|.), (.&.))
import Data.Functor.Identity (Identity(..))
import Data.Int (Int64)
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef, IORef)
import Data.IORef (newIORef, readIORef, mkWeakIORef, writeIORef)
import Data.Maybe (fromJust, isJust, isNothing)
import Data.Word (Word32)
import Foreign.Ptr (Ptr)
Expand Down Expand Up @@ -371,6 +368,7 @@ import Streamly.Internal.Data.Time.Units
import Streamly.Internal.Data.Unfold.Types (Unfold(..))
import Streamly.Internal.Data.Strict (Tuple3'(..))

import Streamly.Internal.Data.IORef
import Streamly.Internal.Data.Stream.StreamD.Type
import Streamly.Internal.Data.SVar
import Streamly.Internal.Data.Stream.SVar (fromConsumer, pushToFold)
Expand Down Expand Up @@ -3202,39 +3200,6 @@ gbracket bef exc aft fexc fnormal =
Skip s -> return $ Skip (GBracketException (Stream step1 s))
Stop -> return Stop

-- | Create an IORef holding a finalizer that is called automatically when the
-- IORef is garbage collected. The IORef can be written to with a 'Nothing'
-- value to deactivate the finalizer.
newFinalizedIORef :: (MonadIO m, MonadBaseControl IO m)
=> m a -> m (IORef (Maybe (IO ())))
newFinalizedIORef finalizer = do
mrun <- captureMonadState
ref <- liftIO $ newIORef $ Just $ liftIO $ void $ do
_ <- runInIO mrun finalizer
return ()
let finalizer1 = do
res <- readIORef ref
case res of
Nothing -> return ()
Just f -> f
_ <- liftIO $ mkWeakIORef ref finalizer1
return ref

-- | Run the finalizer stored in an IORef and deactivate it so that it is run
-- only once.
--
runIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
runIORefFinalizer ref = liftIO $ do
res <- readIORef ref
case res of
Nothing -> return ()
Just f -> writeIORef ref Nothing >> f

-- | Deactivate the finalizer stored in an IORef without running it.
--
clearIORefFinalizer :: MonadIO m => IORef (Maybe (IO ())) -> m ()
clearIORefFinalizer ref = liftIO $ writeIORef ref Nothing

data GbracketIOState s1 s2 v wref
= GBracketIOInit
| GBracketIONormal s1 v wref
Expand Down Expand Up @@ -3301,6 +3266,8 @@ gbracketIO bef exc aft fexc fnormal =
Skip s -> return $ Skip (GBracketIOException (Stream step1 s))
Stop -> return Stop

-- Same as nilM action <> stream
--
-- | Run a side effect before the stream yields its first element.
{-# INLINE_NORMAL before #-}
before :: Monad m => m b -> Stream m a -> Stream m a
Expand All @@ -3318,6 +3285,8 @@ before action (Stream step state) = Stream step' Nothing
Skip s -> return $ Skip (Just s)
Stop -> return Stop

-- Same as stream <> nilM action
--
-- | Run a side effect whenever the stream stops normally.
{-# INLINE_NORMAL after #-}
after :: Monad m => m b -> Stream m a -> Stream m a
Expand Down
Loading