Spark的两种外围Shuffle详解

  • 电脑网络维修
  • 2024-11-15

在 MapReduce 框架中, Shuffle 阶段是衔接 Map 与 Reduce 之间的桥梁, Map 阶段经过 Shuffle 环节将数据输入到Reduce 阶段中。由于 Shuffle 触及磁盘的读写和网络 I/O,因此 Shuffle 性能的高下间接影响整个程序的性能。Spark 也有 Map阶段和 Reduce 阶段,因此也会出现 Shuffle 。

Spark Shuffle

Spark Shuffle 分为两种:一种是基于 Hash 的 Shuffle;另一种是基于 Sort 的Shuffle。先引见下它们的开展历程,有助于咱们更好的了解 Shuffle:

在 Spark 1.1 之前, Spark 中只成功了一种 Shuffle 方式,即基于 Hash 的 Shuffle 。在 Spark 1.1版本中引入了基于 Sort 的 Shuffle 成功方式,并且 Spark 1.2 版本之后,自动的成功方式从基于 Hash 的 Shuffle 修正为基于Sort 的 Shuffle 成功方式,即使用的 ShuffleManager 从自动的 hash 修正为 sort。在 Spark 2.0 版本中, HashShuffle 方式己经不再经常使用。

Spark 之所以一开局就提供基于 Hash 的 Shuffle 成功机制,其重要目标之一就是为了防止不要求的排序,大家想下 Hadoop 中的MapReduce,是将 sort 作为固定步骤,有许多并不要求排序的义务,MapReduce 也会对其启动排序,形成了许多不用要的开支。

在基于 Hash 的 Shuffle 成功方式中,每个 Mapper 阶段的 Task 会为每个 Reduce 阶段的 Task生成一个文件,通常会发生少量的文件(即对应为 M*R 个两边文件,其中, M 示意 Mapper 阶段的 Task 个数, R 示意 Reduce 阶段的Task 个数) 随同少量的随机磁盘 I/O 操作与少量的内存开支。

为了缓解上述疑问,在 Spark 0.8.1 版本中为基于 Hash 的 Shuffle 成功引入了 Shuffle Consolidate机制(即文件兼并机制),将 Mapper 端生成的两边文件启动兼并的处置机制。经过性能属性spark.shuffie.consolidateFiles=true,缩小两边生成的文件数量。经过文件兼并,可以将两边文件的生成方式修正为每个口头单位为每个Reduce 阶段的 Task 生成一个文件。

口头单位对应为:每个 Mapper 端的 Cores 数/每个 Task 调配的 Cores 数(默以为 1) 。最终可以将文件个数从 M*R 修正为E*C/T*R,其中, E 示意 Executors 个数, C 示意可用 Cores 个数, T 示意 Task 调配的 Cores 数。

Spark1.1 版本引入了 Sort Shuffle:

基于 Hash 的 Shuffle 的成功方式中,生成的两边结果文件的个数都会依赖于 Reduce 阶段的 Task 个数,即 Reduce端的并行度,因此文件数依然无法控,无法真正处置疑问。为了更好地处置疑问,在 Spark1.1 版本引入了基于 Sort 的 Shuffle 成功方式,并且在Spark 1.2 版本之后,自动的成功方式也从基于 Hash 的 Shuffle,修正为基于 Sort 的 Shuffle 成功方式,即使用的ShuffleManager 从自动的 hash 修正为 sort。

在基于 Sort 的 Shuffle 中,每个 Mapper 阶段的 Task 不会为每 Reduce 阶段的 Task生成一个独自的文件,而是所有写到一个数据(Data)文件中,同时生成一个索引(Index)文件, Reduce 阶段的各个 Task可以经过该索引文件失掉关系的数据。防止发生少量文件的间接纳益就是降落随机磁盘 I/0 与内存的开支。最终生成的文件个数缩小到 2*M ,其中 M 示意Mapper 阶段的 Task 个数,每个 Mapper 阶段的 Task 区分生成两个文件(1 个数据文件、 1 个索引文件),最终的文件个数为 M个数据文件与 M 个索引文件。因此,最终文件个数是 2*M 个。

从 Spark 1.4 版本开局,在 Shuffle 环节中也引入了基于 Tungsten-Sort 的 Shuffie 成功方式,通 Tungsten名目所做的优化,可以极大提高 Spark 在数据处置上的性能。(Tungsten 翻译为中文是钨丝)

注:在一些特定的运行场景下,驳回基于 Hash 成功 Shuffle 机制的性能会超越基于 Sort 的 Shuffle 成功机制。

