Spark Shuffle之Hash Shuffle

xiaoxiao2021-02-27  300

概述

Hash Shuffle是Spark 1.2之前的默认Shuffle实现,并在Spark 2.0版本中被移除,因此,了解Hash Shuffle的意义更多的在于和Sort Shuffle对比,以及理解为什么Sort Shuffle能够完全取代Hash Shuffle。

实现

Shuffle 过程这篇文章介绍了Hash Shuffle的实现及优化过程,特别是下面这幅图很好的描述了整个流程 ShuffleMapTask调用ShuffleWriter(具体实现为HashShuffleWriter)完成Map端操作,代码如下 如上图注释处,分为两个部分,map端聚合、将计算结果写入文件。如果没有map端聚合,流程和图1完全一致。鉴于Shuffle 过程已经介绍了整体流程,不再累述,而是关注一些其他细节。

其他细节

bucket数量

如图1中,ShuffleMapTask将结果写入到bucket 1、bucket 2、bucket 3中,最终bucket一一对应写入到文件,那么这个bucket的数量由什么决定? 如上图,bucket的数量等于Partitioner的numPartitions,numPartitions分为手动设置和默认两种情况,手动设置情况较简单,设置多少就是多少,默认的情况如下图中注释 虽然上图是选取默认Partitioner的,但是numPartitions变量的逻辑也在其中,更多Partitioner的内容参考我的博客Spark RDD之Partitioner。

map端聚合(combine)

map端聚合,Mapreduce中成为combine(以下简称为combine),是指在map task中对相同key所对应的values做aggregate操作,如图2第一处注释。

combine底层使用AppendOnlyMap实现,AppendOnlyMap是Spark实现的一种HashMap结构,和BytesToBytesMap的设计相同,只是底层使用Array存储,AppendOnlyMap实现类较多,后续单独介绍。

实现combine的逻辑如下: 将task处理的数据依次put到HashMap(AppendOnlyMap),每次put都检测HashMap中对应的key是否存在,若存在,对即将插入的value和HashMap中的value做一次combine操作,例如reduceByKey(_ + _)中定义的+操作。此外,combine操作涉及到spill,留作下面单独介绍。

能够触发combine操作的transformation主要如下

reduceByKeyfoldByKeyaggregateByKeycombineByKeycombineByKeyWithClassTag

reduce端聚合

map端聚合效率更高,并且能减少网络IO,但并不能满足所有情况,因此,reduce端也有聚合,reduce端聚合功能在BlockStoreShuffleReader中,代码如下 如上图,当mapSideCombine = false时,只在reduce端进行聚合,其实现原理和map端聚合一样。 map端聚合和reduce端聚合区别如下

map端reduce端map端聚合聚合聚合reduce端聚合不聚合聚合

触发reduce端聚合的transformation如下:groupByKey

Spark调优中重要的一项就是,尽可能的使用map端聚合。

reduce端排序

如果使用了需要排序的transformation,会在reduce端进行排序,代码如下

实现排序的原理如下: ExternalSorter内部维护了两个集合PartitionedPairBuffer和PartitionedAppendOnlyMap,如果有aggregation操作使用后者,否则使用前者存储数据。两个集合底层均使用Array存储数据,每条记录占两个下标,如下 2i位置存储的是元组,内容为PartitionId + K,2i + 1存储V。如果根据K排序,处理2i位置元素,V同理,排序算法使用org.apache.spark.util.collection.TimSort,Sort Shuffle中也是使用的ExternalSorter。

能够触发sort的transformation如下

sortByKeysortByrepartitionAndSortWithinPartitions

溢写(spill)

map端聚合、reduce端聚合、sort都有可能触发溢写(spill),即处理的数据内存放不下,需要落地磁盘,默认阈值32KB,如下 这些落地的中间文件需要被再次读取,例如map端聚合需要读取溢写的文件再写入对应的bucket,reduce端排序读取溢写的文件同时还需要进行merge-sort保证顺序,使用优先队列PriorityQueue实现,如下

参考: Spark Architecture: Shuffle Shuffle 过程 Remove HashShuffleManager

转载请注明原文地址: https://www.6miu.com/read-3230.html

最新回复(0)