Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1140,6 +1140,21 @@ public void mustBeAbleToCombineN() throws Exception {
assertEquals(6, result.toCompletableFuture().get(3, TimeUnit.SECONDS).intValue());
}

// Regression test for https://github.com/apache/pekko/issues/2723
// Verifies that Source.combine with a single source correctly applies
// type-transforming strategies (like MergeLatest), rather than bypassing
// them with an unsafe asInstanceOf cast.
@Test
public void mustBeAbleToCombineSingleSourceWithMergeLatest() throws Exception {
final List<Source<Integer, NotUsed>> sources = Collections.singletonList(Source.single(1));
final List<List<Integer>> result =
Source.<Integer, List<Integer>, NotUsed>combine(sources, MergeLatest::create)
.runWith(Sink.collect(Collectors.toList()), system)
.toCompletableFuture()
.get(3, TimeUnit.SECONDS);
assertEquals(Collections.singletonList(Collections.singletonList(1)), result);
}

@SuppressWarnings("unchecked")
@Test
public void mustBeAbleToZipN() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@ import scala.concurrent.duration._

import org.apache.pekko
import pekko.Done
import pekko.dispatch.ExecutionContexts
import pekko.stream._
import pekko.stream.ActorAttributes.supervisionStrategy
import pekko.stream.testkit._
import pekko.stream.testkit.scaladsl.{ TestSink, TestSource }
import pekko.testkit.DefaultTimeout

import scala.collection.immutable

import org.reactivestreams.Publisher

import org.scalatest.concurrent.ScalaFutures
Expand Down Expand Up @@ -192,6 +195,44 @@ class SinkSpec extends StreamSpec with DefaultTimeout with ScalaFutures {
}
}

// Regression tests for Sink.combine single-sink case — mirrors Source.combine fix (#2723).
// The single-sink case previously used an unsafe asInstanceOf cast.

"combine single sink with Broadcast should work (type-preserving bypass)" in {
// Broadcast has TypePreservingFanOut, so the single-sink case is safely bypassed.
implicit val ex: scala.concurrent.ExecutionContext = ExecutionContexts.parasitic
val result = Source(List(1, 2, 3))
.runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Broadcast[Int](_)))
Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 3)))
}

"combine single sink with Balance should work (type-preserving bypass)" in {
// Balance has TypePreservingFanOut, so the single-sink case is safely bypassed.
implicit val ex: scala.concurrent.ExecutionContext = ExecutionContexts.parasitic
val result = Source(List(1, 2, 3))
.runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Balance[Int](_)))
Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 3)))
}

"combine single sink with Partition should route through strategy (not type-preserving)" in {
// Partition intentionally does NOT have TypePreservingFanOut — its partitioner function
// provides user-specified routing semantics that would be lost if bypassed.
// Single-sink Partition goes through the fan-out graph, honoring partitioner semantics.
implicit val ex: scala.concurrent.ExecutionContext = ExecutionContexts.parasitic
val result = Source(List(1, 2, 3))
.runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(Partition[Int](_, _ => 0)))
Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 3)))
}

"combine single sink with wrapped Broadcast (.named) should still work" in {
// Even if the fan-out strategy loses the TypePreservingFanOut trait via wrapping
// (e.g., .named()), routing through the strategy is still correct.
implicit val ex: scala.concurrent.ExecutionContext = ExecutionContexts.parasitic
val result = Source(List(1, 2, 3))
.runWith(Sink.combine(immutable.Seq(Sink.seq[Int]))(n => Broadcast[Int](n).named("myBroadcast")))
Future.sequence(result).futureValue should ===(List(immutable.Seq(1, 2, 3)))
}

