4.5 数据分区(进阶)
对数据集在节点间的分区进行控制:
- 如果给定 RDD 只需要被扫描一次,我们完全没有必要对其预先进行分区处理。
- 当数据集多次在诸如连接这种基于键的操作中使用时,分区才会有帮助。
示例
- Java
- Scala
- Python
4.5.1 获取RDD的分区方式
- 在 Scala 中,你可以使用 RDD 的 partitioner 属性来获取 RDD 的分区方式。
- 在 Java 中,你可以使用 RDD 的 partitioner() 方法来获取 RDD 的分区方式。
它会返回一个 scala.Option 对象,这是 Scala 中用来存放可能存在的对象的容器类。
可以对这个 Option 对象调用 isDefined() 来检查其中是否有值,调用 get() 来获取其中的值。
如果存在值的话,这个值会是一个 spark.Partitioner 对象。
4.5.2 从分区中获益的操作
Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。
所有这些操作都会从数据分区中获益:
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- combineByKey()
- lookup()
对于像 reduceByKey() 这样只作用于单个 RDD 的操作,运行在未分区的 RDD 上的时候会
导致每个键的所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结
果值从各工作节点传回主节点,所以原本的网络开销就不算大。
对于诸如 cogroup() 和
join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。
4.5.3 影响分区方式的操作
Spark 内部知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果 RDD 自动设置为对应的分区器。
转化操作的结果并不一定会按已知的分区方式分区,这时输出的 RDD 可能就会没有设置分区器。
- 所有会为生成的结果 RDD 设好分区方式的操作:
- cogroup()
- groupWith()
- join()
- leftOuterJoin()
- rightOuterJoin()
- groupByKey()
- reduceByKey()
- combineByKey()
- partitionBy()
- sort()
- mapValues() (如果父 RDD 有分区方式的话)
- flatMapValues() (如果父 RDD 有分区方式的话)
- filter() (如果父 RDD 有分区方式的话)
- 对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式。
默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。4.5.4 示例:PageRank
PageRank 是一种从 RDD 分区中获益的更复杂的算法,用来根据外部文档指向一个文档的链接,对集合中每个文档的重要程度赋一个度量值。
该算法可以用于对网页进行排序,当然,也可以用于排序科技文章或社交网络中有影响的用户。介绍
PageRank 是执行多次连接的一个迭代算法,因此它是 RDD 分区操作的一个很好的用例。
算法会维护两个数据集:
一个由 (pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表;
另一个由 (pageID, rank) 元素组成,包含每个页面的当前排序值。
步骤
它按如下步骤进行计算:
- 将每个页面的排序值初始化为 1.0 。
- 在每次迭代中,对页面 p ,向其每个相邻页面(有直接链接的页面)发送一个值为 rank(p)/numNeighbors(p) 的贡献值。
- 将每个页面的排序值设为 0.15 + 0.85 * contributionsReceived 。
最后两步会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际 PageRank 值。
最小化通信开销
- links RDD 在每次迭代中都会和 ranks 发生连接操作。
由于 links 是一个静态数据集,所以我们在程序一开始的时候就对它进行了分区操作,这样就不需要把它通过网络进行数据混洗了。 - 调用 links 的 persist() 方法,将它保留在内存中以供每次迭代使用。
- 第一次创建 ranks 时,我们使用 mapValues() 而不是 map() 来保留父 RDD( links )的分区方式,这样对它进行的第一次连接操作就会开销很小。
- 在循环体中,我们在 reduceByKey() 后使用 mapValues() ;
因为 reduceByKey() 的结果已经是哈希分区的了,这样一来,下一次循环中将映射操作的结果再次与 links 进行连接操作时就会更加高效。4.5.5 自定义分区方式
虽然 Spark 提供的 HashPartitioner 与 RangePartitioner 已经能够满足大多数用例,
但 Spark 还是允许你通过提供一个自定义的 Partitioner 对象来控制 RDD 的分区方式。 这可以让你利用领域知识进一步减少通信开销。实现自定义的分区器
要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。
- numPartitions: Int :返回创建出来的分区数。
- getPartition(key: Any): Int :返回给定键的分区编号(0 到 numPartitions-1 )。
需要十分谨慎,确保 getPartition() 永远返回一个非负数。 - equals() :Java 判断相等性的标准方法。
这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。