Skip to content

Commit d162909

Browse files
feat: add TaskSeq.splitInto with 91 tests
Implements TaskSeq.splitInto, the missing analogue to Seq.splitInto. Splits the sequence into at most N chunks of roughly equal size, with remainder elements distributed across the first chunks. When count > length, returns one element per chunk (length chunks total). When the source is empty, returns an empty sequence. Validates count > 0 eagerly (before enumeration). Internally materializes the source via toResizeArrayAsync, then slices the resulting array – same approach as groupBy/partition, necessary because total element count is unknown until the sequence is consumed. - 91 new tests covering: empty, null, invalid args, split(1), split(N), split(> length), exact/uneven distribution, side-effect variants - README table updated (✅) - release-notes.txt updated Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
1 parent 5a03593 commit d162909

File tree

7 files changed

+191
-1
lines changed

7 files changed

+191
-1
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -354,7 +354,7 @@ This is what has been implemented so far, is planned or skipped:
354354
| &#x2753; | `sortByAscending` | | | [note #1](#note1 "These functions require a form of pre-materializing through 'TaskSeq.cache', similar to the approach taken in the corresponding 'Seq' functions. It doesn't make much sense to have a cached async sequence. However, 'AsyncSeq' does implement these, so we'll probably do so eventually as well.") |
355355
| &#x2753; | `sortByDescending` | | | [note #1](#note1 "These functions require a form of pre-materializing through 'TaskSeq.cache', similar to the approach taken in the corresponding 'Seq' functions. It doesn't make much sense to have a cached async sequence. However, 'AsyncSeq' does implement these, so we'll probably do so eventually as well.") |
356356
| &#x2753; | `sortWith` | | | [note #1](#note1 "These functions require a form of pre-materializing through 'TaskSeq.cache', similar to the approach taken in the corresponding 'Seq' functions. It doesn't make much sense to have a cached async sequence. However, 'AsyncSeq' does implement these, so we'll probably do so eventually as well.") |
357-
| | `splitInto` | `splitInto` | | |
357+
| &#x2705; | `splitInto` | `splitInto` | | |
358358
| | `sum` | `sum` | | |
359359
| | `sumBy` | `sumBy` | `sumByAsync` | |
360360
| &#x2705; [#76][] | `tail` | `tail` | | |

release-notes.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
Release notes:
33

44
0.6.0
5+
- adds TaskSeq.splitInto
56
- adds TaskSeq.scan and TaskSeq.scanAsync, #289
67
- adds TaskSeq.pairwise, #289
78
- adds TaskSeq.groupBy and TaskSeq.groupByAsync, #289

