Skip to content

Commit 956d478

Browse files
authored
test: fix InterpreterBenchmark so it produces trustworthy numbers (#2985)
* test: stop leaking ActorSystem in InterpreterBenchmark per invocation Motivation: The previous shape `new GraphInterpreterSpecKit { new TestSetup { ... } }` ran inside @benchmark, so each invocation built (and never tore down) a fresh ActorSystem. Long iterations exhausted native threads and JMH reported empty results once the JVM ran out of resources. Modification: Make the benchmark class itself extend GraphInterpreterSpecKit so JMH's @State(Scope.Benchmark) lifecycle reuses one ActorSystem across all invocations. Add @teardown(Level.Trial) to terminate it cleanly. Result: The benchmark now runs to completion and produces stable numbers, which is a prerequisite for measuring follow-up GraphInterpreter optimizations. Tests: sbt 'bench-jmh/compile' * test: use per-instance IdentityStage in InterpreterBenchmark Motivation: GraphStages.identity is a singleton whose Inlet/Outlet shape is shared across every reference. Chaining N copies into the assembly (numberOfIds = 5/10) collapses to a single shape and mis-wires the connections, which surfaced as a runtime "Cannot pull port twice" error spam during the benchmark and produced nonsense throughput numbers (5/10 stages reported as faster than 1). Modification: Define a local IdentityStage class with its own Inlet/Outlet per instance and use Vector.fill(numberOfIds)(new IdentityStage[Int]). Result: The benchmark wires N distinct stages and produces stable, monotonic numbers (throughput decreases as numberOfIds grows, as expected). Tests: sbt 'bench-jmh/compile'
1 parent b95809d commit 956d478

1 file changed

Lines changed: 45 additions & 24 deletions

File tree

bench-jmh/src/main/scala/org/apache/pekko/stream/InterpreterBenchmark.scala

Lines changed: 45 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,19 +15,20 @@ package org.apache.pekko.stream
1515

1616
import java.util.concurrent.TimeUnit
1717

18+
import scala.concurrent.Await
19+
import scala.concurrent.duration._
20+
1821
import org.openjdk.jmh.annotations._
1922

2023
import org.apache.pekko
21-
import pekko.event._
2224
import pekko.stream.impl.fusing.GraphInterpreter.{ DownstreamBoundaryStageLogic, UpstreamBoundaryStageLogic }
2325
import pekko.stream.impl.fusing.GraphInterpreterSpecKit
24-
import pekko.stream.impl.fusing.GraphStages
2526
import pekko.stream.stage._
2627

2728
@State(Scope.Benchmark)
2829
@OutputTimeUnit(TimeUnit.MILLISECONDS)
2930
@BenchmarkMode(Array(Mode.Throughput))
30-
class InterpreterBenchmark {
31+
class InterpreterBenchmark extends GraphInterpreterSpecKit {
3132
import InterpreterBenchmark._
3233

3334
// manual, and not via @Param, because we want @OperationsPerInvocation on our tests
@@ -36,32 +37,59 @@ class InterpreterBenchmark {
3637
@Param(Array("1", "5", "10"))
3738
var numberOfIds: Int = 0
3839

40+
// Earlier this benchmark instantiated `new GraphInterpreterSpecKit` inside @Benchmark, which
41+
// created (and leaked) a fresh ActorSystem on every invocation and would exhaust native threads
42+
// on long runs. Extending the SpecKit means JMH's @State(Scope.Benchmark) lifecycle reuses a
43+
// single ActorSystem across all invocations.
44+
45+
@TearDown(Level.Trial)
46+
def shutdown(): Unit = {
47+
Await.result(system.terminate(), 10.seconds)
48+
}
49+
3950
@Benchmark
4051
@OperationsPerInvocation(100000)
4152
def graph_interpreter_100k_elements(): Unit = {
42-
new GraphInterpreterSpecKit {
43-
new TestSetup {
44-
val identities = Vector.fill(numberOfIds)(GraphStages.identity[Int])
45-
val source = new GraphDataSource("source", data100k)
46-
val sink = new GraphDataSink[Int]("sink", data100k.size)
47-
48-
val b = builder(identities: _*).connect(source, identities.head.in).connect(identities.last.out, sink)
53+
new TestSetup {
54+
val identities = Vector.fill(numberOfIds)(new IdentityStage[Int])
55+
val source = new GraphDataSource("source", data100k)
56+
val sink = new GraphDataSink[Int]("sink", data100k.size)
4957

50-
// FIXME: This should not be here, this is pure setup overhead
51-
for (i <- 0 until identities.size - 1) {
52-
b.connect(identities(i).out, identities(i + 1).in)
53-
}
58+
val b = builder(identities: _*).connect(source, identities.head.in).connect(identities.last.out, sink)
5459

55-
b.init()
56-
sink.requestOne()
57-
interpreter.execute(Int.MaxValue)
60+
// FIXME: This should not be here, this is pure setup overhead
61+
for (i <- 0 until identities.size - 1) {
62+
b.connect(identities(i).out, identities(i + 1).in)
5863
}
64+
65+
b.init()
66+
sink.requestOne()
67+
interpreter.execute(Int.MaxValue)
5968
}
6069
}
6170
}
6271

6372
object InterpreterBenchmark {
6473

74+
/**
75+
* Per-instance identity stage. Cannot reuse [[GraphStages.identity]] because it is a singleton
76+
* whose Inlet/Outlet shape is shared across all references — chaining N copies of the singleton
77+
* collapses to a single shape and mis-wires the assembly (manifests as `Cannot pull port twice`).
78+
*/
79+
final class IdentityStage[T] extends GraphStage[FlowShape[T, T]] {
80+
val in = Inlet[T]("Identity.in")
81+
val out = Outlet[T]("Identity.out")
82+
override val shape: FlowShape[T, T] = FlowShape(in, out)
83+
84+
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
85+
new GraphStageLogic(shape) with InHandler with OutHandler {
86+
override def onPush(): Unit = push(out, grab(in))
87+
override def onPull(): Unit = pull(in)
88+
setHandler(in, this)
89+
setHandler(out, this)
90+
}
91+
}
92+
6593
case class GraphDataSource[T](override val toString: String, data: Vector[T]) extends UpstreamBoundaryStageLogic[T] {
6694
var idx: Int = 0
6795
override val out: pekko.stream.Outlet[T] = Outlet[T]("out")
@@ -98,11 +126,4 @@ object InterpreterBenchmark {
98126

99127
def requestOne(): Unit = pull(in)
100128
}
101-
102-
val NoopBus = new LoggingBus {
103-
override def subscribe(subscriber: Subscriber, to: Classifier): Boolean = true
104-
override def publish(event: Event): Unit = ()
105-
override def unsubscribe(subscriber: Subscriber, from: Classifier): Boolean = true
106-
override def unsubscribe(subscriber: Subscriber): Unit = ()
107-
}
108129
}

0 commit comments

Comments
 (0)