0%

“天长地久。
天地所以能长且久者,以其不自生,故能长生。
是以圣人后其身而身先,外其身而身存。
非以其无私耶?
故能成其私。”1

jdk-SPI (Service Provider Interface)

SPI (Service Provider Interface),一种服务发现机制,为某个接口寻找服务实现的机制。dubbo-SPI就是基于此并有所加强。
在dubbo的源码中,像如下代码段会随处可见:

1
ExtensionLoader.getExtensionLoader(WrappedExt.class).getExtension("XXX");

这便是duboo-SPI的使用方式。

dubbo-SPI

特性:

  • 自动包装
  • 自动装配
  • 自适应
  • 自动激活

代码结构如下:

1
com.alibaba.dubbo.common.extension  
2
 |  
3
 |--factory  
4
 |     |--AdaptiveExtensionFactory     
5
 |     |--SpiExtensionFactory    
6
 |     |--SpringExtensionFactory      
7
 |  
8
 |--support  
9
 |     |--ActivateComparator  
10
 |  
11
 |--Activate  #自动激活加载扩展的注解  
12
 |--Adaptive  #自适应扩展点的注解  
13
 |--ExtensionFactory  #扩展点对象生成工厂接口  
14
 |--ExtensionLoader   #扩展点加载器,扩展点的查找,校验,加载等核心逻辑的实现类  
15
 |--SPI   #扩展点注解

需要我们最为关心的就是ExtensionLoader,几乎所有特性都在这个类中实现。
ExtensionLoader没有提供public的构造方法,但是提供了一个public static的getExtensionLoader,这个方法就是获取ExtensionLoader实例的工厂方法。其public成员方法中有三个比较重要的方法:

  • getActivateExtension :根据条件获取当前扩展可自动激活的实现
  • getExtension : 根据名称获取当前扩展的指定实现
  • getAdaptiveExtension : 获取当前扩展的自适应实现

    ExtensionLoader

    想要获取某个扩展的实现,首先要获取到该扩展对应的ExtensionLoader实例。
    1
    public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    2
            if (type == null)
    3
                throw new IllegalArgumentException("Extension type == null");
    4
            if (!type.isInterface()) { //接口判断
    5
                throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    6
            }
    7
            if (!withExtensionAnnotation(type)) { //注解判断
    8
                throw new IllegalArgumentException("Extension type(" + type +
    9
                        ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    10
            }
    11
            //从静态缓存中获取
    12
            ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    13
            if (loader == null) {
    14
                //为Extension类型创建ExtensionLoader实例
    15
                EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
    16
                loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    17
            }
    18
            return loader;
    19
    }
    上述方法需要一个Class类型的参数,该参数表示希望加载的扩展点类型,该参数必须是接口,且该接口必须被@SPI注解注释,否则拒绝处理。检查通过之后首先会检查ExtensionLoader缓存中是否已经存在该扩展对应的ExtensionLoader,如果有则直接返回,否则创建一个新的ExtensionLoader负责加载该扩展实现,同时将其缓存起来。所以对于每一个扩展,dubbo中只会有一个对应的ExtensionLoader实例。
    1
    private ExtensionLoader(Class<?> type) {
    2
            this.type = type;
    3
            objectFactory = (type == ExtensionFactory.class ? null : ExtensionLoader.getExtensionLoader(ExtensionFactory.class).getAdaptiveExtension());
    4
    }
    从ExtensionLoader的构造函数中可以看到,如果要加载的扩展点类型是ExtensionFactory是,objectFactory字段被设置为null。由于ExtensionLoader的使用范围有限(基本上局限在ExtensionLoader中),因此对他做了特殊对待:在需要使用ExtensionFactory的地方,都是通过对应的自适应实现来代替。
    objectFactory属性是一个ExtensionFactory类型,ExtensionFactory主要用于加载扩展的实现:
    1
    public interface ExtensionFactory {
    2
    3
        /**
    4
         * Get extension.
    5
         *
    6
         * @param type object type.
    7
         * @param name object name.
    8
         * @return object instance.
    9
         */
    10
        <T> T getExtension(Class<T> type, String name);
    11
    12
    }
    ExtensionFactory被@SPI注解注释,说明他也是一个扩展点,dubbo内部提供了三个实现类:SpiExtensionFactory 、AdaptiveExtensionFactory以及SpringExtensionFactory,不同的实现可以已不同的方式来完成扩展点实现的加载。
    默认的ExtensionFactory实现中,AdaptiveExtensionFactotry被@Adaptive注解,它就是ExtensionFactory对应的自适应扩展实现(每个扩展点最多只能有一个自适应实现,如果所有实现中没有被@Adaptive注释的,那么dubbo会动态生成一个自适应实现类),也就是说,所有对ExtensionFactory调用的地方,实际上调用的都是AdpativeExtensionFactory,那么我们看下他的实现代码:
    1
    @Adaptive
    2
    public class AdaptiveExtensionFactory implements ExtensionFactory {
    3
    4
        private final List<ExtensionFactory> factories;
    5
    6
        public AdaptiveExtensionFactory() {
    7
            ExtensionLoader<ExtensionFactory> loader = ExtensionLoader.getExtensionLoader(ExtensionFactory.class);
    8
            List<ExtensionFactory> list = new ArrayList<ExtensionFactory>();
    9
            // 将所有ExtensionFactory实现保存起来  
    10
            for (String name : loader.getSupportedExtensions()) {
    11
                list.add(loader.getExtension(name));
    12
            }
    13
            factories = Collections.unmodifiableList(list);
    14
        }
    15
    16
        public <T> T getExtension(Class<T> type, String name) {
    17
            // 依次遍历各个ExtensionFactory实现的getExtension方法,一旦获取到Extension即返回  
    18
            // 如果遍历完所有的ExtensionFactory实现均无法找到Extension,则返回null
    19
            for (ExtensionFactory factory : factories) {
    20
                T extension = factory.getExtension(type, name);
    21
                if (extension != null) {
    22
                    return extension;
    23
                }
    24
            }
    25
            return null;
    26
        }
    27
    28
    }
    他会遍历当前系统中所有的ExtensionFactory实现来获取指定的扩展实现,获取到扩展实现或遍历完所有的ExtensionFactory实现。这里调用了ExtensionLoader的getSupportedExtensions方法来获取ExtensionFactory的所有实现,又回到了ExtensionLoader类。

