Skip to content

Commit 5bff1b4

Browse files
authored
fix: align BroadcastPartition routing in LinkConfig with broadcast semantics (#5032)
1 parent 1497806 commit 5bff1b4

2 files changed

Lines changed: 24 additions & 14 deletions

File tree

  • amber/src
    • main/scala/org/apache/texera/amber/engine/architecture/scheduling/config
    • test/scala/org/apache/texera/amber/engine/architecture/scheduling/config

amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfig.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,9 @@ case object LinkConfig {
7272
case BroadcastPartition() =>
7373
BroadcastPartitioning(
7474
dataTransferBatchSize,
75-
fromWorkerIds.zip(toWorkerIds).map {
76-
case (fromWorkerId, toWorkerId) =>
77-
ChannelIdentity(fromWorkerId, toWorkerId, isControl = false)
78-
}
75+
fromWorkerIds.flatMap(fromId =>
76+
toWorkerIds.map(toId => ChannelIdentity(fromId, toId, isControl = false))
77+
)
7978
)
8079

8180
case UnknownPartition() =>

amber/src/test/scala/org/apache/texera/amber/engine/architecture/scheduling/config/LinkConfigSpec.scala

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,7 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
144144

145145
// ----- BroadcastPartition -----
146146

147-
"BroadcastPartition" should "produce a BroadcastPartitioning whose channels follow zip pairing today (current behavior)" in {
148-
// Pin: BroadcastPartition currently uses `fromWorkerIds.zip(toWorkerIds)`
149-
// — the SAME 1:1 pairing as OneToOnePartition. ChannelConfig in the same
150-
// package emits a full cross product for the BroadcastPartition arm,
151-
// which matches broadcast semantics ("each sender targets every
152-
// receiver"). The two helpers diverge today; pinning this so a fix that
153-
// realigns the contract surfaces here. Filed as a Bug.
147+
"BroadcastPartition" should "produce a BroadcastPartitioning with the full sender x receiver cross product" in {
154148
val out = LinkConfig.toPartitioning(
155149
List(w1, w2, w3),
156150
List(u1, u2, u3),
@@ -160,18 +154,35 @@ class LinkConfigSpec extends AnyFlatSpec with Matchers {
160154
out shouldBe a[BroadcastPartitioning]
161155
val bp = out.asInstanceOf[BroadcastPartitioning]
162156
bp.batchSize shouldBe batch
163-
endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"), ("w3", "u3"))
157+
endpoints(bp.channels) shouldBe Seq(
158+
("w1", "u1"),
159+
("w1", "u2"),
160+
("w1", "u3"),
161+
("w2", "u1"),
162+
("w2", "u2"),
163+
("w2", "u3"),
164+
("w3", "u1"),
165+
("w3", "u2"),
166+
("w3", "u3")
167+
)
164168
}
165169

166-
it should "silently truncate broadcast pairings when sides differ in length (current behavior)" in {
170+
it should "emit the full cross product even when sender and receiver counts differ" in {
167171
val out = LinkConfig.toPartitioning(
168172
List(w1, w2, w3),
169173
List(u1, u2),
170174
BroadcastPartition(),
171175
batch
172176
)
173177
val bp = out.asInstanceOf[BroadcastPartitioning]
174-
endpoints(bp.channels) shouldBe Seq(("w1", "u1"), ("w2", "u2"))
178+
endpoints(bp.channels) shouldBe Seq(
179+
("w1", "u1"),
180+
("w1", "u2"),
181+
("w2", "u1"),
182+
("w2", "u2"),
183+
("w3", "u1"),
184+
("w3", "u2")
185+
)
175186
}
176187

177188
// ----- UnknownPartition -----

0 commit comments

Comments
 (0)