一张图了解下 Spark Shuffle 的迭代历史:

Spark Shuffle 迭代历史

为什么 Spark 最终还是丢弃了 HashShuffle ,经常使用了 Sorted-Based Shuffle?

咱们可以从 Spark 最基本要优化和迫切要处置的疑问中找到答案,经常使用 HashShuffle 的 Spark 在 Shuffle时发生少量的文件。当数据量越来越多时,发生的文件量是无法控的,这重大制约了 Spark 的性能及裁减才干,所以 Spark 必定要处置这个疑问,缩小Mapper 端 ShuffleWriter 发生的文件数量,这样便可以让 Spark 从几百台集群的规模瞬间变成可以支持几千台,甚至几万台集群的规模。

但经常使用 Sorted-Based Shuffle 就完美了吗,答案能否认的,Sorted-Based Shuffle也有缺陷,其缺陷反而是它排序的特性,它强迫要求数据在 Mapper 端必定先启动排序,所以造成它排序的速度有点慢。好在出现了 Tungsten-SortShuffle ,它对排序算法启动了改良,优化了排序的速度。Tungsten-Sort Shuffle 曾经并入了 Sorted-BasedShuffle,Spark 的引擎会智能识别程序要求的是 Sorted-Based Shuffle,还是 Tungsten-Sort Shuffle。

上方详细剖析每个 Shuffle 的底层口头原理:

一、Hash Shuffle 解析

以下的探讨都假定每个 Executor 有 1 个 cpu core。

1. HashShuffleManager

shuffle write 阶段,重要就是在一个 stage 完结计算之后,为了下一个 stage 可以口头 shuffle 类的算子(比如reduceByKey),而将每个 task 处置的数据按 key 启动“划分”。所谓“划分”,就是对相反的 key 口头 hash 算法,从而将相反 key都写入同一个磁盘文件中,而每一个磁盘文件都只属于下游 stage 的一个task。在将数据写入磁盘之前,会先将数据写入内存缓冲中,当内存缓冲填满之后,才会溢写到磁盘文件中去。

下一个 stage 的 task 有多少个, stage 的每个 task 就要创立多少份磁盘文件。比如下一个 stage 总共有 100 个task,那么 stage 的每个 task 都要创立 100 份磁盘文件。假设 stage 有 50 个 task,总共有 10 个Executor,每个 Executor 口头 5 个 task,那么每个 Executor 上总共就要创立 500 个磁盘文件,一切 Executor上会创立 5000 个磁盘文件。由此可见,未经优化的 shuffle write 操作所发生的磁盘文件的数量是极端惊人的。

shuffle read 阶段,通常就是一个 stage 刚开局时要做的事件。此时该 stage 的每一个 task 就要求将上一个 stage的计算结果中的一切相反 key,从各个节点上经过网络都拉取到自己所在的节点上,而后启动 key 的聚合或衔接等操作。由于 shuffle write的环节中,map task 给下游 stage 的每个 reduce task 都创立了一个磁盘文件,因此 shuffle read 的环节中,每个reduce task 只需从抢先 stage 的一切 map task 所在节点上,拉取属于自己的那一个磁盘文件即可。

shuffle read 的拉取环节是一边拉取一边启动聚合的。每个 shuffle read task 都会有一个自己的 buffer缓冲,每次都只能拉取与 buffer 缓冲相反大小的数据,而后经过内存中的一个 Map 启动聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到 buffer缓冲中启动聚合操作。以此类推,直到最后将一切数据到拉取完,并失掉最终的结果。

HashShuffleManager 上班原理如下图所示:

未优化的HashShuffleManager上班原理

2. 优化的 HashShuffleManager

为了优化 HashShuffleManager 咱们可以设置一个参数:spark.shuffle.consolidateFiles,该参数自动值为false,将其设置为 true 即可开启优化机制,通常来说,假设咱们经常使用 HashShuffleManager,那么都倡导开启这个选项。

开启 consolidate 机制之后,在 shuffle write 环节中,task 就不是为下游 stage 的每个 task创立一个磁盘文件了,此时会出现shuffleFileGroup的概念,每个 shuffleFileGroup 会对应一批磁盘文件,磁盘文件的数量与下游stage 的 task 数量是相反的。一个 Executor 上有多少个 cpu core,就可以并行口头多少个 task。而第一批并行口头的每个 task都会创立一个 shuffleFileGroup,并将数据写入对应的磁盘文件内。

