10 广播变量 & 累加器:共享变量是用来做什么的?
你好,我是吴磊。
今天是国庆第一天,首先祝你节日快乐。专栏上线以来,有不少同学留言说期待后续内容,所以国庆期间我们仍旧更新正文内容,让我们一起把基础知识模块收个尾。
学习过RDD常用算子之后,回顾这些算子,你会发现它们都是作用(Apply)在RDD之上的。RDD的计算以数据分区为粒度,依照算子的逻辑,Executors以相互独立的方式,完成不同数据分区的计算与转换。
不难发现,对于Executors来说,分区中的数据都是局部数据。换句话说,在同一时刻,隶属于某个Executor的数据分区,对于其他Executors来说是不可见的。
不过,在做应用开发的时候,总会有一些计算逻辑需要访问“全局变量”,比如说全局计数器,而这些全局变量在任意时刻对所有的Executors都是可见的、共享的。那么问题来了,像这样的全局变量,或者说共享变量,Spark又是如何支持的呢?
今天这一讲,我就来和你聊聊Spark共享变量。按照创建与使用方式的不同,Spark提供了两类共享变量,分别是广播变量(Broadcast variables)和累加器(Accumulators)。接下来,我们就正式进入今天的学习,去深入了解这两种共享变量的用法、以及它们各自的适用场景。
广播变量(Broadcast variables)
我们先来说说广播变量。广播变量的用法很简单,给定普通变量x,通过调用SparkContext下的broadcast API即可完成广播变量的创建,我们结合代码例子看一下。
在上面的代码示例中,我们先是定义了一个字符串列表list,它包含“Apache”和“Spark”这两个单词。然后,我们使用broadcast函数来创建广播变量bc,bc封装的内容就是list列表。
// 读取广播变量内容
bc.value
// List[String] = List(Apache, Spark)
// 直接读取列表内容
list
// List[String] = List(Apache, Spark)
使用broadcast API创建广播变量
广播变量创建好之后,通过调用它的value函数,我们就可以访问它所封装的数据内容。可以看到调用bc.value的效果,这与直接访问字符串列表list的效果是完全一致的。
看到这里,你可能会问:“明明通过访问list变量就可以直接获取字符串列表,为什么还要绕个大弯儿,先去封装广播变量,然后又通过它的value函数来获取同样的数据内容呢?”实际上,这是个非常好的问题,要回答这个问题,咱们需要做个推演,看看直接访问list变量会产生哪些弊端。
在前面的几讲中,我们换着花样地变更Word Count的计算逻辑。尽管Word Count都快被我们“玩坏了”,不过,一以贯之地沿用同一个实例,有助于我们通过对比迅速掌握新的知识点、技能点。因此,为了让你迅速掌握广播变量的“精髓”,咱们不妨“故技重施”,继续在Word Count这个实例上做文章。
普通变量的痛点
这一次,为了对比使用广播变量前后的差异,我们把Word Count变更为“定向计数”。
所谓定向计数,它指的是只对某些单词进行计数,例如,给定单词列表list,我们只对文件wikiOfSpark.txt当中的“Apache”和“Spark”这两个单词做计数,其他单词我们可以忽略。结合第1讲Word Count的完整代码,这样的计算逻辑很容易实现,如下表所示。
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 创建单词列表list
val list: List[String] = List("Apache", "Spark")
// 使用list列表对RDD进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(word => list.contains(word))
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 获取计算结果
wordCounts.collect
// Array[(String, Int)] = Array((Apache,34), (Spark,63))
将上述代码丢进spark-shell,我们很快就能算出,在wikiOfSpark.txt文件中,“Apache”这个单词出现了34次,而“Spark”则出现了63次。虽说得出计算结果挺容易的,不过知其然,还要知其所以然,接下来,咱们一起来分析一下,这段代码在运行时是如何工作的。
如上图所示,list变量本身是在Driver端创建的,它并不是分布式数据集(如lineRDD、wordRDD)的一部分。因此,在分布式计算的过程中,Spark需要把list变量分发给每一个分布式任务(Task),从而对不同数据分区的内容进行过滤。
在这种工作机制下,如果RDD并行度较高、或是变量的尺寸较大,那么重复的内容分发就会引入大量的网络开销与存储开销,而这些开销会大幅削弱作业的执行性能。为什么这么说呢?
要知道,Driver端变量的分发是以Task为粒度的,系统中有多少个Task,变量就需要在网络中分发多少次。更要命的是,每个Task接收到变量之后,都需要把它暂存到内存,以备后续过滤之用。换句话说,在同一个Executor内部,多个不同的Task多次重复地缓存了同样的内容拷贝,毫无疑问,这对宝贵的内存资源是一种巨大的浪费。
RDD并行度较高,意味着RDD的数据分区数量较多,而Task数量与分区数相一致,这就代表系统中有大量的分布式任务需要执行。如果变量本身尺寸较大,大量分布式任务引入的网络开销与内存开销会进一步升级。在工业级应用中,RDD的并行度往往在千、万这个量级,在这种情况下,诸如list这样的变量会在网络中分发成千上万次,作业整体的执行效率自然会很差 。
面对这样的窘境,我们有没有什么办法,能够避免同一个变量的重复分发与存储呢?答案当然是肯定的,这个时候,我们就可以祭出广播变量这个“杀手锏”。
广播变量的优势
想要知道广播变量到底有啥优势,我们可以先用广播变量重写一下前面的代码实现,然后再做个对比,很容易就能发现广播变量为什么能解决普通变量的痛点。
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 创建单词列表list
val list: List[String] = List("Apache", "Spark")
// 创建广播变量bc
val bc = sc.broadcast(list)
// 使用bc.value对RDD进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(word => bc.value.contains(word))
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 获取计算结果
wordCounts.collect
// Array[(String, Int)] = Array((Apache,34), (Spark,63))
可以看到,代码的修改非常简单,我们先是使用broadcast函数来封装list变量,然后在RDD过滤的时候调用bc.value来访问list变量内容。你可不要小看这个改写,尽管代码的改动微乎其微,几乎可以忽略不计,但在运行时,整个计算过程却发生了翻天覆地的变化。
在使用广播变量之前,list变量的分发是以Task为粒度的,而在使用广播变量之后,变量分发的粒度变成了以Executors为单位,同一个Executor内多个不同的Tasks只需访问同一份数据拷贝即可。换句话说,变量在网络中分发与存储的次数,从RDD的分区数量,锐减到了集群中Executors的个数。
要知道,在工业级系统中,Executors个数与RDD并行度相比,二者之间通常会相差至少两个数量级。在这样的量级下,广播变量节省的网络与内存开销会变得非常可观,省去了这些开销,对作业的执行性能自然大有裨益。
好啦,到现在为止,我们讲解了广播变量的用法、工作原理,以及它的优势所在。在日常的开发工作中,当你遇到需要多个Task共享同一个大型变量(如列表、数组、映射等数据结构)的时候,就可以考虑使用广播变量来优化你的Spark作业。接下来,我们继续来说说Spark支持的第二种共享变量:累加器。
累加器(Accumulators)
累加器,顾名思义,它的主要作用是全局计数(Global counter)。与单机系统不同,在分布式系统中,我们不能依赖简单的普通变量来完成全局计数,而是必须依赖像累加器这种特殊的数据结构才能达到目的。
与广播变量类似,累加器也是在Driver端定义的,但它的更新是通过在RDD算子中调用add函数完成的。在应用执行完毕之后,开发者在Driver端调用累加器的value函数,就能获取全局计数结果。按照惯例,咱们还是通过代码来熟悉累加器的用法。
聪明的你可能已经猜到了,我们又要对Word Count“动手脚”了。在第1讲的Word Count中,我们过滤掉了空字符串,然后对文件wikiOfSpark.txt中所有的单词做统计计数。
不过这一次,我们在过滤掉空字符的同时,还想知道文件中到底有多少个空字符串,这样我们对文件中的“脏数据”就能做到心中有数了。
注意,这里对于空字符串的计数,不是主代码逻辑,它的计算结果不会写入到Word Count最终的统计结果。所以,只是简单地去掉filter环节,是无法实现空字符计数的。
那么,你自然会问:“不把filter环节去掉,怎么对空字符串做统计呢?”别着急,这样的计算需求,正是累加器可以施展拳脚的地方。你可以先扫一眼下表的代码实现,然后我们再一起来熟悉累加器的用法。
import org.apache.spark.rdd.RDD
val rootPath: String = _
val file: String = s"${rootPath}/wikiOfSpark.txt"
// 读取文件内容
val lineRDD: RDD[String] = spark.sparkContext.textFile(file)
// 以行为单位做分词
val wordRDD: RDD[String] = lineRDD.flatMap(line => line.split(" "))
// 定义Long类型的累加器
val ac = sc.longAccumulator("Empty string")
// 定义filter算子的判定函数f,注意,f的返回类型必须是Boolean
def f(x: String): Boolean = {
if(x.equals("")) {
// 当遇到空字符串时,累加器加1
ac.add(1)
return false
} else {
return true
}
}
// 使用f对RDD进行过滤
val cleanWordRDD: RDD[String] = wordRDD.filter(f)
// 把RDD元素转换为(Key,Value)的形式
val kvRDD: RDD[(String, Int)] = cleanWordRDD.map(word => (word, 1))
// 按照单词做分组计数
val wordCounts: RDD[(String, Int)] = kvRDD.reduceByKey((x, y) => x + y)
// 收集计数结果
wordCounts.collect
// 作业执行完毕,通过调用value获取累加器结果
ac.value
// Long = 79
与第1讲的Word Count相比,这里的代码主要有4处改动:
- 使用SparkContext下的longAccumulator来定义Long类型的累加器;
- 定义filter算子的判定函数f,当遇到空字符串时,调用add函数为累加器计数;
- 以函数f为参数,调用filter算子对RDD进行过滤;
- 作业完成后,调用累加器的value函数,获取全局计数结果。
你不妨把上面的代码敲入到spark-shell里,直观体验下累加器的用法与效果,ac.value给出的结果是79,这说明以空格作为分隔符切割源文件wikiOfSpark.txt之后,就会留下79个空字符串。
另外,你还可以验证wordCounts这个RDD,它包含所有单词的计数结果,不过,你会发现它的元素并不包含空字符串,这与我们预期的计算逻辑是一致的。
除了上面代码中用到的longAccumulator,SparkContext还提供了doubleAccumulator和collectionAccumulator这两种不同类型的累加器,用于满足不同场景下的计算需要,感兴趣的话你不妨自己动手亲自尝试一下。
其中,doubleAccumulator用于对Double类型的数值做全局计数;而collectionAccumulator允许开发者定义集合类型的累加器,相比数值类型,集合类型可以为业务逻辑的实现,提供更多的灵活性和更大的自由度。
不过,就这3种累加器来说,尽管类型不同,但它们的用法是完全一致的。都是先定义累加器变量,然后在RDD算子中调用add函数,从而更新累加器状态,最后通过调用value函数来获取累加器的最终结果。
好啦,到这里,关于累加器的用法,我们就讲完了。在日常的开发中,当你遇到需要做全局计数的场景时,别忘了用上累加器这个实用工具。
重点回顾
今天的内容讲完了,我们一起来做个总结。今天这一讲,我们重点讲解了广播变量与累加器的用法与适用场景。
广播变量由Driver端定义并初始化,各个Executors以只读(Read only)的方式访问广播变量携带的数据内容。累加器也是由Driver定义的,但Driver并不会向累加器中写入任何数据内容,累加器的内容更新,完全是由各个Executors以只写(Write only)的方式来完成,而Driver仅以只读的方式对更新后的内容进行访问。
关于广播变量,你首先需要掌握它的基本用法。给定任意类型的普通变量,你都可以使用SparkContext下面的broadcast API来创建广播变量。接下来,在RDD的转换与计算过程中,你可以通过调用广播变量的value函数,来访问封装的数据内容,从而辅助RDD的数据处理。
需要额外注意的是,在Driver与Executors之间,普通变量的分发与存储,是以Task为粒度的,因此,它所引入的网络与内存开销,会成为作业执行性能的一大隐患。在使用广播变量的情况下,数据内容的分发粒度变为以Executors为单位。相比前者,广播变量的优势高下立判,它可以大幅度消除前者引入的网络与内存开销,进而在整体上提升作业的执行效率。
关于累加器,首先你要清楚它的适用场景,当你需要做全局计数的时候,累加器会是个很好的帮手。其次,你需要掌握累加器的具体用法,可以分为这样3步:
- 使用SparkContext下的[long | double | collection]Accumulator来定义累加器;
- 在RDD的转换过程中,调用add函数更新累加器状态;
- 在作业完成后,调用value函数,获取累加器的全局结果。
每课一练
- 在使用累加器对空字符串做全局计数的代码中,请你用普通变量去替换累加器,试一试,在不使用累加器的情况,能否得到预期的计算结果?
- 累加器提供了Long、Double和Collection三种类型的支持,那么广播变量在类型支持上有限制吗?除了普通类型、集合类型之外,广播变量还支持其他类型吗?比如,Spark支持在RDD之上创建广播变量吗?
欢迎你在留言区跟我交流互动,也推荐你把这一讲的内容分享给你身边的朋友,说不定就能帮他解决一个难题。
- Geek_2dfa9a 👍(25) 💬(1)
1.使用普通变量是无法得到预期结果的,因为lambda里的必须是final型的变量,那这里我用两种方法做个测试: 首先是原子类,因字数限制,只放关键代码: var normal = new AtomicInteger(0) val preRdd = wordRdd.filter(f => if(f.contains("scala")) { println(normal.incrementAndGet()) false } else { true } ) println(normal.get()) 可以看到Executor运行过程中normal是会正常累加的,但是最后println(normal.get())打印出来是0,这里简单分析下,spark会把闭包序列化后 传递给Executor,然后Executor再把闭包反序列化后作用在RDD上。因此Driver里的normal变量和Executor里的normal变量是多个进程里的多个变量。 然后使用自定义类对象 class IntHolder(var value: Int) {} 测试,报了一个Exception in thread "main" org.apache.spark.SparkException: Task not serializable 从侧面也证明,闭包里的对象是要实现序列化的。变量是多个进程里的多个变量改进下再试, class IntHolder(var value: Int) extends Serializable {} 发现每个Task都是从0开始计数,更说明每个Task里的对象是Driver丢过来的副本。这里我多想了下Task里的函数是串行执行还是并行执行的,如果我的IntHolder对象 不是线程安全的,那在Task里有无数据竞争?从我的例子看是串行运行的,但是一时找不到看哪些代码,请老师指正。 多讲两句,Spark闭包序列化和反序列化真的是很重要的知识,打开了我的视野,我感觉Spark是Faas一种非常巧妙的场景,函数式编程真的也很符合Spark 把函数作用在RDD上,存算分离的思路,UCB出品真的名不虚传。 2.Spark基本支持任何类型的广播变量,但是不支持RDD类型的广播变量,从代码中可以看到,有一个验证会判断变量是否为RDD类型,如果想要广播RDD类型的话,可以先 collect收集到driver,再作为普通集合广播。 def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
2021-10-03 - Geek_f39659 👍(12) 💬(1)
第一题: driver中声明的非广播变量,如果executor需要用的话,会给每一个分区拷贝一个副本,所以每个分区都会给自己的这份副本加一加一。最后这些副本随着executor 进程的结束就都丢失了。所以driver 中的这个变量仍然还是0.
2021-10-03 - 火炎焱燚 👍(3) 💬(1)
python 版的代码对累加器这儿有些不一样,貌似没有longAccumulator,double之分,我这儿是这样的: file='~~~~/chapter01/wikiOfSpark.txt' lineRDD=sc.textFile(file) lineRDD.first() # 会打印出lineRDD的第一行: u'Apache Spark',如果没有打印出来,则报错 wordRDD=lineRDD.flatMap(lambda line: line.split(" ")) # 定义累加器 ac = sc.accumulator(0) # 这儿的定义方式不一样 # 定义filter算子的判定函数f,注意,f的返回类型必须是Boolean def f(x): if x=='': ac.add(1) return False else: return True # 使用f对RDD进行过滤 cleanWordRDD = wordRDD.filter(f) kvRDD=cleanWordRDD.map(lambda word:(word,1)) wordCounts=kvRDD.reduceByKey(lambda x,y:x+y) # 获取计算结果 wordCounts.collect() # 结果:[('Spark', 63), ('Apache', 34)] # wordCounts.map(lambda (k,v):(v,k)).sortByKey(False).take(5) # 作业执行完毕,通过调用value获取累加器结果 ac.value # 79
2021-10-12 - welldo 👍(2) 💬(1)
以前我写累加器,都是手动继承AccumulatorV2,现在找到更简单的方法了😁
2021-10-29 - GAC·DU 👍(2) 💬(4)
写程序测试了一下老师的第一个思考题,在本机的idea中可以得到正确的结果79,但是在spark- shell或者提交到yarn集群上结果却是0,好纠结,请老师解惑,代码如下。 第二个思考题,我认为广播变量是只读数据,参加计算时不能随着RDD一起变换形态破坏数据的一致性。 import org.apache.spark.sql.SparkSession object BcAcOpt { var n: Long = _ // 全局变量 def main(args: Array[String]): Unit = { val spark = SparkSession.builder.master("local[*]").appName("board").getOrCreate() val lineRDD = spark.sparkContext.textFile("wikiOfSpark.txt") println(lineRDD.partitions.length) val wordRDD = lineRDD.flatMap(word => word.split(" ")) val filterRDD = wordRDD.filter(acf) val kvRDD = filterRDD.map(word => (word, 1)) val wordCount = kvRDD.reduceByKey((x, y) => x + y) wordCount.take(10) println(n) spark.stop() } def acf(word: String): Boolean = { if ("".equals(word)) { n += 1 false } else { true } } }
2021-10-02 - welldo 👍(1) 💬(1)
老师,广播变量有"只读" / "不可变"特性, 但广播引用数据类型时, 广播的实际上是地址值,那么地址值肯定不可变,而地址值指向的内容是可变的; 我今天在idea和shell里做了一个测试,证明了这个说法是对的, 代码如下: (scala-List是不可变的,所以代码里我用了可变的scala-ListBuffer) var buffer: ListBuffer[String] = ListBuffer() var bufferBroad = spark.sparkContext.broadcast(buffer) val cleanWordRDD2: RDD[String] = wordRDD.filter(word => { if (bc.value.contains(word)) { //bc是文章里的广播变量 bufferBroad.value.append("test") //关键的一行 true } else { false } }) println(cleanWordRDD2.count())//触发这条dag println(buffer) //97(34+63)个test println(bufferBroad.value) //97(34+63)个test 老师,这个说法和我的证明没有问题吧? 这个算不算“不可变”特性的漏洞 呢?
2021-10-31 - welldo 👍(1) 💬(1)
老师,关于第一题, 我的答案:driver和executor是不一样的进程,普通变量会拷贝副本到executor上, “原本”和“副本”没有任何关系, spark-shell打印出来的是“原本”,数值是初始化时的值. 我的问题:为何idea能计算正确呢?
2021-10-29 - Sun 👍(1) 💬(1)
老师您好。关于第一题的实验,我使用自定义类,如下: class MyCount implements Serializable{ int num; public MyCount(){ this.num = 0; } public void add(){ this.num++; } } 将它用于代替广播变量,是不行的,因为executor最后销毁了这些对象。 但是我将这个对象置为静态对象,放在Driver中再运行后,这个对象成功得到了最后的结果。 请问老师,为什么设置为静态对象就可以获取结果呢?静态对象在Driver和Executor是怎么一个工作机制?
2022-04-20 - 唐方刚 👍(0) 💬(0)
广播变量是以executor为粒度分发的,那么累加器是怎么分发的?最终的结果又是怎么算出来的?
2022-08-12 - 杨帅 👍(0) 💬(0)
老师,我有一个问题:一个worker上可以有多个executor吗?在读取文件阶段,一个executor(一个进程)里可以有多个task(多个线程)读取不同分区(RDD的分区)的数据吗?那RDD的分区的定义是什么呢,是不同机器上的数据,还是?
2022-06-18 - Spoon 👍(0) 💬(0)
共享变量Java版本 https://github.com/Spoon94/spark-practice/blob/master/src/main/java/com/spoon/spark/core/SharedVariableJob.java
2022-04-04