From b39cf4c6edb73b46d9871671e7af8c4b354eb637 Mon Sep 17 00:00:00 2001 From: Harendra Kumar Date: Sun, 18 May 2025 19:32:39 +0530 Subject: [PATCH] Use separate Config type for stream/fold channels --- src/Streamly/Internal/Data/Channel/Types.hs | 377 ------------------ .../Internal/Data/Fold/Channel/Type.hs | 89 +++++ src/Streamly/Internal/Data/Stream/Channel.hs | 4 + .../Data/Stream/Channel/Operations.hs | 2 +- .../Internal/Data/Stream/Channel/Type.hs | 367 ++++++++++++++++- .../Internal/Data/Stream/Concurrent.hs | 5 +- src/Streamly/Internal/Data/Stream/Time.hs | 3 +- test/Streamly/Test/Data/Scanl/Concurrent.hs | 17 +- 8 files changed, 471 insertions(+), 393 deletions(-) diff --git a/src/Streamly/Internal/Data/Channel/Types.hs b/src/Streamly/Internal/Data/Channel/Types.hs index cc2d520641..42eeafb0bb 100644 --- a/src/Streamly/Internal/Data/Channel/Types.hs +++ b/src/Streamly/Internal/Data/Channel/Types.hs @@ -47,7 +47,6 @@ module Streamly.Internal.Data.Channel.Types , WorkerInfo (..) , LatencyRange (..) , YieldRateInfo (..) - , newRateInfo -- ** Output queue , readOutputQRaw @@ -61,42 +60,7 @@ module Streamly.Internal.Data.Channel.Types -- ** Configuration , Rate (..) , StopWhen (..) - , Config - - -- *** Default config , magicMaxBuffer - , defaultConfig - - -- *** Set config - , maxThreads - , maxBuffer - , maxYields - , inspect - , eager - , stopWhen - , ordered - , interleaved - , boundThreads - - , rate - , avgRate - , minRate - , maxRate - , constRate - - -- *** Get config - , getMaxThreads - , getMaxBuffer - , getStreamRate - , getStreamLatency - , setStreamLatency - , getYieldLimit - , getInspectMode - , getEagerDispatch - , getStopWhen - , getOrdered - , getInterleaved - , getBound -- ** Cleanup , cleanupSVar @@ -125,7 +89,6 @@ import Data.IORef (IORef, newIORef, readIORef, writeIORef) import Data.Set (Set) import Streamly.Internal.Data.Atomics (atomicModifyIORefCAS, atomicModifyIORefCAS_, storeLoadBarrier) -import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime) import Streamly.Internal.Data.Time.Units (AbsTime, NanoSecond64(..)) import System.IO (hPutStrLn, stderr) @@ -319,41 +282,6 @@ data StopWhen = | AllStop -- ^ Stop when all the streams end. | AnyStops -- ^ Stop when any one stream ends. --- XXX we can put the resettable fields in a oneShotConfig field and others in --- a persistentConfig field. That way reset would be fast and scalable --- irrespective of the number of fields. --- --- XXX make all these Limited types and use phantom types to distinguish them - --- | An abstract type for specifying the configuration parameters of a --- 'Channel'. Use @Config -> Config@ modifier functions to modify the default --- configuration. See the individual modifier documentation for default values. --- -data Config = Config - { -- one shot configuration, automatically reset for each API call - -- streamVar :: Maybe (SVar t m a) - _yieldLimit :: Maybe Count - - -- persistent configuration, state that remains valid until changed by - -- an explicit setting via a combinator. - , _threadsHigh :: Limit - , _bufferHigh :: Limit - - -- XXX these two can be collapsed into a single type - , _streamLatency :: Maybe NanoSecond64 -- bootstrap latency - , _maxStreamRate :: Maybe Rate - , _inspect :: Bool - , _eagerDispatch :: Bool - , _stopWhen :: StopWhen - , _ordered :: Bool - , _interleaved :: Bool - , _bound :: Bool - } - -------------------------------------------------------------------------------- --- State defaults and reset -------------------------------------------------------------------------------- - -- | A magical value for the buffer size arrived at by running the smallest -- possible task and measuring the optimal value of the buffer for that. This -- is obviously dependent on hardware, this figure is based on a 2.2GHz intel @@ -361,263 +289,6 @@ data Config = Config magicMaxBuffer :: Word magicMaxBuffer = 1500 -defaultMaxThreads, defaultMaxBuffer :: Limit -defaultMaxThreads = Limited magicMaxBuffer -defaultMaxBuffer = Limited magicMaxBuffer - --- | The fields prefixed by an _ are not to be accessed or updated directly but --- via smart accessor APIs. Use get/set routines instead of directly accessing --- the Config fields -defaultConfig :: Config -defaultConfig = Config - { -- streamVar = Nothing - _yieldLimit = Nothing - , _threadsHigh = defaultMaxThreads - , _bufferHigh = defaultMaxBuffer - , _maxStreamRate = Nothing - , _streamLatency = Nothing - , _inspect = False - -- XXX Set it to True when Rate is not set? - , _eagerDispatch = False - , _stopWhen = AllStop - , _ordered = False - , _interleaved = False - , _bound = False - } - -------------------------------------------------------------------------------- --- Smart get/set routines for State -------------------------------------------------------------------------------- - --- | The maximum number of yields that this channel would produce. The Channel --- automatically stops after that. This could be used to limit the speculative --- execution beyond the limit. --- --- 'Nothing' means there is no limit. --- --- Keep in mind that checking this limit all the time has a performance --- overhead. --- --- Known Bugs: currently this works only when rate is specified. --- Known Bugs: for ordered streams sometimes the actual count is less than --- expected. -maxYields :: Maybe Int64 -> Config -> Config -maxYields lim st = - st { _yieldLimit = - case lim of - Nothing -> Nothing - Just n -> - if n <= 0 - then Just 0 - else Just (fromIntegral n) - } - -getYieldLimit :: Config -> Maybe Count -getYieldLimit = _yieldLimit - --- | Specify the maximum number of threads that can be spawned by the channel. --- A value of 0 resets the thread limit to default, a negative value means --- there is no limit. The default value is 1500. --- --- When the actions in a stream are IO bound, having blocking IO calls, this --- option can be used to control the maximum number of in-flight IO requests. --- When the actions are CPU bound this option can be used to control the amount --- of CPU used by the stream. --- -maxThreads :: Int -> Config -> Config -maxThreads n st = - st { _threadsHigh = - if n < 0 - then Unlimited - else if n == 0 - then defaultMaxThreads - else Limited (fromIntegral n) - } - -getMaxThreads :: Config -> Limit -getMaxThreads = _threadsHigh - --- | Specify the maximum size of the buffer for storing the results from --- concurrent computations. If the buffer becomes full we stop spawning more --- concurrent tasks until there is space in the buffer. --- A value of 0 resets the buffer size to default, a negative value means --- there is no limit. The default value is 1500. --- --- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value) --- coupled with an unbounded 'maxThreads' value is a recipe for disaster in --- presence of infinite streams, or very large streams. Especially, it must --- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in --- applicative zip streams generates an infinite stream causing unbounded --- concurrent generation with no limit on the buffer or threads. --- -maxBuffer :: Int -> Config -> Config -maxBuffer n st = - st { _bufferHigh = - if n < 0 - then Unlimited - else if n == 0 - then defaultMaxBuffer - else Limited (fromIntegral n) - } - -getMaxBuffer :: Config -> Limit -getMaxBuffer = _bufferHigh - --- | Specify the stream evaluation rate of a channel. --- --- A 'Nothing' value means there is no smart rate control, concurrent execution --- blocks only if 'maxThreads' or 'maxBuffer' is reached, or there are no more --- concurrent tasks to execute. This is the default. --- --- When rate (throughput) is specified, concurrent production may be ramped --- up or down automatically to achieve the specified stream throughput. The --- specific behavior for different styles of 'Rate' specifications is --- documented under 'Rate'. The effective maximum production rate achieved by --- a channel is governed by: --- --- * The 'maxThreads' limit --- * The 'maxBuffer' limit --- * The maximum rate that the stream producer can achieve --- * The maximum rate that the stream consumer can achieve --- --- Maximum production rate is given by: --- --- \(rate = \frac{maxThreads}{latency}\) --- --- If we know the average latency of the tasks we can set 'maxThreads' --- accordingly. --- -rate :: Maybe Rate -> Config -> Config -rate r st = st { _maxStreamRate = r } - -getStreamRate :: Config -> Maybe Rate -getStreamRate = _maxStreamRate - -setStreamLatency :: Int -> Config -> Config -setStreamLatency n st = - st { _streamLatency = - if n <= 0 - then Nothing - else Just (fromIntegral n) - } - -getStreamLatency :: Config -> Maybe NanoSecond64 -getStreamLatency = _streamLatency - --- XXX Rename to "inspect" - --- | Print debug information about the 'Channel' when the stream ends. When the --- stream does not end normally, the channel debug information is printed when --- the channel is garbage collected. If you are expecting but not seeing the --- debug info try adding a 'performMajorGC' before the program ends. --- -inspect :: Bool -> Config -> Config -inspect flag st = st { _inspect = flag } - -getInspectMode :: Config -> Bool -getInspectMode = _inspect - --- | By default, processing of output from the worker threads is given priority --- over dispatching new workers. More workers are dispatched only when there is --- no output to process. When 'eager' is set to 'True', workers are dispatched --- aggresively as long as there is more work to do irrespective of whether --- there is output pending to be processed by the stream consumer. However, --- dispatching may stop if 'maxThreads' or 'maxBuffer' is reached. --- --- /Note:/ This option has no effect when rate has been specified. --- --- /Note:/ Not supported with 'interleaved'. --- -eager :: Bool -> Config -> Config -eager flag st = st { _eagerDispatch = flag } - -getEagerDispatch :: Config -> Bool -getEagerDispatch = _eagerDispatch - --- | Specify when the 'Channel' should stop. -stopWhen :: StopWhen -> Config -> Config -stopWhen cond st = st { _stopWhen = cond } - -getStopWhen :: Config -> StopWhen -getStopWhen = _stopWhen - --- | When enabled the streams may be evaluated cocnurrently but the results are --- produced in the same sequence as a serial evaluation would produce. --- --- /Note:/ Not supported with 'interleaved'. --- -ordered :: Bool -> Config -> Config -ordered flag st = st { _ordered = flag } - -getOrdered :: Config -> Bool -getOrdered = _ordered - --- | Interleave the streams fairly instead of prioritizing the left stream. --- This schedules all streams in a round robin fashion over limited number of --- threads. --- --- /Note:/ Can only be used on finite number of streams. --- --- /Note:/ Not supported with 'ordered'. --- -interleaved :: Bool -> Config -> Config -interleaved flag st = st { _interleaved = flag } - -getInterleaved :: Config -> Bool -getInterleaved = _interleaved - --- TODO: Make it consistently take effect everywhere. - --- | Spawn bound threads (i.e., spawn threads using 'forkOS' instead of --- 'forkIO'). The default value is 'False'. --- --- Currently, this only takes effect only for concurrent folds. -boundThreads :: Bool -> Config -> Config -boundThreads flag st = st { _bound = flag } - -getBound :: Config -> Bool -getBound = _bound - -------------------------------------------------------------------------------- --- Initialization -------------------------------------------------------------------------------- - -newRateInfo :: Config -> IO (Maybe YieldRateInfo) -newRateInfo st = do - -- convert rate in Hertz to latency in Nanoseconds - let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r - case getStreamRate st of - Just (Rate low goal high buf) -> - let l = rateToLatency goal - minl = rateToLatency high - maxl = rateToLatency low - in mkYieldRateInfo l (LatencyRange minl maxl) buf - Nothing -> return Nothing - - where - - mkYieldRateInfo latency latRange buf = do - measured <- newIORef 0 - wcur <- newIORef (0,0,0) - wcol <- newIORef (0,0,0) - now <- getTime Monotonic - wlong <- newIORef (0,now) - period <- newIORef 1 - gainLoss <- newIORef (Count 0) - - return $ Just YieldRateInfo - { svarLatencyTarget = latency - , svarLatencyRange = latRange - , svarRateBuffer = buf - , svarGainedLostYields = gainLoss - , workerBootstrapLatency = getStreamLatency st - , workerPollingInterval = period - , workerMeasuredLatency = measured - , workerPendingLatency = wcur - , workerCollectedLatency = wcol - , svarAllTimeLatency = wlong - } - newSVarStats :: IO SVarStats newSVarStats = do disp <- newIORef 0 @@ -642,54 +313,6 @@ newSVarStats = do , svarStopTime = stpTime } -------------------------------------------------------------------------------- --- Rate -------------------------------------------------------------------------------- - --- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@ --- --- Specifies the average production rate of a stream in number of yields --- per second (i.e. @Hertz@). Concurrent production is ramped up or down --- automatically to achieve the specified average yield rate. The rate can --- go down to half of the specified rate on the lower side and double of --- the specified rate on the higher side. --- -avgRate :: Double -> Config -> Config -avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound) - --- | Same as @rate (Just $ Rate r r (2*r) maxBound)@ --- --- Specifies the minimum rate at which the stream should yield values. As --- far as possible the yield rate would never be allowed to go below the --- specified rate, even though it may possibly go above it at times, the --- upper limit is double of the specified rate. --- -minRate :: Double -> Config -> Config -minRate r = rate (Just $ Rate r r (2*r) maxBound) - --- | Same as @rate (Just $ Rate (r/2) r r maxBound)@ --- --- Specifies the maximum rate at which the stream should yield values. As --- far as possible the yield rate would never be allowed to go above the --- specified rate, even though it may possibly go below it at times, the --- lower limit is half of the specified rate. This can be useful in --- applications where certain resource usage must not be allowed to go --- beyond certain limits. --- -maxRate :: Double -> Config -> Config -maxRate r = rate (Just $ Rate (r/2) r r maxBound) - --- | Same as @rate (Just $ Rate r r r 0)@ --- --- Specifies a constant yield rate. If for some reason the actual rate --- goes above or below the specified rate we do not try to recover it by --- increasing or decreasing the rate in future. This can be useful in --- applications like graphics frame refresh where we need to maintain a --- constant refresh rate. --- -constRate :: Double -> Config -> Config -constRate r = rate (Just $ Rate r r r 0) - ------------------------------------------------------------------------------- -- Channel yield count ------------------------------------------------------------------------------- diff --git a/src/Streamly/Internal/Data/Fold/Channel/Type.hs b/src/Streamly/Internal/Data/Fold/Channel/Type.hs index 2d4a23690b..70fa41cb6d 100644 --- a/src/Streamly/Internal/Data/Fold/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Fold/Channel/Type.hs @@ -14,6 +14,7 @@ module Streamly.Internal.Data.Fold.Channel.Type -- ** Configuration , Config + , defaultConfig , maxBuffer , boundThreads , inspect @@ -141,6 +142,94 @@ data Channel m a b = Channel , svarCreator :: ThreadId } +------------------------------------------------------------------------------- +-- Config +------------------------------------------------------------------------------- + +-- | An abstract type for specifying the configuration parameters of a +-- 'Channel'. Use @Config -> Config@ modifier functions to modify the default +-- configuration. See the individual modifier documentation for default values. +-- +data Config = Config + { + _bufferHigh :: Limit + , _inspect :: Bool + , _bound :: Bool + } + +------------------------------------------------------------------------------- +-- State defaults and reset +------------------------------------------------------------------------------- + +defaultMaxBuffer :: Limit +defaultMaxBuffer = Limited magicMaxBuffer + +-- | The fields prefixed by an _ are not to be accessed or updated directly but +-- via smart accessor APIs. Use get/set routines instead of directly accessing +-- the Config fields +defaultConfig :: Config +defaultConfig = Config + { + _bufferHigh = defaultMaxBuffer + , _inspect = False + , _bound = False + } + +------------------------------------------------------------------------------- +-- Smart get/set routines for State +------------------------------------------------------------------------------- + +-- | Specify the maximum size of the buffer for storing the results from +-- concurrent computations. If the buffer becomes full we stop spawning more +-- concurrent tasks until there is space in the buffer. +-- A value of 0 resets the buffer size to default, a negative value means +-- there is no limit. The default value is 1500. +-- +-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value) +-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in +-- presence of infinite streams, or very large streams. Especially, it must +-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in +-- applicative zip streams generates an infinite stream causing unbounded +-- concurrent generation with no limit on the buffer or threads. +-- +maxBuffer :: Int -> Config -> Config +maxBuffer n st = + st { _bufferHigh = + if n < 0 + then Unlimited + else if n == 0 + then defaultMaxBuffer + else Limited (fromIntegral n) + } + +getMaxBuffer :: Config -> Limit +getMaxBuffer = _bufferHigh + +-- | Print debug information about the 'Channel' when the stream ends. When the +-- stream does not end normally, the channel debug information is printed when +-- the channel is garbage collected. If you are expecting but not seeing the +-- debug info try adding a 'performMajorGC' before the program ends. +-- +inspect :: Bool -> Config -> Config +inspect flag st = st { _inspect = flag } + +getInspectMode :: Config -> Bool +getInspectMode = _inspect + +-- | Spawn bound threads (i.e., spawn threads using 'forkOS' instead of +-- 'forkIO'). The default value is 'False'. +-- +-- Currently, this only takes effect only for concurrent folds. +boundThreads :: Bool -> Config -> Config +boundThreads flag st = st { _bound = flag } + +getBound :: Config -> Bool +getBound = _bound + +------------------------------------------------------------------------------- +-- Inspection +------------------------------------------------------------------------------- + -- | Dump the channel stats for diagnostics. Used when 'inspect' option is -- enabled. {-# NOINLINE dumpChannel #-} diff --git a/src/Streamly/Internal/Data/Stream/Channel.hs b/src/Streamly/Internal/Data/Stream/Channel.hs index 14897c0db3..577e774d41 100644 --- a/src/Streamly/Internal/Data/Stream/Channel.hs +++ b/src/Streamly/Internal/Data/Stream/Channel.hs @@ -44,6 +44,10 @@ import Streamly.Internal.Data.Stream.Channel.Interleave import Streamly.Internal.Data.Stream.Channel.Dispatcher import Streamly.Internal.Data.Stream.Channel.Consumer +------------------------------------------------------------------------------- +-- Channel allocation +------------------------------------------------------------------------------- + -- | Create a new concurrent stream evaluation channel. The monad -- state used to run the stream actions is captured from the call site of -- newChannel. diff --git a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs index 604ff992cf..58c671cf32 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Operations.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Operations.hs @@ -48,7 +48,7 @@ import qualified Streamly.Internal.Data.Stream as Stream import qualified Streamly.Internal.Data.Stream as D import qualified Streamly.Internal.Data.StreamK as K -import Streamly.Internal.Data.Channel.Types hiding (inspect) +import Streamly.Internal.Data.Channel.Types import Streamly.Internal.Data.Stream.Channel.Dispatcher import Streamly.Internal.Data.Stream.Channel.Type hiding (inspect) diff --git a/src/Streamly/Internal/Data/Stream/Channel/Type.hs b/src/Streamly/Internal/Data/Stream/Channel/Type.hs index 36abbf1b75..d10ec718ef 100644 --- a/src/Streamly/Internal/Data/Stream/Channel/Type.hs +++ b/src/Streamly/Internal/Data/Stream/Channel/Type.hs @@ -14,6 +14,9 @@ module Streamly.Internal.Data.Stream.Channel.Type -- ** Configuration , Config + -- *** Default config + , defaultConfig + -- *** Limits , maxThreads , maxBuffer @@ -21,6 +24,7 @@ module Streamly.Internal.Data.Stream.Channel.Type -- *** Rate Control , Rate(..) + , newRateInfo , rate , avgRate , minRate @@ -35,10 +39,22 @@ module Streamly.Internal.Data.Stream.Channel.Type , eager , ordered , interleaved + , boundThreads -- *** Diagnostics , inspect + -- *** Get config + , getMaxBuffer + , getMaxThreads + , getYieldLimit + , getInspectMode + , getStreamRate + , getEagerDispatch + , getOrdered + , getStopWhen + , getInterleaved + -- ** Sending Worker Events , yieldWith , stopWith @@ -55,7 +71,8 @@ import Control.Concurrent.MVar (MVar) import Control.Exception (SomeException(..)) import Control.Monad (void) import Control.Monad.IO.Class (MonadIO(..)) -import Data.IORef (IORef) +import Data.Int (Int64) +import Data.IORef (IORef, newIORef) import Data.List (intersperse) import Data.Set (Set) import Streamly.Internal.Control.Concurrent (RunInIO) @@ -64,6 +81,8 @@ import Streamly.Internal.Data.Channel.Dispatcher (dumpSVarStats) import Streamly.Internal.Data.Channel.Worker (sendYield, sendStop, sendEvent, sendException) import Streamly.Internal.Data.StreamK (StreamK) +import Streamly.Internal.Data.Time.Clock (Clock(Monotonic), getTime) +import Streamly.Internal.Data.Time.Units (NanoSecond64(..)) import Streamly.Internal.Data.Channel.Types @@ -262,6 +281,352 @@ data Channel m a = Channel , svarCreator :: ThreadId } +------------------------------------------------------------------------------- +-- Channel Config +------------------------------------------------------------------------------- + +-- XXX we can put the resettable fields in a oneShotConfig field and others in +-- a persistentConfig field. That way reset would be fast and scalable +-- irrespective of the number of fields. +-- +-- XXX make all these Limited types and use phantom types to distinguish them + +-- | An abstract type for specifying the configuration parameters of a +-- 'Channel'. Use @Config -> Config@ modifier functions to modify the default +-- configuration. See the individual modifier documentation for default values. +-- +data Config = Config + { -- one shot configuration, automatically reset for each API call + -- streamVar :: Maybe (SVar t m a) + _yieldLimit :: Maybe Count + + -- persistent configuration, state that remains valid until changed by + -- an explicit setting via a combinator. + , _threadsHigh :: Limit + , _bufferHigh :: Limit + + -- XXX these two can be collapsed into a single type + , _streamLatency :: Maybe NanoSecond64 -- bootstrap latency + , _maxStreamRate :: Maybe Rate + , _inspect :: Bool + , _eagerDispatch :: Bool + , _stopWhen :: StopWhen + , _ordered :: Bool + , _interleaved :: Bool + , _bound :: Bool + } + +------------------------------------------------------------------------------- +-- State defaults and reset +------------------------------------------------------------------------------- + +defaultMaxThreads, defaultMaxBuffer :: Limit +defaultMaxThreads = Limited magicMaxBuffer +defaultMaxBuffer = Limited magicMaxBuffer + +-- | The fields prefixed by an _ are not to be accessed or updated directly but +-- via smart accessor APIs. Use get/set routines instead of directly accessing +-- the Config fields +defaultConfig :: Config +defaultConfig = Config + { -- streamVar = Nothing + _yieldLimit = Nothing + , _threadsHigh = defaultMaxThreads + , _bufferHigh = defaultMaxBuffer + , _maxStreamRate = Nothing + , _streamLatency = Nothing + , _inspect = False + -- XXX Set it to True when Rate is not set? + , _eagerDispatch = False + , _stopWhen = AllStop + , _ordered = False + , _interleaved = False + , _bound = False + } + +------------------------------------------------------------------------------- +-- Smart get/set routines for State +------------------------------------------------------------------------------- + +-- | The maximum number of yields that this channel would produce. The Channel +-- automatically stops after that. This could be used to limit the speculative +-- execution beyond the limit. +-- +-- 'Nothing' means there is no limit. +-- +-- Keep in mind that checking this limit all the time has a performance +-- overhead. +-- +-- Known Bugs: currently this works only when rate is specified. +-- Known Bugs: for ordered streams sometimes the actual count is less than +-- expected. +maxYields :: Maybe Int64 -> Config -> Config +maxYields lim st = + st { _yieldLimit = + case lim of + Nothing -> Nothing + Just n -> + if n <= 0 + then Just 0 + else Just (fromIntegral n) + } + +getYieldLimit :: Config -> Maybe Count +getYieldLimit = _yieldLimit + +-- | Specify the maximum number of threads that can be spawned by the channel. +-- A value of 0 resets the thread limit to default, a negative value means +-- there is no limit. The default value is 1500. +-- +-- When the actions in a stream are IO bound, having blocking IO calls, this +-- option can be used to control the maximum number of in-flight IO requests. +-- When the actions are CPU bound this option can be used to control the amount +-- of CPU used by the stream. +-- +maxThreads :: Int -> Config -> Config +maxThreads n st = + st { _threadsHigh = + if n < 0 + then Unlimited + else if n == 0 + then defaultMaxThreads + else Limited (fromIntegral n) + } + +getMaxThreads :: Config -> Limit +getMaxThreads = _threadsHigh + +-- | Specify the maximum size of the buffer for storing the results from +-- concurrent computations. If the buffer becomes full we stop spawning more +-- concurrent tasks until there is space in the buffer. +-- A value of 0 resets the buffer size to default, a negative value means +-- there is no limit. The default value is 1500. +-- +-- CAUTION! using an unbounded 'maxBuffer' value (i.e. a negative value) +-- coupled with an unbounded 'maxThreads' value is a recipe for disaster in +-- presence of infinite streams, or very large streams. Especially, it must +-- not be used when 'pure' is used in 'ZipAsyncM' streams as 'pure' in +-- applicative zip streams generates an infinite stream causing unbounded +-- concurrent generation with no limit on the buffer or threads. +-- +maxBuffer :: Int -> Config -> Config +maxBuffer n st = + st { _bufferHigh = + if n < 0 + then Unlimited + else if n == 0 + then defaultMaxBuffer + else Limited (fromIntegral n) + } + +getMaxBuffer :: Config -> Limit +getMaxBuffer = _bufferHigh + +-- | Specify the stream evaluation rate of a channel. +-- +-- A 'Nothing' value means there is no smart rate control, concurrent execution +-- blocks only if 'maxThreads' or 'maxBuffer' is reached, or there are no more +-- concurrent tasks to execute. This is the default. +-- +-- When rate (throughput) is specified, concurrent production may be ramped +-- up or down automatically to achieve the specified stream throughput. The +-- specific behavior for different styles of 'Rate' specifications is +-- documented under 'Rate'. The effective maximum production rate achieved by +-- a channel is governed by: +-- +-- * The 'maxThreads' limit +-- * The 'maxBuffer' limit +-- * The maximum rate that the stream producer can achieve +-- * The maximum rate that the stream consumer can achieve +-- +-- Maximum production rate is given by: +-- +-- \(rate = \frac{maxThreads}{latency}\) +-- +-- If we know the average latency of the tasks we can set 'maxThreads' +-- accordingly. +-- +rate :: Maybe Rate -> Config -> Config +rate r st = st { _maxStreamRate = r } + +getStreamRate :: Config -> Maybe Rate +getStreamRate = _maxStreamRate + +_setStreamLatency :: Int -> Config -> Config +_setStreamLatency n st = + st { _streamLatency = + if n <= 0 + then Nothing + else Just (fromIntegral n) + } + +getStreamLatency :: Config -> Maybe NanoSecond64 +getStreamLatency = _streamLatency + +-- XXX Rename to "inspect" + +-- | Print debug information about the 'Channel' when the stream ends. When the +-- stream does not end normally, the channel debug information is printed when +-- the channel is garbage collected. If you are expecting but not seeing the +-- debug info try adding a 'performMajorGC' before the program ends. +-- +inspect :: Bool -> Config -> Config +inspect flag st = st { _inspect = flag } + +getInspectMode :: Config -> Bool +getInspectMode = _inspect + +-- | By default, processing of output from the worker threads is given priority +-- over dispatching new workers. More workers are dispatched only when there is +-- no output to process. When 'eager' is set to 'True', workers are dispatched +-- aggresively as long as there is more work to do irrespective of whether +-- there is output pending to be processed by the stream consumer. However, +-- dispatching may stop if 'maxThreads' or 'maxBuffer' is reached. +-- +-- /Note:/ This option has no effect when rate has been specified. +-- +-- /Note:/ Not supported with 'interleaved'. +-- +eager :: Bool -> Config -> Config +eager flag st = st { _eagerDispatch = flag } + +getEagerDispatch :: Config -> Bool +getEagerDispatch = _eagerDispatch + +-- | Specify when the 'Channel' should stop. +stopWhen :: StopWhen -> Config -> Config +stopWhen cond st = st { _stopWhen = cond } + +getStopWhen :: Config -> StopWhen +getStopWhen = _stopWhen + +-- | When enabled the streams may be evaluated cocnurrently but the results are +-- produced in the same sequence as a serial evaluation would produce. +-- +-- /Note:/ Not supported with 'interleaved'. +-- +ordered :: Bool -> Config -> Config +ordered flag st = st { _ordered = flag } + +getOrdered :: Config -> Bool +getOrdered = _ordered + +-- | Interleave the streams fairly instead of prioritizing the left stream. +-- This schedules all streams in a round robin fashion over limited number of +-- threads. +-- +-- /Note:/ Can only be used on finite number of streams. +-- +-- /Note:/ Not supported with 'ordered'. +-- +interleaved :: Bool -> Config -> Config +interleaved flag st = st { _interleaved = flag } + +getInterleaved :: Config -> Bool +getInterleaved = _interleaved + +-- | Spawn bound threads (i.e., spawn threads using 'forkOS' instead of +-- 'forkIO'). The default value is 'False'. +-- +-- /Unimplemented/ +boundThreads :: Bool -> Config -> Config +boundThreads flag st = st { _bound = flag } + +_getBound :: Config -> Bool +_getBound = _bound + +------------------------------------------------------------------------------- +-- Initialization +------------------------------------------------------------------------------- + +newRateInfo :: Config -> IO (Maybe YieldRateInfo) +newRateInfo st = do + -- convert rate in Hertz to latency in Nanoseconds + let rateToLatency r = if r <= 0 then maxBound else round $ 1.0e9 / r + case getStreamRate st of + Just (Rate low goal high buf) -> + let l = rateToLatency goal + minl = rateToLatency high + maxl = rateToLatency low + in mkYieldRateInfo l (LatencyRange minl maxl) buf + Nothing -> return Nothing + + where + + mkYieldRateInfo latency latRange buf = do + measured <- newIORef 0 + wcur <- newIORef (0,0,0) + wcol <- newIORef (0,0,0) + now <- getTime Monotonic + wlong <- newIORef (0,now) + period <- newIORef 1 + gainLoss <- newIORef (Count 0) + + return $ Just YieldRateInfo + { svarLatencyTarget = latency + , svarLatencyRange = latRange + , svarRateBuffer = buf + , svarGainedLostYields = gainLoss + , workerBootstrapLatency = getStreamLatency st + , workerPollingInterval = period + , workerMeasuredLatency = measured + , workerPendingLatency = wcur + , workerCollectedLatency = wcol + , svarAllTimeLatency = wlong + } + +------------------------------------------------------------------------------- +-- Rate +------------------------------------------------------------------------------- + +-- | Same as @rate (Just $ Rate (r/2) r (2*r) maxBound)@ +-- +-- Specifies the average production rate of a stream in number of yields +-- per second (i.e. @Hertz@). Concurrent production is ramped up or down +-- automatically to achieve the specified average yield rate. The rate can +-- go down to half of the specified rate on the lower side and double of +-- the specified rate on the higher side. +-- +avgRate :: Double -> Config -> Config +avgRate r = rate (Just $ Rate (r/2) r (2*r) maxBound) + +-- | Same as @rate (Just $ Rate r r (2*r) maxBound)@ +-- +-- Specifies the minimum rate at which the stream should yield values. As +-- far as possible the yield rate would never be allowed to go below the +-- specified rate, even though it may possibly go above it at times, the +-- upper limit is double of the specified rate. +-- +minRate :: Double -> Config -> Config +minRate r = rate (Just $ Rate r r (2*r) maxBound) + +-- | Same as @rate (Just $ Rate (r/2) r r maxBound)@ +-- +-- Specifies the maximum rate at which the stream should yield values. As +-- far as possible the yield rate would never be allowed to go above the +-- specified rate, even though it may possibly go below it at times, the +-- lower limit is half of the specified rate. This can be useful in +-- applications where certain resource usage must not be allowed to go +-- beyond certain limits. +-- +maxRate :: Double -> Config -> Config +maxRate r = rate (Just $ Rate (r/2) r r maxBound) + +-- | Same as @rate (Just $ Rate r r r 0)@ +-- +-- Specifies a constant yield rate. If for some reason the actual rate +-- goes above or below the specified rate we do not try to recover it by +-- increasing or decreasing the rate in future. This can be useful in +-- applications like graphics frame refresh where we need to maintain a +-- constant refresh rate. +-- +constRate :: Double -> Config -> Config +constRate r = rate (Just $ Rate r r r 0) + +------------------------------------------------------------------------------- +-- Operations +------------------------------------------------------------------------------- + -- | Used by workers to send a value to the channel's output stream. -- -- When a worker is dispatched, a 'WorkerInfo' record is supplied to it by the diff --git a/src/Streamly/Internal/Data/Stream/Concurrent.hs b/src/Streamly/Internal/Data/Stream/Concurrent.hs index 6a9aa0a2d5..3212b4e330 100644 --- a/src/Streamly/Internal/Data/Stream/Concurrent.hs +++ b/src/Streamly/Internal/Data/Stream/Concurrent.hs @@ -91,10 +91,6 @@ import Streamly.Internal.Control.ForkLifted (forkManaged) import Streamly.Internal.Data.Channel.Dispatcher (modifyThread) import Streamly.Internal.Data.Channel.Worker (sendEvent) import Streamly.Internal.Data.Stream (Stream, Step(..)) -import Streamly.Internal.Data.Stream.Channel - ( Channel(..), newChannel, fromChannel, toChannelK, withChannelK - , withChannel, shutdown, chanConcatMapK - ) import qualified Streamly.Internal.Data.IORef as Unboxed import qualified Streamly.Internal.Data.Stream as Stream @@ -103,6 +99,7 @@ import qualified Streamly.Internal.Data.StreamK as K import Prelude hiding (mapM, sequence, concat, concatMap, zipWith) import Streamly.Internal.Data.Channel.Types +import Streamly.Internal.Data.Stream.Channel -- $setup -- diff --git a/src/Streamly/Internal/Data/Stream/Time.hs b/src/Streamly/Internal/Data/Stream/Time.hs index 849bca1e69..bf870adc70 100644 --- a/src/Streamly/Internal/Data/Stream/Time.hs +++ b/src/Streamly/Internal/Data/Stream/Time.hs @@ -70,8 +70,9 @@ import Streamly.Data.Array (Unbox) import Streamly.Internal.Data.Array (Array) import Streamly.Internal.Data.Fold (Fold (..)) import Streamly.Internal.Data.IsMap (IsMap(..)) -import Streamly.Internal.Data.Channel.Types (Rate, rate) +import Streamly.Internal.Data.Channel.Types (Rate) import Streamly.Internal.Data.Stream (Stream) +import Streamly.Internal.Data.Stream.Channel.Type (rate) import Streamly.Internal.Data.Time.Units ( AbsTime , MilliSecond64(..) diff --git a/test/Streamly/Test/Data/Scanl/Concurrent.hs b/test/Streamly/Test/Data/Scanl/Concurrent.hs index 8657b9444c..3cecbe4e6f 100644 --- a/test/Streamly/Test/Data/Scanl/Concurrent.hs +++ b/test/Streamly/Test/Data/Scanl/Concurrent.hs @@ -18,7 +18,6 @@ import Test.Hspec as H import qualified Streamly.Data.Fold as Fold import qualified Streamly.Data.Stream as Stream -import qualified Streamly.Data.Stream.Prelude as Stream import qualified Streamly.Internal.Data.Scanl as Scanl import qualified Streamly.Internal.Data.Scanl.Prelude as Scanl @@ -39,7 +38,7 @@ oddScan = Scanl.filtering odd & Scanl.lmapM (\x -> threadDelay 100 >> pure x) -parDistributeScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO () +parDistributeScan_ScanEnd :: (Scanl.Config -> Scanl.Config) -> IO () parDistributeScan_ScanEnd concOpts = do let streamLen = 10000 evenLen = 100 @@ -54,7 +53,7 @@ parDistributeScan_ScanEnd concOpts = do & Stream.fold Fold.toList sort res1 `shouldBe` [1..evenLen] ++ filter odd [(evenLen+1)..streamLen] -parDemuxScan_ScanEnd :: (Stream.Config -> Stream.Config) -> IO () +parDemuxScan_ScanEnd :: (Scanl.Config -> Scanl.Config) -> IO () parDemuxScan_ScanEnd concOpts = do let streamLen = 10000 evenLen = 100 @@ -74,7 +73,7 @@ parDemuxScan_ScanEnd concOpts = do map snd (filter fst res) `shouldBe` take evenLen [2, 4 ..] map snd (filter (not . fst) res) `shouldBe` filter odd [1..streamLen] -parDistributeScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO () +parDistributeScan_StreamEnd :: (Scanl.Config -> Scanl.Config) -> IO () parDistributeScan_StreamEnd concOpts = do let streamLen = 10000 ref <- newIORef [evenScan, oddScan] @@ -88,7 +87,7 @@ parDistributeScan_StreamEnd concOpts = do & Stream.fold Fold.toList sort res1 `shouldBe` inpList -parDemuxScan_StreamEnd :: (Stream.Config -> Stream.Config) -> IO () +parDemuxScan_StreamEnd :: (Scanl.Config -> Scanl.Config) -> IO () parDemuxScan_StreamEnd concOpts = do let streamLen = 10000 demuxer i = even (i :: Int) @@ -112,10 +111,10 @@ main = hspec #endif $ describe moduleName $ do it "parDistributeScan (stream end) (maxBuffer 1)" - $ parDistributeScan_StreamEnd (Stream.maxBuffer 1) + $ parDistributeScan_StreamEnd (Scanl.maxBuffer 1) it "parDistributeScan (scan end) (maxBuffer 1)" - $ parDistributeScan_ScanEnd (Stream.maxBuffer 1) + $ parDistributeScan_ScanEnd (Scanl.maxBuffer 1) it "parDemuxScan (stream end) (maxBuffer 1)" - $ parDemuxScan_StreamEnd (Stream.maxBuffer 1) + $ parDemuxScan_StreamEnd (Scanl.maxBuffer 1) it "parDemuxScan (scan end) (maxBuffer 1)" - $ parDemuxScan_ScanEnd (Stream.maxBuffer 1) + $ parDemuxScan_ScanEnd (Scanl.maxBuffer 1)