Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
20 changes: 2 additions & 18 deletions core/streamly-core.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -74,24 +74,7 @@ extra-source-files:
configure.ac

-- doctest include files
src/DocTestControlException.hs
src/DocTestDataArray.hs
src/DocTestDataFold.hs
src/DocTestDataMutArray.hs
src/DocTestDataMutArrayGeneric.hs
src/DocTestDataParser.hs
src/DocTestDataParserK.hs
src/DocTestDataScanl.hs
src/DocTestDataStream.hs
src/DocTestDataStreamK.hs
src/DocTestDataUnfold.hs
src/DocTestFileSystemHandle.hs
src/DocTestFileSystemPath.hs
src/DocTestFileSystemPosixPath.hs
src/DocTestFileSystemWindowsPath.hs
src/DocTestUnicodeParser.hs
src/DocTestUnicodeStream.hs
src/DocTestUnicodeString.hs
src/doctest/*.hs

-- This is duplicated
src/Streamly/Internal/Data/Array/ArrayMacros.h
Expand Down Expand Up @@ -327,6 +310,7 @@ library

include-dirs:
src
, src/doctest
, src/Streamly/Internal/Data
, src/Streamly/Internal/Data/Array
, src/Streamly/Internal/Data/Stream
Expand Down
81 changes: 54 additions & 27 deletions src/Streamly/Internal/Data/Scanl/Concurrent.hs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
{-# LANGUAGE CPP #-}
-- |
-- Module : Streamly.Internal.Data.Scanl.Concurrent
-- Copyright : (c) 2024 Composewell Technologies
Expand Down Expand Up @@ -37,12 +38,7 @@ import qualified Streamly.Internal.Data.Stream as Stream
import Streamly.Internal.Data.Fold.Channel.Type
import Streamly.Internal.Data.Channel.Types

-- $setup
-- >>> :set -fno-warn-deprecations
-- >>> import Control.Concurrent (threadDelay)
-- >>> import qualified Streamly.Internal.Data.Stream as Stream
-- >>> import qualified Streamly.Internal.Data.Scanl as Scanl
-- >>> import qualified Streamly.Internal.Data.Scanl.Concurrent as Scanl
#include "DocTestDataScanl.hs"

-------------------------------------------------------------------------------
-- Concurrent scans
Expand Down Expand Up @@ -149,18 +145,28 @@ data ScanState s q db f =
-- XXX We can use a one way mailbox type abstraction instead of using an IORef
-- for adding new folds dynamically.

-- | Evaluate a stream and scan its outputs using zero or more parallel scans,
-- which can be generated dynamically. It takes an action for producing new
-- scans which is run before processing each input. The list of scans produced
-- is added to the currently running scans. If you do not want the same scan
-- added every time then the action should generate it only once (see the
-- example below). If there are no scans available, the input is discarded. The
-- outputs of all the scans are merged in the output stream.
-- | Evaluate a stream by distributing its inputs across zero or more
-- concurrently running scans. New scans can be generated dynamically. Use
-- 'parDistributeScan' for an eaiser to use interface, if you do not need the
-- power of 'parDistributeScanM'.
--
-- If the input buffer (see maxBuffer) is limited then a scan may block until
-- space becomes available in the input buffer. If a scan blocks then input is
-- not provided to any of the scans, input is distributed to scans only when
-- all scans have input buffer available.
-- Before processing each input element, the supplied action is executed to
-- produce additional scans. These scans are appended to the set of currently
-- active scans. If you do not want the same scan to be added repeatedly,
-- ensure that the action only generates it once (see the example below).
--
-- If there are no scans currently active, the input element is discarded.
-- The results from all active scans are collected and lattened into the
-- the output stream.
--
-- Concurrency and buffering:
--
-- If the input buffer (see 'maxBuffer') is bounded, a scan may block until
-- space becomes available. If any scan is blocked on buffer, all scans are
-- blocked. Processing continues only when all scans have buffer space
-- available.
--
-- Example:
--
-- >>> import Data.IORef
-- >>> ref <- newIORef [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
Expand Down Expand Up @@ -246,7 +252,13 @@ parDistributeScanM cfg getFolds (Stream sstep state) =
else return $ Yield outputs (ScanDrain q db running)
step _ ScanStop = return Stop

-- | Like 'parDistributeScanM' but takes a list of static scans.
-- | A pure variant of 'parDistributeScanM' that uses a fixed list of scans.
--
-- The provided scans are started once and run concurrently for the duration
-- of the stream. Each input element is distributed to all active scans, and
-- their outputs are collected and emitted together.
--
-- Example:
--
-- >>> xs = [Scanl.take 5 Scanl.sum, Scanl.take 5 Scanl.length :: Scanl.Scanl IO Int Int]
-- >>> Stream.toList $ Scanl.parDistributeScan id xs (Stream.enumerateFromTo 1 10)
Expand Down Expand Up @@ -275,17 +287,29 @@ data DemuxState s q db f =
-- never delete a key. Whatever we do we should keep the non-concurrent fold as
-- well consistent with that.

-- | Evaluate a stream and send its outputs to the selected scan. The scan is
-- dynamically selected using a key at the time of the first input seen for
-- that key. Any new scan is added to the list of scans which are currently
-- running. If there are no scans available for a given key, the input is
-- discarded. If a constituent scan completes its output is emitted in the
-- output of the composed scan.
-- | Evaluate a stream by routing each input to a scan determined by a key.
--
-- For each distinct key, the first input encountered triggers the creation
-- of a new scan (via the supplied key-to-scan function). This scan is then
-- added to the set of currently active scans. Subsequent inputs with the
-- same key are directed to the same scan.
--
-- If no scan can be created for a key, the input element is discarded.
--
-- When a constituent scan completes, its final output is emitted as part of
-- the composed output stream. The output of 'parDemuxScanM' is a stream of
-- key–value pairs, where each value is the output produced by the scan
-- corresponding to that key.
--
-- For a simpler interface, use 'parDemuxScan' if you do not need the full
-- flexibility of 'parDemuxScanM'.
--
-- Example:
--
-- >>> import qualified Data.Map.Strict as Map
-- >>> import Data.Maybe (fromJust)
-- >>> f1 = ("even", Scanl.take 5 Scanl.sum)
-- >>> f2 = ("odd", Scanl.take 5 Scanl.sum)
-- >>> f2 = ("odd", Scanl.take 5 Scanl.sum)
-- >>> kv = Map.fromList [f1, f2]
-- >>> getScan k = return (fromJust $ Map.lookup k kv)
-- >>> getKey x = if even x then "even" else "odd"
Expand Down Expand Up @@ -386,8 +410,11 @@ parDemuxScanM cfg getKey getFold (Stream sstep state) =
else return $ Yield outputs (DemuxDrain q db keyToChan1)
step _ DemuxStop = return Stop

-- | Like 'parDemuxScanM' but the key to scan mapping is static/pure instead of
-- monadic.
-- | A pure variant of 'parDemuxScanM' where the key-to-scan mapping is
-- static and does not require monadic effects.
--
-- Each distinct key is deterministically mapped to a scan using the provided
-- function. The behavior is otherwise the same as 'parDemuxScanM'.
{-# INLINE parDemuxScan #-}
parDemuxScan :: (MonadAsync m, Ord k) =>
(Config -> Config)
Expand Down
7 changes: 7 additions & 0 deletions src/doctest/DocTestDataScanl.hs
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{- $setup
>>> :m
>>> :set -XFlexibleContexts
>>> import Control.Concurrent (threadDelay)
>>> import qualified Streamly.Data.Stream as Stream
>>> import qualified Streamly.Data.Scanl.Prelude as Scanl
-}
1 change: 1 addition & 0 deletions streamly.cabal
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,7 @@ library
, Streamly.Data.Stream.MkType
, Streamly.Data.Stream.Prelude
, Streamly.Data.Fold.Prelude
, Streamly.Data.Scanl.Prelude

-- Network/IO
, Streamly.Network.Socket
Expand Down
Loading