方法代码分析

  • getExtension
    1
    getExtension(name)  
    2
        -> createExtension(name) #如果无缓存则创建  
    3
            -> getExtensionClasses().get(name) #获取name对应的扩展类型  
    4
            -> 实例化扩展类  
    5
            -> injectExtension(instance) # 扩展点注入  
    6
            -> instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance)) #循环遍历所有wrapper实现,实例化wrapper并进行扩展点注入
  • getAdaptiveExtension
    1
    public T getAdaptiveExtension()  
    2
        -> createAdaptiveExtension() #如果无缓存则创建  
    3
            -> getAdaptiveExtensionClass().newInstance() #获取AdaptiveExtensionClass  
    4
                -> getExtensionClasses() # 加载当前扩展所有实现,看是否有实现被标注为@Adaptive  
    5
                -> createAdaptiveExtensionClass() #如果没有实现被标注为@Adaptive,则动态创建一个Adaptive实现类  
    6
                    -> createAdaptiveExtensionClassCode() #动态生成实现类java代码  
    7
                    -> compiler.compile(code, classLoader) #动态编译java代码,加载类并实例化  
    8
            -> injectExtension(instance)

能在本地把代码跑起来,debug状态能更好的跟踪代码的执行,dubbo源代码中提供了非常详细的测试用例,基本涵盖了所有的使用场景,可以本地试试,大有裨益!

1
com.alibaba.dubbo.common.extensionloader.ExtensionLoaderTest

1:老子《道德经》第七章,老子故里,中国鹿邑。