"suitably override attribute handling methods" in {
import Attributes._
val s: Sink[Int, Future[Int]] = Sink.head[Int].async.addAttributes(none).named("name")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,107 @@ class SourceSpec extends StreamSpec with DefaultTimeout {
sub.request(5).expectNextN(0 to 4).expectComplete()
}

// Regression tests for https://github.com/apache/pekko/issues/2723
// Source.combine with a single source must apply type-transforming fan-in strategies
// (like MergeLatest) correctly, rather than bypassing them with an unsafe cast.
// The TypePreservingFanIn trait marks strategies where T == U, enabling safe bypass.
// Strategies without this trait (MergeLatest, ZipWithN) are always routed through
// the fan-in graph even for a single source.

"combine single source with MergeLatest should emit wrapped elements" in {
Source
.combine(immutable.Seq(Source.single(1)))(MergeLatest(_))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(List(1)))
}

"combine single source with MergeLatest should emit all wrapped elements" in {
Source
.combine(immutable.Seq(Source(List(1, 2, 3))))(MergeLatest(_))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
}

"combine single source with ZipWithN should apply zipper function" in {
Source
.combine(immutable.Seq(Source(List(1, 2, 3))))(n => ZipWithN[Int, Int](_.sum)(n))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(1, 2, 3))
}

"combine single source with Merge should still work (type-preserving)" in {
Source
.combine(immutable.Seq(Source.single(1)))(Merge(_))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(1))
}

"combine single source with Concat should still work (type-preserving)" in {
Source
.combine(immutable.Seq(Source(List(1, 2, 3))))(Concat(_))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(1, 2, 3))
}

"combine single source with Interleave should still work (type-preserving)" in {
Source
.combine(immutable.Seq(Source(List(1, 2, 3))))(Interleave(_, 1))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(1, 2, 3))
}

"combine single source with wrapped Merge (.named) should still work" in {
// When Merge is wrapped via .named(), the TypePreservingFanIn trait is lost
// (GenericGraphWithChangedAttributes does not extend it). The code correctly
// routes through the fan-in graph instead of bypassing — functionally correct,
// just slightly less optimal.
Source
.combine(immutable.Seq(Source(List(1, 2, 3))))(n => Merge[Int](n).named("my-merge"))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(1, 2, 3))
}

"combine single source with wrapped MergeLatest (.named) should emit wrapped elements" in {
Source
.combine(immutable.Seq(Source(List(1, 2, 3))))(n => MergeLatest[Int](n).named("my-merge-latest"))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(List(1), List(2), List(3)))
}

"combine single source with MergeSequence should still work (type-preserving)" in {
// MergeSequence.apply wraps via withDetachedInputs, which loses the TypePreservingFanIn
// trait. This means single-source MergeSequence goes through the fan-in strategy (safe
// default). The test uses 0-based sequences to satisfy MergeSequence's ordering validation.
Source
.combine(immutable.Seq(Source(List(0L, 1L, 2L))))(n => MergeSequence[Long](n)(identity))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(0L, 1L, 2L))
}

"combine single source with MergePrioritized should still work (type-preserving)" in {
Source
.combine(immutable.Seq(Source(List(1, 2, 3))))(n => MergePrioritized(Seq.fill(n)(1)))
.runWith(Sink.seq)
.futureValue should ===(immutable.Seq(1, 2, 3))
}

"combine single source materialized value should be a singleton list" in {
val (mat, result) = Source
.combine(immutable.Seq(Source.single(1).mapMaterializedValue(_ => "mat-value")))(MergeLatest(_))
.toMat(Sink.seq)(Keep.both)
.run()
mat should ===(immutable.Seq("mat-value"))
result.futureValue should ===(immutable.Seq(List(1)))
}

"combine empty sources list should produce empty source" in {
val result = Source
.combine(immutable.Seq.empty[Source[Int, NotUsed]])(MergeLatest(_))
.runWith(Sink.seq)
.futureValue
result should ===(immutable.Seq.empty)
}

