Spark ShuffleManager的运行原理

什么是Shuffle

有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为 Shuffle。

SortShuffleManager运行原理

先看下SortShuffleManager的官方说明

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
/**
* In sort-based shuffle, incoming records are sorted according to their target partition ids, then
* written to a single map output file. Reducers fetch contiguous regions of this file in order to
* read their portion of the map output. In cases where the map output data is too large to fit in
* memory, sorted subsets of the output can be spilled to disk and those on-disk files are merged
* to produce the final output file.
*
# 有两种不同的方式输出文件
* Sort-based shuffle has two different write paths for producing its map output files:
# 序列化排序,当一下三个条件都成立时使用
* - Serialized sorting: used when all three of the following conditions hold:
# shuffle 的依赖没有指定的聚合或输出排序
* 1. The shuffle dependency specifies no aggregation or output ordering.
# shuffle 的序列化支持序列化值的重定位
* 2. The shuffle serializer supports relocation of serialized values (this is currently supported by KryoSerializer and Spark SQL's custom serializers).
# shuffle 产生的输出分区少于16777216
* 3. The shuffle produces fewer than 16777216 output partitions.
* - Deserialized sorting: used to handle all other cases.
*
* -----------------------
* Serialized sorting mode
* -----------------------
*
# 优化手段
* In the serialized sorting mode, incoming records are serialized as soon as they are passed to the
* shuffle writer and are buffered in a serialized form during sorting. This write path implements
* several optimizations:
# 排序在二进制数据上而不是Java对象
* - Its sort operates on serialized binary data rather than Java objects, which reduces memory
* consumption and GC overheads. This optimization requires the record serializer to have certain
* properties to allow serialized records to be re-ordered without requiring deserialization.
* See SPARK-4550, where this optimization was first proposed and implemented, for more details.
*
# 使用一个专门的缓存效率高的分拣器([[ShuffleExternalSorter]]),对压缩记录指针和分区ID的数组进行分拣。通过在排序数组中每条记录只使用8个字节的空间,这可以将更多的数组放入缓存。
* - It uses a specialized cache-efficient sorter ([[ShuffleExternalSorter]]) that sorts
* arrays of compressed record pointers and partition ids. By using only 8 bytes of space per
* record in the sorting array, this fits more of the array into cache.
*
# 溢出合并程序对属于同一分区的序列化记录块进行操作,在合并过程中不需要反序列化记录。
* - The spill merging procedure operates on blocks of serialized records that belong to the same
* partition and does not need to deserialize records during the merge.
*
# 当spill压缩编解码器支持压缩数据的连接时,spill合并只是将序列化和压缩的spill分区连接起来,产生最终的输出分区。 这样就可以使用高效的数据复制方法,如NIO的 "transferTo",并避免了在合并过程中分配解压或复制缓冲区的需要。
* - When the spill compression codec supports concatenation of compressed data, the spill merge
* simply concatenates the serialized and compressed spill partitions to produce the final output
* partition. This allows efficient data copying methods, like NIO's `transferTo`, to be used
* and avoids the need to allocate decompression or copying buffers during the merge.
*
* For more details on these optimizations, see SPARK-7081.
*/

主要方法

  • registerShuffle
  • getReader
  • getWriter
  • unregisterShuffle

三种 writer 说明及选择时机

类型 描述
BypassMergeSortShuffleWriter 和Hash Shuffle实现基本相同,区别在于map task输出会汇总为一个文件
UnsafeShuffleWriter tungsten-sort,ShuffleExternalSorter使用Java Unsafe直接操作内存,避免Java对象多余的开销和GC 延迟,效率高
SortShuffleWriter Sort Shuffle,和Hash Shuffle的主要不同在于,map端支持Partition级别的sort,map task输出会汇总为一个文件