“将欲取天下而为之,吾见其不得已。
天下神器,不可为也。
为者败之,执者失之。
故物或行或随;或嘘或吹,或强或羸,或挫或隳。
是以圣人去甚,去奢,去泰。”1

本文以spark2.3.0版本(on YARN)为主,可以移步spark2.3.0官方了解更多。
你可以通过:

spark2-submit –help

来查看详细的参数配置说明。

1
spark2-submit  \
2
  --master yarn \
3
  --deploy-mode cluster \
4
  --num-executors 48 \
5
  --driver-memory 2g \
6
  --executor-memory 7g \
7
  --executor-cores 3 \
8
  /home/data/demo/spark/sparkwordcount.jar \
9
  --class com.cgoshine.sh.demo.SparkWordCount  \
10
  [application-arguments]

–master

master url 含义
local 使用1个worker线程在本地运行spark应用程序
local[k] 使用k个worker线程在本地运行spark应用程序
local[*] 使用剩余可用的所有worker线程在本地运行spark应用程序
spark://host:port 连接到Spark Standalone集群,以便在该集群上运行Spark应用程序
yarn Connect to a YARN cluster in client or cluster mode depending on the value of –deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.

Spark使用下面几种URI来处理文件的传播:

  • file:// 使用file://和绝对路径,是由driver的HTTP server来提供文件服务,各个executor从driver上拉回文件。
  • hdfs:, http:, https:, ftp: executor直接从URL拉回文件
  • local: executor本地本身存在的文件,不需要拉回;也可以是通过NFS网络共享的文件。

其他参数说明:

参数 含义
–name 应用程序名称
–proxy-user 模拟提交作业的用户
–conf 以key=value的方式对其他参数进行配置,例子见文末。
–queue yarn-only,提交应用程序给哪个YARN的队列,默认是default队列
–num-executors yarn-only,启动executor的数量,默认2个。
–executor-cores yarn-only,每个executor使用的内核数,默认1个。
–archives ARCHIVES yarn-only,被每个executor提取到工作目录的档案列表,用逗号隔开。提交python作业的时候其依赖的包可以用这种形式。
–driver-memory driver内存大小,默认512M
–executor-memory executor内存大小,默认1G
–verbose 打印debug信息,生成更详细的运行信息以做参考,可以知道配置是如何加载的。

资源参数调优

Spark资源参数调优,其实主要就是对Spark运行过程中各个使用资源的地方,通过调节各种参数,来优化资源使用的效率,从而提升Spark作业的执行性能。

参数 说明 优化建议
num-executors 该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照你的设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。 参数调优建议:每个Spark作业的运行一般设置50~100个左右的Executor进程比较合适,设置太少或太多的Executor进程都不好。设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。
executor-memory 参数说明:该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常,也有直接的关联。 参数调优建议:每个Executor进程的内存设置4G-8G较为合适。但是这只是一个参考值,具体的设置还是得根据不同部门的资源队列来定。可以看看自己团队的资源队列的最大内存限制是多少,num-executors乘以executor-memory,是不能超过队列的最大内存量的。此外,如果你是跟团队里其他人共享这个资源队列,那么申请的内存量最好不要超过资源队列最大总内存的1/3~1/2,避免你自己的Spark作业占用了队列所有的资源,导致别的同学的作业无法运行。
executor-cores 该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。 Executor的CPU core数量设置为24个较为合适。同样得根据不同部门的资源队列来定,可以看看自己的资源队列的最大CPU core限制是多少,再依据设置的Executor数量,来决定每个Executor进程可以分配到几个CPU core。同样建议,如果是跟他人共享这个队列,那么num-executors * executor-cores不要超过队列总CPU core的1/31/2左右比较合适,也是避免影响其他同学的作业运行。
driver-memory 该参数用于设置Driver进程的内存。 Driver的内存通常来说不设置,或者设置1G左右应该就够了。唯一需要注意的一点是,如果需要使用collect算子将RDD的数据全部拉取到Driver上进行处理,那么必须确保Driver的内存足够大,否则会出现OOM内存溢出的问题。
spark.default.parallelism 该参数用于设置每个stage的默认task数量。这个参数极为重要,如果不设置可能会直接影响你的Spark作业性能。 Spark作业的默认task数量为5001000个较为合适。很多同学常犯的一个错误就是不去设置这个参数,那么此时就会导致Spark自己根据底层HDFS的block数量来设置task的数量,默认是一个HDFS block对应一个task。通常来说,Spark默认设置的数量是偏少的(比如就几十个task),如果task数量偏少的话,就会导致你前面设置好的Executor的参数都前功尽弃。试想一下,无论你的Executor进程有多少个,内存和CPU有多大,但是task只有1个或者10个,那么90%的Executor进程可能根本就没有task执行,也就是白白浪费了资源!因此Spark官网建议的设置原则是,设置该参数为num-executors * executor-cores的23倍较为合适,比如Executor的总CPU core数量为300个,那么设置1000个task是可以的,此时可以充分地利用Spark集群的资源。
spark.storage.memoryFraction 该参数用于设置RDD持久化数据在Executor内存中能占的比例,默认是0.6。也就是说,默认Executor 60%的内存,可以用来保存持久化的RDD数据。根据你选择的不同的持久化策略,如果内存不够时,可能数据就不会持久化,或者数据会写入磁盘 如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
spark.shuffle.memoryFraction 该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。 如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。资源参数的调优,没有一个固定的值,需要根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),
合理地设置上述参数。

