diff --git a/core/src/DocTestControlException.hs b/core/src/doctest/DocTestControlException.hs similarity index 100% rename from core/src/DocTestControlException.hs rename to core/src/doctest/DocTestControlException.hs diff --git a/core/src/DocTestDataArray.hs b/core/src/doctest/DocTestDataArray.hs similarity index 100% rename from core/src/DocTestDataArray.hs rename to core/src/doctest/DocTestDataArray.hs diff --git a/core/src/DocTestDataFold.hs b/core/src/doctest/DocTestDataFold.hs similarity index 100% rename from core/src/DocTestDataFold.hs rename to core/src/doctest/DocTestDataFold.hs diff --git a/core/src/DocTestDataMutArray.hs b/core/src/doctest/DocTestDataMutArray.hs similarity index 100% rename from core/src/DocTestDataMutArray.hs rename to core/src/doctest/DocTestDataMutArray.hs diff --git a/core/src/DocTestDataMutArrayGeneric.hs b/core/src/doctest/DocTestDataMutArrayGeneric.hs similarity index 100% rename from core/src/DocTestDataMutArrayGeneric.hs rename to core/src/doctest/DocTestDataMutArrayGeneric.hs diff --git a/core/src/DocTestDataParser.hs b/core/src/doctest/DocTestDataParser.hs similarity index 100% rename from core/src/DocTestDataParser.hs rename to core/src/doctest/DocTestDataParser.hs diff --git a/core/src/DocTestDataParserK.hs b/core/src/doctest/DocTestDataParserK.hs similarity index 100% rename from core/src/DocTestDataParserK.hs rename to core/src/doctest/DocTestDataParserK.hs diff --git a/core/src/DocTestDataScanl.hs b/core/src/doctest/DocTestDataScanl.hs similarity index 100% rename from core/src/DocTestDataScanl.hs rename to core/src/doctest/DocTestDataScanl.hs diff --git a/core/src/DocTestDataStream.hs b/core/src/doctest/DocTestDataStream.hs similarity index 100% rename from core/src/DocTestDataStream.hs rename to core/src/doctest/DocTestDataStream.hs diff --git a/core/src/DocTestDataStreamK.hs b/core/src/doctest/DocTestDataStreamK.hs similarity index 100% rename from core/src/DocTestDataStreamK.hs rename to core/src/doctest/DocTestDataStreamK.hs diff --git a/core/src/DocTestDataUnfold.hs b/core/src/doctest/DocTestDataUnfold.hs similarity index 100% rename from core/src/DocTestDataUnfold.hs rename to core/src/doctest/DocTestDataUnfold.hs diff --git a/core/src/DocTestFileSystemHandle.hs b/core/src/doctest/DocTestFileSystemHandle.hs similarity index 100% rename from core/src/DocTestFileSystemHandle.hs rename to core/src/doctest/DocTestFileSystemHandle.hs diff --git a/core/src/DocTestFileSystemPath.hs b/core/src/doctest/DocTestFileSystemPath.hs similarity index 100% rename from core/src/DocTestFileSystemPath.hs rename to core/src/doctest/DocTestFileSystemPath.hs diff --git a/core/src/DocTestFileSystemPosixPath.hs b/core/src/doctest/DocTestFileSystemPosixPath.hs similarity index 100% rename from core/src/DocTestFileSystemPosixPath.hs rename to core/src/doctest/DocTestFileSystemPosixPath.hs diff --git a/core/src/DocTestFileSystemWindowsPath.hs b/core/src/doctest/DocTestFileSystemWindowsPath.hs similarity index 100% rename from core/src/DocTestFileSystemWindowsPath.hs rename to core/src/doctest/DocTestFileSystemWindowsPath.hs diff --git a/core/src/DocTestUnicodeParser.hs b/core/src/doctest/DocTestUnicodeParser.hs similarity index 100% rename from core/src/DocTestUnicodeParser.hs rename to core/src/doctest/DocTestUnicodeParser.hs diff --git a/core/src/DocTestUnicodeStream.hs b/core/src/doctest/DocTestUnicodeStream.hs similarity index 100% rename from core/src/DocTestUnicodeStream.hs rename to core/src/doctest/DocTestUnicodeStream.hs diff --git a/core/src/DocTestUnicodeString.hs b/core/src/doctest/DocTestUnicodeString.hs similarity index 100% rename from core/src/DocTestUnicodeString.hs rename to core/src/doctest/DocTestUnicodeString.hs diff --git a/core/streamly-core.cabal b/core/streamly-core.cabal index e4c63691bd..1e3bb67fae 100644 --- a/core/streamly-core.cabal +++ b/core/streamly-core.cabal @@ -74,24 +74,7 @@ extra-source-files: configure.ac -- doctest include files - src/DocTestControlException.hs - src/DocTestDataArray.hs - src/DocTestDataFold.hs - src/DocTestDataMutArray.hs - src/DocTestDataMutArrayGeneric.hs - src/DocTestDataParser.hs - src/DocTestDataParserK.hs - src/DocTestDataScanl.hs - src/DocTestDataStream.hs - src/DocTestDataStreamK.hs - src/DocTestDataUnfold.hs - src/DocTestFileSystemHandle.hs - src/DocTestFileSystemPath.hs - src/DocTestFileSystemPosixPath.hs - src/DocTestFileSystemWindowsPath.hs - src/DocTestUnicodeParser.hs - src/DocTestUnicodeStream.hs - src/DocTestUnicodeString.hs + src/doctest/*.hs -- This is duplicated src/Streamly/Internal/Data/Array/ArrayMacros.h @@ -327,6 +310,7 @@ library include-dirs: src + , src/doctest , src/Streamly/Internal/Data , src/Streamly/Internal/Data/Array , src/Streamly/Internal/Data/Stream diff --git a/src/Streamly/Internal/Data/Scanl/Concurrent.hs b/src/Streamly/Internal/Data/Scanl/Concurrent.hs index 4675630fdb..fcf9b925d8 100644 --- a/src/Streamly/Internal/Data/Scanl/Concurrent.hs +++ b/src/Streamly/Internal/Data/Scanl/Concurrent.hs @@ -1,3 +1,4 @@ +{-# LANGUAGE CPP #-} -- | -- Module : Streamly.Internal.Data.Scanl.Concurrent -- Copyright : (c) 2024 Composewell Technologies @@ -37,12 +38,7 @@ import qualified Streamly.Internal.Data.Stream as Stream import Streamly.Internal.Data.Fold.Channel.Type import Streamly.Internal.Data.Channel.Types --- $setup --- >>> :set -fno-warn-deprecations --- >>> import Control.Concurrent (threadDelay) --- >>> import qualified Streamly.Internal.Data.Stream as Stream --- >>> import qualified Streamly.Internal.Data.Scanl as Scanl --- >>> import qualified Streamly.Internal.Data.Scanl.Concurrent as Scanl +#include "DocTestDataScanl.hs" ------------------------------------------------------------------------------- -- Concurrent scans @@ -149,18 +145,28 @@ data ScanState s q db f = -- XXX We can use a one way mailbox type abstraction instead of using an IORef -- for adding new folds dynamically. --- | Evaluate a stream and scan its outputs using zero or more parallel scans, --- which can be generated dynamically. It takes an action for producing new --- scans which is run before processing each input. The list of scans produced --- is added to the currently running scans. If you do not want the same scan --- added every time then the action should generate it only once (see the --- example below). If there are no scans available, the input is discarded. The --- outputs of all the scans are merged in the output stream. +-- | Evaluate a stream by distributing its inputs across zero or more +-- concurrently running scans. New scans can be generated dynamically. Use +-- 'parDistributeScan' for an eaiser to use interface, if you do not need the +-- power of 'parDistributeScanM'. -- --- If the input buffer (see maxBuffer) is limited then a scan may block until --- space becomes available in the input buffer. If a scan blocks then input is --- not provided to any of the scans, input is distributed to scans only when --- all scans have input buffer available. +-- Before processing each input element, the supplied action is executed to +-- produce additional scans. These scans are appended to the set of currently +-- active scans. If you do not want the same scan to be added repeatedly, +-- ensure that the action only generates it once (see the example below). +-- +-- If there are no scans currently active, the input element is discarded. +-- The results from all active scans are collected and lattened into the +-- the output stream. +-- +-- Concurrency and buffering: +-- +-- If the input buffer (see 'maxBuffer') is bounded, a scan may block until +-- space becomes available. If any scan is blocked on buffer, all scans are +-- blocked. Processing continues only when all scans have buffer space +-- available. +-- +-- Example: -- -- >>> import Data.IORef -- >>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int] @@ -246,7 +252,13 @@ parDistributeScanM cfg getFolds (Stream sstep state) = else return $ Yield outputs (ScanDrain q db running) step _ ScanStop = return Stop --- | Like 'parDistributeScanM' but takes a list of static scans. +-- | A pure variant of 'parDistributeScanM' that uses a fixed list of scans. +-- +-- The provided scans are started once and run concurrently for the duration +-- of the stream. Each input element is distributed to all active scans, and +-- their outputs are collected and emitted together. +-- +-- Example: -- -- >>> xs = [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int] -- >>> Stream.toList $ Scanl.parDistributeScan id xs (Stream.enumerateFromTo 1 10) @@ -275,17 +287,29 @@ data DemuxState s q db f = -- never delete a key. Whatever we do we should keep the non-concurrent fold as -- well consistent with that. --- | Evaluate a stream and send its outputs to the selected scan. The scan is --- dynamically selected using a key at the time of the first input seen for --- that key. Any new scan is added to the list of scans which are currently --- running. If there are no scans available for a given key, the input is --- discarded. If a constituent scan completes its output is emitted in the --- output of the composed scan. +-- | Evaluate a stream by routing each input to a scan determined by a key. +-- +-- For each distinct key, the first input encountered triggers the creation +-- of a new scan (via the supplied key-to-scan function). This scan is then +-- added to the set of currently active scans. Subsequent inputs with the +-- same key are directed to the same scan. +-- +-- If no scan can be created for a key, the input element is discarded. +-- +-- When a constituent scan completes, its final output is emitted as part of +-- the composed output stream. The output of 'parDemuxScanM' is a stream of +-- key–value pairs, where each value is the output produced by the scan +-- corresponding to that key. +-- +-- For a simpler interface, use 'parDemuxScan' if you do not need the full +-- flexibility of 'parDemuxScanM'. +-- +-- Example: -- -- >>> import qualified Data.Map.Strict as Map -- >>> import Data.Maybe (fromJust) -- >>> f1 = ("even", Scanl.take 5 Scanl.sum) --- >>> f2 = ("odd", Scanl.take 5 Scanl.sum) +-- >>> f2 = ("odd", Scanl.take 5 Scanl.sum) -- >>> kv = Map.fromList [f1, f2] -- >>> getScan k = return (fromJust $ Map.lookup k kv) -- >>> getKey x = if even x then "even" else "odd" @@ -386,8 +410,11 @@ parDemuxScanM cfg getKey getFold (Stream sstep state) = else return $ Yield outputs (DemuxDrain q db keyToChan1) step _ DemuxStop = return Stop --- | Like 'parDemuxScanM' but the key to scan mapping is static/pure instead of --- monadic. +-- | A pure variant of 'parDemuxScanM' where the key-to-scan mapping is +-- static and does not require monadic effects. +-- +-- Each distinct key is deterministically mapped to a scan using the provided +-- function. The behavior is otherwise the same as 'parDemuxScanM'. {-# INLINE parDemuxScan #-} parDemuxScan :: (MonadAsync m, Ord k) => (Config -> Config) diff --git a/src/doctest/DocTestDataScanl.hs b/src/doctest/DocTestDataScanl.hs new file mode 100644 index 0000000000..53b80868b6 --- /dev/null +++ b/src/doctest/DocTestDataScanl.hs @@ -0,0 +1,7 @@ +{- $setup +>>> :m +>>> :set -XFlexibleContexts +>>> import Control.Concurrent (threadDelay) +>>> import qualified Streamly.Data.Stream as Stream +>>> import qualified Streamly.Data.Scanl.Prelude as Scanl +-} diff --git a/streamly.cabal b/streamly.cabal index f8d746448d..acd7353f0f 100644 --- a/streamly.cabal +++ b/streamly.cabal @@ -427,6 +427,7 @@ library , Streamly.Data.Stream.MkType , Streamly.Data.Stream.Prelude , Streamly.Data.Fold.Prelude + , Streamly.Data.Scanl.Prelude -- Network/IO , Streamly.Network.Socket