Skip to content

Commit fc443a8

Browse files
authored
Merge pull request #312 from fsprojects/repo-assist/improve-map-async-parallel-throttled-20260415-b652b2a1157a8dda
[Repo Assist] Add AsyncSeq.mapAsyncParallelThrottled — ordered, bounded-concurrency parallel map
2 parents 16aa1d7 + 7c3c629 commit fc443a8

File tree

6 files changed

+84
-2
lines changed

6 files changed

+84
-2
lines changed

RELEASE_NOTES.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,7 @@
1+
### 4.14.0
2+
3+
* Added `AsyncSeq.mapAsyncParallelThrottled` — ordered, bounded-concurrency parallel map. Like `mapAsyncParallel` but limits the number of in-flight operations to `parallelism`, preventing unbounded resource use on large or infinite sequences.
4+
15
### 4.13.0
26

37
* CI: Upgrade Fable from 4.25.0 to 5.0.0-rc.7 and .NET SDK from 8.0.19 to 10.0.100 to fix a CI hang where the Fable build step ran for 6+ hours with .NET 10. Fable 5 + .NET 10 compiles in ~20 seconds.

src/FSharp.Control.AsyncSeq/AsyncSeq.fs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,6 +1159,28 @@ module AsyncSeq =
11591159
| Choice1Of2 value -> return value
11601160
| Choice2Of2 ex -> return raise ex })
11611161
}
1162+
1163+
let mapAsyncParallelThrottled (parallelism:int) (f:'a -> Async<'b>) (s:AsyncSeq<'a>) : AsyncSeq<'b> = asyncSeq {
1164+
use mb = MailboxProcessor.Start (fun _ -> async.Return())
1165+
use sm = new SemaphoreSlim(parallelism)
1166+
let! err =
1167+
s
1168+
|> iterAsync (fun a -> async {
1169+
do! sm.WaitAsync () |> Async.awaitTaskUnitCancellationAsError
1170+
let! b = Async.StartChild (async {
1171+
try
1172+
let! result = f a
1173+
sm.Release() |> ignore
1174+
return result
1175+
with ex ->
1176+
sm.Release() |> ignore
1177+
return raise ex })
1178+
mb.Post (Some b) })
1179+
|> Async.map (fun _ -> mb.Post None)
1180+
|> Async.StartChildAsTask
1181+
yield!
1182+
replicateUntilNoneAsync (Task.chooseTask (err |> Task.taskFault) (async.Delay mb.Receive))
1183+
|> mapAsync id }
11621184
#endif
11631185

11641186
let chooseAsync f (source:AsyncSeq<'T>) =

src/FSharp.Control.AsyncSeq/AsyncSeq.fsi

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -887,6 +887,15 @@ module AsyncSeq =
887887
/// in the order they complete (unordered), without preserving the original order.
888888
val mapAsyncUnorderedParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
889889

890+
/// Builds a new asynchronous sequence whose elements are generated by
891+
/// applying the specified function to all elements of the input sequence,
892+
/// with at most <c>parallelism</c> mapping operations running concurrently.
893+
///
894+
/// The function is applied to elements in order and results are emitted in order,
895+
/// but in parallel, with at most <c>parallelism</c> operations running concurrently.
896+
/// This is the throttled counterpart to <c>mapAsyncParallel</c>.
897+
val mapAsyncParallelThrottled : parallelism:int -> mapping:('T -> Async<'U>) -> s:AsyncSeq<'T> -> AsyncSeq<'U>
898+
890899
/// Applies a key-generating function to each element and returns an async sequence containing unique keys
891900
/// and async sequences containing elements corresponding to the key.
892901
///

src/FSharp.Control.AsyncSeq/FSharp.Control.AsyncSeq.fsproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
</ItemGroup>
2525
<ItemGroup>
2626
<PackageReference Update="FSharp.Core" Version="4.7.2" />
27-
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.5" />
27+
<PackageReference Include="Microsoft.Bcl.AsyncInterfaces" Version="10.0.6" />
2828
<PackageReference Include="System.Threading.Channels" Version="*" />
2929
<Content Include="*.fsproj; **\*.fs; **\*.fsi;" PackagePath="fable\" />
3030
</ItemGroup>

tests/FSharp.Control.AsyncSeq.Tests/AsyncSeqTests.fs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1985,6 +1985,53 @@ let ``AsyncSeq.mapAsyncUnorderedParallelThrottled should throttle`` () =
19851985

19861986
Assert.AreEqual(50, result.Length)
19871987

1988+
[<Test>]
1989+
let ``AsyncSeq.mapAsyncParallelThrottled should maintain order`` () =
1990+
let ls = List.init 100 id
1991+
let result =
1992+
ls
1993+
|> AsyncSeq.ofList
1994+
|> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
1995+
do! Async.Sleep (100 - i)
1996+
return i * 2 })
1997+
|> AsyncSeq.toListAsync
1998+
|> Async.RunSynchronously
1999+
Assert.AreEqual(ls |> List.map ((*) 2), result)
2000+
2001+
[<Test>]
2002+
let ``AsyncSeq.mapAsyncParallelThrottled should propagate exception`` () =
2003+
let result =
2004+
AsyncSeq.init 50L id
2005+
|> AsyncSeq.mapAsyncParallelThrottled 5 (fun i -> async {
2006+
if i = 25L then return failwith "test error"
2007+
return i })
2008+
|> AsyncSeq.toListAsync
2009+
|> Async.Catch
2010+
|> Async.RunSynchronously
2011+
match result with
2012+
| Choice2Of2 _ -> ()
2013+
| Choice1Of2 _ -> Assert.Fail("Expected exception")
2014+
2015+
[<Test>]
2016+
let ``AsyncSeq.mapAsyncParallelThrottled should throttle`` () =
2017+
let count = ref 0
2018+
let parallelism = 5
2019+
2020+
let result =
2021+
AsyncSeq.init 50L id
2022+
|> AsyncSeq.mapAsyncParallelThrottled parallelism (fun i -> async {
2023+
let c = Interlocked.Increment count
2024+
if c > parallelism then
2025+
return failwith (sprintf "concurrency exceeded: %d > %d" c parallelism)
2026+
do! Async.Sleep 5
2027+
Interlocked.Decrement count |> ignore
2028+
return i * 2L })
2029+
|> AsyncSeq.toListAsync
2030+
|> Async.RunSynchronously
2031+
2032+
Assert.AreEqual(50, result.Length)
2033+
Assert.AreEqual([ 0L..49L ] |> List.map ((*) 2L), result)
2034+
19882035
//[<Test>]
19892036
//let ``AsyncSeq.mapParallelAsyncBounded should maintain order`` () =
19902037
// let ls = List.init 500 id

version.props

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
<Project>
22
<PropertyGroup>
3-
<Version>4.12.0</Version>
3+
<Version>4.14.0</Version>
44
</PropertyGroup>
55
</Project>

0 commit comments

Comments
 (0)