Spark ShuffleManager的运行原理
什么是Shuffle
有些运算需要将各节点上的同一类数据汇集到某一节点进行计算,把这些分布在不同节点的数据按照一定的规则汇集到一起的过程称为 Shuffle。
SortShuffleManager运行原理
先看下SortShuffleManager的官方说明
1 | /** |
主要方法
- registerShuffle
- getReader
- getWriter
- unregisterShuffle
三种 writer 说明及选择时机
| 类型 | 描述 |
|---|---|
| BypassMergeSortShuffleWriter | 和Hash Shuffle实现基本相同,区别在于map task输出会汇总为一个文件 |
| UnsafeShuffleWriter | tungsten-sort,ShuffleExternalSorter使用Java Unsafe直接操作内存,避免Java对象多余的开销和GC 延迟,效率高 |
| SortShuffleWriter | Sort Shuffle,和Hash Shuffle的主要不同在于,map端支持Partition级别的sort,map task输出会汇总为一个文件 |
Spark根据运行时信息选择三种ShuffleWriter实现中的一种,对应的源码为SortShuffleManager中的registerShuffle方法,如下:
1 | /** |
选择逻辑如下:
| map-side aggregation | Partition数(RDD) | Serializer支持relocation | |
|---|---|---|---|
| BypassMergeSortShuffleWriter | 否 | 小于200(默认) | - |
| UnsafeShuffleWriter | 否 | 小于16777216 | 是 |
| SortShuffleWriter | - | - | - |
BypassMergeSortShuffleWriter
1
2
3
4
5
6
7
8
9def shouldBypassMergeSort(conf: SparkConf, dep: ShuffleDependency[_, _, _]): Boolean = {
// We cannot bypass sorting if we need to do map-side aggregation.
if (dep.mapSideCombine) {
false
} else {
val bypassMergeThreshold: Int = conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200)
dep.partitioner.numPartitions <= bypassMergeThreshold
}
}- 判断是否开启combine,只能在未开启combine时使用;
- 分区数量小于 spark.shuffle.sort.bypassMergeThreshold 设置的阈值;
UnsafeSuffleWriter
canUseSerializedShuffle 判断为 ture
- serializer 支持 relocation;
- 没有定义本地combine
- 分区数量小于16777216
SortShuffleWriter
canUseSerializedShuffle 判断为false
- serializer 不支持 relocation;
- 定义了本地combine
- 分区数量大于 16777216
1 | def canUseSerializedShuffle(dependency: ShuffleDependency[_, _, _]): Boolean = { |
Serializer支持relocation
Serializer可以对已经序列化的对象进行排序,这种排序起到的效果和先对数据排序再序列化一致。Serializer的这个属性会在UnsafeShuffleWriter进行排序时用到。
支持relocation的Serializer是KryoSerializer,Spark默认使用JavaSerializer,通过参数spark.serializer设置。
BypassMergeSortShuffleWriter
map端结果按照bucket顺序依次写入dataFile文件中,这么处理后,Shuffle生成的文件数显著减少了,同时还会生成indexFile文件,记录各个bucket在dataFile中的位置,用于后续reducer随机读取文件。
1 | public void write(Iterator<Product2<K, V>> records) throws IOException { |
UnsafeSuffleWriter
UnsafeShuffleWriter内部使用了和BytesToBytesMap基本相同的数据结构处理map端的输出,不过将其细化为ShuffleExternalSorter和ShuffleInMemorySorter两部分,功能如下
- ShuffleExternalSorter:使用MemoryBlock存储数据,每条记录包括长度信息和K-V Pair
- ShuffleInMemorySorter:使用long数组存储每条记录对应的位置信息(page number + offset),以及其对应的PartitionId,共8 bytes
SortShuffleWriter
SortShuffleWriter它主要是判断在Map端是否需要本地进行combine操作。如果需要聚合,则使用PartitionedAppendOnlyMap;如果不进行combine操作,则使用PartitionedPairBuffer添加数据存放于内存中。然后无论哪一种情况都需要判断内存是否足够,如果内存不够而且又申请不到内存,则需要进行本地磁盘溢写操作,把相关的数据写入溢写到临时文件。最后把内存里的数据和磁盘溢写的临时文件的数据进行合并,如果需要则进行一次归并排序,如果没有发生溢写则是不需要归并排序,因为都在内存里。最后生成合并后的data文件和index文件。
writer 方法
1 | /** Write a bunch of records to this task's output */ |
创建外部排序器ExternalSorter, 只是根据是否需要本地combine与否从而决定是否传入aggregator和keyOrdering参数;
调用ExternalSorter实例的insertAll方法,插入record;如果ExternalSorter实例中用以保存record的in-memory collection的大小达到阈值,会将record按顺序溢写到磁盘文件。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23/**
* Spill the current in-memory collection to disk if needed.
*
* @param usingMap whether we're using a map or buffer as our current in-memory collection
*/
private def maybeSpillCollection(usingMap: Boolean): Unit = {
var estimatedSize = 0L
if (usingMap) {
estimatedSize = map.estimateSize()
if (maybeSpill(map, estimatedSize)) {
map = new PartitionedAppendOnlyMap[K, C]
}
} else {
estimatedSize = buffer.estimateSize()
if (maybeSpill(buffer, estimatedSize)) {
buffer = new PartitionedPairBuffer[K, C]
}
}
if (estimatedSize > _peakMemoryUsedBytes) {
_peakMemoryUsedBytes = estimatedSize
}
}
构造最终的输出文件实例,其中文件名为(reduceId为0): “shuffle_” + shuffleId + ““ + mapId + ““ + reduceId;
1
2
3
4
5
6
7val output = shuffleBlockResolver.getDataFile(dep.shuffleId, mapId)
...
case class ShuffleDataBlockId(shuffleId: Int, mapId: Int, reduceId: Int) extends BlockId {
override def name: String = "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
}在输出文件名后加上uuid用于标识文件正在写入,结束后重命名
1
2
3
4
5
6
7val tmp = Utils.tempFileWith(output)
...
def tempFileWith(path: File): File = {
new File(path.getAbsolutePath + "." + UUID.randomUUID())
}调用 ExternalSorter 实例的
writePartitionedFile方法,将插入到该 sorter 的 record 进行排序并写入输出文件;插入到 sorter 的 record 可以是在 in-memory collection 或者在溢写文件。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48/**
* Write all the data added into this ExternalSorter into a file in the disk store. This is
* called by the SortShuffleWriter.
*
* @param blockId block ID to write to. The index file will be blockId.name + ".index".
* @return array of lengths, in bytes, of each partition of the file (used by map output tracker)
*/
def writePartitionedFile(
blockId: BlockId,
outputFile: File): Array[Long] = {
// Track location of each range in the output file
val lengths = new Array[Long](numPartitions)
val writer = blockManager.getDiskWriter(blockId, outputFile, serInstance, fileBufferSize,
context.taskMetrics().shuffleWriteMetrics)
if (spills.isEmpty) {
// Case where we only have in-memory data
val collection = if (aggregator.isDefined) map else buffer
val it = collection.destructiveSortedWritablePartitionedIterator(comparator)
while (it.hasNext) {
val partitionId = it.nextPartition()
while (it.hasNext && it.nextPartition() == partitionId) {
it.writeNext(writer)
}
val segment = writer.commitAndGet()
lengths(partitionId) = segment.length
}
} else {
// We must perform merge-sort; get an iterator by partition and write everything directly.
for ((id, elements) <- this.partitionedIterator) {
if (elements.hasNext) {
for (elem <- elements) {
writer.write(elem._1, elem._2)
}
val segment = writer.commitAndGet()
lengths(id) = segment.length
}
}
}
writer.close()
context.taskMetrics().incMemoryBytesSpilled(memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(diskBytesSpilled)
context.taskMetrics().incPeakExecutionMemory(peakMemoryUsedBytes)
lengths
}将每个 partition 的 offset 写入 index 文件方便 reduce 端 fetch 数据;
把部分信息封装到MapStatus返回;