Skip to content

Commit 546c114

Browse files
feat: add TaskSeq.map3 and TaskSeq.map3Async (81 tests)
Implements TaskSeq.map3 and TaskSeq.map3Async, which apply a mapping function to corresponding elements of three task sequences in parallel, stopping when the shortest sequence is exhausted. - TaskSeqInternal.fs: add map3/map3Async implementations modelled on zip3 - TaskSeq.fs: add public static member wrappers - TaskSeq.fsi: add XML-documented signatures - TaskSeq.Map3.Tests.fs: 81 new tests (null checks, empty, immutable, side effects) - README.md: mark map3/map3Async as implemented - release-notes.txt: updated Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5a03593 commit 546c114

File tree

7 files changed

+334
-1
lines changed

7 files changed

+334
-1
lines changed

README.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,7 +313,7 @@ This is what has been implemented so far, is planned or skipped:
313313
| &#x2705; [#53][] | | `lengthBy` | `lengthByAsync` | |
314314
| &#x2705; [#2][] | `map` | `map` | `mapAsync` | |
315315
| | `map2` | `map2` | `map2Async` | |
316-
| | `map3` | `map3` | `map3Async` | |
316+
| &#x2705; | `map3` | `map3` | `map3Async` | |
317317
| | `mapFold` | `mapFold` | `mapFoldAsync` | |
318318
| &#x1f6ab; | `mapFoldBack` | | | [note #2](#note2 "Because of the async nature of TaskSeq sequences, iterating from the back would be bad practice. Instead, materialize the sequence to a list or array and then apply the 'Back' iterators.") |
319319
| &#x2705; [#2][] | `mapi` | `mapi` | `mapiAsync` | |
@@ -599,6 +599,8 @@ module TaskSeq =
599599
val unbox<'U when 'U: struct> : source: TaskSeq<obj> -> TaskSeq<'U>
600600
val updateAt: position:int -> value:'T -> source: TaskSeq<'T> -> TaskSeq<'T>
601601
val zip: source1: TaskSeq<'T> -> source2: TaskSeq<'U> -> TaskSeq<'T * 'U>
602+
val map3: mapping: ('T1 -> 'T2 -> 'T3 -> 'U) -> source1: TaskSeq<'T1> -> source2: TaskSeq<'T2> -> source3: TaskSeq<'T3> -> TaskSeq<'U>
603+
val map3Async: mapping: ('T1 -> 'T2 -> 'T3 -> #Task<'U>) -> source1: TaskSeq<'T1> -> source2: TaskSeq<'T2> -> source3: TaskSeq<'T3> -> TaskSeq<'U>
602604
```
603605

604606
[buildstatus]: https://github.com/fsprojects/FSharp.Control.TaskSeq/actions/workflows/main.yaml

release-notes.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ Release notes:
1717
- adds TaskSeq.unfold and TaskSeq.unfoldAsync, #289
1818
- adds TaskSeq.chunkBySize (closes #258) and TaskSeq.windowed, #289
1919
- fixes: CancellationToken passed to GetAsyncEnumerator is now honored in MoveNextAsync, #179
20+
- adds TaskSeq.map3 and TaskSeq.map3Async
2021

2122
0.5.0
2223
- update engineering to .NET 9/10

src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
<Compile Include="TaskSeq.ToXXX.Tests.fs" />
5959
<Compile Include="TaskSeq.UpdateAt.Tests.fs" />
6060
<Compile Include="TaskSeq.Zip.Tests.fs" />
61+
<Compile Include="TaskSeq.Map3.Tests.fs" />
6162
<Compile Include="TaskSeq.ChunkBySize.Tests.fs" />
6263
<Compile Include="TaskSeq.Windowed.Tests.fs" />
6364
<Compile Include="TaskSeq.Tests.CE.fs" />
Lines changed: 233 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,233 @@
1+
module TaskSeq.Tests.Map3
2+
3+
open Xunit
4+
open FsUnit.Xunit
5+
6+
open FSharp.Control
7+
8+
//
9+
// TaskSeq.map3
10+
// TaskSeq.map3Async
11+
//
12+
13+
module EmptySeq =
14+
[<Fact>]
15+
let ``Null source is invalid for map3`` () =
16+
assertNullArg
17+
<| fun () -> TaskSeq.map3 (fun a b c -> a + b + c) null TaskSeq.empty TaskSeq.empty
18+
19+
assertNullArg
20+
<| fun () -> TaskSeq.map3 (fun a b c -> a + b + c) TaskSeq.empty null TaskSeq.empty
21+
22+
assertNullArg
23+
<| fun () -> TaskSeq.map3 (fun a b c -> a + b + c) TaskSeq.empty TaskSeq.empty null
24+
25+
assertNullArg
26+
<| fun () -> TaskSeq.map3 (fun a b c -> a + b + c) null null null
27+
28+
[<Fact>]
29+
let ``Null source is invalid for map3Async`` () =
30+
assertNullArg
31+
<| fun () -> TaskSeq.map3Async (fun a b c -> task { return a + b + c }) null TaskSeq.empty TaskSeq.empty
32+
33+
assertNullArg
34+
<| fun () -> TaskSeq.map3Async (fun a b c -> task { return a + b + c }) TaskSeq.empty null TaskSeq.empty
35+
36+
assertNullArg
37+
<| fun () -> TaskSeq.map3Async (fun a b c -> task { return a + b + c }) TaskSeq.empty TaskSeq.empty null
38+
39+
assertNullArg
40+
<| fun () -> TaskSeq.map3Async (fun a b c -> task { return a + b + c }) null null null
41+
42+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
43+
let ``TaskSeq-map3 with all empty sequences returns empty`` variant =
44+
TaskSeq.map3 (fun a b c -> a + b + c) (Gen.getEmptyVariant variant) (Gen.getEmptyVariant variant) (Gen.getEmptyVariant variant)
45+
|> verifyEmpty
46+
47+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
48+
let ``TaskSeq-map3Async with all empty sequences returns empty`` variant =
49+
TaskSeq.map3Async
50+
(fun a b c -> task { return a + b + c })
51+
(Gen.getEmptyVariant variant)
52+
(Gen.getEmptyVariant variant)
53+
(Gen.getEmptyVariant variant)
54+
|> verifyEmpty
55+
56+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
57+
let ``TaskSeq-map3 stops when first sequence is empty`` variant =
58+
TaskSeq.map3 (fun a b c -> a + b + c) (Gen.getEmptyVariant variant) (taskSeq { yield 1 }) (taskSeq { yield 2 })
59+
|> verifyEmpty
60+
61+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
62+
let ``TaskSeq-map3 stops when second sequence is empty`` variant =
63+
TaskSeq.map3 (fun a b c -> a + b + c) (taskSeq { yield 1 }) (Gen.getEmptyVariant variant) (taskSeq { yield 2 })
64+
|> verifyEmpty
65+
66+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
67+
let ``TaskSeq-map3 stops when third sequence is empty`` variant =
68+
TaskSeq.map3 (fun a b c -> a + b + c) (taskSeq { yield 1 }) (taskSeq { yield 2 }) (Gen.getEmptyVariant variant)
69+
|> verifyEmpty
70+
71+
72+
module Immutable =
73+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
74+
let ``TaskSeq-map3 maps in correct order`` variant = task {
75+
let one = Gen.getSeqImmutable variant
76+
let two = Gen.getSeqImmutable variant
77+
let three = Gen.getSeqImmutable variant
78+
79+
let! result =
80+
TaskSeq.map3 (fun a b c -> a + b + c) one two three
81+
|> TaskSeq.toArrayAsync
82+
83+
result |> should haveLength 10
84+
85+
result
86+
|> should equal (Array.init 10 (fun i -> (i + 1) * 3))
87+
}
88+
89+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
90+
let ``TaskSeq-map3Async maps in correct order`` variant = task {
91+
let one = Gen.getSeqImmutable variant
92+
let two = Gen.getSeqImmutable variant
93+
let three = Gen.getSeqImmutable variant
94+
95+
let! result =
96+
TaskSeq.map3Async (fun a b c -> task { return a + b + c }) one two three
97+
|> TaskSeq.toArrayAsync
98+
99+
result |> should haveLength 10
100+
101+
result
102+
|> should equal (Array.init 10 (fun i -> (i + 1) * 3))
103+
}
104+
105+
[<Fact>]
106+
let ``TaskSeq-map3 applies mapping to corresponding elements`` () = task {
107+
let one = taskSeq { yield! [ 1..5 ] }
108+
let two = taskSeq { yield! [ 10..10..50 ] }
109+
let three = taskSeq { yield! [ 100..100..500 ] }
110+
111+
let! result =
112+
TaskSeq.map3 (fun a b c -> a + b + c) one two three
113+
|> TaskSeq.toArrayAsync
114+
115+
result |> should equal [| 111; 222; 333; 444; 555 |]
116+
}
117+
118+
[<Fact>]
119+
let ``TaskSeq-map3 works with mixed types`` () = task {
120+
let one = taskSeq {
121+
yield "hello"
122+
yield "world"
123+
}
124+
125+
let two = taskSeq {
126+
yield 1
127+
yield 2
128+
}
129+
130+
let three = taskSeq {
131+
yield true
132+
yield false
133+
}
134+
135+
let! result =
136+
TaskSeq.map3 (fun (s: string) (n: int) (b: bool) -> sprintf "%s-%d-%b" s n b) one two three
137+
|> TaskSeq.toArrayAsync
138+
139+
result |> should equal [| "hello-1-true"; "world-2-false" |]
140+
}
141+
142+
[<Fact>]
143+
let ``TaskSeq-map3 truncates to shortest sequence`` () = task {
144+
let one = taskSeq { yield! [ 1..10 ] }
145+
let two = taskSeq { yield! [ 1..5 ] }
146+
let three = taskSeq { yield! [ 1..3 ] }
147+
148+
let! result =
149+
TaskSeq.map3 (fun a b c -> a + b + c) one two three
150+
|> TaskSeq.toArrayAsync
151+
152+
result |> should haveLength 3
153+
result |> should equal [| 3; 6; 9 |]
154+
}
155+
156+
[<Fact>]
157+
let ``TaskSeq-map3Async truncates to shortest sequence`` () = task {
158+
let one = taskSeq { yield! [ 1..10 ] }
159+
let two = taskSeq { yield! [ 1..5 ] }
160+
let three = taskSeq { yield! [ 1..3 ] }
161+
162+
let! result =
163+
TaskSeq.map3Async (fun a b c -> task { return a + b + c }) one two three
164+
|> TaskSeq.toArrayAsync
165+
166+
result |> should haveLength 3
167+
result |> should equal [| 3; 6; 9 |]
168+
}
169+
170+
[<Fact>]
171+
let ``TaskSeq-map3 works with singleton sequences`` () = task {
172+
let! result =
173+
TaskSeq.map3 (fun a b c -> a + b + c) (TaskSeq.singleton 1) (TaskSeq.singleton 2) (TaskSeq.singleton 3)
174+
|> TaskSeq.toArrayAsync
175+
176+
result |> should equal [| 6 |]
177+
}
178+
179+
[<Fact>]
180+
let ``TaskSeq-map3Async works with singleton sequences`` () = task {
181+
let! result =
182+
TaskSeq.map3Async (fun a b c -> task { return a + b + c }) (TaskSeq.singleton 1) (TaskSeq.singleton 2) (TaskSeq.singleton 3)
183+
|> TaskSeq.toArrayAsync
184+
185+
result |> should equal [| 6 |]
186+
}
187+
188+
[<Fact>]
189+
let ``TaskSeq-map3 mapping function receives correct argument positions`` () = task {
190+
let! result =
191+
TaskSeq.map3
192+
(fun (a: string) (b: int) (c: bool) -> (a, b, c))
193+
(taskSeq { yield "x" })
194+
(taskSeq { yield 42 })
195+
(taskSeq { yield true })
196+
|> TaskSeq.toArrayAsync
197+
198+
result |> should equal [| ("x", 42, true) |]
199+
}
200+
201+
202+
module SideEffects =
203+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
204+
let ``TaskSeq-map3 can deal with side effects in sequences`` variant = task {
205+
let one = Gen.getSeqWithSideEffect variant
206+
let two = Gen.getSeqWithSideEffect variant
207+
let three = Gen.getSeqWithSideEffect variant
208+
209+
let! result =
210+
TaskSeq.map3 (fun a b c -> a + b + c) one two three
211+
|> TaskSeq.toArrayAsync
212+
213+
result |> should haveLength 10
214+
215+
result
216+
|> should equal (Array.init 10 (fun i -> (i + 1) * 3))
217+
}
218+
219+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
220+
let ``TaskSeq-map3Async can deal with side effects in sequences`` variant = task {
221+
let one = Gen.getSeqWithSideEffect variant
222+
let two = Gen.getSeqWithSideEffect variant
223+
let three = Gen.getSeqWithSideEffect variant
224+
225+
let! result =
226+
TaskSeq.map3Async (fun a b c -> task { return a + b + c }) one two three
227+
|> TaskSeq.toArrayAsync
228+
229+
result |> should haveLength 10
230+
231+
result
232+
|> should equal (Array.init 10 (fun i -> (i + 1) * 3))
233+
}

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -512,6 +512,11 @@ type TaskSeq private () =
512512

513513
static member zip source1 source2 = Internal.zip source1 source2
514514
static member zip3 source1 source2 source3 = Internal.zip3 source1 source2 source3
515+
static member map3 (mapping: 'T1 -> 'T2 -> 'T3 -> 'U) source1 source2 source3 = Internal.map3 mapping source1 source2 source3
516+
517+
static member map3Async (mapping: 'T1 -> 'T2 -> 'T3 -> #Task<'U>) source1 source2 source3 =
518+
Internal.map3Async mapping source1 source2 source3
519+
515520
static member fold folder state source = Internal.fold (FolderAction folder) state source
516521
static member foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source
517522
static member scan folder state source = Internal.scan (FolderAction folder) state source

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -749,6 +749,50 @@ type TaskSeq =
749749
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
750750
static member mapiAsync: mapper: (int -> 'T -> #Task<'U>) -> source: TaskSeq<'T> -> TaskSeq<'U>
751751

752+
/// <summary>
753+
/// Builds a new task sequence whose elements are the results of applying the <paramref name="mapping" />
754+
/// function to the corresponding elements of the three input task sequences pairwise.
755+
/// The three sequences need not have equal lengths: when one sequence is exhausted any remaining elements
756+
/// in the other sequences are ignored. Uses <c>CancellationToken.None</c>.
757+
///
758+
/// If <paramref name="mapping" /> is asynchronous, consider using <see cref="TaskSeq.map3Async" />.
759+
/// </summary>
760+
///
761+
/// <param name="mapping">A function to transform triples of items from the input task sequences.</param>
762+
/// <param name="source1">The first input task sequence.</param>
763+
/// <param name="source2">The second input task sequence.</param>
764+
/// <param name="source3">The third input task sequence.</param>
765+
/// <returns>The resulting task sequence.</returns>
766+
/// <exception cref="T:ArgumentNullException">Thrown when any of the input task sequences is null.</exception>
767+
static member map3:
768+
mapping: ('T1 -> 'T2 -> 'T3 -> 'U) ->
769+
source1: TaskSeq<'T1> ->
770+
source2: TaskSeq<'T2> ->
771+
source3: TaskSeq<'T3> ->
772+
TaskSeq<'U>
773+
774+
/// <summary>
775+
/// Builds a new task sequence whose elements are the results of applying the asynchronous <paramref name="mapping" />
776+
/// function to the corresponding elements of the three input task sequences pairwise.
777+
/// The three sequences need not have equal lengths: when one sequence is exhausted any remaining elements
778+
/// in the other sequences are ignored. Uses <c>CancellationToken.None</c>.
779+
///
780+
/// If <paramref name="mapping" /> is synchronous, consider using <see cref="TaskSeq.map3" />.
781+
/// </summary>
782+
///
783+
/// <param name="mapping">An asynchronous function to transform triples of items from the input task sequences.</param>
784+
/// <param name="source1">The first input task sequence.</param>
785+
/// <param name="source2">The second input task sequence.</param>
786+
/// <param name="source3">The third input task sequence.</param>
787+
/// <returns>The resulting task sequence.</returns>
788+
/// <exception cref="T:ArgumentNullException">Thrown when any of the input task sequences is null.</exception>
789+
static member map3Async:
790+
mapping: ('T1 -> 'T2 -> 'T3 -> #Task<'U>) ->
791+
source1: TaskSeq<'T1> ->
792+
source2: TaskSeq<'T2> ->
793+
source3: TaskSeq<'T3> ->
794+
TaskSeq<'U>
795+
752796
/// <summary>
753797
/// Builds a new task sequence whose elements are the results of applying the <paramref name="binder" />
754798
/// function to each of the elements of the input task sequence in <paramref name="source" />, and concatenating the

src/FSharp.Control.TaskSeq/TaskSeqInternal.fs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -585,6 +585,53 @@ module internal TaskSeqInternal =
585585
go <- step1 && step2 && step3
586586
}
587587

588+
let map3 (mapping: 'T1 -> 'T2 -> 'T3 -> 'U) (source1: TaskSeq<'T1>) (source2: TaskSeq<'T2>) (source3: TaskSeq<'T3>) =
589+
checkNonNull (nameof source1) source1
590+
checkNonNull (nameof source2) source2
591+
checkNonNull (nameof source3) source3
592+
593+
taskSeq {
594+
use e1 = source1.GetAsyncEnumerator CancellationToken.None
595+
use e2 = source2.GetAsyncEnumerator CancellationToken.None
596+
use e3 = source3.GetAsyncEnumerator CancellationToken.None
597+
let mutable go = true
598+
let! step1 = e1.MoveNextAsync()
599+
let! step2 = e2.MoveNextAsync()
600+
let! step3 = e3.MoveNextAsync()
601+
go <- step1 && step2 && step3
602+
603+
while go do
604+
yield mapping e1.Current e2.Current e3.Current
605+
let! step1 = e1.MoveNextAsync()
606+
let! step2 = e2.MoveNextAsync()
607+
let! step3 = e3.MoveNextAsync()
608+
go <- step1 && step2 && step3
609+
}
610+
611+
let map3Async (mapping: 'T1 -> 'T2 -> 'T3 -> #Task<'U>) (source1: TaskSeq<'T1>) (source2: TaskSeq<'T2>) (source3: TaskSeq<'T3>) =
612+
checkNonNull (nameof source1) source1
613+
checkNonNull (nameof source2) source2
614+
checkNonNull (nameof source3) source3
615+
616+
taskSeq {
617+
use e1 = source1.GetAsyncEnumerator CancellationToken.None
618+
use e2 = source2.GetAsyncEnumerator CancellationToken.None
619+
use e3 = source3.GetAsyncEnumerator CancellationToken.None
620+
let mutable go = true
621+
let! step1 = e1.MoveNextAsync()
622+
let! step2 = e2.MoveNextAsync()
623+
let! step3 = e3.MoveNextAsync()
624+
go <- step1 && step2 && step3
625+
626+
while go do
627+
let! result = mapping e1.Current e2.Current e3.Current
628+
yield result
629+
let! step1 = e1.MoveNextAsync()
630+
let! step2 = e2.MoveNextAsync()
631+
let! step3 = e3.MoveNextAsync()
632+
go <- step1 && step2 && step3
633+
}
634+
588635
let collect (binder: _ -> #IAsyncEnumerable<_>) (source: TaskSeq<_>) =
589636
checkNonNull (nameof source) source
590637

0 commit comments

Comments
 (0)