src/FSharp.Control.TaskSeq.Test/FSharp.Control.TaskSeq.Test.fsproj

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@
5959
<Compile Include="TaskSeq.UpdateAt.Tests.fs" />
6060
<Compile Include="TaskSeq.Zip.Tests.fs" />
6161
<Compile Include="TaskSeq.ChunkBySize.Tests.fs" />
62+
<Compile Include="TaskSeq.SplitInto.Tests.fs" />
6263
<Compile Include="TaskSeq.Windowed.Tests.fs" />
6364
<Compile Include="TaskSeq.Tests.CE.fs" />
6465
<Compile Include="TaskSeq.StateTransitionBug.Tests.CE.fs" />
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
module TaskSeq.Tests.SplitInto
2+
3+
open Xunit
4+
open FsUnit.Xunit
5+
6+
open FSharp.Control
7+
8+
//
9+
// TaskSeq.splitInto
10+
//
11+
12+
module EmptySeq =
13+
[<Fact>]
14+
let ``TaskSeq-splitInto with null source raises`` () = assertNullArg <| fun () -> TaskSeq.splitInto 1 null
15+
16+
[<Fact>]
17+
let ``TaskSeq-splitInto with zero raises ArgumentException before awaiting`` () =
18+
fun () -> TaskSeq.empty<int> |> TaskSeq.splitInto 0 |> ignore // throws eagerly, before enumeration
19+
|> should throw typeof<System.ArgumentException>
20+
21+
[<Fact>]
22+
let ``TaskSeq-splitInto with negative raises ArgumentException before awaiting`` () =
23+
fun () -> TaskSeq.empty<int> |> TaskSeq.splitInto -1 |> ignore
24+
|> should throw typeof<System.ArgumentException>
25+
26+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
27+
let ``TaskSeq-splitInto on empty sequence yields empty`` variant =
28+
Gen.getEmptyVariant variant
29+
|> TaskSeq.splitInto 1
30+
|> verifyEmpty
31+
32+
[<Theory; ClassData(typeof<TestEmptyVariants>)>]
33+
let ``TaskSeq-splitInto(99) on empty sequence yields empty`` variant =
34+
Gen.getEmptyVariant variant
35+
|> TaskSeq.splitInto 99
36+
|> verifyEmpty
37+
38+
module Immutable =
39+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
40+
let ``TaskSeq-splitInto preserves all elements in order`` variant = task {
41+
do!
42+
Gen.getSeqImmutable variant
43+
|> TaskSeq.splitInto 3
44+
|> TaskSeq.collect TaskSeq.ofArray
45+
|> verify1To10
46+
}
47+
48+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
49+
let ``TaskSeq-splitInto(2) splits 10-element sequence into 2 chunks of 5`` variant = task {
50+
let! chunks =
51+
Gen.getSeqImmutable variant
52+
|> TaskSeq.splitInto 2
53+
|> TaskSeq.toArrayAsync
54+
55+
chunks |> should equal [| [| 1..5 |]; [| 6..10 |] |]
56+
}
57+
58+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
59+
let ``TaskSeq-splitInto(5) splits 10-element sequence into 5 chunks of 2`` variant = task {
60+
let! chunks =
61+
Gen.getSeqImmutable variant
62+
|> TaskSeq.splitInto 5
63+
|> TaskSeq.toArrayAsync
64+
65+
chunks
66+
|> should equal [| [| 1; 2 |]; [| 3; 4 |]; [| 5; 6 |]; [| 7; 8 |]; [| 9; 10 |] |]
67+
}
68+
69+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
70+
let ``TaskSeq-splitInto(10) splits 10-element sequence into 10 singleton chunks`` variant = task {
71+
let! chunks =
72+
Gen.getSeqImmutable variant
73+
|> TaskSeq.splitInto 10
74+
|> TaskSeq.toArrayAsync
75+
76+
chunks |> Array.length |> should equal 10
77+
78+
chunks
79+
|> Array.iteri (fun i chunk -> chunk |> should equal [| i + 1 |])
80+
}
81+
82+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
83+
let ``TaskSeq-splitInto(1) returns the whole sequence as one chunk`` variant = task {
84+
let! chunks =
85+
Gen.getSeqImmutable variant
86+
|> TaskSeq.splitInto 1
87+
|> TaskSeq.toArrayAsync
88+
89+
chunks |> should equal [| [| 1..10 |] |]
90+
}
91+
92+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
93+
let ``TaskSeq-splitInto(3) distributes remainder across first chunks`` variant = task {
94+
// 10 elements into 3 chunks: 10 / 3 = 3 remainder 1 → [4; 3; 3]
95+
let! chunks =
96+
Gen.getSeqImmutable variant
97+
|> TaskSeq.splitInto 3
98+
|> TaskSeq.toArrayAsync
99+
100+
chunks |> Array.length |> should equal 3
101+
chunks.[0] |> should equal [| 1; 2; 3; 4 |]
102+
chunks.[1] |> should equal [| 5; 6; 7 |]
103+
chunks.[2] |> should equal [| 8; 9; 10 |]
104+
}
105+
106+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
107+
let ``TaskSeq-splitInto(4) distributes remainder across first chunks`` variant = task {
108+
// 10 elements into 4 chunks: 10 / 4 = 2 remainder 2 → [3; 3; 2; 2]
109+
let! chunks =
110+
Gen.getSeqImmutable variant
111+
|> TaskSeq.splitInto 4
112+
|> TaskSeq.toArrayAsync
113+
114+
chunks |> Array.length |> should equal 4
115+
chunks.[0] |> should equal [| 1; 2; 3 |]
116+
chunks.[1] |> should equal [| 4; 5; 6 |]
117+
chunks.[2] |> should equal [| 7; 8 |]
118+
chunks.[3] |> should equal [| 9; 10 |]
119+
}
120+
121+
[<Theory; ClassData(typeof<TestImmTaskSeq>)>]
122+
let ``TaskSeq-splitInto count greater than length returns one element per chunk`` variant = task {
123+
// 10 elements into 20 chunks → 10 singleton chunks
124+
let! chunks =
125+
Gen.getSeqImmutable variant
126+
|> TaskSeq.splitInto 20
127+
|> TaskSeq.toArrayAsync
128+
129+
chunks |> Array.length |> should equal 10
130+
131+
chunks
132+
|> Array.iteri (fun i chunk -> chunk |> should equal [| i + 1 |])
133+
}
134+
135+
module SideEffects =
136+
[<Theory; ClassData(typeof<TestSideEffectTaskSeq>)>]
137+
let ``TaskSeq-splitInto preserves all side-effectful elements in order`` variant = task {
138+
do!
139+
Gen.getSeqWithSideEffect variant
140+
|> TaskSeq.splitInto 3
141+
|> TaskSeq.collect TaskSeq.ofArray
142+
|> verify1To10
143+
}

