Skip to content

Commit 0bdf1d6

Browse files
authored
Merge pull request #299 from fsprojects/repo-assist/feat-reduce-2026-03-4df851dee5ec5f9d
[Repo Assist] feat: add TaskSeq.reduce and TaskSeq.reduceAsync (ref #289)
2 parents 76645cd + 5bd5cd1 commit 0bdf1d6

File tree

5 files changed

+222
-0
lines changed

5 files changed

+222
-0
lines changed

src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
<Compile Include="TaskSeq.FindIndex.Tests.fs" />
2727
<Compile Include="TaskSeq.Find.Tests.fs" />
2828
<Compile Include="TaskSeq.Fold.Tests.fs" />
29+
<Compile Include="TaskSeq.Reduce.Tests.fs" />
2930
<Compile Include="TaskSeq.Forall.Tests.fs" />
3031
<Compile Include="TaskSeq.Head.Tests.fs" />
3132
<Compile Include="TaskSeq.Indexed.Tests.fs" />
Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
module TaskSeq.Tests.Reduce
2+
3+
open Xunit
4+
open FsUnit.Xunit
5+
6+
open FSharp.Control
7+
8+
//
9+
// TaskSeq.reduce
10+
// TaskSeq.reduceAsync
11+
//
12+
13+
module EmptySeq =
14+
[<Fact>]
15+
let ``Null source is invalid`` () =
16+
assertNullArg
17+
<| fun () -> TaskSeq.reduce (fun a _ -> a) null
18+
19+
assertNullArg
20+
<| fun () -> TaskSeq.reduceAsync (fun a _ -> Task.fromResult a) null
21+
22+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
23+
let ``TaskSeq-reduce raises on empty`` variant =
24+
fun () ->
25+
Gen.getEmptyVariant variant
26+
|> TaskSeq.reduce (fun a b -> a + b)
27+
|> Task.ignore
28+
29+
|> should throwAsyncExact typeof<System.ArgumentException>
30+
31+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
32+
let ``TaskSeq-reduceAsync raises on empty`` variant =
33+
fun () ->
34+
Gen.getEmptyVariant variant
35+
|> TaskSeq.reduceAsync (fun a b -> task { return a + b })
36+
|> Task.ignore
37+
38+
|> should throwAsyncExact typeof<System.ArgumentException>
39+
40+
module Immutable =
41+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
42+
let ``TaskSeq-reduce folds from first element`` variant = task {
43+
// items are 1..10; sum = 55
44+
let! sum =
45+
Gen.getSeqImmutable variant
46+
|> TaskSeq.reduce (fun acc item -> acc + item)
47+
48+
sum |> should equal 55
49+
}
50+
51+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
52+
let ``TaskSeq-reduceAsync folds from first element`` variant = task {
53+
let! sum =
54+
Gen.getSeqImmutable variant
55+
|> TaskSeq.reduceAsync (fun acc item -> task { return acc + item })
56+
57+
sum |> should equal 55
58+
}
59+
60+
[<Fact>]
61+
let ``TaskSeq-reduce returns single element without calling folder`` () = task {
62+
let mutable called = false
63+
64+
let! result =
65+
TaskSeq.singleton 42
66+
|> TaskSeq.reduce (fun _ _ ->
67+
called <- true
68+
failwith "should not be called")
69+
70+
result |> should equal 42
71+
called |> should equal false
72+
}
73+
74+
[<Fact>]
75+
let ``TaskSeq-reduceAsync returns single element without calling folder`` () = task {
76+
let mutable called = false
77+
78+
let! result =
79+
TaskSeq.singleton 42
80+
|> TaskSeq.reduceAsync (fun _ _ -> task {
81+
called <- true
82+
return failwith "should not be called"
83+
})
84+
85+
result |> should equal 42
86+
called |> should equal false
87+
}
88+
89+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
90+
let ``TaskSeq-reduce uses first element as initial accumulator`` variant = task {
91+
// reduce must use element[0] as initial state; for 1..10 summing gives 55
92+
// if it used 0 as initial, sum would also be 55 — but we verify the folder is called n-1 times
93+
let mutable callCount = 0
94+
95+
let! sum =
96+
Gen.getSeqImmutable variant
97+
|> TaskSeq.reduce (fun acc item ->
98+
callCount <- callCount + 1
99+
acc + item)
100+
101+
sum |> should equal 55
102+
callCount |> should equal 9 // 10 elements => 9 reduce calls
103+
}
104+
105+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
106+
let ``TaskSeq-reduce can concatenate strings`` variant = task {
107+
// items 1..10 as chars: ABCDEFGHIJ
108+
let! letters =
109+
Gen.getSeqImmutable variant
110+
|> TaskSeq.map (fun i -> string (char (i + 64)))
111+
|> TaskSeq.reduce (fun acc item -> acc + item)
112+
113+
letters |> should equal "ABCDEFGHIJ"
114+
}
115+
116+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
117+
let ``TaskSeq-reduceAsync can concatenate strings`` variant = task {
118+
let! letters =
119+
Gen.getSeqImmutable variant
120+
|> TaskSeq.map (fun i -> string (char (i + 64)))
121+
|> TaskSeq.reduceAsync (fun acc item -> task { return acc + item })
122+
123+
letters |> should equal "ABCDEFGHIJ"
124+
}
125+
126+
module SideEffects =
127+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
128+
let ``TaskSeq-reduce folds correctly with side-effecting sequences`` variant = task {
129+
let ts = Gen.getSeqWithSideEffect variant
130+
131+
let! sum = ts |> TaskSeq.reduce (fun acc item -> acc + item)
132+
133+
sum |> should equal 55
134+
135+
// second enumeration produces next 10 elements: 11..20, sum = 155
136+
let! sum2 = ts |> TaskSeq.reduce (fun acc item -> acc + item)
137+
138+
sum2 |> should equal 155
139+
}
140+
141+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
142+
let ``TaskSeq-reduceAsync folds correctly with side-effecting sequences`` variant = task {
143+
let ts = Gen.getSeqWithSideEffect variant
144+
145+
let! sum =
146+
ts
147+
|> TaskSeq.reduceAsync (fun acc item -> task { return acc + item })
148+
149+
sum |> should equal 55
150+
151+
let! sum2 =
152+
ts
153+
|> TaskSeq.reduceAsync (fun acc item -> task { return acc + item })
154+
155+
sum2 |> should equal 155
156+
}

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -407,3 +407,5 @@ type TaskSeq private () =
407407
static member zip source1 source2 = Internal.zip source1 source2
408408
static member fold folder state source = Internal.fold (FolderAction folder) state source
409409
static member foldAsync folder state source = Internal.fold (AsyncFolderAction folder) state source
410+
static member reduce folder source = Internal.reduce (FolderAction folder) source
411+
static member reduceAsync folder source = Internal.reduce (AsyncFolderAction folder) source

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1361,6 +1361,38 @@ type TaskSeq =
13611361
static member foldAsync:
13621362
folder: ('State -> 'T -> #Task<'State>) -> state: 'State -> source: TaskSeq<'T> -> Task<'State>
13631363

1364+
/// <summary>
1365+
/// Applies the function <paramref name="folder" /> to each element of the task sequence, threading an accumulator
1366+
/// argument through the computation. The first element is used as the initial state. If the input function is
1367+
/// <paramref name="f" /> and the elements are <paramref name="i0...iN" />, then computes
1368+
/// <paramref name="f (... (f i0 i1)...) iN" />. Raises <see cref="T:System.ArgumentException" /> when the
1369+
/// sequence is empty.
1370+
/// If the accumulator function <paramref name="folder" /> is asynchronous, consider using <see cref="TaskSeq.reduceAsync" />.
1371+
/// </summary>
1372+
///
1373+
/// <param name="folder">A function that updates the state with each element from the sequence.</param>
1374+
/// <param name="source">The input sequence.</param>
1375+
/// <returns>The final state value after applying the reduction function to all elements.</returns>
1376+
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
1377+
/// <exception cref="T:ArgumentException">Thrown when the input task sequence is empty.</exception>
1378+
static member reduce: folder: ('T -> 'T -> 'T) -> source: TaskSeq<'T> -> Task<'T>
1379+
1380+
/// <summary>
1381+
/// Applies the asynchronous function <paramref name="folder" /> to each element of the task sequence, threading
1382+
/// an accumulator argument through the computation. The first element is used as the initial state. If the input
1383+
/// function is <paramref name="f" /> and the elements are <paramref name="i0...iN" />, then computes
1384+
/// <paramref name="f (... (f i0 i1)...) iN" />. Raises <see cref="T:System.ArgumentException" /> when the
1385+
/// sequence is empty.
1386+
/// If the accumulator function <paramref name="folder" /> is synchronous, consider using <see cref="TaskSeq.reduce" />.
1387+
/// </summary>
1388+
///
1389+
/// <param name="folder">A function that updates the state with each element from the sequence.</param>
1390+
/// <param name="source">The input sequence.</param>
1391+
/// <returns>The final state value after applying the reduction function to all elements.</returns>
1392+
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
1393+
/// <exception cref="T:ArgumentException">Thrown when the input task sequence is empty.</exception>
1394+
static member reduceAsync: folder: ('T -> 'T -> #Task<'T>) -> source: TaskSeq<'T> -> Task<'T>
1395+
13641396
/// <summary>
13651397
/// Return a new task sequence with a new item inserted before the given index.
13661398
/// </summary>

src/FSharp.Control.TaskSeq/TaskSeqInternal.fs

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,37 @@ module internal TaskSeqInternal =
371371
return result
372372
}
373373

374+
let reduce folder (source: TaskSeq<_>) =
375+
checkNonNull (nameof source) source
376+
377+
task {
378+
use e = source.GetAsyncEnumerator CancellationToken.None
379+
let! hasFirst = e.MoveNextAsync()
380+
381+
if not hasFirst then
382+
raiseEmptySeq ()
383+
384+
let mutable result = e.Current
385+
let! step = e.MoveNextAsync()
386+
let mutable go = step
387+
388+
match folder with
389+
| FolderAction folder ->
390+
while go do
391+
result <- folder result e.Current
392+
let! step = e.MoveNextAsync()
393+
go <- step
394+
395+
| AsyncFolderAction folder ->
396+
while go do
397+
let! tempResult = folder result e.Current
398+
result <- tempResult
399+
let! step = e.MoveNextAsync()
400+
go <- step
401+
402+
return result
403+
}
404+
374405
let toResizeArrayAsync source =
375406
checkNonNull (nameof source) source
376407

0 commit comments

Comments
 (0)