"combine from two inputs with simplified API" in {
val probes = immutable.Seq.fill(2)(TestPublisher.manualProbe[Int]())
val source = Source.fromPublisher(probes(0)) :: Source.fromPublisher(probes(1)) :: Nil
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream

/**
* Marker trait for fan-in graph stages whose output element type is the same as
* their input element type (i.e., `T => T`).
*
* Built-in stages with this trait: [[scaladsl.Merge]], [[scaladsl.Concat]],
* [[scaladsl.Interleave]], [[scaladsl.MergePrioritized]], [[scaladsl.OrElse]],
* and [[scaladsl.MergeSequence]].
*
* Note: some of these stages (Concat, Interleave, MergeSequence) have factory methods
* that wrap the stage via `withDetachedInputs`, which loses this trait. In those cases,
* `Source.combine` routes through the fan-in graph instead of bypassing—functionally
* correct, just slightly less optimal. The bypass optimization fires for stages whose
* factory methods return the raw class (e.g., `Merge`, `MergePrioritized`).
*
* This trait is used by [[scaladsl.Source.combine]] (and its Java API counterpart)
* to safely optimize the single-source case. When only one source is provided,
* the fan-in strategy can be bypassed with a direct pass-through if and only if the
* strategy is type-preserving (output type equals input type). Without this marker,
* a bypass via `asInstanceOf` would be unsafe for type-transforming strategies
* like `MergeLatest` (where `T => List[T]`) or `ZipWithN` (where `A => O`).
*
* This design uses a "safe default": strategies '''without''' this trait will always
* be routed through the fan-in graph, even for a single source. This ensures
* correct behavior for unknown or third-party fan-in strategies that may transform
* the element type.
*
* @since 1.5.0
*/
trait TypePreservingFanIn
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pekko.stream

/**
* Marker trait for fan-out graph stages whose output element type is the same as
* their input element type (i.e., `T => T`).
*
* Examples include [[scaladsl.Broadcast]] and [[scaladsl.Balance]].
*
* Note: [[scaladsl.Partition]] is intentionally NOT marked with this trait despite having
* `T => T` types, because its `partitioner` function provides user-specified routing
* semantics that would be lost if the stage were bypassed.
*
* This trait is used by [[scaladsl.Sink.combine]] (and its Java API counterpart)
* to safely optimize the single-sink case. When only one sink is provided,
* the fan-out strategy can be bypassed with a direct pass-through if and only if the
* strategy is type-preserving (output type equals input type). Without this marker,
* a bypass via `asInstanceOf` would be unsafe for type-transforming strategies
* where `T` differs from `U`.
*
* This design uses a "safe default": strategies '''without''' this trait will always
* be routed through the fan-out graph, even for a single sink. This ensures
* correct behavior for unknown or third-party fan-out strategies that may transform
* the element type.
*
* @since 1.5.0
*/
trait TypePreservingFanOut
43 changes: 32 additions & 11 deletions stream/src/main/scala/org/apache/pekko/stream/scaladsl/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ object Merge {
*
* '''Cancels when''' downstream cancels
*/
final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean)
extends GraphStage[UniformFanInShape[T, T]]
with pekko.stream.TypePreservingFanIn {
// one input might seem counter intuitive but saves us from special handling in other places
require(inputPorts >= 1, "A Merge must have one or more input ports")

Expand Down Expand Up @@ -338,7 +340,8 @@ object MergePrioritized {
* '''Cancels when''' downstream cancels
*/
final class MergePrioritized[T] private (val priorities: Seq[Int], val eagerComplete: Boolean)
extends GraphStage[UniformFanInShape[T, T]] {
extends GraphStage[UniformFanInShape[T, T]]
with pekko.stream.TypePreservingFanIn {
require(priorities.nonEmpty, "A Merge must have one or more input ports")
require(priorities.forall(_ > 0), "Priorities should be positive integers")

Expand Down Expand Up @@ -463,8 +466,12 @@ object Interleave {
* '''Cancels when''' downstream cancels
*/
final class Interleave[T](val inputPorts: Int, val segmentSize: Int, val eagerClose: Boolean)
extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "input ports must be > 1")
extends GraphStage[UniformFanInShape[T, T]]
with pekko.stream.TypePreservingFanIn {
// Relaxed from > 1 to >= 1: single-input Interleave is semantically valid (pass-through).
// This enables Source.combine to route single-source cases through the stage without
// needing a try-catch fallback. See #2723.
require(inputPorts >= 1, "input ports must be >= 1")
require(segmentSize > 0, "segmentSize must be > 0")

val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("Interleave.in" + i))
Expand Down Expand Up @@ -617,7 +624,9 @@ object Broadcast {
* '''Cancels when'''
* If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*/
final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean) extends GraphStage[UniformFanOutShape[T, T]] {
final class Broadcast[T](val outputPorts: Int, val eagerCancel: Boolean)
extends GraphStage[UniformFanOutShape[T, T]]
with pekko.stream.TypePreservingFanOut {
// one output might seem counter intuitive but saves us from special handling in other places
require(outputPorts >= 1, "A Broadcast must have one or more output ports")
val in: Inlet[T] = Inlet[T]("Broadcast.in")
Expand Down Expand Up @@ -943,7 +952,8 @@ object Balance {
* '''Cancels when''' If eagerCancel is enabled: when any downstream cancels; otherwise: when all downstreams cancel
*/
final class Balance[T](val outputPorts: Int, val waitForAllDownstreams: Boolean, val eagerCancel: Boolean)
extends GraphStage[UniformFanOutShape[T, T]] {
extends GraphStage[UniformFanOutShape[T, T]]
with pekko.stream.TypePreservingFanOut {
// one output might seem counter intuitive but saves us from special handling in other places
require(outputPorts >= 1, "A Balance must have one or more output ports")

Expand Down Expand Up @@ -1325,8 +1335,13 @@ object Concat {
*
* '''Cancels when''' downstream cancels
*/
final class Concat[T](val inputPorts: Int) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Concat must have more than 1 input ports")
final class Concat[T](val inputPorts: Int)
extends GraphStage[UniformFanInShape[T, T]]
with pekko.stream.TypePreservingFanIn {
// Relaxed from > 1 to >= 1: single-input Concat is semantically valid (pass-through).
// This enables Source.combine to route single-source cases through the stage without
// needing a try-catch fallback. See #2723.
require(inputPorts >= 1, "A Concat must have at least 1 input port")
val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("Concat.in" + i))
val out: Outlet[T] = Outlet[T]("Concat.out")
override def initialAttributes = DefaultAttributes.concat
Expand Down Expand Up @@ -1398,7 +1413,9 @@ object OrElse {
* '''Cancels when''' downstream cancels
*/
@InternalApi
private[stream] final class OrElse[T] extends GraphStage[UniformFanInShape[T, T]] {
private[stream] final class OrElse[T]
extends GraphStage[UniformFanInShape[T, T]]
with pekko.stream.TypePreservingFanIn {
val primary = Inlet[T]("OrElse.primary")
val secondary = Inlet[T]("OrElse.secondary")
val out = Outlet[T]("OrElse.out")
Expand Down Expand Up @@ -1497,8 +1514,12 @@ object MergeSequence {
* '''Cancels when''' downstream cancels
*/
final class MergeSequence[T](val inputPorts: Int)(extractSequence: T => Long)
extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A MergeSequence must have more than 1 input ports")
extends GraphStage[UniformFanInShape[T, T]]
with pekko.stream.TypePreservingFanIn {
// Relaxed from > 1 to >= 1: single-input MergeSequence is semantically valid.
// This enables Source.combine to route single-source cases through the stage without
// needing a try-catch fallback. See #2723.
require(inputPorts >= 1, "A MergeSequence must have at least 1 input port")
private val in: IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i => Inlet[T]("MergeSequence.in" + i))
private val out: Outlet[T] = Outlet("MergeSequence.out")
override val shape: UniformFanInShape[T, T] = UniformFanInShape(out, in: _*)
Expand Down
Loading
Loading