-
Notifications
You must be signed in to change notification settings - Fork 11
Expand file tree
/
Copy pathTaskSeq.fs
More file actions
535 lines (409 loc) · 21.4 KB
/
TaskSeq.fs
File metadata and controls
535 lines (409 loc) · 21.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
namespace FSharp.Control
open System.Collections.Generic
open System.Threading
open System.Threading.Tasks
// Just for convenience
module Internal = TaskSeqInternal
[<AutoOpen>]
module TaskSeqExtensions =
// these need to be in a module, not a type for proper auto-initialization of generic values
module TaskSeq =
let empty<'T> = Internal.empty<'T>
let inline sum (source: TaskSeq< ^T >) : Task< ^T > =
if obj.ReferenceEquals(source, null) then
nullArg (nameof source)
task {
use e = source.GetAsyncEnumerator(System.Threading.CancellationToken.None)
let mutable acc = Unchecked.defaultof< ^T>
while! e.MoveNextAsync() do
acc <- acc + e.Current
return acc
}
let inline sumBy (projection: 'T -> ^U) (source: TaskSeq<'T>) : Task< ^U > =
if obj.ReferenceEquals(source, null) then
nullArg (nameof source)
task {
use e = source.GetAsyncEnumerator(System.Threading.CancellationToken.None)
let mutable acc = Unchecked.defaultof< ^U>
while! e.MoveNextAsync() do
acc <- acc + projection e.Current
return acc
}
let inline sumByAsync (projection: 'T -> Task< ^U >) (source: TaskSeq<'T>) : Task< ^U > =
if obj.ReferenceEquals(source, null) then
nullArg (nameof source)
task {
use e = source.GetAsyncEnumerator(System.Threading.CancellationToken.None)
let mutable acc = Unchecked.defaultof< ^U>
while! e.MoveNextAsync() do
let! value = projection e.Current
acc <- acc + value
return acc
}
let inline average (source: TaskSeq< ^T >) : Task< ^T > =
if obj.ReferenceEquals(source, null) then
nullArg (nameof source)
task {
use e = source.GetAsyncEnumerator(System.Threading.CancellationToken.None)
let mutable acc = Unchecked.defaultof< ^T>
let mutable count = 0
while! e.MoveNextAsync() do
acc <- acc + e.Current
count <- count + 1
if count = 0 then
invalidArg (nameof source) "The input task sequence was empty."
return LanguagePrimitives.DivideByInt acc count
}
let inline averageBy (projection: 'T -> ^U) (source: TaskSeq<'T>) : Task< ^U > =
if obj.ReferenceEquals(source, null) then
nullArg (nameof source)
task {
use e = source.GetAsyncEnumerator(System.Threading.CancellationToken.None)
let mutable acc = Unchecked.defaultof< ^U>
let mutable count = 0
while! e.MoveNextAsync() do
acc <- acc + projection e.Current
count <- count + 1
if count = 0 then
invalidArg (nameof source) "The input task sequence was empty."
return LanguagePrimitives.DivideByInt acc count
}
let inline averageByAsync (projection: 'T -> Task< ^U >) (source: TaskSeq<'T>) : Task< ^U > =
if obj.ReferenceEquals(source, null) then
nullArg (nameof source)
task {
use e = source.GetAsyncEnumerator(System.Threading.CancellationToken.None)
let mutable acc = Unchecked.defaultof< ^U>
let mutable count = 0
while! e.MoveNextAsync() do
let! value = projection e.Current
acc <- acc + value
count <- count + 1
if count = 0 then
invalidArg (nameof source) "The input task sequence was empty."
return LanguagePrimitives.DivideByInt acc count
}
[<Sealed; AbstractClass>]
type TaskSeq private () =
// Rules for static classes, see bug report: https://github.com/dotnet/fsharp/issues/8093
// F# does not need this internally, but C# does
// 'Abstract & Sealed': makes it a static class in C#
// the 'private ()' ensure that a constructor is emitted, which is required by IL
static member singleton(value: 'T) = Internal.singleton value
static member replicate count value = Internal.replicate count value
static member isEmpty source = Internal.isEmpty source
//
// Convert 'ToXXX' functions
//
static member toList(source: TaskSeq<'T>) = [
Internal.checkNonNull (nameof source) source
let e = source.GetAsyncEnumerator CancellationToken.None
try
while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do
yield e.Current
finally
e.DisposeAsync().AsTask().Wait()
]
static member toArray(source: TaskSeq<'T>) = [|
Internal.checkNonNull (nameof source) source
let e = source.GetAsyncEnumerator CancellationToken.None
try
while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do
yield e.Current
finally
e.DisposeAsync().AsTask().Wait()
|]
static member toSeq(source: TaskSeq<'T>) =
Internal.checkNonNull (nameof source) source
seq {
let e = source.GetAsyncEnumerator CancellationToken.None
try
while (let vt = e.MoveNextAsync() in if vt.IsCompleted then vt.Result else vt.AsTask().Result) do
yield e.Current
finally
e.DisposeAsync().AsTask().Wait()
}
static member toArrayAsync source =
Internal.toResizeArrayAsync source
|> Task.map (fun a -> a.ToArray())
static member toListAsync source = Internal.toResizeArrayAndMapAsync List.ofSeq source
static member toResizeArrayAsync source = Internal.toResizeArrayAsync source
static member toIListAsync source = Internal.toResizeArrayAndMapAsync (fun x -> x :> IList<_>) source
//
// Convert 'OfXXX' functions
//
static member ofArray(source: 'T[]) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
yield c
}
static member ofList(source: 'T list) = taskSeq {
for c in source do
yield c
}
static member ofSeq(source: 'T seq) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
yield c
}
static member ofResizeArray(source: 'T ResizeArray) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
yield c
}
static member ofTaskSeq(source: #Task<'T> seq) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = c
yield c
}
static member ofTaskList(source: #Task<'T> list) = taskSeq {
for c in source do
let! c = c
yield c
}
static member ofTaskArray(source: #Task<'T> array) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = c
yield c
}
static member ofAsyncSeq(source: Async<'T> seq) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = task { return! c }
yield c
}
static member ofAsyncList(source: Async<'T> list) = taskSeq {
for c in source do
let! c = Task.ofAsync c
yield c
}
static member ofAsyncArray(source: Async<'T> array) =
Internal.checkNonNull (nameof source) source
taskSeq {
for c in source do
let! c = Async.toTask c
yield c
}
//
// Utility functions
//
static member max source = Internal.maxMin max source
static member min source = Internal.maxMin min source
static member maxBy projection source = Internal.maxMinBy (<) projection source // looks like 'less than', is 'greater than'
static member minBy projection source = Internal.maxMinBy (>) projection source
static member maxByAsync projection source = Internal.maxMinByAsync (<) projection source // looks like 'less than', is 'greater than'
static member minByAsync projection source = Internal.maxMinByAsync (>) projection source
static member length source = Internal.lengthBy None source
static member lengthOrMax max source = Internal.lengthBeforeMax max source
static member lengthBy predicate source = Internal.lengthBy (Some(Predicate predicate)) source
static member lengthByAsync predicate source = Internal.lengthBy (Some(PredicateAsync predicate)) source
static member init count initializer = Internal.init (Some count) (InitAction initializer)
static member initInfinite initializer = Internal.init None (InitAction initializer)
static member initAsync count initializer = Internal.init (Some count) (InitActionAsync initializer)
static member initInfiniteAsync initializer = Internal.init None (InitActionAsync initializer)
static member unfold generator state = Internal.unfold generator state
static member unfoldAsync generator state = Internal.unfoldAsync generator state
static member delay(generator: unit -> TaskSeq<'T>) =
{ new IAsyncEnumerable<'T> with
member _.GetAsyncEnumerator(ct) = generator().GetAsyncEnumerator(ct)
}
static member concat(sources: TaskSeq<#TaskSeq<'T>>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner taskseqs, similar to seq
yield! (ts :> TaskSeq<'T>)
}
static member concat(sources: TaskSeq<'T seq>) = // NOTE: we cannot use flex types on two overloads
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner seqs, similar to seq
yield! ts
}
static member concat(sources: TaskSeq<'T[]>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner arrays, similar to seq
yield! ts
}
static member concat(sources: TaskSeq<'T list>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner lists, similar to seq
yield! ts
}
static member concat(sources: TaskSeq<ResizeArray<'T>>) =
Internal.checkNonNull (nameof sources) sources
taskSeq {
for ts in sources do
// no null-check of inner resize arrays, similar to seq
yield! ts
}
static member append (source1: TaskSeq<'T>) (source2: TaskSeq<'T>) =
Internal.checkNonNull (nameof source1) source1
Internal.checkNonNull (nameof source2) source2
taskSeq {
yield! source1
yield! source2
}
static member appendSeq (source1: TaskSeq<'T>) (source2: seq<'T>) =
Internal.checkNonNull (nameof source1) source1
Internal.checkNonNull (nameof source2) source2
taskSeq {
yield! source1
yield! source2
}
static member prependSeq (source1: seq<'T>) (source2: TaskSeq<'T>) =
Internal.checkNonNull (nameof source1) source1
Internal.checkNonNull (nameof source2) source2
taskSeq {
yield! source1
yield! source2
}
//
// iter/map/collect functions
//
static member cast source : TaskSeq<'T> = Internal.map (SimpleAction(fun (x: obj) -> x :?> 'T)) source
static member box source = Internal.map (SimpleAction box) source
static member unbox<'U when 'U: struct>(source: TaskSeq<obj>) : TaskSeq<'U> = Internal.map (SimpleAction unbox) source
static member iter action source = Internal.iter (SimpleAction action) source
static member iteri action source = Internal.iter (CountableAction action) source
static member iterAsync action source = Internal.iter (AsyncSimpleAction action) source
static member iteriAsync action source = Internal.iter (AsyncCountableAction action) source
static member map (mapper: 'T -> 'U) source = Internal.map (SimpleAction mapper) source
static member mapi (mapper: int -> 'T -> 'U) source = Internal.map (CountableAction mapper) source
static member mapAsync mapper source = Internal.map (AsyncSimpleAction mapper) source
static member mapiAsync mapper source = Internal.map (AsyncCountableAction mapper) source
static member collect (binder: 'T -> #TaskSeq<'U>) source = Internal.collect binder source
static member collectSeq (binder: 'T -> #seq<'U>) source = Internal.collectSeq binder source
static member collectAsync (binder: 'T -> #Task<#TaskSeq<'U>>) source : TaskSeq<'U> = Internal.collectAsync binder source
static member collectSeqAsync (binder: 'T -> #Task<#seq<'U>>) source : TaskSeq<'U> = Internal.collectSeqAsync binder source
//
// choosers, pickers and the like
//
static member tryHead source = Internal.tryHead source
static member head source =
Internal.tryHead source
|> Task.map (Option.defaultWith Internal.raiseEmptySeq)
static member tryLast source = Internal.tryLast source
static member last source =
Internal.tryLast source
|> Task.map (Option.defaultWith Internal.raiseEmptySeq)
static member tryTail source = Internal.tryTail source
static member tail source =
Internal.tryTail source
|> Task.map (Option.defaultWith Internal.raiseEmptySeq)
static member tryItem index source = Internal.tryItem index source
static member item index source =
if index < 0 then
invalidArg (nameof index) "The input must be non-negative."
Internal.tryItem index source
|> Task.map (Option.defaultWith Internal.raiseInsufficient)
static member tryExactlyOne source = Internal.tryExactlyOne source
static member exactlyOne source =
Internal.tryExactlyOne source
|> Task.map (Option.defaultWith (fun () -> invalidArg (nameof source) "The input sequence contains more than one element."))
static member indexed(source: TaskSeq<'T>) =
Internal.checkNonNull (nameof source) source
taskSeq {
let mutable i = 0
for x in source do
yield i, x
i <- i + 1
}
static member choose chooser source = Internal.choose (TryPick chooser) source
static member chooseAsync chooser source = Internal.choose (TryPickAsync chooser) source
static member filter predicate source = Internal.filter (Predicate predicate) source
static member filterAsync predicate source = Internal.filter (PredicateAsync predicate) source
static member where predicate source = Internal.filter (Predicate predicate) source
static member whereAsync predicate source = Internal.filter (PredicateAsync predicate) source
static member skip count source = Internal.skipOrTake Skip count source
static member drop count source = Internal.skipOrTake Drop count source
static member take count source = Internal.skipOrTake Take count source
static member truncate count source = Internal.skipOrTake Truncate count source
static member takeWhile predicate source = Internal.takeWhile false (Predicate predicate) source
static member takeWhileAsync predicate source = Internal.takeWhile false (PredicateAsync predicate) source
static member takeWhileInclusive predicate source = Internal.takeWhile true (Predicate predicate) source
static member takeWhileInclusiveAsync predicate source = Internal.takeWhile true (PredicateAsync predicate) source
static member skipWhile predicate source = Internal.skipWhile false (Predicate predicate) source
static member skipWhileAsync predicate source = Internal.skipWhile false (PredicateAsync predicate) source
static member skipWhileInclusive predicate source = Internal.skipWhile true (Predicate predicate) source
static member skipWhileInclusiveAsync predicate source = Internal.skipWhile true (PredicateAsync predicate) source
static member tryPick chooser source = Internal.tryPick (TryPick chooser) source
static member tryPickAsync chooser source = Internal.tryPick (TryPickAsync chooser) source
static member tryFind predicate source = Internal.tryFind (Predicate predicate) source
static member tryFindAsync predicate source = Internal.tryFind (PredicateAsync predicate) source
static member tryFindIndex predicate source = Internal.tryFindIndex (Predicate predicate) source
static member tryFindIndexAsync predicate source = Internal.tryFindIndex (PredicateAsync predicate) source
static member insertAt index value source = Internal.insertAt index (One value) source
static member insertManyAt index values source = Internal.insertAt index (Many values) source
static member removeAt index source = Internal.removeAt index source
static member removeManyAt index count source = Internal.removeManyAt index count source
static member updateAt index value source = Internal.updateAt index value source
static member except itemsToExclude source = Internal.except itemsToExclude source
static member exceptOfSeq itemsToExclude source = Internal.exceptOfSeq itemsToExclude source
static member distinct source = Internal.distinct source
static member distinctBy projection source = Internal.distinctBy projection source
static member distinctByAsync projection source = Internal.distinctByAsync projection source
static member distinctUntilChanged source = Internal.distinctUntilChanged source
static member pairwise source = Internal.pairwise source
static member chunkBySize chunkSize source = Internal.chunkBySize chunkSize source
static member windowed windowSize source = Internal.windowed windowSize source
static member forall predicate source = Internal.forall (Predicate predicate) source
static member forallAsync predicate source = Internal.forall (PredicateAsync predicate) source
static member exists predicate source = Internal.exists (Predicate predicate) source
static member existsAsync predicate source = Internal.exists (PredicateAsync predicate) source
static member contains value source = Internal.contains value source
static member pick chooser source =
Internal.tryPick (TryPick chooser) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member pickAsync chooser source =
Internal.tryPick (TryPickAsync chooser) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member find predicate source =
Internal.tryFind (Predicate predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member findAsync predicate source =
Internal.tryFind (PredicateAsync predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member findIndex predicate source =
Internal.tryFindIndex (Predicate predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
static member findIndexAsync predicate source =
Internal.tryFindIndex (PredicateAsync predicate) source
|> Task.map (Option.defaultWith Internal.raiseNotFound)
//
// zip/unzip/fold etc functions
//
static member zip source1 source2 = Internal.zip source1 source2
static member zip3 source1 source2 source3 = Internal.zip3 source1 source2 source3
static member compareWith comparer source1 source2 = Internal.compareWith comparer source1 source2
static member compareWithAsync comparer source1 source2 = Internal.compareWithAsync comparer source1 source2
static member fold folder state source = Internal.fold (FolderAction folder) state source
static member foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source
static member scan folder state source = Internal.scan (FolderAction folder) state source
static member scanAsync folder state source = Internal.scan (AsyncFolderAction folder) state source
static member reduce folder source = Internal.reduce (FolderAction folder) source
static member reduceAsync folder source = Internal.reduce (AsyncFolderAction folder) source
//
// groupBy/countBy/partition
//
static member groupBy projection source = Internal.groupBy (ProjectorAction projection) source
static member groupByAsync projection source = Internal.groupBy (AsyncProjectorAction projection) source
static member countBy projection source = Internal.countBy (ProjectorAction projection) source
static member countByAsync projection source = Internal.countBy (AsyncProjectorAction projection) source
static member partition predicate source = Internal.partition (Predicate predicate) source
static member partitionAsync predicate source = Internal.partition (PredicateAsync predicate) source
static member mapFold mapping state source = Internal.mapFold (MapFolderAction mapping) state source
static member mapFoldAsync mapping state source = Internal.mapFold (AsyncMapFolderAction mapping) state source