14 弹性分布式数据集:Spark大厦的地基(下)
你好,我是蔡元楠。
上一讲我们介绍了弹性分布式数据集(RDD)的定义、特性以及结构,并且深入讨论了依赖关系(Dependencies)。
今天让我们一起来继续学习RDD的其他特性。
RDD的结构
首先,我来介绍一下RDD结构中其他的几个知识点:检查点(Checkpoint)、存储级别( Storage Level)和迭代函数(Iterator)。
通过上一讲,你应该已经知道了,基于RDD的依赖关系,如果任意一个RDD在相应的节点丢失,你只需要从上一步的RDD出发再次计算,便可恢复该RDD。
但是,如果一个RDD的依赖链比较长,而且中间又有多个RDD出现故障的话,进行恢复可能会非常耗费时间和计算资源。
而检查点(Checkpoint)的引入,就是为了优化这些情况下的数据恢复。
很多数据库系统都有检查点机制,在连续的transaction列表中记录某几个transaction后数据的内容,从而加快错误恢复。
RDD中的检查点的思想与之类似。
在计算过程中,对于一些计算过程比较耗时的RDD,我们可以将它缓存至硬盘或HDFS中,标记这个RDD有被检查点处理过,并且清空它的所有依赖关系。同时,给它新建一个依赖于CheckpointRDD的依赖关系,CheckpointRDD可以用来从硬盘中读取RDD和生成新的分区信息。
这样,当某个子RDD需要错误恢复时,回溯至该RDD,发现它被检查点记录过,就可以直接去硬盘中读取这个RDD,而无需再向前回溯计算。
存储级别(Storage Level)是一个枚举类型,用来记录RDD持久化时的存储级别,常用的有以下几个:
- MEMORY_ONLY:只缓存在内存中,如果内存空间不够则不缓存多出来的部分。这是RDD存储级别的默认值。
- MEMORY_AND_DISK:缓存在内存中,如果空间不够则缓存在硬盘中。
- DISK_ONLY:只缓存在硬盘中。
- MEMORY_ONLY_2和MEMORY_AND_DISK_2等:与上面的级别功能相同,只不过每个分区在集群中两个节点上建立副本。
这就是我们在前文提到过的,Spark相比于Hadoop在性能上的提升。我们可以随时把计算好的RDD缓存在内存中,以便下次计算时使用,这大幅度减小了硬盘读写的开销。
迭代函数(Iterator)和计算函数(Compute)是用来表示RDD怎样通过父RDD计算得到的。
迭代函数会首先判断缓存中是否有想要计算的RDD,如果有就直接读取,如果没有,就查找想要计算的RDD是否被检查点处理过。如果有,就直接读取,如果没有,就调用计算函数向上递归,查找父RDD进行计算。
到现在,相信你已经对弹性分布式数据集的基本结构有了初步了解。但是光理解RDD的结构是远远不够的,我们的终极目标是使用RDD进行数据处理。
要使用RDD进行数据处理,你需要先了解一些RDD的数据操作。
在第12讲中,我曾经提过,相比起MapReduce只支持两种数据操作,Spark支持大量的基本操作,从而减轻了程序员的负担。
接下来,让我们进一步了解基于RDD的各种数据操作。
RDD的转换操作
RDD的数据操作分为两种:转换(Transformation)和动作(Action)。
顾名思义,转换是用来把一个RDD转换成另一个RDD,而动作则是通过计算返回一个结果。
不难想到,之前举例的map、filter、groupByKey等都属于转换操作。
Map
map是最基本的转换操作。
与MapReduce中的map一样,它把一个RDD中的所有数据通过一个函数,映射成一个新的RDD,任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。
在这一讲中提到的所有的操作,我都会使用代码举例,帮助你更好地理解。
rdd = sc.parallelize(["b", "a", "c"])
rdd2 = rdd.map(lambda x: (x, 1)) // [('b', 1), ('a', 1), ('c', 1)]
Filter
filter这个操作,是选择原RDD里所有数据中满足某个特定条件的数据,去返回一个新的RDD。如下例所示,通过filter,只选出了所有的偶数。
mapPartitions
mapPartitions是map的变种。不同于map的输入函数是应用于RDD中每个元素,mapPartitions的输入函数是应用于RDD的每个分区,也就是把每个分区中的内容作为整体来处理的,所以输入函数的类型是Iterator[T] => Iterator[U]。
rdd = sc.parallelize([1, 2, 3, 4], 2)
def f(iterator): yield sum(iterator)
rdd2 = rdd.mapPartitions(f) // [3, 7]
在mapPartitions的例子中,我们首先创建了一个有两个分区的RDD。mapPartitions的输入函数是对每个分区内的元素求和,所以返回的RDD包含两个元素:1+2=3 和3+4=7。
groupByKey
groupByKey和SQL中的groupBy类似,是把对象的集合按某个Key来归类,返回的RDD中每个Key对应一个序列。
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 2)])
rdd.groupByKey().collect()
//"a" [1, 2]
//"b" [1]
在此,我们只列举这几个常用的、有代表性的操作,对其他转换操作感兴趣的同学可以去自行查阅官方的API文档。
RDD的动作操作
让我们再来看几个常用的动作操作。
Collect
RDD中的动作操作collect与函数式编程中的collect类似,它会以数组的形式,返回RDD的所有元素。需要注意的是,collect操作只有在输出数组所含的数据数量较小时使用,因为所有的数据都会载入到程序的内存中,如果数据量较大,会占用大量JVM内存,导致内存溢出。
rdd = sc.parallelize(["b", "a", "c"])
rdd.map(lambda x: (x, 1)).collect() // [('b', 1), ('a', 1), ('c', 1)]
实际上,上述转换操作中所有的例子,最后都需要将RDD的元素collect成数组才能得到标记好的输出。
Reduce
与MapReduce中的reduce类似,它会把RDD中的元素根据一个输入函数聚合起来。
Count
Count会返回RDD中元素的个数。
sc.parallelize([2, 3, 4]).count() // 3
CountByKey
仅适用于Key-Value pair类型的 RDD,返回具有每个 key 的计数的<Key, Count>的map。
rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
sorted(rdd.countByKey().items()) // [('a', 2), ('b', 1)]
讲到这,你可能会问了,为什么要区分转换和动作呢?虽然转换是生成新的RDD,动作是把RDD进行计算生成一个结果,它们本质上不都是计算吗?
这是因为,所有转换操作都很懒,它只是生成新的RDD,并且记录依赖关系。
但是Spark并不会立刻计算出新RDD中各个分区的数值。直到遇到一个动作时,数据才会被计算,并且输出结果给Driver。
比如,在之前的例子中,你先对RDD进行map转换,再进行collect动作,这时map后生成的RDD不会立即被计算。只有当执行到collect操作时,map才会被计算。而且,map之后得到的较大的数据量并不会传给Driver,只有collect动作的结果才会传递给Driver。
这种惰性求值的设计优势是什么呢?让我们来看这样一个例子。
假设,你要从一个很大的文本文件中筛选出包含某个词语的行,然后返回第一个这样的文本行。你需要先读取文件textFile()生成rdd1,然后使用filter()方法生成rdd2,最后是行动操作first(),返回第一个元素。
读取文件的时候会把所有的行都存储起来,但我们马上就要筛选出只具有特定词组的行了,等筛选出来之后又要求只输出第一个。这样是不是太浪费存储空间了呢?确实。
所以实际上,Spark是在行动操作first()的时候开始真正的运算:只扫描第一个匹配的行,不需要读取整个文件。所以,惰性求值的设计可以让Spark的运算更加高效和快速。
让我们总结一下Spark执行操作的流程吧。
Spark在每次转换操作的时候,使用了新产生的 RDD 来记录计算逻辑,这样就把作用在 RDD 上的所有计算逻辑串起来,形成了一个链条。当对 RDD 进行动作时,Spark 会从计算链的最后一个RDD开始,依次从上一个RDD获取数据并执行计算逻辑,最后输出结果。
RDD的持久化(缓存)
每当我们对RDD调用一个新的action操作时,整个RDD都会从头开始运算。因此,如果某个RDD会被反复重用的话,每次都从头计算非常低效,我们应该对多次使用的RDD进行一个持久化操作。
Spark的persist()和cache()方法支持将RDD的数据缓存至内存或硬盘中,这样当下次对同一RDD进行Action操作时,可以直接读取RDD的结果,大幅提高了Spark的计算效率。
rdd = sc.parallelize([1, 2, 3, 4, 5])
rdd1 = rdd.map(lambda x: x+5)
rdd2 = rdd1.filter(lambda x: x % 2 == 0)
rdd2.persist()
count = rdd2.count() // 3
first = rdd2.first() // 6
rdd2.unpersist()
在文中的代码例子中你可以看到,我们对RDD2进行了多个不同的action操作。由于在第四行我把RDD2的结果缓存在内存中,所以Spark无需从一开始的rdd开始算起了(持久化处理过的RDD只有第一次有action操作时才会从源头计算,之后就把结果存储下来,所以在这个例子中,count需要从源头开始计算,而first不需要)。
在缓存RDD的时候,它所有的依赖关系也会被一并存下来。所以持久化的RDD有自动的容错机制。如果RDD的任一分区丢失了,通过使用原先创建它的转换操作,它将会被自动重算。
持久化可以选择不同的存储级别。正如我们讲RDD的结构时提到的一样,有MEMORY_ONLY,MEMORY_AND_DISK,DISK_ONLY等。cache()方法会默认取MEMORY_ONLY这一级别。
小结
Spark在每次转换操作的时候使用了新产生的 RDD 来记录计算逻辑,这样就把作用在 RDD 上的所有计算逻辑串起来形成了一个链条,但是并不会真的去计算结果。当对 RDD 进行动作Action时,Spark 会从计算链的最后一个RDD开始,利用迭代函数(Iterator)和计算函数(Compute),依次从上一个RDD获取数据并执行计算逻辑,最后输出结果。
此外,我们可以通过将一些需要复杂计算和经常调用的RDD进行持久化处理,从而提升计算效率。
思考题
对RDD进行持久化操作和记录Checkpoint,有什么区别呢?
欢迎你把对弹性分布式数据集的疑问写在留言区,与我和其他同学一起讨论。如果你觉得有所收获,也欢迎把文章分享给你的朋友。
- 锦 👍(63) 💬(4)
区别在于Checkpoint会清空该RDD的依赖关系,并新建一个CheckpointRDD依赖关系,让该RDD依赖,并保存在磁盘或HDFS文件系统中,当数据恢复时,可通过CheckpointRDD读取RDD进行数据计算;持久化RDD会保存依赖关系和计算结果至内存中,可用于后续计算。
2019-05-17 - RocWay 👍(32) 💬(1)
主要区别应该是对依赖链的处理: checkpoint在action之后执行,相当于事务完成后备份结果。既然结果有了,之前的计算过程,也就是RDD的依赖链,也就不需要了,所以不必保存。 但是cache和persist只是保存当前RDD,并不要求是在action之后调用。相当于事务的计算过程,还没有结果。既然没有结果,当需要恢复、重新计算时就要重放计算过程,自然之前的依赖链不能放弃,也需要保存下来。需要恢复时就要从最初的或最近的checkpoint开始重新计算。
2019-05-17 - hua168 👍(7) 💬(2)
老师,我想问下,如果是linux 命令分析单机300G log日志,内存只有16G,怎搞? 如果用spark思想,,从io读很卡,直接内存爆了。 如果先分割日志为100份,再用shell,一下10个并发执行,最后结果合并。感觉还是有点慢。
2019-05-17 - JohnT3e 👍(5) 💬(2)
两者区别在于依赖关系是否保留吧。checkpoint的话,检查点之前的关系应该丢失了,但其数据已经持久化了;而persist或者cache保留了这个依赖关系,如果缓存结果有丢失,可以通过这个关系进行rebuild。
2019-05-17 - 挖矿的小戈 👍(4) 💬(1)
1. 前者:persist或者cache除了除了持久化该RDD外,还会保留该RDD前面的依赖关系 2. 后者:将该RDD保存到磁盘上,并清除前面的依赖关系 感觉后者的开销会大很多
2019-05-17 - miwucc 👍(3) 💬(1)
手动调用缓存函数和checkpoint本质上是一样的吧。就是一个手动控制落盘时间,一个自动控制。
2019-05-17 - 廖师虎 👍(3) 💬(1)
记不太清除了,checkpoint清除血缘关系,一般保存在类hdfs文件系统,目的是容错,缓存是保留血缘关系,并保存在本机,的目的是提高效率,High performance Spark书讲得很详细。 第一次遇到把driver翻译成驱动程序的,个人感觉还是保留Driver,Action为佳。
2019-05-17 - cricket1981 👍(3) 💬(1)
终于明白spark惰性求值的原理了。我理解对 RDD 进行持久化操作和记录 Checkpoint的区别是:前者是开发人员为了避免重复计算、减少长链路计算时间而主动去缓存中间结果,而后者是spark框架为了容错而提供的保存中间结果机制,它对开发人员是透明的,无感知的。
2019-05-17 - Steven 👍(2) 💬(3)
缓存了之后,第一个action还是需要从头计算的吧? "所以无论是 count 还是 first,Spark 都无需从头计算", 这句话是不是有误?
2019-05-17 - jon 👍(2) 💬(1)
checkpoint不会存储该rdd前面的依赖关系,它后面的rdd都依赖于它。 persist、 cache操作会存储依赖关系,当一个分区丢失后可以根据依赖重新计算。
2019-05-17 - Peter 👍(1) 💬(1)
在缓存 RDD 的时候,它所有的依赖关系也会被一并存下来。所以持久化的 RDD 有自动的容错机制。如果 RDD 的任一分区丢失了,通过使用原先创建它的转换操作,它将会被自动重算
2019-05-18 - 渡码 👍(0) 💬(1)
设计上,应用场景不同,checkpoint用来做故障恢复,cache为了避免重复计算。实现技术上我没做深入了解不清楚,猜测存储技术类似,可能checkpoint会额外增加用于故障恢复的信息
2019-05-24 - 涵 👍(14) 💬(0)
从目的上来说,checkpoint用于数据恢复,RDD持久化用于RDD的多次计算操作的性能优化,避免重复计算。从存储位置上看checkpoint储存在外存中,RDD可以根据存储级别存储在内存或/和外存中。
2019-05-17 - 珅剑 👍(7) 💬(0)
1.设置checkpoint时需要指定checkpoint的存储目录,而持久化不管是直接调用cache还是通过persist指定缓存级别都不需要指定存储目录,由系统自己指定 2.checkpoint是将RDD去除依赖关系后将数据直接存储到磁盘,且一般是HDFS,带有备份,因此不容易丢失,恢复时直接获取checkpoint的数据;而持久化一般是直接cache到内存。数据容易丢失,即便是通过设置MEMORY_AND_DISK_2等缓存级别达到内存和磁盘都有备份,也会在每个备份中都缓存RDD的依赖关系,造成不必要的冗余
2019-06-11 - Peter 👍(2) 💬(0)
在计算过程中,对于一些计算过程比较耗时的 RDD,我们可以将它缓存至硬盘或 HDFS 中,标记这个 RDD 有被检查点处理过,并且清空它的所有依赖关系。同时,给它新建一个依赖于 CheckpointRDD 的依赖关系,CheckpointRDD 可以用来从硬盘中读取 RDD 和生成新的分区信息。
2019-05-18