Skip to content

Commit 2610cda

Browse files
authored
perf: batch async boundary elements (#3288)
Motivation: Async stream islands send each element across the internal actor boundary as a separate boundary event, which adds per-element allocation and mailbox traffic. Modification: Batch elements produced during an interpreter run for internal stream boundaries and flush them when the interpreter parks. Let the input boundary implement the internal BoundarySubscriber directly, cap output batch size at 1024 elements, flush pending batch elements before completion or upstream failure, and keep cancellation and already-terminated paths clearing pending elements. Store initialized boundary lists in arrays, add a fused baseline to the async-boundary benchmark, and add regression coverage for boundary ordering, terminal ordering, demand, cancellation, supervision, and failure draining. Result: Internal async boundary crossings allocate fewer boundary events and actor messages while preserving element order, demand bounds, terminal ordering, and supervision behavior. Tests: - rtk scalafmt --mode diff-ref=origin/main -- passed - rtk scalafmt --list --mode diff-ref=origin/main -- passed - rtk git diff --check -- passed - rtk sbt "stream-tests / Test / testOnly org.apache.pekko.stream.FusingSpec -- -z \"drain asynchronous boundary batches before failing\"" -- passed, 1 test - rtk sbt "stream-tests / Test / testOnly org.apache.pekko.stream.FusingSpec org.apache.pekko.stream.scaladsl.PublisherSinkSpec org.apache.pekko.stream.io.FileSinkSpec org.apache.pekko.stream.scaladsl.FlowMapWithResourceSpec" "stream / mimaReportBinaryIssues" -- passed, 53 tests and MiMa - rtk bash -lc 'set -o pipefail; sbt "bench-jmh/Jmh/run -wi 1 -i 3 -w 5s -r 5s -f 1 -prof gc .*AsyncBoundaryThroughputBenchmark.*" 2>&1 | tee /tmp/pekko-async-boundary-final-squashed.log' -- passed - rtk qodercli --help -- passed, confirmed -p/--print, --output-format, --cwd, and --attachment - qodercli stdout review of /tmp/project-review.diff -- passed, no must-fix findings - Independent subAgent review of /tmp/project-review.diff -- passed, no must-fix findings References: None - internal stream boundary throughput optimization
1 parent 535a5fa commit 2610cda

4 files changed

Lines changed: 398 additions & 41 deletions

File tree

bench-jmh/src/main/scala/org/apache/pekko/stream/AsyncBoundaryThroughputBenchmark.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ class AsyncBoundaryThroughputBenchmark {
5252

5353
implicit val system: ActorSystem = ActorSystem("AsyncBoundaryThroughputBenchmark", config)
5454

55-
@Param(Array("1", "3", "10"))
55+
@Param(Array("0", "1", "3", "10"))
5656
var asyncBoundaries = 0
5757

5858
var source: Source[Int, NotUsed] = _

stream-tests/src/test/scala/org/apache/pekko/stream/FusingSpec.scala

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,20 @@ import duration._
1919

2020
import org.apache.pekko
2121
import pekko.Done
22+
import pekko.stream.QueueOfferResult
2223
import pekko.stream.impl.UnfoldResourceSource
2324
import pekko.stream.impl.fusing.GraphInterpreter
2425
import pekko.stream.scaladsl._
2526
import pekko.stream.stage.GraphStage
27+
import pekko.stream.testkit.TestPublisher
2628
import pekko.stream.testkit.StreamSpec
2729
import pekko.stream.testkit.Utils.TE
30+
import pekko.stream.testkit.scaladsl.TestSink
2831

2932
class FusingSpec extends StreamSpec {
3033

34+
val asyncBoundaryInputBuffer = Attributes.inputBuffer(16, 16)
35+
3136
def actorRunningStage = {
3237
GraphInterpreter.currentInterpreter.context
3338
}
@@ -47,6 +52,187 @@ class FusingSpec extends StreamSpec {
4752
.sorted should ===(0 to 198 by 2)
4853
}
4954

55+
"preserve elements across repeated asynchronous boundary batches" in {
56+
val elements = 1 to 5000
57+
58+
Source(elements)
59+
.map(identity)
60+
.async
61+
.addAttributes(asyncBoundaryInputBuffer)
62+
.runWith(Sink.seq)
63+
.futureValue should ===(elements)
64+
}
65+
66+
"preserve elements across chained asynchronous boundary batches" in {
67+
val elements = 1 to 5000
68+
69+
Source(elements)
70+
.map(identity)
71+
.async
72+
.addAttributes(asyncBoundaryInputBuffer)
73+
.map(identity)
74+
.async
75+
.addAttributes(asyncBoundaryInputBuffer)
76+
.runWith(Sink.seq)
77+
.futureValue should ===(elements)
78+
}
79+
80+
"flush asynchronous boundary batches when async input suspends the interpreter" in {
81+
val elements = 1 to 64
82+
val (queue, downstream) = Source
83+
.fromGraph(Source.queue[Int](elements.size))
84+
.async
85+
.addAttributes(ActorAttributes.syncProcessingLimit(1) and asyncBoundaryInputBuffer)
86+
.toMat(TestSink[Int]())(Keep.both)
87+
.run()
88+
89+
downstream.request(elements.size)
90+
elements.foreach { elem =>
91+
queue.offer(elem) should ===(QueueOfferResult.Enqueued)
92+
}
93+
downstream.expectNextN(elements)
94+
95+
queue.complete()
96+
downstream.expectComplete()
97+
}
98+
99+
"drain asynchronous boundary batches before completing" in {
100+
val elements = 1 to 64
101+
102+
Source(elements)
103+
.async
104+
.addAttributes(asyncBoundaryInputBuffer)
105+
.runWith(TestSink[Int]())
106+
.request(elements.size)
107+
.expectNextN(elements)
108+
.expectComplete()
109+
}
110+
111+
"drain asynchronous boundary batches before failing" in {
112+
val elements = 1 to 64
113+
val ex = TE("boom")
114+
val upstream = TestPublisher.probe[Int]()
115+
val downstream = Source
116+
.fromPublisher(upstream)
117+
.async
118+
.addAttributes(asyncBoundaryInputBuffer)
119+
.runWith(TestSink[Int]())
120+
121+
downstream.request(elements.size + 1)
122+
123+
var sent = 0
124+
while (sent < elements.size) {
125+
val request = upstream.expectRequest()
126+
val send = math.min(request, elements.size - sent).toInt
127+
elements.slice(sent, sent + send).foreach(upstream.sendNext)
128+
sent += send
129+
}
130+
131+
upstream.sendError(ex)
132+
downstream.expectNextN(elements)
133+
downstream.expectError(ex)
134+
}
135+
136+
"not emit elements after an asynchronous boundary failure" in {
137+
val ex = TE("boom")
138+
val probe = Source(1 to 64)
139+
.concat(Source.failed(ex))
140+
.async
141+
.addAttributes(asyncBoundaryInputBuffer)
142+
.runWith(TestSink[Int]())
143+
144+
probe.request(65)
145+
146+
var expected = 1
147+
var errorSignalled = false
148+
while (!errorSignalled && expected <= 64) {
149+
probe.expectNextOrError(expected, ex) match {
150+
case Right(_) =>
151+
expected += 1
152+
case Left(_) =>
153+
errorSignalled = true
154+
}
155+
}
156+
if (!errorSignalled) probe.expectError(ex)
157+
}
158+
159+
"propagate cancellation with asynchronous boundary elements in flight" in {
160+
val upstream = TestPublisher.probe[Int]()
161+
val downstream = Source
162+
.fromPublisher(upstream)
163+
.async
164+
.addAttributes(asyncBoundaryInputBuffer)
165+
.runWith(TestSink[Int]())
166+
167+
downstream.request(16)
168+
upstream.expectRequest() should be >= 16L
169+
(1 to 16).foreach(upstream.sendNext)
170+
171+
downstream.expectNext(1)
172+
downstream.cancel()
173+
174+
upstream.expectCancellation()
175+
}
176+
177+
"not exceed downstream demand across asynchronous boundary batches" in {
178+
val downstream = Source(1 to 1000)
179+
.async
180+
.addAttributes(asyncBoundaryInputBuffer)
181+
.runWith(TestSink[Int]())
182+
183+
downstream.request(1)
184+
downstream.expectNext(1)
185+
downstream.expectNoMessage(100.millis)
186+
187+
downstream.request(2)
188+
downstream.expectNext(2, 3)
189+
downstream.expectNoMessage(100.millis)
190+
191+
downstream.cancel()
192+
}
193+
194+
"preserve resuming supervision across asynchronous boundary batches" in {
195+
Source(List(1, 2, -1, 3, 4))
196+
.async
197+
.addAttributes(asyncBoundaryInputBuffer)
198+
.map { elem =>
199+
require(elem > 0)
200+
elem
201+
}
202+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider))
203+
.runWith(Sink.seq)
204+
.futureValue should ===(Seq(1, 2, 3, 4))
205+
}
206+
207+
"preserve restarting supervision across asynchronous boundary batches" in {
208+
Source(List(1, 3, -1, 5, 7))
209+
.async
210+
.addAttributes(asyncBoundaryInputBuffer)
211+
.scan(0) { (old, current) =>
212+
require(current > 0)
213+
old + current
214+
}
215+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider))
216+
.runWith(Sink.seq)
217+
.futureValue should ===(Seq(0, 1, 4, 0, 5, 12))
218+
}
219+
220+
"preserve stopping supervision across asynchronous boundary batches" in {
221+
val ex = TE("boom")
222+
Source(List(1, 2, -1, 3, 4))
223+
.async
224+
.addAttributes(asyncBoundaryInputBuffer)
225+
.map {
226+
case -1 => throw ex
227+
case elem => elem
228+
}
229+
.withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider))
230+
.runWith(TestSink[Int]())
231+
.request(5)
232+
.expectNext(1, 2)
233+
.expectError(ex)
234+
}
235+
50236
"use multiple actors when there are asynchronous boundaries in the subflows (manual)" in {
51237
val async = Flow[Int].map(x => { testActor ! actorRunningStage; x }).async
52238
Source(0 to 9)

stream/src/main/mima-filters/2.0.x.backwards.excludes/pr-2916-boundary-event-allocation.excludes

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,9 @@ ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.ActorGraphI
2828
ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*Abort*")
2929
ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*AsyncInput*")
3030
ProblemFilters.exclude[Problem]("org.apache.pekko.stream.impl.fusing.GraphInterpreterShell*ResumeShell*")
31+
32+
# Optimize internal async boundary batching
33+
ProblemFilters.exclude[FinalMethodProblem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter#BatchingActorInputBoundary.onNext")
34+
ProblemFilters.exclude[FinalMethodProblem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter#BatchingActorInputBoundary.onError")
35+
ProblemFilters.exclude[FinalMethodProblem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter#BatchingActorInputBoundary.onComplete")
36+
ProblemFilters.exclude[FinalMethodProblem]("org.apache.pekko.stream.impl.fusing.ActorGraphInterpreter#BatchingActorInputBoundary.onSubscribe")

0 commit comments

Comments
 (0)