“以道佐人主者,不以兵强天下。
其事好远。
师之所处,荆棘生焉;大军之后,必有凶年。
善有果而已,不敢以取强。
果而勿矜,果而勿伐,果而勿骄。
果而不得已,果而勿强。
物壮则老,是谓不道,不道早已。”1
spark内置了非常多有用的算子(方法),通过对这些算子的组合就可以完成业务需要的功能,spark的编程归根结底就是对spark算子的使用,因此非常有必要对这些内置算子进行详细的归纳。
spark算子在大的方向上可以分为两类:
| 名称 | 说明 |
|---|---|
| Transformation | 变换、转换算子:不触发提交作业,只是完成作业中间过程处理;Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。Transformation参数类型为value或者key-value的形式。 |
| Action | 行动算子:触发SparkContext提交job作业。Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。 |
上表中提到Transformation的参数类型分为两类:value和key-value形式,对其归纳如下:
value 类型
| 细类型 | 算子 |
|---|---|
| 输入分区与输出分区一对一型 | map flatMap mapPartitions glom |
| 输入分区与输出分区多对一型 | union cartesain |
| 输入分区与输出分区多对多型 | groupBy |
| 输出分区为输入分区子集型 | filter distinct substract sample takeSample |
| Cache型 | cache persist |
| #### key-value类型 | |
| 细类型 | 算子 |
| :—— | :—— |
| 输入分区与输出分区一对一 | mapValues |
| 对单个RDD或两个RDD聚集 | 单个RDD聚集: combineByKey reduceByKey partitionBy 两个RDD聚集: Cogroup |
| 连接 | join leftOutJoin和 rightOutJoin |
Action算子
| 细类型 | 算子 |
|---|---|
| 无输出 | foreach |
| HDFS | saveAsTextFile saveAsObjectFile |
| Scala集合和数据类型 | collect collectAsMap reduceByKeyLocally lookup count top reduce fold aggregate |
接下来,以pyspark为编译环境对上述算子(不限于)进行详细解析。
1 | Welcome to |
2 | ____ __ |
3 | / __/__ ___ _____/ /__ |
4 | _\ \/ _ \/ _ `/ __/ '_/ |
5 | /__ / .__/\_,_/_/ /_/\_\ version 2.3.0.cloudera2 |
6 | /_/ |
7 | |
8 | Using Python version 2.7.5 (default, Aug 4 2017 00:39:18) |
9 | SparkSession available as 'spark'. |
map
map(f, preservesPartitioning=False)
map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,RDD之间的元素是一对一关系。
1 | x = sc.parallelize([1,2,3,4]) |
2 | y = x.map(lambda x:(x**3)) |
3 | y.collect() |
4 | [1, 8, 27, 64] |
filter
filter(f)
对RDD元素进行过滤,返回一个新的数据集,由经过func函数后返回值为true的原元素组成。
1 | y = x.filter(lambda x:x>2) |
2 | y.collect() |
3 | [3, 4] |
flatMap
flatMap(f, preservesPartitioning=False)
类似于map,但是每一个输入元素会被映射为0到多个输入元素,RDD之间的元素是一对多关系。
1 | y = x.flatMap(lambda x:(x,x*100,x**2)) |
2 | y.collect() |
3 | [1, 100, 1, 2, 200, 4, 3, 300, 9, 4, 400, 16] |
glom
glom()
返回一个RDD,它将每个分区中的所有元素合并到一个列表中。
1 | a = sc.parallelize([1,2,3,4],2) //第二个参数2,表示数据集切分的份数(slices)。Spark将会在集群上为每一份数据起一个任务。 |
2 | y = a.glom() |
3 | y.collect() |
4 | [[1, 2], [3, 4]] |
mapPartitions
mapPartitions(f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD.
1 | xx = sc.parallelize([1,2,3,4], 2) |
2 | def f(iter): |
3 | yield sum(iter) |
4 | ... |
5 | yy = xx.mapPartitions(f) |
6 | print 'xx原来分区信息:{0}'.format(xx.glom().collect()) |
7 | xx原来分区信息:[[1, 2], [3, 4]] |
8 | print 'xx经过f计算后的结果:{}'.format(yy.glom().collect()) |
9 | xx经过f计算后的结果:[[3], [7]] |
关于yield的使用,请参考我的另一篇笔记《Python易筋经-yield when data is bigger》
mapPartitionsWithIndex
mapPartitionsWithIndex(f, preservesPartitioning=False)
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
和mapPartitions类似,也是针对每个分区处理,但是func函数需要两个入参,第一个表示partition分区索引,第二个入参表示每个分区的迭代器。
1 | x = sc.parallelize([1, 2, 3, 4], 2) |
2 | def f(splitIndex, iterator): yield (splitIndex, sum(iterator)) |
3 | ... |
4 | y = x.mapPartitionsWithIndex(f) |
5 | print 'x原来分区信息:{0}'.format(x.glom().collect()) |
6 | x原来分区信息:[[1, 2], [3, 4]] |
7 | print 'x经过f计算后的结果:{}'.format(y.glom().collect()) |
8 | x经过f计算后的结果:[[(0, 3)], [(1, 7)]] |
getNumsPartitions
getNumPartitions()
Returns the number of partitions in RDD
1 | rdd = sc.parallelize([1, 2, 3, 4], 2) |
2 | print '分区有{}个'.format(rdd.getNumPartitions()) |
3 | 分区有2 |
distinct
distinct(numPartitions=None)
Return a new RDD containing the distinct elements in this RDD.
1 | res = sorted(sc.parallelize([1, 1, 1, 2, 3, 2, 3]).distinct().collect()) |
2 | print '去重后的结果:{}'.format(res) |
3 | 去重后的结果:[1, 2, 3] |
sample
sample(withReplacement, fraction, seed=None)
Return a sampled subset of this RDD.
Parameters:
withReplacement – can elements be sampled multiple times (replaced when sampled out)
fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
seed – seed for the random number generator
对原RDD进行采样,其中withReplacement表示是否有放回的抽样,fraction表示采样大小是原RDD的百分比,seed表示随机数生成器。fraction和seed相同,则每次返回的数据相同。
1 | rdd = sc.parallelize(range(100), 4) |
2 | y = rdd.sample(False, 0.1, 81) |
3 | y.collect() |
4 | [4, 27, 40, 42, 43, 60, 76, 80, 86, 97] |
5 | y.count() |
6 | 10 |
union 并集
union(other)
Return the union of this RDD and another one.
1 | rdd = sc.parallelize([1, 1, 2, 3]) |
2 | rdd1 = sc.parallelize([5, 3, 4, 6]) |
3 | print rdd.union(rdd1).collect() |
4 | [1, 1, 2, 3, 5, 3, 4, 6] |
intersection 交集
intersection(other)
Note that this method performs a shuffle internally.
1 | rdd = sc.parallelize([1, 1, 2, 3]) |
2 | rdd1 = sc.parallelize([5, 3, 4, 6]) |
3 | print rdd.intersection(rdd1).collect() |
4 | [3] |
sortByKey
sortByKey(ascending=True, numPartitions=None, keyfunc=func)
Sorts this RDD, which is assumed to consist of (key, value) pairs.
1 | tmp = [('a', 1), ('f', 2), ('d', 3), ('c', 4), ('b', 5)] |
2 | rdd = sc.parallelize(tmp, 2) |
3 | print rdd.glom().collect() |
4 | [[('a', 1), ('f', 2)], [('d', 3), ('c', 4), ('b', 5)]] |
5 | sort1 = rdd.sortByKey(True,1).glom().collect() |
6 | sort2 = rdd.sortByKey(True,3).glom().collect() |
7 | print sort1 |
8 | [[('a', 1), ('b', 5), ('c', 4), ('d', 3), ('f', 2)]] |
9 | print sort2 |
10 | [[('a', 1), ('b', 5)], [('c', 4), ('d', 3)], [('f', 2)]] |
sortBy
sortBy(keyfunc, ascending=True, numPartitions=None)
跟sortByKey类似,用索引的方式指定根据什么sort
1 | tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] |
2 | sc.parallelize(tmp).sortBy(lambda x: x[0]).collect() |
3 | [('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)] |
4 | sc.parallelize(tmp).sortBy(lambda x: x[1]).collect() |
5 | [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)] |
cartesian
cartesian(other)
返回两个rdd的笛卡尔积。
1rdd = sc.parallelize([1, 2])2rdd_1 = sc.parallelize([3,4])3rdd.cartesian(rdd_1).collect()4[(1, 3), (1, 4), (2, 3), (2, 4)]
groupBy
groupBy(f, numPartitions=None, partitionFunc=
)
根据f逻辑结果进行分组。
1 | rdd = sc.parallelize([1, 1, 2, 3, 5, 8]) |
2 | result = rdd.groupBy(lambda x: x % 2).collect() |
3 | sorted([(x, sorted(y)) for (x, y) in result]) |
4 | [(0, [2, 8]), (1, [1, 1, 3, 5])] |
groupByKey
groupByKey(numPartitions=None, partitionFunc=
)
groupByKey([numTasks])是数据分组操作,在一个由(K, V)键值对组成的数据集上调用,返回一个(K, Seq[V])对的数据集。
注意,如果要对每个键执行聚合(比如求和或平均值),使用reduceByKey或aggregateByKey将提供更好的性能。
1 | rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
2 | sorted(rdd.groupByKey().mapValues(len).collect()) |
3 | [('a', 2), ('b', 1)] |
4 | sorted(rdd.groupByKey().mapValues(list).collect()) |
5 | [('a', [1, 1]), ('b', [1])] |
pipe
pipe(command, env=None, checkCode=False
将由管道元素创建的RDD返回到一个forked外部进程。
参数:checkCode——是否检查shell命令的返回值。
1 | sc.parallelize(['1', '2', '', '3']).pipe('cat').collect() |
2 | [u'1', u'2', u'', u'3'] |
3 | |
4 | rdd = sc.parallelize(['spark2.3.0', 'kafka', 'hbase']) |
5 | rdd2 = rdd.pipe('grep -i "ar"') |
6 | print '经过pipe处理过后的数据:{}'.format(rdd2.collect()) |
7 | 经过pipe处理过后的数据:[u'spark2.3.0'] |
foreach
foreach(f)
Applies a function to all elements of this RDD.
1 | def f(x): print(x) |
2 | ... |
3 | sc.parallelize([1, 2, 3, 4, 5]).foreach(f) |
max, min, sum, count
1 | x = sc.parallelize(range(10)) |
2 | x.collect() |
3 | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] |
4 | print '最大值:{}'.format(x.max()) |
5 | 最大值:9 |
6 | print '最小值:{}'.format(x.min()) |
7 | 最小值:0 |
8 | print '总和:{}'.format(x.sum()) |
9 | 总和:45 |
10 | print '总个数:{}'.format(x.count()) |
11 | 总个数:10 |
mean, variance, sampleVariance, stdev, sampleStdev
1 | x.collect() |
2 | [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] |
3 | print '平均值:{}'.format(x.mean()) |
4 | 平均值:4.5 |
5 | print '方差:{}'.format(x.variance()) |
6 | 方差:8.25 |
7 | print '样本方差:{}'.format(x.sampleVariance()) |
8 | 样本方差:9.16666666667 |
9 | print '总体标准偏差:{}'.format(x.stdev()) |
10 | 总体标准偏差:2.87228132327 |
11 | print '样本标准偏差:{}'.format(x.sampleStdev()) |
12 | 样本标准偏差:3.0276503541 |
countByKey, countByValue
countByKey()
Count the number of elements for each key, and return the result to the master as a dictionary.
countByValue()
Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.
1 | rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
2 | sorted(rdd.countByKey().items()) |
3 | [('a', 2), ('b', 1)] |
4 | |
5 | sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items()) |
6 | [(1, 2), (2, 3)] |
first, top, take, takeOrdered
first()
top(num, key=None)
从按降序排列的RDD中获取前N个元素,或者有可选的key函数决定顺序。
take(num)
Take the first num elements of the RDD.
akeOrdered(num, key=None)
从按升序排列的RDD中获取N个元素,或者由可选key函数指定。
1 | rdd = sc.parallelize([10, 4, 2, 12, 3]) |
2 | rdd.first() |
3 | 10 |
4 | rdd.top(1) |
5 | [12] |
6 | rdd.top(2) |
7 | [12, 10] |
8 | rdd.top(2,key=str) |
9 | [4, 3] |
10 | rdd.take(3) |
11 | [10, 4, 2] |
subtract
subtract(other, numPartitions=None)
Return each value in self that is not contained in other.
1 | x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) |
2 | y = sc.parallelize([("a", 3), ("c", None)]) |
3 | sorted(x.subtract(y).collect()) |
4 | [('a', 1), ('b', 4), ('b', 5)] |
cache
cache()
Persist this RDD with the default storage level (MEMORY_ONLY).
presist
persist()
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).
cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
reduce
reduce(f)
Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.
1 | from operator import add |
2 | sc.parallelize([1, 2, 3, 4, 5]).reduce(add) |
3 | 15 |
4 | sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add) |
5 | 10 |
reduceByKey
reduceByKey(func, numPartitions=None, partitionFunc=
)
Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.
把一个函数作用在一个RDD上,这个函数必须接收两个参数,reduce把结果继续和RDD的下一个元素做累积计算。注意:reduceByKey的函数是针对具有相同key的二元组
1 | from operator import add |
2 | rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) |
3 | sorted(rdd.reduceByKey(add).collect()) |
4 | [('a', 2), ('b', 1)] |
5 | |
6 | num = sc.parallelize([1, 4, 2, 3, 4, 4, 2, 4]) |
7 | pairs = num.map(lambda x: (x, 1)) |
8 | pairs.collect() |
9 | [(1, 1), (4, 1), (2, 1), (3, 1), (4, 1), (4, 1), (2, 1), (4, 1)] |
10 | a = pairs.reduceByKey(lambda x, y: x+y+1) |
11 | a.collect() |
12 | [(2, 3), (4, 7), (1, 1), (3, 1)] |
13 | b = pairs.reduceByKey(lambda x, y: x+y+2) |
14 | b.collect() |
15 | [(2, 4), (4, 10), (1, 1), (3, 1) |
1:老子《道德经》第三十章,老子故里,中国鹿邑。