Spark根据运行时信息选择三种ShuffleWriter实现中的一种,对应的源码为SortShuffleManager中的registerShuffle方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* Obtains a [[ShuffleHandle]] to pass to tasks.
*/
override def registerShuffle[K, V, C](
shuffleId: Int,
numMaps: Int,
dependency: ShuffleDependency[K, V, C]): ShuffleHandle = {
if (SortShuffleWriter.shouldBypassMergeSort(conf, dependency)) {
// If there are fewer than spark.shuffle.sort.bypassMergeThreshold partitions and we don't
// need map-side aggregation, then write numPartitions files directly and just concatenate
// them at the end. This avoids doing serialization and deserialization twice to merge
// together the spilled files, which would happen with the normal code path. The downside is
// having multiple files open at a time and thus more memory allocated to buffers.
new BypassMergeSortShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else if (SortShuffleManager.canUseSerializedShuffle(dependency)) {
// Otherwise, try to buffer map outputs in a serialized form, since this is more efficient:
new SerializedShuffleHandle[K, V](
shuffleId, numMaps, dependency.asInstanceOf[ShuffleDependency[K, V, V]])
} else {
// Otherwise, buffer map outputs in a deserialized form:
new BaseShuffleHandle(shuffleId, numMaps, dependency)
}
}

选择逻辑如下:

map-side aggregation Partition数(RDD) Serializer支持relocation
BypassMergeSortShuffleWriter 小于200(默认) -
UnsafeShuffleWriter 小于16777216
SortShuffleWriter - - -
  • BypassMergeSortShuffleWriter

    1
    2
    3
    4
    5
    6
    7
    8
    9
    def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
    // We cannot bypass sorting if we need to do map-side aggregation.
    if (dep.mapSideCombine) {
    false
    } else {
    val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
    dep.partitioner.numPartitions <= bypassMergeThreshold
    }
    }
    1. 判断是否开启combine,只能在未开启combine时使用;
    2. 分区数量小于 spark.shuffle.sort.bypassMergeThreshold 设置的阈值;
  • UnsafeSuffleWriter

    canUseSerializedShuffle 判断为 ture

    1. serializer 支持 relocation;
    2. 没有定义本地combine
    3. 分区数量小于16777216
  • SortShuffleWriter

    canUseSerializedShuffle 判断为false

    1. serializer 不支持 relocation;
    2. 定义了本地combine
    3. 分区数量大于 16777216
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = {
val shufId = dependency.shuffleId
val numPartitions = dependency.partitioner.numPartitions
// 判断 serializer 是否支持 relocation
if (!dependency.serializer.supportsRelocationOfSerializedObjects) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because the serializer, " +
s"${dependency.serializer.getClass.getName}, does not support object relocation")
false
// 是否是本地combine
} else if (dependency.mapSideCombine) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because we need to do " +
s"map-side aggregation")
false
// 分区数量是否 > 16777215 + 1
} else if (numPartitions > MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE) {
log.debug(s"Can't use serialized shuffle for shuffle $shufId because it has more than " +
s"$MAX_SHUFFLE_OUTPUT_PARTITIONS_FOR_SERIALIZED_MODE partitions")
false
} else {
log.debug(s"Can use serialized shuffle for shuffle $shufId")
true
}
}

Serializer支持relocation

Serializer可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。Serializer的这个属性会在UnsafeShuffleWriter进行排序时用到。

支持relocation的Serializer是KryoSerializer,Spark默认使用JavaSerializer,通过参数spark.serializer设置。

BypassMergeSortShuffleWriter

