Connect to a YARN cluster in client or cluster mode depending on the value of –deploy-mode. The cluster location will be found based on the HADOOP_CONF_DIR or YARN_CONF_DIR variable.
如果Spark作业中,有较多的RDD持久化操作,该参数的值可以适当提高一些,保证持久化的数据能够容纳在内存中。避免内存不够缓存所有的数据,导致数据只能写入磁盘中,降低了性能。但是如果Spark作业中的shuffle类操作比较多,而持久化操作比较少,那么这个参数的值适当降低一些比较合适。此外,如果发现作业由于频繁的gc导致运行缓慢(通过spark web ui可以观察到作业的gc耗时),意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。
如果Spark作业中的RDD持久化操作较少,shuffle操作较多时,建议降低持久化操作的内存占比,提高shuffle操作的内存占比比例,避免shuffle过程中数据过多时内存不够用,必须溢写到磁盘上,降低了性能。此外,如果发现作业由于频繁的gc导致运行缓慢,意味着task执行用户代码的内存不够用,那么同样建议调低这个参数的值。资源参数的调优,没有一个固定的值,需要根据自己的实际情况(包括Spark作业中的shuffle操作数量、RDD持久化操作数量以及spark web ui中显示的作业gc情况),
Return a new RDD by applying a function to each partition of this RDD, while tracking the index of the original partition. 和mapPartitions类似,也是针对每个分区处理,但是func函数需要两个入参,第一个表示partition分区索引,第二个入参表示每个分区的迭代器。
Return a new RDD containing the distinct elements in this RDD.
1
>>> res = sorted(sc.parallelize([1, 1, 1, 2, 3, 2, 3]).distinct().collect())
2
>>> print'去重后的结果:{}'.format(res)
3
去重后的结果:[1, 2, 3]
sample
sample(withReplacement, fraction, seed=None)
Return a sampled subset of this RDD. Parameters: withReplacement – can elements be sampled multiple times (replaced when sampled out)
fraction – expected size of the sample as a fraction of this RDD’s size without replacement: probability that each element is chosen; fraction must be [0, 1] with replacement: expected number of times each element is chosen; fraction must be >= 0
seed – seed for the random number generator 对原RDD进行采样,其中withReplacement表示是否有放回的抽样,fraction表示采样大小是原RDD的百分比,seed表示随机数生成器。fraction和seed相同,则每次返回的数据相同。
1
>>> rdd = sc.parallelize(range(100), 4)
2
>>> y = rdd.sample(False, 0.1, 81)
3
>>> y.collect()
4
[4, 27, 40, 42, 43, 60, 76, 80, 86, 97]
5
>>> y.count()
6
10
union 并集
union(other)
Return the union of this RDD and another one.
1
>>> rdd = sc.parallelize([1, 1, 2, 3])
2
>>> rdd1 = sc.parallelize([5, 3, 4, 6])
3
>>> print rdd.union(rdd1).collect()
4
[1, 1, 2, 3, 5, 3, 4, 6]
intersection 交集
intersection(other)
Note that this method performs a shuffle internally.
Persist this RDD with the default storage level (MEMORY_ONLY).
presist
persist()
Set this RDD’s storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. If no storage level is specified defaults to (MEMORY_ONLY).
Merge the values for each key using an associative and commutative reduce function. This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified. Default partitioner is hash-partition.