Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.Indexed.Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,68 @@ module Immutable =
|> TaskSeq.toArrayAsync
|> Task.map (Array.forall (fun (x, y) -> x + 1 = y))
|> Task.map (should be True)

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-indexed returns all 10 pairs with correct zero-based indices`` variant = task {
let! pairs =
Gen.getSeqImmutable variant
|> TaskSeq.indexed
|> TaskSeq.toArrayAsync

pairs |> should be (haveLength 10)

pairs
|> Array.iteri (fun pos (idx, _) -> idx |> should equal pos)
}

[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
let ``TaskSeq-indexed returns values 1 to 10 unchanged`` variant = task {
let! pairs =
Gen.getSeqImmutable variant
|> TaskSeq.indexed
|> TaskSeq.toArrayAsync

pairs |> Array.map snd |> should equal [| 1..10 |]
}

module SideEffects =
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-indexed on side-effect sequence returns correct pairs`` variant = task {
let ts = Gen.getSeqWithSideEffect variant
let! pairs = ts |> TaskSeq.indexed |> TaskSeq.toArrayAsync
pairs |> should be (haveLength 10)

pairs
|> Array.iteri (fun pos (idx, _) -> idx |> should equal pos)
}

[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
let ``TaskSeq-indexed on side-effect sequence is re-evaluated on second iteration`` variant = task {
let ts = Gen.getSeqWithSideEffect variant

let! firstPairs = ts |> TaskSeq.indexed |> TaskSeq.toArrayAsync
let! secondPairs = ts |> TaskSeq.indexed |> TaskSeq.toArrayAsync

// indices always start at 0
firstPairs |> Array.map fst |> should equal [| 0..9 |]
secondPairs |> Array.map fst |> should equal [| 0..9 |]

// values advance due to side effects
firstPairs |> Array.map snd |> should equal [| 1..10 |]
secondPairs |> Array.map snd |> should equal [| 11..20 |]
}

[<Fact>]
let ``TaskSeq-indexed prove index starts at zero even after side effects`` () = task {
let mutable counter = 0

let ts = taskSeq {
for _ in 1..5 do
counter <- counter + 1
yield counter
}

let! pairs = ts |> TaskSeq.indexed |> TaskSeq.toArrayAsync
pairs |> Array.map fst |> should equal [| 0..4 |]
pairs |> Array.map snd |> should equal [| 1..5 |]
}
55 changes: 55 additions & 0 deletions src/FSharp.Control.TaskSeq.Test/TaskSeq.Zip.Tests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,61 @@ module Performance =
combined |> Array.last |> should equal (length, length)
}

module UnequalLength =
[<Fact>]
let ``TaskSeq-zip stops at shorter first sequence`` () = task {
// documented: "when one sequence is exhausted any remaining elements in the other sequence are ignored"
let short = taskSeq { yield! [ 1..5 ] }
let long = taskSeq { yield! [ 1..10 ] }
let! combined = TaskSeq.zip short long |> TaskSeq.toArrayAsync
combined |> should be (haveLength 5)

combined
|> should equal (Array.init 5 (fun i -> i + 1, i + 1))
}

[<Fact>]
let ``TaskSeq-zip stops at shorter second sequence`` () = task {
// documented: "when one sequence is exhausted any remaining elements in the other sequence are ignored"
let long = taskSeq { yield! [ 1..10 ] }
let short = taskSeq { yield! [ 1..3 ] }
let! combined = TaskSeq.zip long short |> TaskSeq.toArrayAsync
combined |> should be (haveLength 3)

combined
|> should equal (Array.init 3 (fun i -> i + 1, i + 1))
}

[<Fact>]
let ``TaskSeq-zip with first sequence empty returns empty`` () =
// documented: remaining elements in the longer sequence are ignored
let empty = taskSeq { yield! ([]: int list) }
let nonEmpty = taskSeq { yield! [ 1..10 ] }
TaskSeq.zip empty nonEmpty |> verifyEmpty

[<Fact>]
let ``TaskSeq-zip with second sequence empty returns empty`` () =
// documented: remaining elements in the longer sequence are ignored
let nonEmpty = taskSeq { yield! [ 1..10 ] }
let empty = taskSeq { yield! ([]: int list) }
TaskSeq.zip nonEmpty empty |> verifyEmpty

[<Fact>]
let ``TaskSeq-zip with singleton first and longer second returns singleton`` () = task {
let one = taskSeq { yield 42 }
let many = taskSeq { yield! [ 1..10 ] }
let! combined = TaskSeq.zip one many |> TaskSeq.toArrayAsync
combined |> should equal [| (42, 1) |]
}

[<Fact>]
let ``TaskSeq-zip with longer first and singleton second returns singleton`` () = task {
let many = taskSeq { yield! [ 1..10 ] }
let one = taskSeq { yield 99 }
let! combined = TaskSeq.zip many one |> TaskSeq.toArrayAsync
combined |> should equal [| (1, 99) |]
}

module Other =
[<Fact>]
let ``TaskSeq-zip zips different types`` () = task {
Expand Down
74 changes: 16 additions & 58 deletions src/FSharp.Control.TaskSeq/TaskSeqInternal.fs
Original file line number Diff line number Diff line change
Expand Up @@ -975,58 +975,7 @@ module internal TaskSeqInternal =
raiseOutOfBounds (nameof index)
}

// Consider turning using an F# version of this instead?
// https://github.com/i3arnon/ConcurrentHashSet
type ConcurrentHashSet<'T when 'T: equality>(ct) =
let _rwLock = new ReaderWriterLockSlim()
let hashSet = HashSet<'T>(Array.empty, HashIdentity.Structural)

member _.Add item =
_rwLock.EnterWriteLock()

try
hashSet.Add item
finally
_rwLock.ExitWriteLock()

member _.AddMany items =
_rwLock.EnterWriteLock()

try
for item in items do
hashSet.Add item |> ignore

finally
_rwLock.ExitWriteLock()

member _.AddManyAsync(source: TaskSeq<'T>) = task {
use e = source.GetAsyncEnumerator(ct)
let mutable go = true
let! step = e.MoveNextAsync()
go <- step

while go do
// NOTE: r/w lock cannot cross thread boundaries. Should we use SemaphoreSlim instead?
// or alternatively, something like this: https://github.com/StephenCleary/AsyncEx/blob/8a73d0467d40ca41f9f9cf827c7a35702243abb8/src/Nito.AsyncEx.Coordination/AsyncReaderWriterLock.cs#L16
// not sure how they compare.

_rwLock.EnterWriteLock()

try
hashSet.Add e.Current |> ignore
finally
_rwLock.ExitWriteLock()

let! step = e.MoveNextAsync()
go <- step
}

interface IDisposable with
override _.Dispose() =
if not (isNull _rwLock) then
_rwLock.Dispose()

let except itemsToExclude (source: TaskSeq<_>) =
let except (itemsToExclude: TaskSeq<_>) (source: TaskSeq<_>) =
checkNonNull (nameof source) source
checkNonNull (nameof itemsToExclude) itemsToExclude

Expand All @@ -1037,9 +986,18 @@ module internal TaskSeqInternal =
go <- step

if step then
// only create hashset by the time we actually start iterating
use hashSet = new ConcurrentHashSet<_>(CancellationToken.None)
do! hashSet.AddManyAsync itemsToExclude
// only create hashset by the time we actually start iterating;
// taskSeq enumerates sequentially, so a plain HashSet suffices β€” no locking needed.
let hashSet = HashSet<_>(HashIdentity.Structural)

use excl = itemsToExclude.GetAsyncEnumerator CancellationToken.None
let! exclStep = excl.MoveNextAsync()
let mutable exclGo = exclStep

while exclGo do
hashSet.Add excl.Current |> ignore
let! exclStep = excl.MoveNextAsync()
exclGo <- exclStep

while go do
let current = e.Current
Expand All @@ -1065,9 +1023,9 @@ module internal TaskSeqInternal =
go <- step

if step then
// only create hashset by the time we actually start iterating
use hashSet = new ConcurrentHashSet<_>(CancellationToken.None)
do hashSet.AddMany itemsToExclude
// only create hashset by the time we actually start iterating;
// initialize directly from the seq β€” taskSeq is sequential so no locking needed.
let hashSet = HashSet<_>(itemsToExclude, HashIdentity.Structural)

while go do
let current = e.Current
Expand Down