Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
377 changes: 0 additions & 377 deletions src/Streamly/Internal/Data/Channel/Types.hs

Large diffs are not rendered by default.

89 changes: 89 additions & 0 deletions src/Streamly/Internal/Data/Fold/Channel/Type.hs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ module Streamly.Internal.Data.Fold.Channel.Type

-- ** Configuration
, Config
, defaultConfig
, maxBuffer
, boundThreads
, inspect
Expand Down Expand Up @@ -141,6 +142,94 @@ data Channel m a b = Channel
, svarCreator :: ThreadId
}

-------------------------------------------------------------------------------
-- Config
-------------------------------------------------------------------------------

-- | An abstract type for specifying the configuration parameters of a
-- 'Channel'. Use @Config -> Config@ modifier functions to modify the default
-- configuration. See the individual modifier documentation for default values.
--
data Config = Config
{
_bufferHigh :: Limit
, _inspect :: Bool
, _bound :: Bool
}

-------------------------------------------------------------------------------
-- State defaults and reset
-------------------------------------------------------------------------------

defaultMaxBuffer :: Limit
defaultMaxBuffer = Limited magicMaxBuffer

-- | The fields prefixed by an _ are not to be accessed or updated directly but
-- via smart accessor APIs. Use get/set routines instead of directly accessing
-- the Config fields
defaultConfig :: Config
defaultConfig = Config
{
_bufferHigh = defaultMaxBuffer
, _inspect = False
, _bound = False
}

-------------------------------------------------------------------------------
-- Smart get/set routines for State
-------------------------------------------------------------------------------

-- | Specify the maximum size of the buffer for storing the results from
-- concurrent computations. If the buffer becomes full we stop spawning more
-- concurrent tasks until there is space in the buffer.
-- A value of 0 resets the buffer size to default, a negative value means
-- there is no limit. The default value is 1500.
--
-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value)
-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in
-- presence of infinite streams, or very large streams. Especially, it must
-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in
-- applicative zip streams generates an infinite stream causing unbounded
-- concurrent generation with no limit on the buffer or threads.
--
maxBuffer :: Int -> Config -> Config
maxBuffer n st =
st { _bufferHigh =
if n < 0
then Unlimited
else if n == 0
then defaultMaxBuffer
else Limited (fromIntegral n)
}

getMaxBuffer :: Config -> Limit
getMaxBuffer = _bufferHigh

-- | Print debug information about the 'Channel' when the stream ends. When the
-- stream does not end normally, the channel debug information is printed when
-- the channel is garbage collected. If you are expecting but not seeing the
-- debug info try adding a 'performMajorGC' before the program ends.
--
inspect :: Bool -> Config -> Config
inspect flag st = st { _inspect = flag }

getInspectMode :: Config -> Bool
getInspectMode = _inspect

-- | Spawn bound threads (i.e., spawn threads using 'forkOS' instead of
-- 'forkIO'). The default value is 'False'.
--
-- Currently, this only takes effect only for concurrent folds.
boundThreads :: Bool -> Config -> Config
boundThreads flag st = st { _bound = flag }

getBound :: Config -> Bool
getBound = _bound

-------------------------------------------------------------------------------
-- Inspection
-------------------------------------------------------------------------------

-- | Dump the channel stats for diagnostics. Used when 'inspect' option is
-- enabled.
{-# NOINLINE dumpChannel #-}
Expand Down
4 changes: 4 additions & 0 deletions src/Streamly/Internal/Data/Stream/Channel.hs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ import Streamly.Internal.Data.Stream.Channel.Interleave
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Consumer

-------------------------------------------------------------------------------
-- Channel allocation
-------------------------------------------------------------------------------

-- | Create a new concurrent stream evaluation channel. The monad
-- state used to run the stream actions is captured from the call site of
-- newChannel.
Expand Down
2 changes: 1 addition & 1 deletion src/Streamly/Internal/Data/Stream/Channel/Operations.hs
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import qualified Streamly.Internal.Data.Stream as Stream
import qualified Streamly.Internal.Data.Stream as D
import qualified Streamly.Internal.Data.StreamK as K

import Streamly.Internal.Data.Channel.Types hiding (inspect)
import Streamly.Internal.Data.Channel.Types
import Streamly.Internal.Data.Stream.Channel.Dispatcher
import Streamly.Internal.Data.Stream.Channel.Type hiding (inspect)

Expand Down
Loading
Loading