当 Executor 的 cpu core 口头完一批 task,接着口头下一批 task 时,下一批 task 就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件,也就是说,此时 task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate 机制准许不同的 task 复用同一批磁盘文件,这样就可以有效将多个task 的磁盘文件启动必定水平上的兼并,从而大幅度缩小磁盘文件的数量,进而优化 shuffle write 的性能。

假定第二个 stage 有 100 个 task,第一个 stage 有 50 个 task,总共还是有 10 个 Executor(ExecutorCPU 个数为 1),每个 Executor 口头 5 个 task。那么原本经常使用未经优化的 HashShuffleManager 时,每个 Executor会发生 500 个磁盘文件,一切 Executor 会发生 5000 个磁盘文件的。然而此时经过优化之后,每个 Executor创立的磁盘文件的数量的计算公式为:cpu core的数量 * 下一个stage的task数量,也就是说,每个 Executor 此时只会创立 100个磁盘文件,一切 Executor 只会创立 1000 个磁盘文件。

这特性能优势显著,但为什么 Spark 不时没有在基于 Hash Shuffle的成功中将性能设置为自动选项呢,官网给出的说法是这特性能还欠稳固。

优化后的 HashShuffleManager 上班原理如下图所示:

优化后的HashShuffleManager上班原理

基于 Hash 的 Shuffle 机制的优缺陷

优势:

缺陷:

二、SortShuffle 解析

SortShuffleManager 的运转机制重要分红三种:

1. 普通运转机制

在该形式下,数据会先写入一个内存数据结构中,此时依据不同的 shuffle 算子,或者决定不同的数据结构。假设是 reduceByKey 这种聚合类的shuffle 算子,那么会决定 Map 数据结构,一边经过 Map 启动聚合,一边写入内存;假设是 join 这种普通的 shuffle 算子,那么会决定Array数据结构,间接写入内存。接着,每写一条数据进入内存数据结构之后,就会判别一下,能否到达了某个临界阈值。假设到达临界阈值的话,那么就会尝试将内存数据结构中的数据溢写到磁盘,而后清空内存数据结构。

在溢写到磁盘文件之前,会先依据 key 对内存数据结构中已有的数据启动排序。排序事先,会分批将数据写入磁盘文件。自动的 batch 数量是 10000条,也就是说,排序好的数据,会以每批 1 万条数据的方式分批写入磁盘文件。写入磁盘文件是经过 Java 的 BufferedOutputStream成功的。BufferedOutputStream 是 Java 的缓冲输入流,首先会将数据缓冲在内存中,当内存缓冲满溢之后再一次性写入磁盘文件中,这样可以缩小磁盘IO 次数,优化性能。

一个 task将一切数据写入内存数据结构的环节中,会出现屡次磁盘溢写操作,也就会发生多个暂时文件。最后会将之前一切的暂时磁盘文件都启动兼并,这就是merge环节,此时会将之前一切暂时磁盘文件中的数据读取进去,而后依次写入最终的磁盘文件之中。此外,由于一个 task 就只对应一个磁盘文件,也就象征着该 task为下游 stage 的 task 预备的数据都在这一个文件中,因此还会独自写一份索引文件,其中标识了下游各个 task 的数据在文件中的 startoffset 与 end offset。

SortShuffleManager 由于有一个磁盘文件 merge 的环节,因此大大缩小了文件数量。比如第一个 stage 有 50 个task,总共有 10 个 Executor,每个 Executor 口头 5 个 task,而第二个 stage 有 100 个 task。由于每个 task最终只要一个磁盘文件,因此此时每个 Executor 上只要 5 个磁盘文件,一切 Executor 只要 50 个磁盘文件。

普通运转机制的 SortShuffleManager 上班原理如下图所示:

普通运转机制的SortShuffleManager上班原理

2. bypass 运转机制

Reducer 端义务数比拟少的状况下,基于 Hash Shuffle 成功机制显著比基于 Sort Shuffle 成功机制要快,因此基于 SortShuffle 成功机制提供了一个带 Hash 格调的回退方案,就是 bypass 运转机制。关于 Reducer端义务数少于性能属性spark.shuffle.sort.bypassMergeThreshold设置的个数时,经常使用带 Hash 格调的回退方案。

bypass 运转机制的触发条件如下:

此时,每个 task 会为每个下游 task 都创立一个暂时磁盘文件,并将数据按 key 启动 hash 而后依据 key 的 hash 值,将 key写入对应的磁盘文件之中。当然,写入磁盘文件时也是先写入内存缓冲,缓冲写满之后再溢写到磁盘文件的。最后,雷同会将一切暂时磁盘文件都兼并成一个磁盘文件,并创立一个独自的索引文件。

