|
| 1 | +(** |
| 2 | +--- |
| 3 | +title: Advanced Task Sequence Operations |
| 4 | +category: Documentation |
| 5 | +categoryindex: 2 |
| 6 | +index: 6 |
| 7 | +description: Advanced F# task sequence operations including groupBy, mapFold, distinct, partition, countBy, compareWith, withCancellation and in-place editing. |
| 8 | +keywords: F#, task sequences, TaskSeq, IAsyncEnumerable, groupBy, mapFold, distinct, distinctUntilChanged, except, partition, countBy, compareWith, withCancellation, insertAt, removeAt, updateAt |
| 9 | +--- |
| 10 | +*) |
| 11 | +(*** condition: prepare ***) |
| 12 | +#nowarn "211" |
| 13 | +#I "../src/FSharp.Control.TaskSeq/bin/Release/netstandard2.1" |
| 14 | +#r "FSharp.Control.TaskSeq.dll" |
| 15 | +(*** condition: fsx ***) |
| 16 | +#if FSX |
| 17 | +#r "nuget: FSharp.Control.TaskSeq,{{fsdocs-package-version}}" |
| 18 | +#endif // FSX |
| 19 | +(*** condition: ipynb ***) |
| 20 | +#if IPYNB |
| 21 | +#r "nuget: FSharp.Control.TaskSeq,{{fsdocs-package-version}}" |
| 22 | +#endif // IPYNB |
| 23 | + |
| 24 | +(** |
| 25 | +
|
| 26 | +# Advanced Task Sequence Operations |
| 27 | +
|
| 28 | +This page covers advanced `TaskSeq<'T>` operations: grouping, stateful transformation with |
| 29 | +`mapFold`, deduplication, set-difference, partitioning, counting by key, lexicographic |
| 30 | +comparison, cancellation, and positional editing. |
| 31 | +
|
| 32 | +*) |
| 33 | + |
| 34 | +open System.Threading |
| 35 | +open System.Threading.Tasks |
| 36 | +open FSharp.Control |
| 37 | + |
| 38 | +(** |
| 39 | +
|
| 40 | +--- |
| 41 | +
|
| 42 | +## groupBy and groupByAsync |
| 43 | +
|
| 44 | +`TaskSeq.groupBy` partitions a sequence into groups by a key-projection function. The result |
| 45 | +is an array of `(key, elements[])` pairs, one per distinct key, in order of first occurrence. |
| 46 | +
|
| 47 | +> **Note:** `groupBy` consumes the entire source before returning. Do not use it on |
| 48 | +> potentially infinite sequences. |
| 49 | +
|
| 50 | +*) |
| 51 | + |
| 52 | +type Event = { EntityId: int; Payload: string } |
| 53 | + |
| 54 | +let events : TaskSeq<Event> = |
| 55 | + TaskSeq.ofList |
| 56 | + [ { EntityId = 1; Payload = "A" } |
| 57 | + { EntityId = 2; Payload = "B" } |
| 58 | + { EntityId = 1; Payload = "C" } |
| 59 | + { EntityId = 3; Payload = "D" } |
| 60 | + { EntityId = 2; Payload = "E" } ] |
| 61 | + |
| 62 | +// groups: (1, [A;C]), (2, [B;E]), (3, [D]) |
| 63 | +let grouped : Task<(int * Event[])[]> = |
| 64 | + events |> TaskSeq.groupBy (fun e -> e.EntityId) |
| 65 | + |
| 66 | +(** |
| 67 | +
|
| 68 | +`TaskSeq.groupByAsync` accepts an async key projection: |
| 69 | +
|
| 70 | +*) |
| 71 | + |
| 72 | +let groupedAsync : Task<(int * Event[])[]> = |
| 73 | + events |> TaskSeq.groupByAsync (fun e -> task { return e.EntityId }) |
| 74 | + |
| 75 | +(** |
| 76 | +
|
| 77 | +--- |
| 78 | +
|
| 79 | +## countBy and countByAsync |
| 80 | +
|
| 81 | +`TaskSeq.countBy` counts how many elements map to each key, returning `(key, count)[]`: |
| 82 | +
|
| 83 | +*) |
| 84 | + |
| 85 | +let counts : Task<(int * int)[]> = |
| 86 | + events |> TaskSeq.countBy (fun e -> e.EntityId) |
| 87 | +// (1,2), (2,2), (3,1) |
| 88 | + |
| 89 | +(** |
| 90 | +
|
| 91 | +--- |
| 92 | +
|
| 93 | +## mapFold and mapFoldAsync |
| 94 | +
|
| 95 | +`TaskSeq.mapFold` threads a state accumulator through a sequence while simultaneously mapping |
| 96 | +each element to a result value. The output is a task returning a pair of `(result[], finalState)`: |
| 97 | +
|
| 98 | +*) |
| 99 | + |
| 100 | +// Number each word sequentially while building a running concatenation |
| 101 | +let words : TaskSeq<string> = |
| 102 | + TaskSeq.ofList [ "hello"; "world"; "foo" ] |
| 103 | + |
| 104 | +let numbered : Task<string[] * int> = |
| 105 | + words |
| 106 | + |> TaskSeq.mapFold (fun count w -> $"{count}: {w}", count + 1) 0 |
| 107 | + |
| 108 | +// result: ([| "0: hello"; "1: world"; "2: foo" |], 3) |
| 109 | + |
| 110 | +(** |
| 111 | +
|
| 112 | +`TaskSeq.mapFoldAsync` is the same but the mapping function returns `Task<'Result * 'State>`. |
| 113 | +
|
| 114 | +--- |
| 115 | +
|
| 116 | +## scan and scanAsync |
| 117 | +
|
| 118 | +`TaskSeq.scan` is the streaming sibling of `fold`: it emits each intermediate state as a new |
| 119 | +element, starting with the initial state: |
| 120 | +
|
| 121 | +*) |
| 122 | + |
| 123 | +let numbers : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 }) |
| 124 | + |
| 125 | +let runningTotals : TaskSeq<int> = |
| 126 | + numbers |> TaskSeq.scan (fun acc n -> acc + n) 0 |
| 127 | + |
| 128 | +// yields: 0, 1, 3, 6, 10, 15 |
| 129 | + |
| 130 | +(** |
| 131 | +
|
| 132 | +--- |
| 133 | +
|
| 134 | +## distinct and distinctBy |
| 135 | +
|
| 136 | +`TaskSeq.distinct` removes duplicates (keeps first occurrence), using generic equality: |
| 137 | +
|
| 138 | +*) |
| 139 | + |
| 140 | +let withDups : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 2; 3; 1; 4 ] |
| 141 | + |
| 142 | +let deduped : TaskSeq<int> = withDups |> TaskSeq.distinct // 1, 2, 3, 4 |
| 143 | + |
| 144 | +(** |
| 145 | +
|
| 146 | +`TaskSeq.distinctBy` deduplicates by a key projection: |
| 147 | +
|
| 148 | +*) |
| 149 | + |
| 150 | +let strings : TaskSeq<string> = |
| 151 | + TaskSeq.ofList [ "hello"; "HELLO"; "world"; "WORLD" ] |
| 152 | + |
| 153 | +let caseInsensitiveDistinct : TaskSeq<string> = |
| 154 | + strings |> TaskSeq.distinctBy (fun s -> s.ToLowerInvariant()) |
| 155 | +// "hello", "world" |
| 156 | + |
| 157 | +(** |
| 158 | +
|
| 159 | +> **Note:** both `distinct` and `distinctBy` buffer all unique keys in a hash set. Do not use |
| 160 | +> them on potentially infinite sequences. |
| 161 | +
|
| 162 | +`TaskSeq.distinctByAsync` accepts an async key projection. |
| 163 | +
|
| 164 | +--- |
| 165 | +
|
| 166 | +## distinctUntilChanged |
| 167 | +
|
| 168 | +`TaskSeq.distinctUntilChanged` removes consecutive duplicates only — it does not buffer the |
| 169 | +whole sequence, so it is safe on infinite streams: |
| 170 | +
|
| 171 | +*) |
| 172 | + |
| 173 | +let run : TaskSeq<int> = TaskSeq.ofList [ 1; 1; 2; 2; 2; 3; 1; 1 ] |
| 174 | + |
| 175 | +let noConsecDups : TaskSeq<int> = run |> TaskSeq.distinctUntilChanged |
| 176 | +// 1, 2, 3, 1 |
| 177 | + |
| 178 | +(** |
| 179 | +
|
| 180 | +--- |
| 181 | +
|
| 182 | +## except and exceptOfSeq |
| 183 | +
|
| 184 | +`TaskSeq.except itemsToExclude source` returns elements of `source` that do not appear in |
| 185 | +`itemsToExclude`. The exclusion set is materialised eagerly before iteration: |
| 186 | +
|
| 187 | +*) |
| 188 | + |
| 189 | +let exclusions : TaskSeq<int> = TaskSeq.ofList [ 2; 4 ] |
| 190 | +let source : TaskSeq<int> = TaskSeq.ofSeq (seq { 1..5 }) |
| 191 | + |
| 192 | +let filtered : TaskSeq<int> = TaskSeq.except exclusions source // 1, 3, 5 |
| 193 | + |
| 194 | +(** |
| 195 | +
|
| 196 | +`TaskSeq.exceptOfSeq` accepts a plain `seq<'T>` as the exclusion set. |
| 197 | +
|
| 198 | +--- |
| 199 | +
|
| 200 | +## partition and partitionAsync |
| 201 | +
|
| 202 | +`TaskSeq.partition` splits the sequence into two arrays in a single pass. Elements for which |
| 203 | +the predicate returns `true` go into the first array; the rest into the second: |
| 204 | +
|
| 205 | +*) |
| 206 | + |
| 207 | +let partitioned : Task<int[] * int[]> = |
| 208 | + source |> TaskSeq.partition (fun n -> n % 2 = 0) |
| 209 | +// trueItems: [|2;4|] falseItems: [|1;3;5|] |
| 210 | + |
| 211 | +(** |
| 212 | +
|
| 213 | +`TaskSeq.partitionAsync` accepts an async predicate. |
| 214 | +
|
| 215 | +--- |
| 216 | +
|
| 217 | +## compareWith and compareWithAsync |
| 218 | +
|
| 219 | +`TaskSeq.compareWith` performs a lexicographic comparison of two sequences using a custom |
| 220 | +comparer. It returns the first non-zero comparison result, or `0` if the sequences are |
| 221 | +element-wise equal and have the same length: |
| 222 | +
|
| 223 | +*) |
| 224 | + |
| 225 | +let a : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 3 ] |
| 226 | +let b : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4 ] |
| 227 | + |
| 228 | +let cmp : Task<int> = |
| 229 | + TaskSeq.compareWith (fun x y -> compare x y) a b |
| 230 | +// negative (a < b) |
| 231 | + |
| 232 | +(** |
| 233 | +
|
| 234 | +`TaskSeq.compareWithAsync` accepts an async comparer. |
| 235 | +
|
| 236 | +--- |
| 237 | +
|
| 238 | +## withCancellation |
| 239 | +
|
| 240 | +`TaskSeq.withCancellation token source` injects a `CancellationToken` into the underlying |
| 241 | +`IAsyncEnumerable<'T>`. This is equivalent to calling `.WithCancellation(token)` in C# and |
| 242 | +is useful when consuming sequences from libraries (e.g. Entity Framework Core) that require a |
| 243 | +token at the enumeration site: |
| 244 | +
|
| 245 | +*) |
| 246 | + |
| 247 | +let cts = new CancellationTokenSource() |
| 248 | + |
| 249 | +let cancellable : TaskSeq<int> = |
| 250 | + source |> TaskSeq.withCancellation cts.Token |
| 251 | + |
| 252 | +(** |
| 253 | +
|
| 254 | +--- |
| 255 | +
|
| 256 | +## Positional editing |
| 257 | +
|
| 258 | +`TaskSeq.insertAt`, `TaskSeq.insertManyAt`, `TaskSeq.removeAt`, `TaskSeq.removeManyAt`, and |
| 259 | +`TaskSeq.updateAt` produce new sequences with an element inserted, removed, or replaced at a |
| 260 | +given zero-based index: |
| 261 | +
|
| 262 | +*) |
| 263 | + |
| 264 | +let original : TaskSeq<int> = TaskSeq.ofList [ 1; 2; 4; 5 ] |
| 265 | + |
| 266 | +let inserted : TaskSeq<int> = original |> TaskSeq.insertAt 2 3 // 1,2,3,4,5 |
| 267 | + |
| 268 | +let removed : TaskSeq<int> = original |> TaskSeq.removeAt 1 // 1,4,5 |
| 269 | + |
| 270 | +let updated : TaskSeq<int> = original |> TaskSeq.updateAt 0 99 // 99,2,4,5 |
| 271 | + |
| 272 | +let manyInserted : TaskSeq<int> = |
| 273 | + original |
| 274 | + |> TaskSeq.insertManyAt 2 (TaskSeq.ofList [ 10; 11 ]) |
| 275 | +// 1, 2, 10, 11, 4, 5 |
| 276 | + |
| 277 | +let manyRemoved : TaskSeq<int> = original |> TaskSeq.removeManyAt 1 2 // 1, 5 |
0 commit comments