Skip to content

Source.combine single source with type-transforming fan-in strategies#2746

Merged
pjfanning merged 3 commits into
apache:1.5.xfrom
pjfanning:stream-1.5
Mar 16, 2026
Merged

Source.combine single source with type-transforming fan-in strategies#2746
pjfanning merged 3 commits into
apache:1.5.xfrom
pjfanning:stream-1.5

Conversation

@pjfanning
Copy link
Copy Markdown
Member

cherry pick 566a347 #2723 #2726

He-Pin and others added 2 commits March 16, 2026 19:48
…in strategies (apache#2723) (apache#2726)

Motivation:
Source.combine with a single source bypassed the fan-in strategy using an unsafe
asInstanceOf cast. This worked for type-preserving strategies like Merge (T → T),
but silently produced incorrect results for type-transforming strategies like
MergeLatest (T → List[T]). For example:
  Source.combine(Seq(Source.single(1)))(MergeLatest(_)) emitted 1 instead of List(1)

Modification:
- Introduce TypePreservingFanIn marker trait for fan-in stages where T == U AND
  single-input behavior is a no-op pass-through (Merge, Concat, Interleave,
  MergePrioritized, OrElse)
- MergeSequence intentionally NOT marked: despite being T → T, it validates
  sequence ordering (not a pure pass-through)
- Source.combine single-source case: check TypePreservingFanIn trait before
  bypassing. Strategies without the trait are routed through the fan-in graph.
- Relax Concat, Interleave, MergeSequence to accept inputPorts >= 1 (was > 1).
  This eliminates the need for a try-catch fallback in Source.combine and allows
  these stages to be used directly with a single input.
- Use Source.fromGraph for non-Source Graph inputs safety
- Add 14 regression tests (12 Scala + 1 Java + MergeSequence validation)

Result:
- MergeLatest/ZipWithN correctly apply their transformation even for single source
- Merge/Concat/Interleave correctly bypass (type-preserving optimization)
- MergeSequence correctly validates sequences even for single source
- Unknown/third-party strategies default to routing through the fan-in graph
  (safe default for strategies that may transform types)
- Binary compatibility maintained (verified via MiMa)

References:
- apache#2723
- apache#2726
@pjfanning pjfanning added this to the 1.5.0 milestone Mar 16, 2026
@pjfanning pjfanning merged commit 20c2b18 into apache:1.5.x Mar 16, 2026
9 checks passed
@pjfanning pjfanning deleted the stream-1.5 branch March 16, 2026 23:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants