Skip to content

Commit ceedbcd

Browse files
committed
[SPARK-51638][CORE][3.5] Fix fetching the remote disk stored RDD blocks via the external shuffle service
### What changes were proposed in this pull request? Fix remote fetching of disk stored RDD blocks via the external shuffle service when `spark.shuffle.service.fetch.rdd.enabled` is set. ### Why are the changes needed? After https://issues.apache.org/jira/browse/SPARK-43221 remote fetching was handled in `BlockManagerMasterEndpoint#getLocationsAndStatus` at one place where all the location was used along with the `blockManagerInfo` map but this map only includes information about the active executors which are not already killed (after for example downscaling in dynamic allocation or just killed because of a failures). This PR extend the search to all the remote external shuffle services where the `blockStatusByShuffleService` map is used. That map contains block infos even for the killed executors. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? An existing unit test was extended. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #50596 from attilapiros/ATTILA_PR_50439_BRANCH-3.5. Authored-by: attilapiros <piros.attila.zsolt@gmail.com> Signed-off-by: attilapiros <piros.attila.zsolt@gmail.com>
1 parent c306e2d commit ceedbcd

2 files changed

Lines changed: 91 additions & 35 deletions

File tree

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -849,41 +849,41 @@ class BlockManagerMasterEndpoint(
849849
blockId: BlockId,
850850
requesterHost: String): Option[BlockLocationsAndStatus] = {
851851
val allLocations = Option(blockLocations.get(blockId)).map(_.toSeq).getOrElse(Seq.empty)
852-
val hostLocalLocations = allLocations.filter(bmId => bmId.host == requesterHost)
853-
854852
val blockStatusWithBlockManagerId: Option[(BlockStatus, BlockManagerId)] =
855-
(if (externalShuffleServiceRddFetchEnabled) {
856-
// if fetching RDD is enabled from the external shuffle service then first try to find
857-
// the block in the external shuffle service of the same host
858-
val location = hostLocalLocations.find(_.port == externalShuffleServicePort)
859-
location
860-
.flatMap(blockStatusByShuffleService.get(_).flatMap(_.get(blockId)))
861-
.zip(location)
862-
.headOption
863-
} else {
864-
None
865-
})
866-
.orElse {
867-
// if the block is not found via the external shuffle service trying to find it in the
868-
// executors running on the same host and persisted on the disk
869-
// using flatMap on iterators makes the transformation lazy
870-
hostLocalLocations.iterator
871-
.flatMap { bmId =>
872-
blockManagerInfo.get(bmId).flatMap { blockInfo =>
873-
blockInfo.getStatus(blockId).map((_, bmId))
874-
}
875-
}
876-
.find(_._1.storageLevel.useDisk)
877-
}
878-
.orElse {
879-
// if the block cannot be found in the same host search it in all the executors
880-
val location = allLocations.headOption
881-
location
882-
.flatMap(blockManagerInfo.get(_))
883-
.flatMap(_.getStatus(blockId))
884-
.zip(location)
885-
.headOption
853+
(if (externalShuffleServiceRddFetchEnabled && blockId.isRDD) {
854+
// If fetching disk persisted RDD from the external shuffle service is enabled then first
855+
// try to find the block in the external shuffle service preferring the one running on
856+
// the same host. This search includes blocks stored on already killed executors as well.
857+
val hostLocalLocations = allLocations.find { bmId =>
858+
bmId.host == requesterHost && bmId.port == externalShuffleServicePort
886859
}
860+
val location = hostLocalLocations
861+
.orElse(allLocations.find(_.port == externalShuffleServicePort))
862+
location
863+
.flatMap(blockStatusByShuffleService.get(_).flatMap(_.get(blockId)))
864+
.zip(location)
865+
.headOption
866+
} else {
867+
// trying to find it in the executors running on the same host and persisted on the disk
868+
// Implementation detail: using flatMap on iterators makes the transformation lazy.
869+
allLocations.filter(_.host == requesterHost).iterator
870+
.flatMap { bmId =>
871+
blockManagerInfo.get(bmId).flatMap { blockInfo =>
872+
blockInfo.getStatus(blockId).map((_, bmId))
873+
}
874+
}
875+
.find(_._1.storageLevel.useDisk)
876+
})
877+
.orElse {
878+
// if the block cannot be found in the same host as a disk stored block then extend the
879+
// search to all active (not killed) executors and to all storage levels
880+
val location = allLocations.headOption
881+
location
882+
.flatMap(blockManagerInfo.get(_))
883+
.flatMap(_.getStatus(blockId))
884+
.zip(location)
885+
.headOption
886+
}
887887
logDebug(s"Identified block: $blockStatusWithBlockManagerId")
888888
blockStatusWithBlockManagerId
889889
.map { case (blockStatus: BlockStatus, bmId: BlockManagerId) =>

core/src/test/scala/org/apache/spark/ExternalShuffleServiceSuite.scala

Lines changed: 58 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.spark
1919

2020
import java.io.File
21+
import java.nio.ByteBuffer
2122
import java.nio.file.Files
2223
import java.nio.file.attribute.PosixFilePermission
2324

@@ -35,8 +36,9 @@ import org.apache.spark.network.TransportContext
3536
import org.apache.spark.network.netty.SparkTransportConf
3637
import org.apache.spark.network.server.TransportServer
3738
import org.apache.spark.network.shuffle.{ExecutorDiskUtils, ExternalBlockHandler, ExternalBlockStoreClient}
38-
import org.apache.spark.storage.{RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
39+
import org.apache.spark.storage.{BroadcastBlockId, RDDBlockId, ShuffleBlockId, ShuffleDataBlockId, ShuffleIndexBlockId, StorageLevel}
3940
import org.apache.spark.util.{ThreadUtils, Utils}
41+
import org.apache.spark.util.io.ChunkedByteBuffer
4042

4143
/**
4244
* This suite creates an external shuffle server and routes all shuffle fetches through it.
@@ -113,12 +115,15 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
113115
.set(config.SHUFFLE_HOST_LOCAL_DISK_READING_ENABLED, true)
114116
.set(config.SHUFFLE_SERVICE_FETCH_RDD_ENABLED, true)
115117
.set(config.EXECUTOR_REMOVE_DELAY.key, "0s")
118+
.set(config.DRIVER_BIND_ADDRESS.key, Utils.localHostName())
116119
sc = new SparkContext("local-cluster[1,1,1024]", "test", confWithRddFetchEnabled)
117120
sc.env.blockManager.externalShuffleServiceEnabled should equal(true)
118121
sc.env.blockManager.blockStoreClient.getClass should equal(classOf[ExternalBlockStoreClient])
119122
try {
123+
val list = List[Int](1, 2, 3, 4)
124+
val broadcast = sc.broadcast(list)
120125
val rdd = sc.parallelize(0 until 100, 2)
121-
.map { i => (i, 1) }
126+
.map { i => (i, broadcast.value.size) }
122127
.persist(StorageLevel.DISK_ONLY)
123128

124129
rdd.count()
@@ -169,8 +174,59 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll wi
169174
"external shuffle service port should be contained")
170175
}
171176

177+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
178+
val locationStatusForLocalHost =
179+
sc.env.blockManager.master.getLocationsAndStatus(blockId, Utils.localHostName())
180+
assert(locationStatusForLocalHost.isDefined)
181+
assert(locationStatusForLocalHost.get.localDirs.isDefined)
182+
assert(locationStatusForLocalHost.get.locations.head.executorId == "0")
183+
assert(locationStatusForLocalHost.get.locations.head.host == Utils.localHostName())
184+
}
185+
186+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
187+
val locationStatusForRemoteHost =
188+
sc.env.blockManager.master.getLocationsAndStatus(blockId, "<invalid-host>")
189+
assert(locationStatusForRemoteHost.isDefined)
190+
assert(locationStatusForRemoteHost.get.localDirs.isEmpty)
191+
assert(locationStatusForRemoteHost.get.locations.head.executorId == "0")
192+
assert(locationStatusForRemoteHost.get.locations.head.host == Utils.localHostName())
193+
}
194+
172195
assert(sc.env.blockManager.getRemoteValues(blockId).isDefined)
173196

197+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
198+
val broadcastBlockId = BroadcastBlockId(broadcast.id, "piece0")
199+
val locStatusForMemBroadcast =
200+
sc.env.blockManager.master.getLocationsAndStatus(broadcastBlockId, Utils.localHostName())
201+
assert(locStatusForMemBroadcast.isDefined)
202+
assert(locStatusForMemBroadcast.get.localDirs.isEmpty)
203+
assert(locStatusForMemBroadcast.get.locations.head.executorId == "driver")
204+
assert(locStatusForMemBroadcast.get.locations.head.host == Utils.localHostName())
205+
}
206+
207+
val byteBuffer = ByteBuffer.wrap(Array[Byte](7))
208+
val bytes = new ChunkedByteBuffer(Array(byteBuffer))
209+
val diskBroadcastId = BroadcastBlockId(Long.MaxValue, "piece0")
210+
sc.env.blockManager.putBytes(diskBroadcastId, bytes, StorageLevel.DISK_ONLY,
211+
tellMaster = true)
212+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
213+
val locStatusForDiskBroadcast =
214+
sc.env.blockManager.master.getLocationsAndStatus(diskBroadcastId, Utils.localHostName())
215+
assert(locStatusForDiskBroadcast.isDefined)
216+
assert(locStatusForDiskBroadcast.get.localDirs.isDefined)
217+
assert(locStatusForDiskBroadcast.get.locations.head.executorId == "driver")
218+
assert(locStatusForDiskBroadcast.get.locations.head.host == Utils.localHostName())
219+
}
220+
221+
eventually(timeout(2.seconds), interval(100.milliseconds)) {
222+
val locStatusForDiskBroadcastForFetch =
223+
sc.env.blockManager.master.getLocationsAndStatus(diskBroadcastId, "<invalid-host>")
224+
assert(locStatusForDiskBroadcastForFetch.isDefined)
225+
assert(locStatusForDiskBroadcastForFetch.get.localDirs.isEmpty)
226+
assert(locStatusForDiskBroadcastForFetch.get.locations.head.executorId == "driver")
227+
assert(locStatusForDiskBroadcastForFetch.get.locations.head.host == Utils.localHostName())
228+
}
229+
174230
// test unpersist: as executors are killed the blocks will be removed via the shuffle service
175231
rdd.unpersist(true)
176232
assert(sc.env.blockManager.getRemoteValues(blockId).isEmpty)

0 commit comments

Comments
 (0)