Spark算子 Transformation | Action
本文参考:扎心了,老铁
一、Transformation算子
1、map, flatMap, mapParations, mapPartitionsWithIndex
A、map
(1)使用scala进行编写
1 | def map(): Unit ={ |
(2)运行结果
(3)总结
可以看出,对于map算子,源JavaRDD的每个元素都会进行计算,由于是依次进行传参,所以他是有序的,新RDD的元素顺序与源RDD是相同的。而由有序又引出接下来的flatMap。
B、flatMap
(1)使用scala进行编写
1 | def flatMap(): Unit ={ |
(2)运行结果
(3)总结
flatMap的特性决定了这个算子在对需要随时增加元素的时候十分好用,比如在对源RDD查漏补缺时。
map和flatMap都是依次进行参数传递的,但有时候需要RDD中的两个元素进行相应操作时(例如:算存款所得时,下一个月所得的利息是要原本金加上上一个月所得的本金的),这两个算子便无法达到目的了,这是便需要mapPartitions算子,他传参的方式是将整个RDD传入,然后将一个迭代器传出生成一个新的RDD,由于整个RDD都传入了,所以便能完成前面说的业务。
C、mapPartitions
(1)使用scala进行编写
1 | def mapParations(): Unit ={ |
(2)运行结果
D、mapPartitionsWithIndex
每次获取和处理的就是一个分区的数据,并且知道处理的分区号。
(1)使用scala进行编写
1 | def mapPartitionsWithIndex(): Unit ={ |
(2)运行结果
2、reduce, reduceByKey
A、reduce
reduce其实是讲RDD中的所有元素进行合并,当运行call方法时,会传入两个参数,在call方法中将两个参数合并后返回,而这个返回值回合一个新的RDD中的元素再次传入call方法中,继续合并,直到合并到只剩下一个元素时。
(1)使用scala进行编写
1 | def reduce(): Unit ={ |
(2)运行结果
B、reduceByKey
reduceByKey仅将RDD中所有K,V对中K值相同的V进行合并。
(1)使用scala进行编写
1 | def reduceByKey(): Unit ={ |
(2)运行结果
3、union, join, groupByKey
A、union
当要将两个RDD合并时,便要用到union和join,其中union只是简单的将两个RDD累加起来,可以看做List的addAll方法。就想List中一样,当使用union及join时,必须保证两个RDD的泛型是一致的。
(1)使用scala进行编写
1 | def union(): Unit ={ |
(2)运行结果
B、join
join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合
(1)使用scala进行编写
1 | def join(): Unit = { |
(2)运行结果
C、groupByKey
union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:groupBy是将RDD中的元素进行分组,组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:
(1)使用scala进行编写
1 | def groupByKey(): Unit ={ |
(2)运行结果
4、sample, cartesian
A、sample
(1)使用scala进行编写
1 | def sample(): Unit ={ |
(2)运行结果
B、cartesian
cartesian是用于求笛卡尔积的
(1)使用scala进行编写
1 | def cartesian(): Unit ={ |
(2)运行结果
5、filter, distinct, intersection
A、filter
(1)使用scala进行编写
1 | def filter(): Unit ={ |
(2)运行结果
B、distinct
(1)使用scala进行编写
1 | def distinct(): Unit ={ |
(2)运行结果
C、intersection
(1)使用scala进行编写
1 | def intersection(): Unit ={ |
(2)运行结果
6、coalesce, replication, repartition, SortWithinPartitions
A、coalesce
分区数由多 -> 变少
(1)使用scala进行编写
1 | def coalesce(): Unit = { |
(2)运行结果
B、replication
进行重分区,解决的问题:本来分区数少 -> 增加分区数
(1)使用scala进行编写
1 | def replication(): Unit ={ |
(2)运行结果
C、repartitionAndSortWithinPartitions
repartitionAndSortWithinPartitions函数是repartition函数的变种,与repartition函数不同的是,repartitionAndSortWithinPartitions在给定的partitioner内部进行排序,性能比repartition要高。
(1)使用scala进行编写
1 | def repartitionAndSortWithinPartitions(): Unit ={ |
(2)运行结果
7、cogroup, sortBykey, aggregateByKey
A、cogroup
对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并。
(1)使用scala进行编写
1 | def cogroup(): Unit ={ |
(2)运行结果
B、sortByKey
sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。
(1)使用scala进行编写
1 | def sortByKey(): Unit ={ |
(2)运行结果
C、aggregateByKey
aggregateByKey函数对PairRDD中相同Key的值进行聚合操作,在聚合过程中同样使用了一个中立的初始值。和aggregate函数类似,aggregateByKey返回值的类型不需要和RDD中value的类型一致。因为aggregateByKey是对相同Key中的值进行聚合操作,所以aggregateByKey函数最终返回的类型还是Pair RDD,对应的结果是Key和聚合好的值;而aggregate函数直接是返回非RDD的结果,这点需要注意。在实现过程中,定义了三个aggregateByKey函数原型,但最终调用的aggregateByKey函数都一致。
(1)使用scala进行编写
1 | def aggregateByKey(): Unit ={ |
(2)运行结果