src/FSharp.Control.TaskSeq/TaskSeq.fs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -471,6 +471,7 @@ type TaskSeq private () =
471471
static member distinctUntilChanged source = Internal.distinctUntilChanged source
472472
static member pairwise source = Internal.pairwise source
473473
static member chunkBySize chunkSize source = Internal.chunkBySize chunkSize source
474+
static member splitInto count source = Internal.splitInto count source
474475
static member windowed windowSize source = Internal.windowed windowSize source
475476

476477
static member forall predicate source = Internal.forall (Predicate predicate) source

src/FSharp.Control.TaskSeq/TaskSeq.fsi

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1512,6 +1512,27 @@ type TaskSeq =
15121512
/// <exception cref="T:System.ArgumentException">Thrown when <paramref name="chunkSize" /> is not positive.</exception>
15131513
static member chunkBySize: chunkSize: int -> source: TaskSeq<'T> -> TaskSeq<'T[]>
15141514

1515+
/// <summary>
1516+
/// Splits the input task sequence into at most <paramref name="count" /> chunks of roughly equal size.
1517+
/// The last chunk may be smaller if the total number of elements does not divide evenly.
1518+
/// When the source has fewer elements than <paramref name="count" />, the number of chunks equals
1519+
/// the number of elements (each chunk has one element). Returns an empty task sequence when the
1520+
/// source is empty.
1521+
///
1522+
/// Unlike <see cref="TaskSeq.chunkBySize" />, which fixes the chunk size, this function fixes
1523+
/// the number of chunks. The whole source sequence must be evaluated before any chunk is yielded.
1524+
///
1525+
/// If <paramref name="count" /> is not positive, an <see cref="T:System.ArgumentException" /> is raised immediately
1526+
/// (before the sequence is evaluated).
1527+
/// </summary>
1528+
///
1529+
/// <param name="count">The maximum number of chunks. Must be positive.</param>
1530+
/// <param name="source">The input task sequence.</param>
1531+
/// <returns>A task sequence of non-overlapping array chunks.</returns>
1532+
/// <exception cref="T:System.ArgumentNullException">Thrown when the input task sequence is null.</exception>
1533+
/// <exception cref="T:System.ArgumentException">Thrown when <paramref name="count" /> is not positive.</exception>
1534+
static member splitInto: count: int -> source: TaskSeq<'T> -> TaskSeq<'T[]>
1535+
15151536
/// <summary>
15161537
/// Returns a task sequence of sliding windows of a given size over the source sequence.
15171538
/// Each window is a fresh array of exactly <paramref name="windowSize" /> consecutive elements.

src/FSharp.Control.TaskSeq/TaskSeqInternal.fs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1460,6 +1460,29 @@ module internal TaskSeqInternal =
14601460
yield buffer.[0 .. count - 1]
14611461
}
14621462

1463+
let splitInto count (source: TaskSeq<'T>) : TaskSeq<'T[]> =
1464+
if count < 1 then
1465+
invalidArg (nameof count) $"The value must be positive, but was %i{count}."
1466+
1467+
checkNonNull (nameof source) source
1468+
1469+
taskSeq {
1470+
let! ra = toResizeArrayAsync source
1471+
let arr = ra.ToArray()
1472+
let n = arr.Length
1473+
1474+
if n > 0 then
1475+
// Split into at most `count` chunks (fewer chunks if n < count).
1476+
let actual = min count n
1477+
let k = n / actual
1478+
let m = n % actual // first m chunks get one extra element
1479+
1480+
for i in 0 .. actual - 1 do
1481+
let start = i * k + min i m
1482+
let len = k + (if i < m then 1 else 0)
1483+
yield arr.[start .. start + len - 1]
1484+
}
1485+
14631486
let windowed windowSize (source: TaskSeq<_>) =
14641487
if windowSize <= 0 then
14651488
invalidArg (nameof windowSize) $"The value must be positive, but was %i{windowSize}."

0 commit comments

Comments
 (0)