SortShuffleManager is the one and only ShuffleManager in Spark with the short name sort or tungsten-sort.
|
Note
|
You can use spark.shuffle.manager Spark property to activate your own implementation of ShuffleManager contract. |
|
Caution
|
FIXME The internal registries |
| Name | Description |
|---|---|
IndexShuffleBlockResolver created when SortShuffleManager is created and used throughout the lifetime of the owning NOTE: Beside the uses due to the contract, |
|
Tip
|
Enable Add the following line to Refer to Logging. |
SortShuffleManager takes a SparkConf.
SortShuffleManager makes sure that spark.shuffle.spill Spark property is enabled. If not you should see the following WARN message in the logs:
WARN SortShuffleManager: spark.shuffle.spill was set to false, but this configuration is ignored as of Spark 1.6+. Shuffle will continue to spill to disk when necessary.SortShuffleManager initializes the internal registries and counters.
|
Note
|
SortShuffleManager is created when SparkEnv is created (on the driver and executors) which is at the very beginning of a Spark application’s lifecycle.
|
registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle|
Note
|
registerShuffle is part of ShuffleManager contract.
|
|
Caution
|
FIXME Copy the conditions |
registerShuffle returns a new ShuffleHandle that can be one of the following:
-
BypassMergeSortShuffleHandle (with
ShuffleDependency[K, V, V]) whenshouldBypassMergeSortcondition holds. -
SerializedShuffleHandle (with
ShuffleDependency[K, V, V]) whencanUseSerializedShufflecondition holds.
getWriter[K, V](
handle: ShuffleHandle,
mapId: Int,
context: TaskContext): ShuffleWriter[K, V]|
Note
|
getWriter is part of ShuffleManager contract.
|
Internally, getWriter makes sure that a ShuffleHandle is associated with its numMaps in numMapsForShuffle internal registry.
|
Caution
|
FIXME Associated?! What’s that? |
|
Note
|
getWriter expects that the input handle is of type BaseShuffleHandle (despite the signature that says that it can work with any ShuffleHandle). Moreover, getWriter further expects that in 2 (out of 3 cases) the input handle is a more specialized IndexShuffleBlockResolver.
|
getWriter then returns a new ShuffleWriter for the input ShuffleHandle:
getReader[K, C](
handle: ShuffleHandle,
startPartition: Int,
endPartition: Int,
context: TaskContext): ShuffleReader[K, C]|
Note
|
getReader is part of ShuffleManager Contract.
|
getReader returns a new BlockStoreShuffleReader passing all the input parameters on to it.
|
Note
|
getReader assumes that the input ShuffleHandle is of type BaseShuffleHandle.
|
stop(): Unit|
Note
|
stop is part of ShuffleManager contract.
|
stop stops IndexShuffleBlockResolver (available as shuffleBlockResolver internal reference).
shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): BooleanshouldBypassMergeSort holds (i.e. is positive) when:
-
The input ShuffleDependency has
mapSideCombineflag enabled andaggregatordefined. -
mapSideCombineflag is disabled (i.e.false) but the number of partitions (of thePartitionerof the inputShuffleDependency) is at most spark.shuffle.sort.bypassMergeThreshold Spark property (which defaults to200).
Otherwise, shouldBypassMergeSort does not hold (i.e. false).
|
Note
|
shouldBypassMergeSort is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).
|
canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): BooleancanUseSerializedShuffle condition holds (i.e. is positive) when all of the following hold (checked in that order):
-
The
Serializerof the inputShuffleDependencysupports relocation of serialized objects. -
The
Aggregatorof the inputShuffleDependencyis not defined. -
The number of shuffle output partitions of the input
ShuffleDependencyis at most the supported maximum number (which is(1 << 24) - 1, i.e.16777215).
You should see the following DEBUG message in the logs when canUseSerializedShuffle holds:
DEBUG Can use serialized shuffle for shuffle [id]Otherwise, canUseSerializedShuffle does not hold and you should see one of the following DEBUG messages:
DEBUG Can't use serialized shuffle for shuffle [id] because the serializer, [name], does not support object relocation
DEBUG SortShuffleManager: Can't use serialized shuffle for shuffle [id] because an aggregator is defined
DEBUG Can't use serialized shuffle for shuffle [id] because it has more than [number] partitions|
Note
|
canUseSerializedShuffle is exclusively used when SortShuffleManager selects a ShuffleHandle (for a ShuffleDependency).
|
| Spark Property | Default Value | Description |
|---|---|---|
|
The maximum number of reduce partitions below which |
|
|
No longer in use. When
|