示例:

1
spark-submit \
2
  --master yarn \
3
  --deply-mode cluster \
4
  --num-executors 100 \
5
  --executor-memory 4G \
6
  --executor-cores 4 \
7
  --driver-memory 1G \
8
  --conf spark.default.parallelism=1000 \
9
  --conf spark.storage.memoryFraction=0.5 \
10
  --conf spark.shuffle.memoryFraction=0.3 \

1:老子《道德经》第二十九章,老子故里,中国鹿邑。

“以道佐人主者,不以兵强天下。
其事好远。
师之所处,荆棘生焉;大军之后,必有凶年。
善有果而已,不敢以取强。
果而勿矜,果而勿伐,果而勿骄。
果而不得已,果而勿强。
物壮则老,是谓不道,不道早已。”1

spark内置了非常多有用的算子(方法),通过对这些算子的组合就可以完成业务需要的功能,spark的编程归根结底就是对spark算子的使用,因此非常有必要对这些内置算子进行详细的归纳。
spark算子在大的方向上可以分为两类:

名称 说明
Transformation 变换、转换算子:不触发提交作业,只是完成作业中间过程处理;Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。Transformation参数类型为value或者key-value的形式。
Action 行动算子:触发SparkContext提交job作业。Action 算子会触发 Spark 提交作业(Job),并将数据输出 Spark系统。

上表中提到Transformation的参数类型分为两类:value和key-value形式,对其归纳如下:

value 类型

细类型 算子
输入分区与输出分区一对一型 map
flatMap
mapPartitions
glom
输入分区与输出分区多对一型 union
cartesain
输入分区与输出分区多对多型 groupBy
输出分区为输入分区子集型 filter
distinct
substract
sample
takeSample
Cache型 cache
persist
#### key-value类型
细类型 算子
:—— :——
输入分区与输出分区一对一 mapValues
对单个RDD或两个RDD聚集 单个RDD聚集: combineByKey reduceByKey partitionBy
两个RDD聚集: Cogroup
连接 join
leftOutJoin和 rightOutJoin

Action算子

细类型 算子
无输出 foreach
HDFS saveAsTextFile
saveAsObjectFile
Scala集合和数据类型 collect
collectAsMap
reduceByKeyLocally
lookup
count
top
reduce
fold
aggregate

接下来,以pyspark为编译环境对上述算子(不限于)进行详细解析。

1
Welcome to
2
      ____              __
3
     / __/__  ___ _____/ /__
