|
16 | 16 | -- >>> :set -XTemplateHaskell |
17 | 17 | -- >>> :set -XTypeFamilies |
18 | 18 | -- >>> :set -XUndecidableInstances |
| 19 | +-- >>> :set -XFlexibleContexts |
| 20 | +-- >>> :set -XFlexibleInstances |
| 21 | +-- >>> :set -XGeneralizedNewtypeDeriving |
| 22 | +-- >>> :set -XTypeOperators |
19 | 23 | -- |
20 | 24 | -- Import this module unqualified to bring everything needed in scope without |
21 | 25 | -- having to import several other modules. Also, "Streamly.Data.Stream" or |
22 | 26 | -- "Streamly.Data.Stream.Prelude" must be imported @as Stream@. |
23 | 27 | -- |
24 | 28 | -- >>> import Streamly.Data.Stream.MkType |
25 | 29 | -- >>> import qualified Streamly.Data.Stream.Prelude as Stream |
| 30 | +-- >>> import Streamly.Data.Stream.Prelude (Stream, MonadAsync) |
26 | 31 | -- |
27 | 32 | -- We are describing below many useful types that can be created using macros |
28 | 33 | -- in this module and the behavior of those types. These could be useful if you |
29 | 34 | -- like to program using the monad \"do notation\" instead of using concatMap |
30 | | --- like operations. |
| 35 | +-- like operations explicitly. |
31 | 36 | -- |
32 | 37 | -- == Parallel |
33 | 38 | -- |
34 | | --- A newtype wrapper over the 'Stream' type; the Applicative and Monad |
35 | | --- instances generate a cross product of the two streams in a concurrent |
36 | | --- manner. The order in which the stream elements are produced is not |
37 | | --- deterministic, this is supposed to be used if order does not matter. |
38 | | - |
39 | | --- Loops over the outer stream, generating multiple elements concurrently; for |
40 | | --- each outer stream element, loop over the inner stream concurrently. More |
41 | | --- outer iterations are started only if the existing inner iterations are not |
42 | | --- saturating the resources. |
43 | | --- |
44 | | --- Use 'mkParallel' to construct from 'Stream' type and 'unParallel' to |
45 | | --- deconstruct back to 'Stream'. |
46 | | --- |
47 | 39 | -- >>> :{ |
48 | 40 | -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b |
49 | 41 | -- bind = flip (Stream.parConcatMap id) |
50 | 42 | -- $(mkCrossType "Parallel" "bind" True) |
51 | 43 | -- :} |
52 | 44 | -- |
53 | | --- This is a bounded concurrent, unordered list-transformer (ListT) monad. |
| 45 | +-- A newtype wrapper over 'Stream' that provides a |
| 46 | +-- __bounded concurrent ListT-like monad__, running computations |
| 47 | +-- __concurrently__. Prefer this type when you want high throughput and do not |
| 48 | +-- require ordering guarantees. |
| 49 | +-- |
| 50 | +-- Combining streams with 'Applicative' or 'Monad' does not process them |
| 51 | +-- element-by-element in sequence. Instead, multiple elements are produced |
| 52 | +-- and consumed __at the same time__. |
| 53 | +-- |
| 54 | +-- For example, in 'Applicative', both sides are evaluated concurrently: |
| 55 | +-- |
| 56 | +-- > pure (,) <*> a <*> b |
| 57 | +-- |
| 58 | +-- elements from @a@ and @b@ may be generated in parallel. |
| 59 | +-- |
| 60 | +-- In 'Monad', each element of the outer stream can start its own |
| 61 | +-- concurrent computation: |
| 62 | +-- |
| 63 | +-- > do |
| 64 | +-- > x <- a |
| 65 | +-- > y <- f x |
| 66 | +-- |
| 67 | +-- multiple @x@ values may be generated at once, and for each @x@, |
| 68 | +-- multiple @y@ values may be produced concurrently. |
| 69 | +-- |
| 70 | +-- A useful way to think about this is nested loops: |
| 71 | +-- |
| 72 | +-- * Many iterations of the outer loop can run at the same time |
| 73 | +-- * For each outer iteration, many inner iterations can also run at the same |
| 74 | +-- time |
| 75 | +-- |
| 76 | +-- Conceptually, this behaves like a concurrent list transformer. |
| 77 | +-- |
| 78 | +-- Results are emitted in completion order (first-come-first-served), |
| 79 | +-- not in the original stream order: |
| 80 | +-- |
| 81 | +-- * Values from the outer stream (@x@) may be produced out of order |
| 82 | +-- * Values from the inner stream (@y@) may also be produced out of order |
| 83 | +-- |
| 84 | +-- For deterministic ordering, see 'OrderedParallel'. |
| 85 | +-- |
| 86 | +-- Concurrency is __bounded__: new outer iterations are scheduled only when |
| 87 | +-- existing inner computations are not saturating available resources. |
| 88 | +-- |
| 89 | +-- === Construction |
| 90 | +-- |
| 91 | +-- * Use 'mkParallel' to wrap a 'Stream' as 'Parallel' |
| 92 | +-- * Use 'unParallel' to convert back to 'Stream' |
| 93 | +-- |
| 94 | +-- === Laws and Caveats |
| 95 | +-- |
| 96 | +-- /Non-associative Monad/ |
| 97 | +-- |
| 98 | +-- Due to concurrent, completion-order scheduling: |
| 99 | +-- |
| 100 | +-- * '>>=' is __not associative__ |
| 101 | +-- * The order of effects is __not deterministic__ |
| 102 | +-- * The order of results is __not deterministic__ |
| 103 | +-- |
| 104 | +-- This means: |
| 105 | +-- |
| 106 | +-- > (m >>= f) >>= g ≠ m >>= (\x -> f x >>= g) |
54 | 107 | -- |
55 | | --- WARNING! By design, monad bind of this type is not associative, because of |
56 | | --- concurrency, order of effects as well as results is non-deterministic. |
| 108 | +-- Programs relying on ordering or sequencing of effects may behave |
| 109 | +-- differently under reassociation. |
57 | 110 | -- |
58 | | --- Serves the same purpose as the 'Streamly.Prelude.AsyncT' type in older |
| 111 | +-- This type serves a similar role as 'Streamly.Prelude.AsyncT' in older |
59 | 112 | -- releases. |
60 | 113 | -- |
61 | 114 | -- == FairParallel |
62 | 115 | -- |
63 | | --- Like Parallel but strikes a balance between going deeper into existing |
64 | | --- iterations of the loop and starting new outer loop iterations. |
65 | | --- |
66 | | --- Use 'mkFairParallel' to construct from 'Stream' type and 'unFairParallel' to |
67 | | --- deconstruct back to 'Stream'. |
| 116 | +-- A __bounded concurrent, fair LogicT-like (logic programming) monad__. |
68 | 117 | -- |
69 | 118 | -- >>> :{ |
70 | 119 | -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b |
71 | 120 | -- bind = flip (Stream.parConcatMap (Stream.interleaved True)) |
72 | 121 | -- $(mkCrossType "FairParallel" "bind" True) |
73 | 122 | -- :} |
74 | 123 | -- |
75 | | --- This is a bounded concurrent, fair logic programming (LogicT) monad. |
| 124 | +-- Like 'Parallel', but uses __fair scheduling__: instead of prioritizing |
| 125 | +-- existing outer iterations by running their inner computations further, |
| 126 | +-- it interleaves them with starting new outer iterations. This avoids |
| 127 | +-- starvation and ensures more outer branches make progress, even if some |
| 128 | +-- inner computations are infinite. |
76 | 129 | -- |
77 | | --- WARNING! By design, monad bind of this type is not associative, because of |
78 | | --- concurrency, order of effects as well as results may be unpredictable. |
| 130 | +-- Results are still emitted in completion order (first-come-first-served), and |
| 131 | +-- may be out of order. |
79 | 132 | -- |
80 | | --- Serves the same purpose as the 'Streamly.Prelude.WAsyncT' type in older |
81 | | --- releases. |
| 133 | +-- Use 'mkFairParallel' to construct from 'Stream' and 'unFairParallel' to |
| 134 | +-- convert back to 'Stream'. |
82 | 135 | -- |
83 | | --- == EagerParallel |
| 136 | +-- WARNING! '>>=' is __not associative__ due to concurrent, |
| 137 | +-- completion-order scheduling; effects and results may be observed in |
| 138 | +-- different orders. |
84 | 139 | -- |
85 | | --- Like Parallel, but executes as many actions concurrently as possible. This |
86 | | --- is useful if you want all actions to be scheduled at the same time so that |
87 | | --- something does not get starved due to others. |
| 140 | +-- Serves a similar role as 'Streamly.Prelude.WAsyncT' in older releases. |
88 | 141 | -- |
89 | | --- Use 'mkEagerParallel' to construct from 'Stream' type and 'unEagerParallel' |
90 | | --- to deconstruct back to 'Stream'. |
| 142 | +-- == EagerParallel |
| 143 | +-- |
| 144 | +-- An __unbounded concurrent, unordered ListT-like (list transformer) monad__. |
91 | 145 | -- |
92 | 146 | -- >>> :{ |
93 | 147 | -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b |
94 | | --- parBind = flip (Stream.parConcatMap (Stream.eager True)) |
95 | | --- $(mkCrossType "EagerParallel" "parBind" True) |
| 148 | +-- bind = flip (Stream.parConcatMap (Stream.eager True)) |
| 149 | +-- $(mkCrossType "EagerParallel" "bind" True) |
96 | 150 | -- :} |
97 | 151 | -- |
98 | | --- This is an unbounded concurrent, unordered list transformer (ListT) monad. |
| 152 | +-- Like 'Parallel', but uses __eager scheduling__: it starts all possible |
| 153 | +-- concurrent actions immediately, without waiting for existing ones to make |
| 154 | +-- progress. |
99 | 155 | -- |
100 | | --- WARNING! By design, monad bind of this type is not associative, because of |
101 | | --- concurrency order of effects as well as results may be unpredictable. |
| 156 | +-- This ensures that all computations begin execution (e.g. timers or other |
| 157 | +-- effects start promptly), and avoids starvation due to delayed scheduling. |
| 158 | +-- It is primarily about semantics rather than performance, and may reduce |
| 159 | +-- throughput due to excessive concurrency. |
102 | 160 | -- |
103 | | --- Serves the same purpose as the 'Streamly.Prelude.ParallelT' type in older |
104 | | --- releases. |
| 161 | +-- Results are emitted in completion order (first-come-first-served), and may |
| 162 | +-- be out of order. |
105 | 163 | -- |
106 | | --- == OrderedParallel |
| 164 | +-- Use 'mkEagerParallel' to construct from 'Stream' and 'unEagerParallel' to |
| 165 | +-- convert back to 'Stream'. |
| 166 | +-- |
| 167 | +-- WARNING! '>>=' is __not associative__ due to concurrent, |
| 168 | +-- completion-order scheduling; effects and results may be observed in |
| 169 | +-- different orders. |
| 170 | +-- |
| 171 | +-- Serves a similar role as 'Streamly.Prelude.ParallelT' in older releases. |
107 | 172 | -- |
108 | | --- Like Parallel, runs many iterations concurrently, but stages the results |
109 | | --- such that the results of iterations are presented in the same order as |
110 | | --- specified in the code. This is closest to the serial Nested type in behavior |
111 | | --- among all the concurrent types. |
| 173 | +-- == OrderedParallel |
112 | 174 | -- |
113 | | --- Use 'mkOrderedParallel' to construct from 'Stream' type and |
114 | | --- 'unOrderedParallel' to deconstruct back to 'Stream'. |
| 175 | +-- A __bounded concurrent, ordered ListT-like monad__. |
115 | 176 | -- |
116 | 177 | -- >>> :{ |
117 | 178 | -- bind :: MonadAsync m => Stream m a -> (a -> Stream m b) -> Stream m b |
118 | 179 | -- bind = flip (Stream.parConcatMap (Stream.ordered True)) |
119 | 180 | -- $(mkCrossType "OrderedParallel" "bind" True) |
120 | 181 | -- :} |
121 | 182 | -- |
122 | | --- This is a bounded concurrent, ordered list transformer (ListT) monad. |
| 183 | +-- Like 'Parallel', runs many iterations concurrently, but |
| 184 | +-- __preserves the original stream order__ by staging results. Results are |
| 185 | +-- yielded in the same order as specified in the code. |
123 | 186 | -- |
124 | | --- WARNING! Monad bind of this type is associative for values, but because of |
125 | | --- concurrency, order of effects may be unpredictable. |
| 187 | +-- This is closest in behavior to the serial list transformer monad (ListT) |
| 188 | +-- type among the concurrent types. |
126 | 189 | -- |
127 | | --- Serves the same purpose as the 'Streamly.Prelude.AheadT' type in older |
128 | | --- releases. |
| 190 | +-- Use 'mkOrderedParallel' to construct from 'Stream' and |
| 191 | +-- 'unOrderedParallel' to convert back to 'Stream'. |
| 192 | +-- |
| 193 | +-- WARNING! '>>=' is associative for values, but due to concurrency, |
| 194 | +-- the order of effects may be unpredictable. |
| 195 | +-- |
| 196 | +-- Serves a similar role as 'Streamly.Prelude.AheadT' in older releases. |
129 | 197 | -- |
130 | 198 | -- == Zip |
131 | 199 | -- |
132 | 200 | -- A newtype wrapper over the 'Stream' type, the applicative instance zips two |
133 | 201 | -- streams. |
134 | 202 | -- |
135 | | --- Use 'mkZip' to construct from 'Stream' type and 'unZip' to deconstruct back |
136 | | --- to 'Stream'. |
137 | | --- |
138 | 203 | -- >>> :{ |
139 | 204 | -- zipApply :: Monad m => Stream m (a -> b) -> Stream m a -> Stream m b |
140 | 205 | -- zipApply = Stream.zipWith ($) |
141 | 206 | -- $(mkZipType "Zip" "zipApply" False) |
142 | 207 | -- :} |
143 | 208 | -- |
144 | | --- Same as the deprcated 'Streamly.Prelude.ZipSerialM' type. |
| 209 | +-- Use 'mkZip' to construct from 'Stream' type and 'unZip' to deconstruct back |
| 210 | +-- to 'Stream'. |
| 211 | +-- |
| 212 | +-- Same as the deprecated 'Streamly.Prelude.ZipSerialM'. |
145 | 213 | -- |
146 | 214 | -- == ZipParallel |
147 | 215 | -- |
148 | 216 | -- Like Zip but evaluates the streams being zipped concurrently. |
149 | 217 | -- |
150 | | --- Use 'mkZipParallel' to construct from 'Stream' type and 'unZipParallel' to |
151 | | --- deconstruct back to 'Stream'. |
152 | | --- |
153 | 218 | -- >>> :{ |
154 | 219 | -- parZipApply :: MonadAsync m => Stream m (a -> b) -> Stream m a -> Stream m b |
155 | 220 | -- parZipApply = Stream.parZipWith id id |
156 | 221 | -- $(mkZipType "ZipParallel" "parZipApply" True) |
157 | 222 | -- :} |
158 | 223 | -- |
| 224 | +-- Use 'mkZipParallel' to construct from 'Stream' type and 'unZipParallel' to |
| 225 | +-- deconstruct back to 'Stream'. |
| 226 | +-- |
159 | 227 | -- Same as the deprecated 'Streamly.Prelude.ZipAsync' type. |
160 | 228 | -- |
161 | 229 | -- == Avoiding Template Haskell |
|
0 commit comments