map端结果按照bucket顺序依次写入dataFile文件中,这么处理后,Shuffle生成的文件数显著减少了,同时还会生成indexFile文件,记录各个bucket在dataFile中的位置,用于后续reducer随机读取文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public void write(Iterator<Product2<K, V>> records) throws IOException {
assert (partitionWriters == null);
if (!records.hasNext()) {
partitionLengths = new long[numPartitions];
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, null);
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
return;
}
final SerializerInstance serInstance = serializer.newInstance();
final long openStartTime = System.nanoTime();
partitionWriters = new DiskBlockObjectWriter[numPartitions];
partitionWriterSegments = new FileSegment[numPartitions];
for (int i = 0; i < numPartitions; i++) {
final Tuple2<TempShuffleBlockId, File> tempShuffleBlockIdPlusFile =
blockManager.diskBlockManager().createTempShuffleBlock();
final File file = tempShuffleBlockIdPlusFile._2();
final BlockId blockId = tempShuffleBlockIdPlusFile._1();
partitionWriters[i] =
blockManager.getDiskWriter(blockId, file, serInstance, fileBufferSize, writeMetrics);
}
// Creating the file to write to and creating a disk writer both involve interacting with
// the disk, and can take a long time in aggregate when we open many files, so should be
// included in the shuffle write time.
writeMetrics.incWriteTime(System.nanoTime() - openStartTime);

while (records.hasNext()) {
final Product2<K, V> record = records.next();
final K key = record._1();
partitionWriters[partitioner.getPartition(key)].write(key, record._2());
}

for (int i = 0; i < numPartitions; i++) {
final DiskBlockObjectWriter writer = partitionWriters[i];
partitionWriterSegments[i] = writer.commitAndGet();
writer.close();
}

File output = shuffleBlockResolver.getDataFile(shuffleId, mapId);
File tmp = Utils.tempFileWith(output);
try {
partitionLengths = writePartitionedFile(tmp);
shuffleBlockResolver.writeIndexFileAndCommit(shuffleId, mapId, partitionLengths, tmp);
} finally {
if (tmp.exists() && !tmp.delete()) {
logger.error("Error while deleting temp file {}", tmp.getAbsolutePath());
}
}
mapStatus = MapStatus$.MODULE$.apply(blockManager.shuffleServerId(), partitionLengths);
}

UnsafeSuffleWriter

UnsafeShuffleWriter内部使用了和BytesToBytesMap基本相同的数据结构处理map端的输出,不过将其细化为ShuffleExternalSorterShuffleInMemorySorter两部分,功能如下

  • ShuffleExternalSorter:使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair
  • ShuffleInMemorySorter:使用long数组存储每条记录对应的位置信息(page number + offset),以及其对应的PartitionId,共8 bytes

SortShuffleWriter

SortShuffleWriter它主要是判断在Map端是否需要本地进行combine操作。如果需要聚合,则使用PartitionedAppendOnlyMap;如果不进行combine操作,则使用PartitionedPairBuffer添加数据存放于内存中。然后无论哪一种情况都需要判断内存是否足够,如果内存不够而且又申请不到内存,则需要进行本地磁盘溢写操作,把相关的数据写入溢写到临时文件。最后把内存里的数据和磁盘溢写的临时文件的数据进行合并,如果需要则进行一次归并排序,如果没有发生溢写则是不需要归并排序,因为都在内存里。最后生成合并后的data文件和index文件。

