4.5 数据分区(进阶)

对数据集在节点间的分区进行控制:

它会返回一个 scala.Option 对象,这是 Scala 中用来存放可能存在的对象的容器类。
可以对这个 Option 对象调用 isDefined() 来检查其中是否有值,调用 get() 来获取其中的值。
如果存在值的话,这个值会是一个 spark.Partitioner 对象。

4.5.2 从分区中获益的操作

Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。
所有这些操作都会从数据分区中获益:

对于像 reduceByKey() 这样只作用于单个 RDD 的操作,运行在未分区的 RDD 上的时候会 导致每个键的所有对应值都在每台机器上进行本地计算,只需要把本地最终归约出的结 果值从各工作节点传回主节点,所以原本的网络开销就不算大。
对于诸如 cogroup() 和 join() 这样的二元操作,预先进行数据分区会导致其中至少一个 RDD(使用已知分区器的那个 RDD)不发生数据混洗。

4.5.3 影响分区方式的操作

Spark 内部知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果 RDD 自动设置为对应的分区器。
转化操作的结果并不一定会按已知的分区方式分区,这时输出的 RDD 可能就会没有设置分区器。

  1. 所有会为生成的结果 RDD 设好分区方式的操作:
    • cogroup()
    • groupWith()
    • join()
    • leftOuterJoin()
    • rightOuterJoin()
    • groupByKey()
    • reduceByKey()
    • combineByKey()
    • partitionBy()
    • sort()
    • mapValues() (如果父 RDD 有分区方式的话)
    • flatMapValues() (如果父 RDD 有分区方式的话)
    • filter() (如果父 RDD 有分区方式的话)
  2. 对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式。
    默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。

    4.5.4 示例:PageRank

    PageRank 是一种从 RDD 分区中获益的更复杂的算法,用来根据外部文档指向一个文档的链接,对集合中每个文档的重要程度赋一个度量值。
    该算法可以用于对网页进行排序,当然,也可以用于排序科技文章或社交网络中有影响的用户。

    介绍

    PageRank 是执行多次连接的一个迭代算法,因此它是 RDD 分区操作的一个很好的用例。

算法会维护两个数据集:
一个由 (pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表;
另一个由 (pageID, rank) 元素组成,包含每个页面的当前排序值。

步骤

它按如下步骤进行计算:

  1. 将每个页面的排序值初始化为 1.0 。
  2. 在每次迭代中,对页面 p ,向其每个相邻页面(有直接链接的页面)发送一个值为 rank(p)/numNeighbors(p) 的贡献值。
  3. 将每个页面的排序值设为 0.15 + 0.85 * contributionsReceived 。

最后两步会重复几个循环,在此过程中,算法会逐渐收敛于每个页面的实际 PageRank 值。

最小化通信开销

  1. links RDD 在每次迭代中都会和 ranks 发生连接操作。
    由于 links 是一个静态数据集,所以我们在程序一开始的时候就对它进行了分区操作,这样就不需要把它通过网络进行数据混洗了。
  2. 调用 links 的 persist() 方法,将它保留在内存中以供每次迭代使用。
  3. 第一次创建 ranks 时,我们使用 mapValues() 而不是 map() 来保留父 RDD( links )的分区方式,这样对它进行的第一次连接操作就会开销很小。
  4. 在循环体中,我们在 reduceByKey() 后使用 mapValues() ;
    因为 reduceByKey() 的结果已经是哈希分区的了,这样一来,下一次循环中将映射操作的结果再次与 links 进行连接操作时就会更加高效。

    4.5.5 自定义分区方式

    虽然 Spark 提供的 HashPartitioner 与 RangePartitioner 已经能够满足大多数用例,
    但 Spark 还是允许你通过提供一个自定义的 Partitioner 对象来控制 RDD 的分区方式。 这可以让你利用领域知识进一步减少通信开销。

    实现自定义的分区器

    要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法。

    • numPartitions: Int :返回创建出来的分区数。
    • getPartition(key: Any): Int :返回给定键的分区编号(0 到 numPartitions-1 )。
      需要十分谨慎,确保 getPartition() 永远返回一个非负数。
    • equals() :Java 判断相等性的标准方法。
      这个方法的实现非常重要,Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个 RDD 的分区方式是否相同。