该环节的磁盘写机制其实跟未经优化的 HashShuffleManager是如出一辙的,由于都要创立数量惊人的磁盘文件,只是在最后会做一个磁盘文件的兼并而已。因此大批的最终磁盘文件,也让该机制相对未经优化的HashShuffleManager 来说,shuffle read 的性能会更好。

而该机制与普通 SortShuffleManager运转机制的不同在于:第一,磁盘写机制不同;第二,不会启动排序。也就是说,启用该机制的最大好处在于,shuffle write环节中,不要求启动数据的排序操作,也就节俭掉了这局部的性能开支。

bypass 运转机制的 SortShuffleManager 上班原理如下图所示:

bypass运转机制的SortShuffleManager上班原理

3. Tungsten Sort Shuffle 运转机制

Tungsten Sort 是对普通 Sort 的一种优化,Tungsten Sort会启动排序,但排序的不是内容自身,而是内容序列化后字节数组的指针(元数据),把数据的排序转变为了指针数组的排序,成功了间接对序列化后的二进制数据启动排序。由于间接基于二进制数据启动操作,所以在这外面没有序列化和反序列化的环节。内存的消耗大大降落,相应的,会极大的缩小的GC 的开支。

Spark 提供了性能属性,用于决定详细的 Shuffle 成功机制,但要求说明的是,只管自动状况下 Spark 自动开启的是基于 SortShuffle成功机制,但实践上,参考 Shuffle 的框架内核局部可知基于 SortShuffle 的成功机制与基于 Tungsten Sort Shuffle成功机制都是经常使用 SortShuffleManager,而外部经常使用的详细的成功机制,是经过提供的两个方法启动判别的:

对应非基于 Tungsten Sort 时,经过 SortShuffleWriter.shouldBypassMergeSort 方法判别能否要求回退到Hash 格调的 Shuffle 成功机制,当该方法前往的条件不满足时,则经过SortShuffleManager.canUseSerializedShuffle 方法判别能否要求驳回基于 Tungsten Sort Shuffle成功机制,而当这两个方法前往都为 false,即都不满足对应的条件时,会智能驳回普通运转机制。

因此,当设置了 spark.shuffle.manager=tungsten-sort时,也不能保障就必定驳回基于 Tungsten Sort 的Shuffle 成功机制。

要成功 Tungsten Sort Shuffle 机制要求满足以下条件:

实践上,经常使用环节中还有其余一些限度,如引入 Page 方式的内存治理模型后,外部单条记载的长度不能超越 128 MB (详细内存模型可以参考PackedRecordPointer 类)。另外,分区个数的限度也是该内存模型造成的。

所以,目前经常使用基于 Tungsten Sort Shuffle 成功机制条件还是比拟厚道的。

参考资料:

《Spark大数据商业实战三部曲》

  • 关注微信

本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载联系作者并注明出处:https://duobeib.com/diannaowangluoweixiu/8329.html

猜你喜欢

热门标签

洗手盆如何疏浚梗塞 洗手盆为何梗塞 iPhone提价霸占4G市场等于原价8折 明码箱怎样设置明码锁 苏泊尔电饭锅保修多久 长城画龙G8253YN彩电输入指令画面变暗疑问检修 彩星彩电解除童锁方法大全 三星笔记本培修点上海 液晶显示器花屏培修视频 燃气热水器不热水要素 热水器不上班经常出现3种处置方法 无氟空调跟有氟空调有什么区别 norltz燃气热水器售后电话 大连站和大连北站哪个离周水子机场近 热水器显示屏亮显示温度不加热 铁猫牌保险箱高效开锁技巧 科技助力安保无忧 创维8R80 汽修 a1265和c3182是什么管 为什么电热水器不能即热 标致空调为什么不冷 神舟培修笔记本培修 dell1420内存更新 青岛自来水公司培修热线电话 包头美的洗衣机全国各市售后服务预定热线号码2024年修缮点降级 创维42k08rd更新 空调为什么运转异响 热水器为何会漏水 该如何处置 什么是可以自己处置的 重庆华帝售后电话 波轮洗衣机荡涤价格 鼎新热水器 留意了!不是水平疑问! 马桶产生了这5个现象 方便 极速 邢台空调移机电话上门服务 扬子空调缺点代码e4是什么疑问 宏基4736zG可以装置W11吗 奥克斯空调培修官方 为什么突然空调滴水很多 乐视s40air刷机包 未联络视的提高方向 官网培修 格力空调售后电话 皇明太阳能电话 看尚X55液晶电视进入工厂形式和软件更新方法 燃气热水器缺点代码

热门资讯

关注我们

微信公众号