@@ -48,11 +48,12 @@ stream folding operations can also be used, see the docs for more details.
4848Use the following imports to run the snippets shown below:
4949
5050``` haskell
51- import Streamly
52- import Streamly.Prelude ((|:) )
53- import qualified Streamly.Prelude as S
51+ import qualified Streamly.Data.Stream.Prelude as S
52+ import qualified Streamly.Data.Fold as F
5453import qualified Data.Text as Text
5554import Control.Concurrent (threadDelay )
55+ import Control.Exception (Exception , SomeException )
56+ import Control.Monad.Catch (throwM , try )
5657```
5758
5859Let us simulate a URL fetch with a delay of `n` seconds using the following
@@ -71,7 +72,7 @@ You can run any number of actions concurrently. For example, to fetch two URLs
7172concurrently:
7273
7374``` haskell
74- urls <- S. toList $ fromParallel $ getURL 2 |: getURL 1 |: S. nil
75+ getUrlsConcurrently = S. parSequence id $ S. fromList [ getURL 2 , getURL 1 ]
7576```
7677
7778This would return the results in their arrival order i.e. first 1 and then 2.
@@ -81,26 +82,25 @@ concurrently, and even though URL 1 arrives before URL 2 the results will
8182return 2 first and then 1.
8283
8384``` haskell
84- urls <- S. toList $ fromAhead $ getURL 2 |: getURL 1 |: S. nil
85+ getUrlsOrdered = S. parSequence ( S. ordered True ) $ S. fromList [ getURL 2 , getURL 1 ]
8586```
8687
8788### concurrently_
8889
8990Use ` drain ` instead of ` toList ` to run the actions but ignore the results:
9091
9192``` haskell
92- S. drain $ fromParallel $ getURL 1 |: getURL 2 |: S. nil
93+ drainUrlsConcurrently = S. fold F. drain $ S. parMapM id getURL $ S. fromList [ 1 , 2 ]
9394```
9495
95- ### Concurrent Applicative
96+ ### Concurrent Zipping
9697
9798If the actions that you are executing result in different output types you can
98- use applicative zip to collect the results or to directly apply them to a
99- function:
99+ use zip to collect the results or to directly apply them to a function:
100100
101101``` haskell
102- tuples <- S. toList $ fromZipAsync $
103- (,) <$> S. fromEffect (getURLString 1 ) <*> S. fromEffect (getURLText 2 )
102+ concurrentZipping =
103+ S. parZipWith id (,) ( S. fromEffect (getURLString 1 )) ( S. fromEffect (getURLText 2 ) )
104104```
105105
106106### race
@@ -114,14 +114,14 @@ We can run multiple actions concurrently and take the first result that
114114arrives:
115115
116116``` haskell
117- urls <- S. toList $ S. take 1 $ fromParallel $ getURL 1 |: getURL 2 |: S. nil
117+ fastest = S. toList $ S. take 1 $ S. parSequence id $ S. fromList [ getURL 1 , getURL 2 ]
118118```
119119
120120After the first result arrives, the rest of the actions are canceled
121121automatically. In general, we can take first ` n ` results as they arrive:
122122
123123``` haskell
124- urls <- S. toList $ S. take 2 $ fromParallel $ getURL 1 |: getURL 2 |: S. nil
124+ fastestN n = S. toList $ S. take n $ S. parSequence id $ S. fromList [ getURL 1 , getURL 2 ]
125125```
126126
127127#### ` race ` Using Exceptions
@@ -133,54 +133,38 @@ exception to communicate the result. As soon as the first result arrives all
133133other actions will be canceled, for example:
134134
135135``` haskell
136- data Result = Result String deriving Show
137- instance Exception Result
138-
139- main = do
140- url <- try $ S. drain $ fromParallel $
141- (getURL 2 >>= throwM . Result )
142- |: (getURL 1 >>= throwM . Result )
143- |: S. nil
144- case url of
145- Left (e :: SomeException ) -> print e
146- Right _ -> undefined
136+ data Result = Result String deriving Show
137+ instance Exception Result
138+
139+ raceUsingExceptions = do
140+ url <- try $ S. fold F. drain $ S. parSequence id $ S. fromList
141+ [ (getURL 2 >>= throwM . Result )
142+ , (getURL 1 >>= throwM . Result )
143+ ]
144+ case url of
145+ Left (e :: SomeException ) -> print e
146+ Right _ -> undefined
147147```
148148
149149### mapConcurrently
150150
151- There are many ways to map concurrently on a container and collect the results:
152-
153- You can create a concurrent stream from a ` Foldable ` container of monadic
154- actions:
155-
156- ``` haskell
157- urls <- S. toList $ fromAhead $ S. fromFoldableM $ fmap getURL [1 .. 3 ]
158- ```
159-
160- You can first convert a ` Foldable ` into a stream and then map an action on the
161- stream concurrently:
151+ There are many ways to map concurrently on a container and collect the results.
152+ The recommended way is to first convert itinto a stream and then map an action
153+ on the stream concurrently:
162154
163155``` haskell
164- urls <- S. toList $ fromAhead $ S. mapM getURL $ foldMap return [1 .. 3 ]
165- ```
166-
167- You can map a monadic action to a ` Foldable ` container to convert it into a
168- stream and at the same time fold it:
169-
170- ``` haskell
171- urls <- S. toList $ fromAhead $ foldMap (S. fromEffect . getURL) [1 .. 3 ]
156+ mapConcurrently = S. toList $ S. parMapM id getURL $ S. fromList [1 .. 3 ]
172157```
173158
174159### replicateConcurrently
175160
176161Streamly has not just the equivalent of ` replicateConcurrently ` which is
177- ` replicateM ` but many more ways to generate concurrent streams, for example,
178- ` |: ` , ` unfoldrM ` , ` repeatM ` , ` iterateM ` , ` fromFoldableM ` etc. See the
179- [ Streamly.Prelude] ( https://hackage.haskell.org/package/streamly/docs/Streamly-Prelude.html )
180- module documentation for more details.
162+ ` parReplicateM ` but many more ways to generate concurrent streams, for example,
163+ ` parRepeatM ` , ` parBuffered ` , etc. See the ` Streamly.Data.Stream.Prelude ` module
164+ documentation for more details.
181165
182166``` haskell
183- xs <- S. toList $ fromParallel $ S. replicateM 2 $ getURL 1
167+ replicateConcurrently = S. toList $ S. parReplicateM id 2 $ getURL 1
184168```
185169
186170### Functor
@@ -191,14 +175,15 @@ concurrently.
191175To map serially just use ` fmap ` :
192176
193177``` haskell
194- xs <- S. toList $ fromParallel $ fmap (+ 1 ) $ return 1 |: return 2 |: S. nil
178+ serialMap = S. toList $ fmap (+ 1 ) (S. fromList [1 , 2 ] :: S. Stream IO Int )
179+
195180```
196181
197- To map a monadic action concurrently on all elements of the stream use ` mapM ` :
182+ To map a monadic action concurrently on all elements of the stream use ` parMapM ` :
198183
199184``` haskell
200- xs <- S. toList $ fromParallel $ S. mapM ( \ x -> return (x + 1 ))
201- $ return 1 |: return 2 |: S. nil
185+ concurrentMap =
186+ S. toList $ S. parMapM id ( \ x -> pure (x + 1 ) :: IO Int ) $ S. fromList [ 1 , 2 ]
202187```
203188
204189### Semigroup
0 commit comments