18 数据关联优化:都有哪些Join策略,开发者该如何取舍?
你好,我是吴磊。
在上一讲,我们分别从关联形式与实现机制这两个方面,对数据分析进行了讲解和介绍。对于不同关联形式的用法和实现机制的原理,想必你已经了然于胸。不过,在大数据的应用场景中,数据的处理往往是在分布式的环境下进行的,在这种情况下,数据关联的计算还要考虑网络分发这个环节。
我们知道,在分布式环境中,Spark支持两类数据分发模式。一类是我们在第7讲学过的Shuffle,Shuffle通过中间文件来完成Map阶段与Reduce阶段的数据交换,因此它会引入大量的磁盘与网络开销。另一类是我们在第10讲介绍的广播变量(Broadcast Variables),广播变量在Driver端创建,并由Driver分发到各个Executors。
因此,从数据分发模式的角度出发,数据关联又可以分为Shuffle Join和Broadcast Join这两大类。将两种分发模式与Join本身的3种实现机制相结合,就会衍生出分布式环境下的6种Join策略。
那么,对于这6种Join策略,Spark SQL是如何支持的呢?它们的优劣势与适用场景都有哪些?开发者能否针对这些策略有的放矢地进行取舍?今天这一讲,咱们就来聊聊这些话题。
Join实现机制的优势对比
首先,我们先来说一说不同Join实现机制本身的一些特性与适用场景,从而为后续的讨论打好基础。需要说明的是,咱们这里说的Join实现机制,指的是算法层面的工作原理,不同的算法有着不同的适用场景与复杂度,我们需要对它们有足够认识并有所区分。
我们知道,Join支持3种实现机制,它们分别是Hash Join、Sort Merge Join和Nested Loop Join。三者之中,Hash Join的执行效率最高,这主要得益于哈希表O(1)的查找效率。不过,在Probe阶段享受哈希表的“性能红利”之前,Build阶段得先在内存中构建出哈希表才行。因此,Hash Join这种算法对于内存的要求比较高,适用于内存能够容纳基表数据的计算场景。
相比之下,Sort Merge Join就没有内存方面的限制。不论是排序、还是合并,SMJ都可以利用磁盘来完成计算。所以,在稳定性这方面,SMJ更胜一筹。
而且与Hash Join相比,SMJ的执行效率也没有差太多,前者是O(M),后者是O(M + N),可以说是不分伯仲。当然,O(M + N)的复杂度得益于SMJ的排序阶段。因此,如果准备参与Join的两张表是有序表,那么这个时候采用SMJ算法来实现关联简直是再好不过了。
与前两者相比,Nested Loop Join看上去有些多余,嵌套的双层for循环带来的计算复杂度最高:O(M * N)。不过,尺有所短寸有所长,执行高效的HJ和SMJ只能用于等值关联,也就是说关联条件必须是等式,像salaries(“id”) < employees(“id”)这样的关联条件,HJ和SMJ是无能为力的。相反,NLJ既可以处理等值关联(Equi Join),也可以应付不等值关联(Non Equi Join),可以说是数据关联在实现机制上的最后一道防线。
Shuffle Join与Broadcast Join
分析完不同Join机制的优缺点之后,接下来,我们再来说说分布式环境下的Join策略。与单机环境不同,在分布式环境中,两张表的数据各自散落在不同的计算节点与Executors进程。因此,要想完成数据关联,Spark SQL就必须先要把Join Keys相同的数据,分发到同一个Executors中去才行。
我们还是用上一讲的员工信息和薪资表来举例,如果我们打算对salaries和employees两张表按照id列做关联,那么,对于id字段值相同的薪资数据与员工数据,我们必须要保证它们坐落在同样的Executors进程里,Spark SQL才能利用刚刚说的HJ、SMJ、以及NLJ,以Executors(进程)为粒度并行地完成数据关联。
换句话说,以Join Keys为基准,两张表的数据分布保持一致,是Spark SQL执行分布式数据关联的前提。而能满足这个前提的途径只有两个:Shuffle与广播。这里我额外提醒一下,Shuffle和广播变量我们在前面的课程有过详细的介绍,如果你记不太清了,不妨翻回去看一看。
回到正题,开篇咱们说到,如果按照分发模式来划分,数据关联可以分为Shuffle Join和Broadcast Join两大类。通常来说,在执行性能方面,相比Shuffle Join,Broadcast Join往往会更胜一筹。为什么这么说呢?
接下来,我们就一起来分析分析,这两大类Join在分布式环境下的执行过程,~然后再来回答这个问题。~理解了执行过程,你自然就能解答这个问题了。
Shuffle Join
在没有开发者干预的情况下,Spark SQL默认采用Shuffle Join来完成分布式环境下的数据关联。对于参与Join的两张数据表,Spark SQL先是按照如下规则,来决定不同数据记录应当分发到哪个Executors中去:
- 根据Join Keys计算哈希值
- 将哈希值对并行度(Parallelism)取模
由于左表与右表在并行度(分区数)上是一致的,因此,按照同样的规则分发数据之后,一定能够保证id字段值相同的薪资数据与员工数据坐落在同样的Executors中。
如上图所示,颜色相同的矩形代表Join Keys相同的数据记录,可以看到,在Map阶段,数据分散在不同的Executors当中。经过Shuffle过后,Join Keys相同的记录被分发到了同样的Executors中去。接下来,在Reduce阶段,Reduce Task就可以使用HJ、SMJ、或是NLJ算法在Executors内部完成数据关联的计算。
Spark SQL之所以在默认情况下一律采用Shuffle Join,原因在于Shuffle Join的“万金油”属性。也就是说,在任何情况下,不论数据的体量是大是小、不管内存是否足够,Shuffle Join在功能上都能够“不辱使命”,成功地完成数据关联的计算。然而,有得必有失,功能上的完备性,往往伴随着的是性能上的损耗。
学习过 Shuffle的原理(第6讲)之后,不用我多说,Shuffle的弊端想必你早已烂熟于心。我们知道,从CPU到内存,从磁盘到网络,Shuffle的计算几乎需要消耗所有类型的硬件资源。尤其是磁盘和网络开销,这两座大山往往是应用执行的性能瓶颈。
那么问题来了,除了Shuffle Join这种“万金油”式的Join策略,开发者还有没有其他效率更高的选择呢?答案当然是肯定的,Broadcast Join就是用来克制Shuffle的“杀手锏”。
Broadcast Join
在广播变量那一讲(第10讲),我们讲过把用户数据结构封装为广播变量的过程。实际上,Spark不仅可以在普通变量上创建广播变量,在分布式数据集(如RDD、DataFrame)之上也可以创建广播变量。这样一来,对于参与Join的两张表,我们可以把其中较小的一个封装为广播变量,然后再让它们进行关联。
光说思路你可能体会不深,我们还是结合例子理解。以薪资表和员工表为例,只要对代码稍加改动,我们就能充分利用广播变量的优势。
更改后的代码如下所示。
import org.apache.spark.sql.functions.broadcast
// 创建员工表的广播变量
val bcEmployees = broadcast(employees)
// 内关联,PS:将原来的employees替换为bcEmployees
val jointDF: DataFrame = salaries.join(bcEmployees, salaries("id") === employees("id"), "inner")
在Broadcast Join的执行过程中,Spark SQL首先从各个Executors收集employees表所有的数据分片,然后在Driver端构建广播变量bcEmployees,构建的过程如下图实线部分所示。
可以看到,散落在不同Executors内花花绿绿的矩形,代表的正是employees表的数据分片。这些数据分片聚集到一起,就构成了广播变量。接下来,如图中虚线部分所示,携带着employees表全量数据的广播变量bcEmployees,被分发到了全网所有的Executors当中去。
在这种情况下,体量较大的薪资表数据只要“待在原地、保持不动”,就可以轻松关联到跟它保持之一致的员工表数据了。通过这种方式,Spark SQL成功地避开了Shuffle这种“劳师动众”的数据分发过程,转而用广播变量的分发取而代之。
尽管广播变量的创建与分发同样需要消耗网络带宽,但相比Shuffle Join中两张表的全网分发,因为仅仅通过分发体量较小的数据表来完成数据关联,Spark SQL的执行性能显然要高效得多。这种小投入、大产出,用极小的成本去博取高额的性能收益,可以说是“四两拨千斤”!
Spark SQL支持的Join策略
不论是Shuffle Join,还是Broadcast Join,一旦数据分发完毕,理论上可以采用HJ、SMJ和NLJ这3种实现机制中的任意一种,完成Executors内部的数据关联。因此,两种分发模式,与三种实现机制,它们组合起来,总共有6种分布式Join策略,如下图所示。
虽然组合起来选择多样,但你也不必死记硬背,抓住里面的规律才是关键,我们一起来分析看看。
在这6种Join策略中,Spark SQL支持其中的5种来应对不用的关联场景,也即图中蓝色的5个矩形。对于等值关联(Equi Join),Spark SQL优先考虑采用Broadcast HJ策略,其次是Shuffle SMJ,最次是Shuffle HJ。对于不等值关联(Non Equi Join),Spark SQL优先考虑Broadcast NLJ,其次是Shuffle NLJ。
不难发现,不论是等值关联、还是不等值关联,只要Broadcast Join的前提条件成立,Spark SQL一定会优先选择Broadcast Join相关的策略。那么问题来了,Broadcast Join的前提条件是什么呢?
回顾Broadcast Join的工作原理图,我们不难发现,Broadcast Join得以实施的基础,是被广播数据表(图中的表2)的全量数据能够完全放入Driver的内存、以及各个Executors的内存,如下图所示。
另外,为了避免因广播表尺寸过大而引入新的性能隐患,Spark SQL要求被广播表的内存大小不能超过8GB。
好,这里我们简单总结一下。只要被广播表满足上述两个条件,我们就可以利用SQL Functions中的broadcast函数来创建广播变量,进而利用Broadcast Join策略来提升执行性能。
当然,在Broadcast Join前提条件不成立的情况下,Spark SQL就会退化到Shuffle Join的策略。在不等值的数据关联中,Spark SQL只有Shuffle NLJ这一种选择,因此咱们无需赘述。
但在等值关联的场景中,Spark SQL有Shuffle SMJ和Shuffle HJ这两种选择。尽管如此,Shuffle SMJ与Shuffle HJ的关系,就像是关羽和周仓的关系。周仓虽说武艺也不错,但他向来只是站在关公后面提刀。大战在即,刘备仰仗的自然是站在前面的关羽,而很少启用后面的周仓。在Shuffle SMJ与Shuffle HJ的取舍上,Spark SQL也是如此。
学习过Shuffle之后,我们知道,Shuffle在Map阶段往往会对数据做排序,而这恰恰正中SMJ机制的下怀。对于已经排好序的两张表,SMJ的复杂度是O(M + N),这样的执行效率与HJ的O(M)可以说是不相上下。再者,SMJ在执行稳定性方面,远胜于HJ,在内存受限的情况下,SMJ可以充分利用磁盘来顺利地完成关联计算。因此,考虑到Shuffle SMJ的诸多优势,Shuffle HJ就像是关公后面的周仓,Spark SQL向来对之视而不见,所以对于HJ你大概知道它的作用就行。
重点回顾
好啦,到此为止,今天的课程就全部讲完了,我们一起来做个总结。首先,我们一起分析、对比了单机环境中不同Join机制的优劣势,我把它们整理到了下面的表格中,供你随时查看。
在分布式环境中,要想利用上述机制完成数据关联,Spark SQL首先需要把两张表中Join Keys一致的数据分发到相同的Executors中。
因此,数据分发是分布式数据关联的基础和前提。Spark SQL支持Shuffle和广播两种数据分发模式,相应地,Join也被分为Shuffle Join和Broadcast Join,其中Shuffle Join是默认的关联策略。关于两种策略的优劣势对比,我也整理到了如下的表格中,供你参考。
结合三种实现机制和两种数据分发模式,Spark SQL支持5种分布式Join策略。对于这些不同的Join策略,Spark SQL有着自己的选择偏好,我把它整理到了如下的表格中,供你随时查看。
其中,Broadcast Join的生效前提,是基表能够放进内存,且存储尺寸小于8GB。只要前提条件成立,Spark SQL就会优先选择Broadcast Join。
每课一练
在6种分布式Join策略中,Spark SQL唯独没有支持Broadcast SMJ,你能想一想,为什么Spark SQL没有选择支持这种Join策略吗?提示一下,你可以从SMJ与HJ的执行效率入手做分析。
欢迎你在留言区跟我交流互动,也推荐你把这一讲分享给更多同事、朋友。
- Abigail 👍(15) 💬(2)
Broadcast Join 中相比 SMJ,HJ 并不要求参与 Join 的两张表有序,也不需要维护两个游标来判断当前的记录位置,只要在 Build 阶段构建的哈希表可以放进内存就行。这个时候,相比 NLJ、SMJ,HJ 的执行效率是最高的。因此,当Broadcast Join 的前提条件存在,在可以采用 HJ 的情况下,Spark 自然就没有必要再去用 SMJ 这种前置开销(排序)比较大的方式去完成数据关联。
2021-10-20 - 小新 👍(8) 💬(1)
SMJ 在执行稳定性方面,远胜于 HJ,这句话怎么理解? 还有在做等值关联时,优先级是:Broadcast HJ Shuffle SMJ Shuffle HJ那什么情况下Shuffle HJ会启用呢?
2021-12-02 - Unknown element 👍(2) 💬(4)
“学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。” 老师问下这里 join 之前应该还得再排一次序吧?因为 map 阶段的排序只能保证 reduce task 从每个 map task 拉取过来的数据片段是有序的,但是多个数据片段之间还是无序的吧
2021-10-21 - 王璀璨 👍(1) 💬(1)
老师最近在用spark重构pandas的时候遇到一个问题,在udf中使用filter查询的时候报错 temp_data = hospital_data_sheet1.groupby(['hcp_id', 'hcp_name']).count().select('hcp_id', 'hcp_name') def get_data(data1): search_data = hospital_data_sheet1.select((hospital_data_sheet1['hcp_id'] == data1)) total = data1 return total get_number = F.udf(get_data, StringType()) result_data = temp_data.withColumn('total', get_number(temp_data['hcp_id'])) result_data.show() 最后报错 _pickle.PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object 不知道为什么会这样,请老师看一下
2022-01-11 - LJK 👍(1) 💬(1)
老师好,工作中碰到过ERROR BroadcastExchangeExec: Could not execute broadcast in 300 secs.的报错,请问这种报错的排查思路以及有哪些可能的原因导致的呢?
2021-11-05 - 慢慢卢 👍(0) 💬(1)
"学习过 Shuffle 之后,我们知道,Shuffle 在 Map 阶段往往会对数据做排序,而这恰恰正中 SMJ 机制的下怀。对于已经排好序的两张表,SMJ 的复杂度是 O(M + N),这样的执行效率与 HJ 的 O(M) 可以说是不相上下。再者,SMJ 在执行稳定性方面,远胜于 HJ,在内存受限的情况下,SMJ 可以充分利用磁盘来顺利地完成关联计算。因此,考虑到 Shuffle SMJ 的诸多优势,Shuffle HJ 就像是关公后面的周仓,Spark SQL 向来对之视而不见,所以对于 HJ 你大概知道它的作用就行。" 这段里面提到SMJ可以利用磁盘完成计算,结合前面提到内存管理,能使用磁盘的除了cache、shuffle write外,也就是说内存计算其他过程也会使用到磁盘(比如SMJ),但我理解内存计算应该完全在内存中,不然就不会有OOM了。 所以这点我没有搞懂,辛苦老师指导解释下。
2021-12-09 - Geek_038655 👍(0) 💬(1)
请问:大表join大表怎么优化?
2021-10-26 - 思绪纷繁 👍(1) 💬(0)
【由于左表与右表在并行度(分区数)上是一致的】 想问下,只要做shuffle join操作,左表和右表的的并行度一定是一样的吗?
2023-06-06 - InfoQ_11351e216def 👍(0) 💬(0)
老师您好,想问一下,这里 shuffle 的时候会基于 hash 对两个表的数据分区,这只能保证等值关联的情况节点上有对应的数据,那如果不是等值关联呢?比如 a.id < b.aid这种类似的情况,那岂不是驱动表的每条数据都需要扫描被驱动表的每条数据吗?spark 如何做到呢?
2024-12-09 - 嬴梦川 👍(0) 💬(0)
shuffle write 过程中对结构中的数据记录按(目标分区 ID,Key)排序, 排列后的结果应该不光对“目标分区 ID”有序,也应该对“Key”有序。这样在reduce阶段拉取数据时再做一次时间复杂度为O(N)归并排序就行了。不会改变复杂度O(M+N)的最终结果
2023-09-24 - 嬴梦川 👍(0) 💬(0)
第六章中, shuffle write 过程中对结构中的数据记录按(目标分区 ID,Key)排序, 排列后的结果应该不光对“目标分区 ID”有序,也应该对“Key”是有序的吧
2023-09-24 - 18736416569 👍(0) 💬(0)
老师好,前几天跑任务时遇到了关于join的一个问题,很是不解:逻辑是这样的:a表id去left join b表的id_1或者id_2,我用的是a.id = b.id_1 OR a.id = b.id_2,发现spark采用的是BroadCastNestedLoopJoin,难道这不算是等值连接吗?
2023-02-24 - 小麦 👍(0) 💬(1)
【由于左表与右表在并行度(分区数)上是一致的】 想问下,如果是 Hadoop RDD,左表数据量很大,以 128M 划分成10个分区,而右表只有2个分区。如何进行后续的计算呢?
2022-11-19 - Spoon 👍(0) 💬(0)
使用Boradcast SMJ需要前置排序,纯内存的排序最好也需要O(nlgn)的复杂度,更何况是Sorted Merge可能还会利用磁盘排序,这就得不偿失了
2022-04-10