Skip to content

Commit 4ed6953

Browse files
authored
Merge pull request #281 from fsprojects/repo-assist/design-parity-277-batch2-f661b8cc88e91242
[Repo Assist] Design parity with TaskSeq, batch 2 (#277)
2 parents bc5c1f0 + 8457a34 commit 4ed6953

File tree

5 files changed

+411
-3
lines changed

5 files changed

+411
-3
lines changed

RELEASE_NOTES.md

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
### 4.11.0
2+
3+
* Design parity with FSharp.Control.TaskSeq (#277, batch 2):
4+
* Added `AsyncSeq.tryTail` — returns `None` if the sequence is empty; otherwise returns `Some` of the tail. Safe counterpart to `tail`. Mirrors `TaskSeq.tryTail`.
5+
* Added `AsyncSeq.where` / `AsyncSeq.whereAsync` — aliases for `filter` / `filterAsync`, mirroring the naming convention in `TaskSeq` and F# 8 collection expressions.
6+
* Added `AsyncSeq.lengthBy` / `AsyncSeq.lengthByAsync` — counts elements satisfying a predicate. Mirrors `TaskSeq.lengthBy` / `TaskSeq.lengthByAsync`.
7+
* Added `AsyncSeq.compareWith` / `AsyncSeq.compareWithAsync` — lexicographically compares two async sequences using a comparison function. Mirrors `TaskSeq.compareWith` / `TaskSeq.compareWithAsync`.
8+
* Added `AsyncSeq.takeWhileInclusiveAsync` — async variant of the existing `takeWhileInclusive`. Mirrors `TaskSeq.takeWhileInclusiveAsync`.
9+
* Added `AsyncSeq.skipWhileInclusive` / `AsyncSeq.skipWhileInclusiveAsync` — skips elements while predicate holds and also skips the first non-matching boundary element. Mirrors `TaskSeq.skipWhileInclusive` / `TaskSeq.skipWhileInclusiveAsync`.
10+
* Added `AsyncSeq.appendSeq` — appends a synchronous `seq<'T>` after an async sequence. Mirrors `TaskSeq.appendSeq`.
11+
* Added `AsyncSeq.prependSeq` — prepends a synchronous `seq<'T>` before an async sequence. Mirrors `TaskSeq.prependSeq`.
12+
* Added `AsyncSeq.delay` — defers sequence creation to enumeration time by calling a factory function each time `GetAsyncEnumerator` is called. Mirrors `TaskSeq.delay`.
13+
* Added `AsyncSeq.collectAsync` — like `collect` but the mapping function is asynchronous (`'T -> Async<AsyncSeq<'U>>`). Mirrors `TaskSeq.collectAsync`.
14+
* Added `AsyncSeq.partition` / `AsyncSeq.partitionAsync` — splits a sequence into two arrays using a (optionally async) predicate; the first array contains matching elements, the second non-matching. Mirrors `TaskSeq.partition` / `TaskSeq.partitionAsync`.
15+
116
### 4.10.0
217

318
* Added `AsyncSeq.withCancellation` — returns a new `AsyncSeq` that passes the given `CancellationToken` to `GetAsyncEnumerator`, overriding whatever token would otherwise be supplied. Mirrors `TaskSeq.withCancellation` and is useful when consuming sequences from libraries (e.g. Entity Framework) that accept a cancellation token through `GetAsyncEnumerator`. Part of ongoing design-parity work with FSharp.Control.TaskSeq (see #277).

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -492,8 +492,8 @@ module AsyncSeq =
492492
| Some enum -> dispose enum
493493
| None -> () }) :> AsyncSeq<'T>
494494

495-
let inline delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
496-
AsyncGenerator.delay f
495+
let delay (f: unit -> AsyncSeq<'T>) : AsyncSeq<'T> =
496+
AsyncSeqImpl(fun () -> (f()).GetEnumerator()) :> AsyncSeq<'T>
497497

498498
let bindAsync (f:'T -> AsyncSeq<'U>) (inp:Async<'T>) : AsyncSeq<'U> =
499499
AsyncSeqImpl(fun () ->
@@ -703,6 +703,9 @@ module AsyncSeq =
703703
let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> =
704704
AsyncSeqImpl(fun () -> new OptimizedCollectEnumerator<'T, 'U>(f, inp) :> IAsyncSeqEnumerator<'U>) :> AsyncSeq<'U>
705705

706+
let collectAsync (mapping: 'T -> Async<AsyncSeq<'U>>) (source: AsyncSeq<'T>) : AsyncSeq<'U> =
707+
collect (fun x -> bindAsync id (mapping x)) source
708+
706709
// let collect (f: 'T -> AsyncSeq<'U>) (inp: AsyncSeq<'T>) : AsyncSeq<'U> =
707710
// AsyncGenerator.collect f inp
708711

@@ -787,6 +790,12 @@ module AsyncSeq =
787790
dispose e
788791
| _ -> () }) :> AsyncSeq<'T>
789792

793+
let appendSeq (seq2: seq<'T>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
794+
append source (ofSeq seq2)
795+
796+
let prependSeq (seq1: seq<'T>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
797+
append (ofSeq seq1) source
798+
790799
// Optimized iterAsync implementation to reduce allocations
791800
type internal OptimizedIterAsyncEnumerator<'T>(enumerator: IAsyncSeqEnumerator<'T>, f: 'T -> Async<unit>) =
792801
let mutable disposed = false
@@ -1311,6 +1320,33 @@ module AsyncSeq =
13111320
let forallAsync f (source : AsyncSeq<'T>) =
13121321
source |> existsAsync (fun v -> async { let! b = f v in return not b }) |> Async.map not
13131322

1323+
let compareWithAsync (comparer: 'T -> 'T -> Async<int>) (source1: AsyncSeq<'T>) (source2: AsyncSeq<'T>) : Async<int> = async {
1324+
use ie1 = source1.GetEnumerator()
1325+
use ie2 = source2.GetEnumerator()
1326+
let! m1 = ie1.MoveNext()
1327+
let! m2 = ie2.MoveNext()
1328+
let b1 = ref m1
1329+
let b2 = ref m2
1330+
let result = ref 0
1331+
let isDone = ref false
1332+
while not isDone.Value do
1333+
match b1.Value, b2.Value with
1334+
| None, None -> isDone := true
1335+
| None, Some _ -> result := -1; isDone := true
1336+
| Some _, None -> result := 1; isDone := true
1337+
| Some v1, Some v2 ->
1338+
let! c = comparer v1 v2
1339+
if c <> 0 then result := c; isDone := true
1340+
else
1341+
let! n1 = ie1.MoveNext()
1342+
let! n2 = ie2.MoveNext()
1343+
b1 := n1
1344+
b2 := n2
1345+
return result.Value }
1346+
1347+
let compareWith (comparer: 'T -> 'T -> int) (source1: AsyncSeq<'T>) (source2: AsyncSeq<'T>) : Async<int> =
1348+
compareWithAsync (fun a b -> comparer a b |> async.Return) source1 source2
1349+
13141350
let foldAsync f (state:'State) (source : AsyncSeq<'T>) =
13151351
match source with
13161352
| :? AsyncSeqOp<'T> as source -> source.FoldAsync f state
@@ -1368,6 +1404,12 @@ module AsyncSeq =
13681404
let length (source : AsyncSeq<'T>) =
13691405
fold (fun st _ -> st + 1L) 0L source
13701406

1407+
let lengthByAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : Async<int64> =
1408+
foldAsync (fun acc x -> async { let! ok = predicate x in return if ok then acc + 1L else acc }) 0L source
1409+
1410+
let lengthBy (predicate: 'T -> bool) (source: AsyncSeq<'T>) : Async<int64> =
1411+
lengthByAsync (predicate >> async.Return) source
1412+
13711413
let inline sum (source : AsyncSeq<'T>) : Async<'T> =
13721414
(LanguagePrimitives.GenericZero, source) ||> fold (+)
13731415

@@ -1498,6 +1540,12 @@ module AsyncSeq =
14981540
let filter f (source : AsyncSeq<'T>) =
14991541
filterAsync (f >> async.Return) source
15001542

1543+
let where (predicate: 'T -> bool) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
1544+
filter predicate source
1545+
1546+
let whereAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
1547+
filterAsync predicate source
1548+
15011549
let except (excluded : seq<'T>) (source : AsyncSeq<'T>) : AsyncSeq<'T> =
15021550
let s = System.Collections.Generic.HashSet(excluded)
15031551
source |> filter (fun x -> not (s.Contains(x)))
@@ -1848,6 +1896,24 @@ module AsyncSeq =
18481896
interface System.IDisposable with
18491897
member _.Dispose() = en.Dispose() }) :> AsyncSeq<'a>
18501898

1899+
let takeWhileInclusiveAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
1900+
AsyncSeqImpl(fun () ->
1901+
let en = source.GetEnumerator()
1902+
let fin = ref false
1903+
{ new IAsyncSeqEnumerator<'T> with
1904+
member _.MoveNext() = async {
1905+
if !fin then return None
1906+
else
1907+
let! next = en.MoveNext()
1908+
match next with
1909+
| None -> return None
1910+
| Some a ->
1911+
let! ok = predicate a
1912+
if ok then return Some a
1913+
else fin := true; return Some a }
1914+
interface System.IDisposable with
1915+
member _.Dispose() = en.Dispose() }) :> AsyncSeq<'T>
1916+
18511917
let skipWhileAsync p (source : AsyncSeq<'T>) : AsyncSeq<_> = asyncSeq {
18521918
use ie = source.GetEnumerator()
18531919
let! move = ie.MoveNext()
@@ -1865,6 +1931,27 @@ module AsyncSeq =
18651931
let! moven = ie.MoveNext()
18661932
b := moven }
18671933

1934+
let skipWhileInclusiveAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
1935+
use ie = source.GetEnumerator()
1936+
let! move = ie.MoveNext()
1937+
let b = ref move
1938+
let doneSkipping = ref false
1939+
while b.Value.IsSome do
1940+
let v = b.Value.Value
1941+
if doneSkipping.Value then
1942+
yield v
1943+
let! moven = ie.MoveNext()
1944+
b := moven
1945+
else
1946+
let! test = predicate v
1947+
if not test then
1948+
doneSkipping := true // skip this boundary element; do not yield it
1949+
let! moven = ie.MoveNext()
1950+
b := moven }
1951+
1952+
let skipWhileInclusive (predicate: 'T -> bool) (source: AsyncSeq<'T>) : AsyncSeq<'T> =
1953+
skipWhileInclusiveAsync (predicate >> async.Return) source
1954+
18681955
#if !FABLE_COMPILER
18691956
let skipUntilSignal (signal:Async<unit>) (source:AsyncSeq<'T>) : AsyncSeq<'T> = asyncSeq {
18701957
use ie = source.GetEnumerator()
@@ -1930,6 +2017,25 @@ module AsyncSeq =
19302017

19312018
let tail (source : AsyncSeq<'T>) : AsyncSeq<'T> = skip 1 source
19322019

2020+
let tryTail (source: AsyncSeq<'T>) : Async<AsyncSeq<'T> option> = async {
2021+
let ie = source.GetEnumerator()
2022+
let! first = ie.MoveNext()
2023+
match first with
2024+
| None ->
2025+
ie.Dispose()
2026+
return None
2027+
| Some _ ->
2028+
return Some (asyncSeq {
2029+
try
2030+
let! next = ie.MoveNext()
2031+
let b = ref next
2032+
while b.Value.IsSome do
2033+
yield b.Value.Value
2034+
let! moven = ie.MoveNext()
2035+
b := moven
2036+
finally
2037+
ie.Dispose() }) }
2038+
19332039
/// Splits an async sequence at the given index, returning the first `count` elements as an array
19342040
/// and the remaining elements as a new AsyncSeq. The source is enumerated once.
19352041
let splitAt (count: int) (source: AsyncSeq<'T>) : Async<'T array * AsyncSeq<'T>> = async {
@@ -1976,6 +2082,17 @@ module AsyncSeq =
19762082
let toArraySynchronously (source:AsyncSeq<'T>) = toArrayAsync source |> Async.RunSynchronously
19772083
#endif
19782084

2085+
let partitionAsync (predicate: 'T -> Async<bool>) (source: AsyncSeq<'T>) : Async<'T[] * 'T[]> = async {
2086+
let trues = ResizeArray<'T>()
2087+
let falses = ResizeArray<'T>()
2088+
do! source |> iterAsync (fun x -> async {
2089+
let! ok = predicate x
2090+
(if ok then trues else falses).Add(x) })
2091+
return trues.ToArray(), falses.ToArray() }
2092+
2093+
let partition (predicate: 'T -> bool) (source: AsyncSeq<'T>) : Async<'T[] * 'T[]> =
2094+
partitionAsync (predicate >> async.Return) source
2095+
19792096
let concatSeq (source:AsyncSeq<#seq<'T>>) : AsyncSeq<'T> = asyncSeq {
19802097
use ie = source.GetEnumerator()
19812098
let! move = ie.MoveNext()

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,18 @@ module AsyncSeq =
6767
/// all elements of the second asynchronous sequence.
6868
val append : seq1:AsyncSeq<'T> -> seq2:AsyncSeq<'T> -> AsyncSeq<'T>
6969

70+
/// Yields all elements of the source asynchronous sequence and then all elements of the
71+
/// synchronous sequence appended at the end.
72+
val appendSeq : seq2:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T>
73+
74+
/// Yields all elements of the synchronous sequence first and then all elements of the
75+
/// source asynchronous sequence.
76+
val prependSeq : seq1:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T>
77+
78+
/// Returns a new async sequence whose enumeration calls the factory function each time
79+
/// it is enumerated. Useful for deferring the creation of a sequence until enumeration begins.
80+
val delay : f:(unit -> AsyncSeq<'T>) -> AsyncSeq<'T>
81+
7082
/// Computation builder that allows creating of asynchronous
7183
/// sequences using the 'asyncSeq { ... }' syntax
7284
type AsyncSeqBuilder =
@@ -125,6 +137,10 @@ module AsyncSeq =
125137
/// the 'for' keyword in asyncSeq computation).
126138
val collect : mapping:('T -> AsyncSeq<'TResult>) -> source:AsyncSeq<'T> -> AsyncSeq<'TResult>
127139

140+
/// Like AsyncSeq.collect but the mapping function is asynchronous. For every input element
141+
/// it calls the specified async function and iterates over all elements of the returned sequence.
142+
val collectAsync : mapping:('T -> Async<AsyncSeq<'TResult>>) -> source:AsyncSeq<'T> -> AsyncSeq<'TResult>
143+
128144
/// Builds a new asynchronous sequence whose elements are generated by
129145
/// applying the specified function to all elements of the input sequence.
130146
///
@@ -389,12 +405,26 @@ module AsyncSeq =
389405
/// Asynchronously determine if the async predicate returns true for all values in the sequence
390406
val forallAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<bool>
391407

408+
/// Compares two async sequences lexicographically using the given synchronous comparison function.
409+
/// Returns a negative integer if source1 < source2, 0 if equal, and a positive integer if source1 > source2.
410+
val compareWith : comparer:('T -> 'T -> int) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> Async<int>
411+
412+
/// Compares two async sequences lexicographically using the given asynchronous comparison function.
413+
/// Returns a negative integer if source1 < source2, 0 if equal, and a positive integer if source1 > source2.
414+
val compareWithAsync : comparer:('T -> 'T -> Async<int>) -> source1:AsyncSeq<'T> -> source2:AsyncSeq<'T> -> Async<int>
415+
392416
/// Return an asynchronous sequence which, when iterated, includes an integer indicating the index of each element in the sequence.
393417
val indexed : source:AsyncSeq<'T> -> AsyncSeq<int64 * 'T>
394418

395419
/// Asynchronously determine the number of elements in the sequence
396420
val length : source:AsyncSeq<'T> -> Async<int64>
397421

422+
/// Asynchronously returns the number of elements in the sequence for which the predicate returns true.
423+
val lengthBy : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<int64>
424+
425+
/// Asynchronously returns the number of elements in the sequence for which the async predicate returns true.
426+
val lengthByAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<int64>
427+
398428
/// Same as AsyncSeq.scanAsync, but the specified function is synchronous.
399429
val scan : folder:('State -> 'T -> 'State) -> state:'State -> source:AsyncSeq<'T> -> AsyncSeq<'State>
400430

@@ -414,6 +444,13 @@ module AsyncSeq =
414444
/// and processes the input element immediately.
415445
val filter : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
416446

447+
/// Alias for AsyncSeq.filter. Returns elements for which the predicate returns true.
448+
/// Mirrors the naming convention in TaskSeq and FSharp.Core.
449+
val where : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
450+
451+
/// Alias for AsyncSeq.filterAsync. Returns elements for which the async predicate returns true.
452+
val whereAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
453+
417454
/// Returns a new asynchronous sequence containing only elements that are not present
418455
/// in the given excluded collection. Uses a HashSet for O(1) lookup. Mirrors Seq.except.
419456
val except : excluded:seq<'T> -> source:AsyncSeq<'T> -> AsyncSeq<'T> when 'T : equality
@@ -593,11 +630,23 @@ module AsyncSeq =
593630
/// Does return the first element that predicate fails
594631
val takeWhileInclusive : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
595632

633+
/// Returns elements from an asynchronous sequence while the specified async predicate holds,
634+
/// and also returns the first element for which the predicate returns false (inclusive).
635+
val takeWhileInclusiveAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
636+
596637
/// Skips elements from an asynchronous sequence while the specified
597638
/// predicate holds and then returns the rest of the sequence. The
598639
/// predicate is evaluated asynchronously.
599640
val skipWhile : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
600641

642+
/// Skips elements from an asynchronous sequence while the predicate holds AND also skips the
643+
/// first element for which the predicate returns false (the boundary element), then returns the rest.
644+
val skipWhileInclusive : predicate:('T -> bool) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
645+
646+
/// Skips elements from an asynchronous sequence while the async predicate holds AND also skips the
647+
/// first element for which the predicate returns false (the boundary element), then returns the rest.
648+
val skipWhileInclusiveAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> AsyncSeq<'T>
649+
601650
/// Returns the first N elements of an asynchronous sequence
602651
/// does not cast an exception if count is larger than the sequence length.
603652
val take : count:int -> source:AsyncSeq<'T> -> AsyncSeq<'T>
@@ -613,6 +662,10 @@ module AsyncSeq =
613662
/// Returns an empty sequence if the source is empty.
614663
val tail : source:AsyncSeq<'T> -> AsyncSeq<'T>
615664

665+
/// Returns None if the source sequence is empty; otherwise returns Some of an async sequence
666+
/// containing all elements except the first. The source is enumerated once.
667+
val tryTail : source:AsyncSeq<'T> -> Async<AsyncSeq<'T> option>
668+
616669
/// Splits an async sequence at the given index. Returns an async computation that yields
617670
/// the first `count` elements as an array and the remaining elements as a new AsyncSeq.
618671
/// The source is enumerated once; the returned AsyncSeq lazily produces the remainder.
@@ -632,6 +685,14 @@ module AsyncSeq =
632685
val toArraySynchronously : source:AsyncSeq<'T> -> 'T []
633686
#endif
634687

688+
/// Splits the sequence into two arrays: the first contains elements for which the predicate
689+
/// returns true, the second contains elements for which it returns false. Mirrors Seq.partition.
690+
val partition : predicate:('T -> bool) -> source:AsyncSeq<'T> -> Async<'T [] * 'T []>
691+
692+
/// Splits the sequence into two arrays using an async predicate: the first contains elements
693+
/// for which the predicate returns true, the second contains elements for which it returns false.
694+
val partitionAsync : predicate:('T -> Async<bool>) -> source:AsyncSeq<'T> -> Async<'T [] * 'T []>
695+
635696
/// Flattens an AsyncSeq of synchronous sequences.
636697
val concatSeq : source:AsyncSeq<#seq<'T>> -> AsyncSeq<'T>
637698

0 commit comments

Comments
 (0)