|
| 1 | +(** |
| 2 | +
|
| 3 | +*) |
| 4 | +#r "nuget: FSharp.Control.TaskSeq,1.0.0" |
| 5 | +(** |
| 6 | +# Advanced Task Sequence Operations |
| 7 | +
|
| 8 | +This page covers advanced `TaskSeq<'T>` operations: grouping, stateful transformation with |
| 9 | +`mapFold` and `threadState`, deduplication, set-difference, partitioning, counting by key, lexicographic |
| 10 | +comparison, cancellation, and positional editing. |
| 11 | +
|
| 12 | +*) |
| 13 | +open System.Threading |
| 14 | +open System.Threading.Tasks |
| 15 | +open FSharp.Control |
| 16 | +(** |
| 17 | +----------------------- |
| 18 | +
|
| 19 | +## groupBy and groupByAsync |
| 20 | +
|
| 21 | +`TaskSeq.groupBy` partitions a sequence into groups by a key-projection function. The result |
| 22 | +is an array of `(key, elements[])` pairs, one per distinct key, in order of first occurrence. |
| 23 | +
|
| 24 | +> **Note:** `groupBy` consumes the entire source before returning. Do not use it on |
| 25 | +potentially infinite sequences. |
| 26 | +> |
| 27 | +
|
| 28 | +*) |
| 29 | +type Event = { EntityId: int; Payload: string } |
| 30 | + |
| 31 | +let events : TaskSeq<Event> = |
| 32 | + TaskSeq.ofList |
| 33 | + [ { EntityId = 1; Payload = "A" } |
| 34 | + { EntityId = 2; Payload = "B" } |
| 35 | + { EntityId = 1; Payload = "C" } |
| 36 | + { EntityId = 3; Payload = "D" } |
| 37 | + { EntityId = 2; Payload = "E" } ] |
| 38 | + |
| 39 | +// groups: (1, [A;C]), (2, [B;E]), (3, [D]) |
| 40 | +let grouped : Task<(int * Event[])[]> = |
| 41 | + events |> TaskSeq.groupBy (fun e -> e.EntityId) |
| 42 | +(** |
| 43 | +`TaskSeq.groupByAsync` accepts an async key projection: |
| 44 | +
|
| 45 | +*) |
| 46 | +let groupedAsync : Task<(int * Event[])[]> = |
| 47 | + events |> TaskSeq.groupByAsync (fun e -> task { return e.EntityId }) |
| 48 | +(** |
| 49 | +----------------------- |
| 50 | +
|
| 51 | +## countBy and countByAsync |
| 52 | +
|
| 53 | +`TaskSeq.countBy` counts how many elements map to each key, returning `(key, count)[]`: |
| 54 | +
|
| 55 | +*) |
| 56 | +let counts : Task<(int * int)[]> = |
| 57 | + events |> TaskSeq.countBy (fun e -> e.EntityId) |
| 58 | +// (1,2), (2,2), (3,1) |
| 59 | +(** |
| 60 | +----------------------- |
| 61 | +
|
| 62 | +## mapFold and mapFoldAsync |
| 63 | +
|
| 64 | +`TaskSeq.mapFold` threads a state accumulator through a sequence while simultaneously mapping |
| 65 | +each element to a result value. The output is a task returning a pair of `(result[], finalState)`: |
| 66 | +
|
| 67 | +*) |
| 68 | +// Number each word sequentially while building a running concatenation |
| 69 | +let words : TaskSeq<string> = |
| 70 | + TaskSeq.ofList [ "hello"; "world"; "foo" ] |
| 71 | + |
| 72 | +let numbered : Task<string[] * int> = |
| 73 | + words |
| 74 | + |> TaskSeq.mapFold (fun count w -> $"{count}: {w}", count + 1) 0 |
| 75 | + |
| 76 | +// result: ([| "0: hello"; "1: world"; "2: foo" |], 3) |
| 77 | +(** |
| 78 | +`TaskSeq.mapFoldAsync` is the same but the mapping function returns `Task<'Result * 'State>`. |
| 79 | +
|
| 80 | +----------------------- |
| 81 | +
|
| 82 | +## threadState and threadStateAsync |
| 83 | +
|
| 84 | +`TaskSeq.threadState` is the lazy, streaming counterpart to `mapFold`. It threads a state |
| 85 | +accumulator through the sequence while yielding each mapped result — but unlike `mapFold` it |
| 86 | +never materialises the results into an array, and it discards the final state. This makes it |
| 87 | +suitable for infinite sequences and pipelines where intermediate results should be streamed rather |
| 88 | +than buffered: |
| 89 | +
|
| 90 | +*) |
| 91 | +let numbers : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 }) |
| 92 | + |
| 93 | +// Produce a running total without collecting the whole sequence first |
| 94 | +let runningSum : TaskSeq<int> = |
| 95 | + numbers |
| 96 | + |> TaskSeq.threadState (fun acc x -> acc + x, acc + x) 0 |
| 97 | + |
| 98 | +// yields lazily: 1, 3, 6, 10, 15 |
| 99 | +(** |
| 100 | +Compare with `scan`, which also emits a running result but prepends the initial state: |
| 101 | +
|
| 102 | +```fsharp |
| 103 | +let viaScan = numbers |> TaskSeq.scan (fun acc x -> acc + x) 0 |
| 104 | +// yields: 0, 1, 3, 6, 10, 15 (one extra initial element) |
| 105 | +
|
| 106 | +let viaThreadState = numbers |> TaskSeq.threadState (fun acc x -> acc + x, acc + x) 0 |
| 107 | +// yields: 1, 3, 6, 10, 15 (no initial element; result == new state here) |
| 108 | +``` |
| 109 | +
|
| 110 | +`TaskSeq.threadStateAsync` accepts an asynchronous folder: |
| 111 | +
|
| 112 | +*) |
| 113 | +let asyncRunningSum : TaskSeq<int> = |
| 114 | + numbers |
| 115 | + |> TaskSeq.threadStateAsync (fun acc x -> Task.fromResult (acc + x, acc + x)) 0 |
| 116 | +(** |
| 117 | +`TaskSeq.scan` is the streaming sibling of `fold`: it emits each intermediate state as a new |
| 118 | +element, starting with the initial state: |
| 119 | +
|
| 120 | +*) |
| 121 | +let runningTotals : TaskSeq<int> = |
| 122 | + numbers |> TaskSeq.scan (fun acc n -> acc + n) 0 |
| 123 | + |
| 124 | +// yields: 0, 1, 3, 6, 10, 15 |
| 125 | +(** |
| 126 | +----------------------- |
| 127 | +
|
| 128 | +## distinct and distinctBy |
| 129 | +
|
| 130 | +`TaskSeq.distinct` removes duplicates (keeps first occurrence), using generic equality: |
| 131 | +
|
| 132 | +*) |
| 133 | +let withDups : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 2; 3; 1; 4 ] |
| 134 | + |
| 135 | +let deduped : TaskSeq<int> = withDups |> TaskSeq.distinct // 1, 2, 3, 4 |
| 136 | +(** |
| 137 | +`TaskSeq.distinctBy` deduplicates by a key projection: |
| 138 | +
|
| 139 | +*) |
| 140 | +let strings : TaskSeq<string> = |
| 141 | + TaskSeq.ofList [ "hello"; "HELLO"; "world"; "WORLD" ] |
| 142 | + |
| 143 | +let caseInsensitiveDistinct : TaskSeq<string> = |
| 144 | + strings |> TaskSeq.distinctBy (fun s -> s.ToLowerInvariant()) |
| 145 | +// "hello", "world" |
| 146 | +(** |
| 147 | +> **Note:** both `distinct` and `distinctBy` buffer all unique keys in a hash set. Do not use |
| 148 | +them on potentially infinite sequences. |
| 149 | +> |
| 150 | +
|
| 151 | +`TaskSeq.distinctByAsync` accepts an async key projection. |
| 152 | +
|
| 153 | +----------------------- |
| 154 | +
|
| 155 | +## distinctUntilChanged |
| 156 | +
|
| 157 | +`TaskSeq.distinctUntilChanged` removes consecutive duplicates only — it does not buffer the |
| 158 | +whole sequence, so it is safe on infinite streams: |
| 159 | +
|
| 160 | +*) |
| 161 | +let run : TaskSeq<int> = TaskSeq.ofList [ 1; 1; 2; 2; 2; 3; 1; 1 ] |
| 162 | + |
| 163 | +let noConsecDups : TaskSeq<int> = run |> TaskSeq.distinctUntilChanged |
| 164 | +// 1, 2, 3, 1 |
| 165 | +(** |
| 166 | +----------------------- |
| 167 | +
|
| 168 | +## except and exceptOfSeq |
| 169 | +
|
| 170 | +`TaskSeq.except itemsToExclude source` returns elements of `source` that do not appear in |
| 171 | +`itemsToExclude`. The exclusion set is materialised eagerly before iteration: |
| 172 | +
|
| 173 | +*) |
| 174 | +let exclusions : TaskSeq<int> = TaskSeq.ofList [ 2; 4 ] |
| 175 | +let source : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 }) |
| 176 | + |
| 177 | +let filtered : TaskSeq<int> = TaskSeq.except exclusions source // 1, 3, 5 |
| 178 | +(** |
| 179 | +`TaskSeq.exceptOfSeq` accepts a plain `seq<'T>` as the exclusion set. |
| 180 | +
|
| 181 | +----------------------- |
| 182 | +
|
| 183 | +## partition and partitionAsync |
| 184 | +
|
| 185 | +`TaskSeq.partition` splits the sequence into two arrays in a single pass. Elements for which |
| 186 | +the predicate returns `true` go into the first array; the rest into the second: |
| 187 | +
|
| 188 | +*) |
| 189 | +let partitioned : Task<int[] * int[]> = |
| 190 | + source |> TaskSeq.partition (fun n -> n % 2 = 0) |
| 191 | +// trueItems: [|2;4|] falseItems: [|1;3;5|] |
| 192 | +(** |
| 193 | +`TaskSeq.partitionAsync` accepts an async predicate. |
| 194 | +
|
| 195 | +----------------------- |
| 196 | +
|
| 197 | +## compareWith and compareWithAsync |
| 198 | +
|
| 199 | +`TaskSeq.compareWith` performs a lexicographic comparison of two sequences using a custom |
| 200 | +comparer. It returns the first non-zero comparison result, or `0` if the sequences are |
| 201 | +element-wise equal and have the same length: |
| 202 | +
|
| 203 | +*) |
| 204 | +let a : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 3 ] |
| 205 | +let b : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4 ] |
| 206 | + |
| 207 | +let cmp : Task<int> = |
| 208 | + TaskSeq.compareWith (fun x y -> compare x y) a b |
| 209 | +// negative (a < b) |
| 210 | +(** |
| 211 | +`TaskSeq.compareWithAsync` accepts an async comparer. |
| 212 | +
|
| 213 | +----------------------- |
| 214 | +
|
| 215 | +## withCancellation |
| 216 | +
|
| 217 | +`TaskSeq.withCancellation token source` injects a `CancellationToken` into the underlying |
| 218 | +`IAsyncEnumerable<'T>`. This is equivalent to calling `.WithCancellation(token)` in C# and |
| 219 | +is useful when consuming sequences from libraries (e.g. Entity Framework Core) that require a |
| 220 | +token at the enumeration site: |
| 221 | +
|
| 222 | +*) |
| 223 | +let cts = new CancellationTokenSource() |
| 224 | + |
| 225 | +let cancellable : TaskSeq<int> = |
| 226 | + source |> TaskSeq.withCancellation cts.Token |
| 227 | +(** |
| 228 | +----------------------- |
| 229 | +
|
| 230 | +## Positional editing |
| 231 | +
|
| 232 | +`TaskSeq.insertAt`, `TaskSeq.insertManyAt`, `TaskSeq.removeAt`, `TaskSeq.removeManyAt`, and |
| 233 | +`TaskSeq.updateAt` produce new sequences with an element inserted, removed, or replaced at a |
| 234 | +given zero-based index: |
| 235 | +
|
| 236 | +*) |
| 237 | +let original : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4; 5 ] |
| 238 | + |
| 239 | +let inserted : TaskSeq<int> = original |> TaskSeq.insertAt 2 3 // 1,2,3,4,5 |
| 240 | + |
| 241 | +let removed : TaskSeq<int> = original |> TaskSeq.removeAt 1 // 1,4,5 |
| 242 | + |
| 243 | +let updated : TaskSeq<int> = original |> TaskSeq.updateAt 0 99 // 99,2,4,5 |
| 244 | + |
| 245 | +let manyInserted : TaskSeq<int> = |
| 246 | + original |
| 247 | + |> TaskSeq.insertManyAt 2 (TaskSeq.ofList [ 10; 11 ]) |
| 248 | +// 1, 2, 10, 11, 4, 5 |
| 249 | + |
| 250 | +let manyRemoved : TaskSeq<int> = original |> TaskSeq.removeManyAt 1 2 // 1, 5 |
| 251 | + |
0 commit comments