Skip to content

Commit 4391d67

Browse files
committed
TaskSeq.chunkBySize
1 parent d2713a1 commit 4391d67

File tree

6 files changed

+232
-2
lines changed

6 files changed

+232
-2
lines changed

README.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -211,7 +211,8 @@ The `TaskSeq` project already has a wide array of functions and functionalities,
211211
- [ ] `average` / `averageBy`, `sum` and related
212212
- [x] `forall` / `forallAsync` (see [#240])
213213
- [x] `skip` / `drop` / `truncate` / `take` (see [#209])
214-
- [ ] `chunkBySize` / `windowed`
214+
- [x] `chunkBySize` (see [TODO])
215+
- [ ] `windowed`
215216
- [ ] `compareWith`
216217
- [ ] `distinct`
217218
- [ ] `exists2` / `map2` / `fold2` / `iter2` and related '2'-functions
@@ -263,7 +264,7 @@ This is what has been implemented so far, is planned or skipped:
263264
| ✅ [#67][] | | | `box` | |
264265
| ✅ [#67][] | | | `unbox` | |
265266
| ✅ [#23][] | `choose` | `choose` | `chooseAsync` | |
266-
| | `chunkBySize` | `chunkBySize` | | |
267+
| ✅ [TODO][]| `chunkBySize` | `chunkBySize` | | |
267268
| ✅ [#11][] | `collect` | `collect` | `collectAsync` | |
268269
| ✅ [#11][] | | `collectSeq` | `collectSeqAsync` | |
269270
| | `compareWith` | `compareWith` | `compareWithAsync` | |

src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
<Compile Include="TaskSeq.Do.Tests.fs" />
5858
<Compile Include="TaskSeq.Let.Tests.fs" />
5959
<Compile Include="TaskSeq.Using.Tests.fs" />
60+
<Compile Include="TaskSeq.ChunkBySize.Tests.fs" />
6061
</ItemGroup>
6162

6263
<ItemGroup>
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
module TaskSeq.Tests.ChunkBySize
2+
3+
open System
4+
5+
open FsUnitTyped
6+
open Xunit
7+
open FsUnit.Xunit
8+
9+
open FSharp.Control
10+
11+
//
12+
// TaskSeq.chunkBySize
13+
//
14+
15+
exception SideEffectPastEnd of string
16+
17+
module EmptySeq =
18+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
19+
let ``TaskSeq-chunkBySize(0) on empty input should throw InvalidOperation`` variant =
20+
fun () ->
21+
Gen.getEmptyVariant variant
22+
|> TaskSeq.chunkBySize 0
23+
|> consumeTaskSeq
24+
25+
|> should throwAsyncExact typeof<ArgumentException>
26+
27+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
28+
let ``TaskSeq-chunkBySize(1) has no effect on empty input`` variant =
29+
// no `task` block needed
30+
Gen.getEmptyVariant variant |> TaskSeq.chunkBySize 1 |> verifyEmpty
31+
32+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
33+
let ``TaskSeq-chunkBySize(99) has no effect on empty input`` variant =
34+
// no `task` block needed
35+
Gen.getEmptyVariant variant |> TaskSeq.chunkBySize 99 |> verifyEmpty
36+
37+
[<Fact>]
38+
let ``TaskSeq-chunkBySize(-1) should throw ArgumentException on any input`` () =
39+
fun () -> TaskSeq.empty<int> |> TaskSeq.chunkBySize -1 |> consumeTaskSeq
40+
|> should throwAsyncExact typeof<ArgumentException>
41+
42+
fun () -> TaskSeq.init 10 id |> TaskSeq.chunkBySize -1 |> consumeTaskSeq
43+
|> should throwAsyncExact typeof<ArgumentException>
44+
45+
[<Fact>]
46+
let ``TaskSeq-chunkBySize(-1) should throw ArgumentException before awaiting`` () =
47+
fun () ->
48+
taskSeq {
49+
do! longDelay ()
50+
51+
if false then
52+
yield 0 // type inference
53+
}
54+
|> TaskSeq.chunkBySize -1
55+
|> ignore // throws even without running the async. Bad coding, don't ignore a task!
56+
57+
|> should throw typeof<ArgumentException>
58+
59+
module Immutable =
60+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
61+
let ``TaskSeq-chunkBySize returns all items from source in order`` variant = task {
62+
do!
63+
Gen.getSeqImmutable variant
64+
|> TaskSeq.chunkBySize 3
65+
|> TaskSeq.collect TaskSeq.ofArray
66+
|> verify1To10
67+
}
68+
69+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
70+
let ``TaskSeq-chunkBySize returns chunks with items in order`` variant = task {
71+
do!
72+
Gen.getSeqImmutable variant
73+
|> TaskSeq.chunkBySize 2
74+
|> TaskSeq.toArrayAsync
75+
|> Task.map (shouldEqual [| [| 1; 2 |]; [| 3; 4 |]; [| 5; 6 |]; [| 7; 8 |]; [| 9; 10 |] |])
76+
}
77+
78+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
79+
let ``TaskSeq-chunkBySize returns exactly 'chunkSize' items per chunk`` variant = task {
80+
do!
81+
Gen.getSeqImmutable variant
82+
|> TaskSeq.chunkBySize 1
83+
|> TaskSeq.iter (shouldHaveLength 1)
84+
85+
do!
86+
Gen.getSeqImmutable variant
87+
|> TaskSeq.chunkBySize 2
88+
|> TaskSeq.iter (shouldHaveLength 2)
89+
90+
do!
91+
Gen.getSeqImmutable variant
92+
|> TaskSeq.chunkBySize 5
93+
|> TaskSeq.iter (shouldHaveLength 5)
94+
}
95+
96+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
97+
let ``TaskSeq-chunkBySize returns remaining items in last chunk`` variant = task {
98+
let verifyChunk chunkSize lastChunkSize =
99+
Gen.getSeqImmutable variant
100+
|> TaskSeq.chunkBySize chunkSize
101+
|> TaskSeq.toArrayAsync
102+
|> Task.map (Array.last >> shouldHaveLength lastChunkSize)
103+
104+
do! verifyChunk 1 1
105+
do! verifyChunk 3 1
106+
do! verifyChunk 4 2
107+
do! verifyChunk 6 4
108+
do! verifyChunk 7 3
109+
do! verifyChunk 8 2
110+
do! verifyChunk 9 1
111+
}
112+
113+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
114+
let ``TaskSeq-chunkBySize returns all elements when 'chunkSize' > number of items`` variant =
115+
Gen.getSeqImmutable variant
116+
|> TaskSeq.chunkBySize 11
117+
|> TaskSeq.toArrayAsync
118+
|> Task.map (Array.exactlyOne >> shouldHaveLength 10)
119+
120+
module SideEffects =
121+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
122+
let ``TaskSeq-chunkBySize gets all items`` variant =
123+
Gen.getSeqWithSideEffect variant
124+
|> TaskSeq.chunkBySize 5
125+
|> TaskSeq.toArrayAsync
126+
|> Task.map (shouldEqual [| [| 1 .. 5 |]; [| 6 .. 10 |] |])
127+
128+
[<Fact>]
129+
let ``TaskSeq-chunkBySize prove we execute empty-seq side-effects`` () = task {
130+
let mutable i = 0
131+
132+
let ts = taskSeq {
133+
i <- i + 1
134+
i <- i + 1
135+
i <- i + 1 // we should get here
136+
}
137+
138+
do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
139+
do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
140+
do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
141+
i |> should equal 9
142+
}
143+
144+
[<Fact>]
145+
let ``TaskSeq-chunkBySize prove we execute after-effects`` () = task {
146+
let mutable i = 0
147+
148+
let ts = taskSeq {
149+
i <- i + 1
150+
i <- i + 1
151+
yield 42
152+
i <- i + 1 // we should get here
153+
}
154+
155+
do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
156+
do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
157+
do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
158+
i |> should equal 9
159+
}
160+
161+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
162+
let ``TaskSeq-chunkBySize should go over all items`` variant = task {
163+
let ts = Gen.getSeqWithSideEffect variant
164+
do! ts |> TaskSeq.chunkBySize 1 |> consumeTaskSeq
165+
do! ts |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
166+
do! ts |> TaskSeq.chunkBySize 3 |> consumeTaskSeq
167+
// incl. the iteration of 'last', we reach 40
168+
do! ts |> TaskSeq.last |> Task.map (should equal 40)
169+
}
170+
171+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
172+
let ``TaskSeq-chunkBySize multiple iterations over same sequence`` variant = task {
173+
let ts = Gen.getSeqWithSideEffect variant
174+
let mutable sum = 0
175+
176+
do! TaskSeq.chunkBySize 1 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item)
177+
do! TaskSeq.chunkBySize 2 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item)
178+
do! TaskSeq.chunkBySize 3 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item)
179+
do! TaskSeq.chunkBySize 4 ts |> TaskSeq.collect TaskSeq.ofArray |> TaskSeq.iter (fun item -> sum <- sum + item)
180+
181+
sum |> should equal 820 // side-effected tasks, so 'item' DOES CHANGE, each next iteration starts 10 higher
182+
}
183+
[<Fact>]
184+
let ``TaskSeq-chunkBySize prove that an exception from the taskSeq is thrown`` () =
185+
let items = taskSeq {
186+
yield 42
187+
yield! [ 1; 2 ]
188+
do SideEffectPastEnd "at the end" |> raise
189+
yield 43
190+
}
191+
192+
fun () -> items |> TaskSeq.chunkBySize 2 |> consumeTaskSeq
193+
|> should throwAsyncExact typeof<SideEffectPastEnd>

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -252,6 +252,8 @@ type TaskSeq private () =
252252
yield! source2
253253
}
254254

255+
static member chunkBySize (chunkSize: int) (source: TaskSeq<'T>) = Internal.chunkBySize chunkSize source
256+
255257
//
256258
// iter/map/collect functions
257259
//

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -823,6 +823,15 @@ type TaskSeq =
823823
/// <exception cref="T:ArgumentNullException">Thrown when the input task sequence is null.</exception>
824824
static member chooseAsync: chooser: ('T -> #Task<'U option>) -> source: TaskSeq<'T> -> TaskSeq<'U>
825825

826+
/// <summary>Divides the input sequence into chunks of size at most <c>chunkSize</c>.</summary>
827+
///
828+
/// <param name="chunkSize">The maximum size of each chunk.</param>
829+
/// <param name="source">The input task sequence.</param>
830+
/// <returns>The task sequence divided into chunks.</returns>
831+
/// <exception cref="T:System.ArgumentNullException">Thrown when the input task sequence is null.</exception>
832+
/// <exception cref="T:System.ArgumentException">Thrown when <c>chunkSize</c> is not positive.</exception>
833+
static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]>
834+
826835
/// <summary>
827836
/// Returns a new task sequence containing only the elements of the collection
828837
/// for which the given function <paramref name="predicate" /> returns <see cref="true" />.

src/FSharp.Control.TaskSeq/TaskSeqInternal.fs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1097,3 +1097,27 @@ module internal TaskSeqInternal =
10971097
go <- step
10981098

10991099
}
1100+
1101+
let chunkBySize chunkSize (source: TaskSeq<'T>): TaskSeq<'T[]> =
1102+
if chunkSize < 1 then invalidArg (nameof chunkSize) $"The value must be positive, but was %i{chunkSize}."
1103+
checkNonNull (nameof source) source
1104+
1105+
taskSeq {
1106+
use e = source.GetAsyncEnumerator CancellationToken.None
1107+
let mutable go = true
1108+
let! step = e.MoveNextAsync()
1109+
go <- step
1110+
1111+
if step then
1112+
let buffer = ResizeArray<_>()
1113+
while go do
1114+
buffer.Add e.Current
1115+
if buffer.Count = chunkSize then
1116+
yield buffer.ToArray()
1117+
buffer.Clear()
1118+
1119+
let! step = e.MoveNextAsync()
1120+
go <- step
1121+
if buffer.Count > 0 then
1122+
yield buffer.ToArray()
1123+
}

0 commit comments

Comments
 (0)