writer 方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/** Write a bunch of records to this task's output */
override def write(records: Iterator[Product2[K, V]]): Unit = {
// 1. 判断是否有map端combine
sorter = if (dep.mapSideCombine) {
new ExternalSorter[K, V, C](
context, dep.aggregator, Some(dep.partitioner), dep.keyOrdering, dep.serializer)
} else {
// In this case we pass neither an aggregator nor an ordering to the sorter, because we don't
// care whether the keys get sorted in each partition; that will be done on the reduce side
// if the operation being run is sortByKey.
new ExternalSorter[K, V, V](
context, aggregator = None, Some(dep.partitioner), ordering = None, dep.serializer)
}
sorter.insertAll(records)

// Don't bother including the time to open the merged output file in the shuffle write time,
// because it just opens a single file, so is typically too fast to measure accurately
// (see SPARK-3570).
val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
val tmp = Utils.tempFileWith(output)
try {
val blockId = ShuffleBlockId(dep.shuffleId, mapId, IndexShuffleBlockResolver.NOOP_REDUCE_ID)
val partitionLengths = sorter.writePartitionedFile(blockId, tmp)
shuffleBlockResolver.writeIndexFileAndCommit(dep.shuffleId, mapId, partitionLengths, tmp)
mapStatus = MapStatus(blockManager.shuffleServerId, partitionLengths)
} finally {
if (tmp.exists() && !tmp.delete()) {
logError(s"Error while deleting temp file ${tmp.getAbsolutePath}")
}
}
}
  1. 创建外部排序器ExternalSorter, 只是根据是否需要本地combine与否从而决定是否传入aggregator和keyOrdering参数;

  2. 调用ExternalSorter实例的insertAll方法,插入record;如果ExternalSorter实例中用以保存record的in-memory collection的大小达到阈值,会将record按顺序溢写到磁盘文件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    /**
    * Spill the current in-memory collection to disk if needed.
    *
    * @param usingMap whether we're using a map or buffer as our current in-memory collection
    */
    private def maybeSpillCollection(usingMap: Boolean): Unit = {
    var estimatedSize = 0L
    if (usingMap) {
    estimatedSize = map.estimateSize()
    if (maybeSpill(map, estimatedSize)) {
    map = new PartitionedAppendOnlyMap[K, C]
    }
    } else {
    estimatedSize = buffer.estimateSize()
    if (maybeSpill(buffer, estimatedSize)) {
    buffer = new PartitionedPairBuffer[K, C]
    }
    }

    if (estimatedSize > _peakMemoryUsedBytes) {
    _peakMemoryUsedBytes = estimatedSize
    }
    }
  1. 构造最终的输出文件实例,其中文件名为(reduceId为0): “shuffle_” + shuffleId + ““ + mapId + ““ + reduceId;

    1
    2
    3
    4
    5
    6
    7
    val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)

    ...

    case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
    override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
    }
  2. 在输出文件名后加上uuid用于标识文件正在写入,结束后重命名

    1
    2
    3
    4
    5
    6
    7
    val tmp = Utils.tempFileWith(output)

    ...

    def tempFileWith(path: File): File = {
    new File(path.getAbsolutePath + "." + UUID.randomUUID())
    }
  3. 调用 ExternalSorter 实例的 writePartitionedFile 方法,将插入到该 sorter 的 record 进行排序并写入输出文件;插入到 sorter 的 record 可以是在 in-memory collection 或者在溢写文件。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    /**
    * Write all the data added into this ExternalSorter into a file in the disk store. This is
    * called by the SortShuffleWriter.
    *
    * @param blockId block ID to write to. The index file will be blockId.name + ".index".
    * @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
    */
    def writePartitionedFile(
    blockId: BlockId,
    outputFile: File): Array[Long] = {

    // Track location of each range in the output file
    val lengths = new Array[Long](numPartitions)
    val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
    context.taskMetrics().shuffleWriteMetrics)

    if (spills.isEmpty) {
    // Case where we only have in-memory data
    val collection = if (aggregator.isDefined) map else buffer
    val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
    while (it.hasNext) {
    val partitionId = it.nextPartition()
    while (it.hasNext && it.nextPartition() == partitionId) {
    it.writeNext(writer)
    }
    val segment = writer.commitAndGet()
    lengths(partitionId) = segment.length
    }
    } else {
    // We must perform merge-sort; get an iterator by partition and write everything directly.
    for ((id, elements) <- this.partitionedIterator) {
    if (elements.hasNext) {
    for (elem <- elements) {
    writer.write(elem._1, elem._2)
    }
    val segment = writer.commitAndGet()
    lengths(id) = segment.length
    }
    }
    }

    writer.close()
    context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
    context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
    context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)

    lengths
    }
  4. 将每个 partition 的 offset 写入 index 文件方便 reduce 端 fetch 数据;

  5. 把部分信息封装到MapStatus返回;