4
    _\ \/ _ \/ _ `/ __/  '_/
5
   /__ / .__/\_,_/_/ /_/\_\   version 2.3.0.cloudera2
6
      /_/
7
8
Using Python version 2.7.5 (default, Aug  4 2017 00:39:18)
9
SparkSession available as 'spark'.

map

map(f, preservesPartitioning=False)

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD,RDD之间的元素是一对一关系。

1
>>> x = sc.parallelize([1,2,3,4])
2
>>> y = x.map(lambda x:(x**3))
3
>>> y.collect()
4
[1, 8, 27, 64]

filter

filter(f)

对RDD元素进行过滤,返回一个新的数据集,由经过func函数后返回值为true的原元素组成。

1
>>> y = x.filter(lambda x:x>2)
2
>>> y.collect()
3
[3, 4]

flatMap

flatMap(f, preservesPartitioning=False)

类似于map,但是每一个输入元素会被映射为0到多个输入元素,RDD之间的元素是一对多关系。

1
>>> y = x.flatMap(lambda x:(x,x*100,x**2))
2
>>> y.collect()
3
[1, 100, 1, 2, 200, 4, 3, 300, 9, 4, 400, 16]

glom

glom()

返回一个RDD,它将每个分区中的所有元素合并到一个列表中。

1
>>> a = sc.parallelize([1,2,3,4],2) //第二个参数2,表示数据集切分的份数(slices)。Spark将会在集群上为每一份数据起一个任务。
2
>>> y = a.glom()
3
>>> y.collect()
4
[[1, 2], [3, 4]]

mapPartitions

mapPartitions(f, preservesPartitioning=False)

Return a new RDD by applying a function to each partition of this RDD.

1
>>> xx = sc.parallelize([1,2,3,4], 2)
2
>>> def f(iter):
3
...     yield sum(iter)
4
...
5
>>> yy = xx.mapPartitions(f)
6
>>> print 'xx原来分区信息:{0}'.format(xx.glom().collect())
7
xx原来分区信息:[[1, 2], [3, 4]]                                                 
8
>>> print 'xx经过f计算后的结果:{}'.format(yy.glom().collect())
9
xx经过f计算后的结果:[[3], [7]]

关于yield的使用,请参考我的另一篇笔记《Python易筋经-yield when data is bigger》

mapPartitionsWithIndex

mapPartitionsWithIndex(f, preservesPartitioning=False)

Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition.
和mapPartitions类似,也是针对每个分区处理,但是func函数需要两个入参,第一个表示partition分区索引,第二个入参表示每个分区的迭代器。

1
>>> x = sc.parallelize([1, 2, 3, 4], 2)
2
>>> def f(splitIndex, iterator): yield (splitIndex, sum(iterator))
3
...
4
>>> y = x.mapPartitionsWithIndex(f)
5
>>> print 'x原来分区信息:{0}'.format(x.glom().collect())
6
x原来分区信息:[[1, 2], [3, 4]]                                                 
7
>>> print 'x经过f计算后的结果:{}'.format(y.glom().collect())
8
x经过f计算后的结果:[[(0, 3)], [(1, 7)]]

getNumsPartitions

getNumPartitions()

Returns the number of partitions in RDD

1
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
2
>>> print '分区有{}个'.format(rdd.getNumPartitions())
3
分区有2

distinct

distinct(numPartitions=None)

Return a new RDD containing the distinct elements in this RDD.

1
>>> res = sorted(sc.parallelize([1, 1, 1, 2, 3, 2, 3]).distinct().collect())
2
>>> print '去重后的结果:{}'.format(res)                                        
3
去重后的结果:[1, 2, 3]

sample

sample(withReplacement, fraction, seed=None)

Return a sampled subset of this RDD.
Parameters:
withReplacement – can elements be sampled multiple times (replaced when sampled out)

fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0

seed – seed for the random number generator
对原RDD进行采样,其中withReplacement表示是否有放回的抽样,fraction表示采样大小是原RDD的百分比,seed表示随机数生成器。fraction和seed相同,则每次返回的数据相同

1
>>> rdd = sc.parallelize(range(100), 4)
2
>>> y = rdd.sample(False, 0.1, 81)
3
>>> y.collect()
4
[4, 27, 40, 42, 43, 60, 76, 80, 86, 97]                                         
5
>>> y.count()
6
10

union 并集

union(other)

Return the union of this RDD and another one.

1
>>> rdd = sc.parallelize([1, 1, 2, 3])
2
>>> rdd1 = sc.parallelize([5, 3, 4, 6])
3
>>> print rdd.union(rdd1).collect()
4
[1, 1, 2, 3, 5, 3, 4, 6]

intersection 交集

intersection(other)

Note that this method performs a shuffle internally.

1
>>> rdd = sc.parallelize([1, 1, 2, 3])
2
>>> rdd1 = sc.parallelize([5, 3, 4, 6])
3
>>> print rdd.intersection(rdd1).collect()
4
[3]

sortByKey

sortByKey(ascending=True, numPartitions=None, keyfunc=func)

Sorts this RDD, which is assumed to consist of (key, value) pairs.

1
>>> tmp = [('a', 1), ('f', 2), ('d', 3), ('c', 4), ('b', 5)]
2
>>> rdd = sc.parallelize(tmp, 2)
3
>>> print rdd.glom().collect()
4
[[('a', 1), ('f', 2)], [('d', 3), ('c', 4), ('b', 5)]]                          
5
>>> sort1 = rdd.sortByKey(True,1).glom().collect()
6
>>> sort2 = rdd.sortByKey(True,3).glom().collect()
7
>>> print sort1
8
[[('a', 1), ('b', 5), ('c', 4), ('d', 3), ('f', 2)]]
9
>>> print sort2
10
[[('a', 1), ('b', 5)], [('c', 4), ('d', 3)], [('f', 2)]]

sortBy

sortBy(keyfunc, ascending=True, numPartitions=None)

跟sortByKey类似,用索引的方式指定根据什么sort

1
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
2
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
3
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]                              
4
>>> sc.parallelize(tmp).sortBy(lambda x: x[1]).collect()
5
[('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]

cartesian

cartesian(other)

返回两个rdd的笛卡尔积。

1
>>> rdd = sc.parallelize([1, 2])
2
>>> rdd_1 = sc.parallelize([3,4])
3
>>> rdd.cartesian(rdd_1).collect()
4
[(1, 3), (1, 4), (2, 3), (2, 4)]

groupBy

groupBy(f, numPartitions=None, partitionFunc=)

根据f逻辑结果进行分组。

1
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
2
>>> result = rdd.groupBy(lambda x: x % 2).collect()
3
>>> sorted([(x, sorted(y)) for (x, y) in result])
4
[(0, [2, 8]), (1, [1, 1, 3, 5])]

groupByKey

groupByKey(numPartitions=None, partitionFunc=)

groupByKey([numTasks])是数据分组操作,在一个由(K, V)键值对组成的数据集上调用,返回一个(K, Seq[V])对的数据集。

注意,如果要对每个键执行聚合(比如求和或平均值),使用reduceByKey或aggregateByKey将提供更好的性能。

1
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
2
>>> sorted(rdd.groupByKey().mapValues(len).collect())
3
[('a', 2), ('b', 1)]                                                            
4
>>> sorted(rdd.groupByKey().mapValues(list).collect())
5
[('a', [1, 1]), ('b', [1])]

pipe

pipe(command, env=None, checkCode=False

将由管道元素创建的RDD返回到一个forked外部进程。
参数:checkCode——是否检查shell命令的返回值。

1
>>> sc.parallelize(['1', '2', '', '3']).pipe('cat').collect()
2
[u'1', u'2', u'', u'3']
3
4
>>> rdd = sc.parallelize(['spark2.3.0', 'kafka', 'hbase'])
5
>>> rdd2 = rdd.pipe('grep -i "ar"')
6
>>> print '经过pipe处理过后的数据:{}'.format(rdd2.collect())
7
经过pipe处理过后的数据:[u'spark2.3.0']

foreach

foreach(f)

Applies a function to all elements of this RDD.

1
>>> def f(x): print(x)
2
...
3
>>> sc.parallelize([1, 2, 3, 4, 5]).foreach(f)

max, min, sum, count

1
>>> x = sc.parallelize(range(10))
2
>>> x.collect()
3
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
4
>>> print '最大值:{}'.format(x.max())
5
最大值:9                                                                       
6
>>> print '最小值:{}'.format(x.min())
7
最小值:0
8
>>> print '总和:{}'.format(x.sum())
9
总和:45
10
>>> print '总个数:{}'.format(x.count())
11
总个数:10

mean, variance, sampleVariance, stdev, sampleStdev

1
>>> x.collect()
2
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
3
>>> print '平均值:{}'.format(x.mean())
4
平均值:4.5                                                                     
5
>>> print '方差:{}'.format(x.variance())
6
方差:8.25
7
>>> print '样本方差:{}'.format(x.sampleVariance())
8
样本方差:9.16666666667
9
>>> print '总体标准偏差:{}'.format(x.stdev())
10
总体标准偏差:2.87228132327
11
>>> print '样本标准偏差:{}'.format(x.sampleStdev())
12
样本标准偏差:3.0276503541

countByKey, countByValue

countByKey()

Count the number of elements for each key, and return the result to the master as a dictionary.

countByValue()

Return the count of each unique value in this RDD as a dictionary of (value, count) pairs.

1
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
2
>>> sorted(rdd.countByKey().items())
3
[('a', 2), ('b', 1)]
4
5
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
6
[(1, 2), (2, 3)]

first, top, take, takeOrdered

first()
top(num, key=None)

从按降序排列的RDD中获取前N个元素,或者有可选的key函数决定顺序。

take(num)

Take the first num elements of the RDD.

akeOrdered(num, key=None)

从按升序排列的RDD中获取N个元素,或者由可选key函数指定。

1
>>> rdd = sc.parallelize([10, 4, 2, 12, 3])
2
>>> rdd.first()
3
10                                                                              
4
>>> rdd.top(1)
5
[12]
6
>>> rdd.top(2)
7
[12, 10]
8
>>> rdd.top(2,key=str)
9
[4, 3]
10
>>> rdd.take(3)
11
[10, 4, 2]

subtract

subtract(other, numPartitions=None)

Return each value in self that is not contained in other.

1
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
2
>>> y = sc.parallelize([("a", 3), ("c", None)])
3
>>> sorted(x.subtract(y).collect())
4
[('a', 1), ('b', 4), ('b', 5)]

cache

cache()

Persist this RDD with the default storage level (MEMORY_ONLY).

presist

persist()

Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).

cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。

reduce

reduce(f)

Reduces the elements of this RDD using the specified commutative and associative binary operator. Currently reduces partitions locally.

1
>>> from operator import add
2
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
3
15
4
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
5
10

reduceByKey

reduceByKey(func, numPartitions=None, partitionFunc=)

Merge the values for each key using an associative and commutative reduce function.
This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce.
Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.

把一个函数作用在一个RDD上,这个函数必须接收两个参数,reduce把结果继续和RDD的下一个元素做累积计算。注意:reduceByKey的函数是针对具有相同key的二元组

1
>>> from operator import add
2
>>> rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
3
>>> sorted(rdd.reduceByKey(add).collect())
4
[('a', 2), ('b', 1)]
5
6
>>> num = sc.parallelize([1, 4, 2, 3, 4, 4, 2, 4])
7
>>> pairs = num.map(lambda x: (x, 1))
8
>>> pairs.collect()
9
[(1, 1), (4, 1), (2, 1), (3, 1), (4, 1), (4, 1), (2, 1), (4, 1)]                
10
>>> a = pairs.reduceByKey(lambda x, y: x+y+1)
11
>>> a.collect()
12
[(2, 3), (4, 7), (1, 1), (3, 1)]
13
>>> b = pairs.reduceByKey(lambda x, y: x+y+2)
14
>>> b.collect()
15
[(2, 4), (4, 10), (1, 1), (3, 1)

1:老子《道德经》第三十章,老子故里,中国鹿邑。

“夫唯兵者,不祥之器,物或恶之,故有道者不处。
君子居则贵左,用兵则贵右。
兵者不祥之器,非君子之器,不得已而用之,恬淡为上。
胜而不美,而美之者,是乐杀人。
夫乐杀人者,则不可得志于天下矣。
吉事尚左,凶事尚右。
偏将军居左,上将军居右,言以丧礼处之。
杀人之众,以悲哀泣之,战胜以丧礼处之。”1

Spark对RDD的持久化操作(cache()persist()checkpoint())是很重要的,可以将rdd存放在不同的存储介质中,方便后续的操作能重复使用。

cache()

persist()

cache和persist都是用于将一个RDD进行缓存,这样在之后使用的过程中就不需要重新计算,可以大大节省程序运行时间。
cache和persist的区别:cache只有一个默认的缓存级别MEMORY_ONLY,而persist可以根据情况设置其它的缓存级别
RDD的缓存级别:

1
查看 StorageLevel 类的源码
2
object StorageLevel {
3
  val NONE = new StorageLevel(false, false, false, false)
4
  val DISK_ONLY = new StorageLevel(true, false, false, false)
5
  val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
6
  val MEMORY_ONLY = new StorageLevel(false, true, false, true)
7
  val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
8
  val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
9
  val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
10
  val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
11
  val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
12
  val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
13
  val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
14
  val OFF_HEAP = new StorageLevel(false, false, true, false)
15
  ......
16
}

可以看到这里列出了12种缓存级别,但这些有什么区别呢?可以看到每个缓存级别后面都跟了一个StorageLevel的构造函数,里面包含了4个或5个参数,如下:

val MEMORY_ONLY = new StorageLevel(false, true, false, true)

1
查看其构造函数
2
class StorageLevel private(
3
    private var _useDisk: Boolean,
4
    private var _useMemory: Boolean,
5
    private var _useOffHeap: Boolean,
6
    private var _deserialized: Boolean,
7
    private var _replication: Int = 1)
8
  extends Externalizable {
9
  ......
10
  def useDisk: Boolean = _useDisk
11
  def useMemory: Boolean = _useMemory
12
  def useOffHeap: Boolean = _useOffHeap
13
  def deserialized: Boolean = _deserialized
14
  def replication: Int = _replication
15
  ......
16
}

可以看到StorageLevel类的主构造器包含了5个参数:

  • useDisk:使用硬盘(外存)
  • useMemory:使用内存
  • useOffHeap:使用堆外内存,这是Java虚拟机里面的概念,堆外内存意味着把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机)。这样做的结果就是能保持一个较小的堆,以减少垃圾收集对应用的影响。
  • deserialized:反序列化,其逆过程序列化(Serialization)是java提供的一种机制,将对象表示成一连串的字节;而反序列化就表示将字节恢复为对象的过程。序列化是对象永久化的一种机制,可以将对象及其属性保存起来,并能在反序列化后直接恢复这个对象
  • replication:备份数(在多个节点上备份)

理解了这5个参数,StorageLevel 的12种缓存级别就不难理解了。
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) 就表示使用这种缓存级别的RDD将存储在硬盘以及内存中,使用序列化(在硬盘中),并且在多个节点上备份2份(正常的RDD只有一份)

checkpoint()

1
sc.sparkContext.setCheckpointDir('...')
2
......
3
......
4
rdd.cache()
5
rdd.checkpoint()
6
......

checkpoint接口是将RDD持久化到HDFS中,与persist的区别是checkpoint会切断此RDD之前的依赖关系,而persist会保留依赖关系。checkpoint的两大作用:一是spark程序长期驻留,过长的依赖会占用很多的系统资源,定期checkpoint可以有效的节省资源;二是维护过长的依赖关系可能会出现问题,一旦spark程序运行失败,RDD的容错成本会很高。

注意:checkpoint执行前要先进行cache,避免两次计算。

1:老子《道德经》第三十一章,老子故里,中国鹿邑。