6.2 累加器
累加器,提供了将工作节点中的值聚合到驱动器程序中的简单语法。
累加器的一个常见用途是在调试时对作业执行过程中的事件进行计数。
代码
例如,假设我们在从文件中读取呼号列表对应的日志,同时也想知道输入文件中有多少空行(也许不希望在有效输入中看到很多这样的行)。
注意,只有在运行 collect() 行动操作后才能看到正确的计数。
因为行动操作前的转化操作 flatMap() 是惰性的,所以作为计算副产品的累加器只有在惰性的转化操作 flatMap() 被行动操作强制触发时才会开始求值。
累加器的用法
- 通过在驱动器中调用 SparkContext.accumulator(initialValue) 方法,创建出存有初始值的累加器。
返回值为 org.apache.spark.Accumulator[T] 对象,其中 T 是初始值 initialValue 的类型。 - Spark闭包里的执行器代码可以使用累加器的 += 方法(在Java中是 add )增加累加器的值。
- 驱动器程序可以调用累加器的 value 属性(在 Java 中使用 value() 或 setValue() )来访问累加器的值。
注意,工作节点上的任务不能访问累加器的值。
从这些任务的角度来看,累加器是一个只写变量。
在这种模式下,累加器的实现可以更加高效,不需要对每次更新操作进行复杂的通信。
累加器的值只有在驱动器程序中可以访问,所以检查也应当在驱动器程序中完成。
6.2.1 累加器与容错性
Spark 会自动重新执行失败的或较慢的任务来应对有错误的或者比较慢的机器。
因此最终结果就是同一个函数可能对同一个数据运行了多次,这取决于集群发生了什么。
这种情况下累加器要怎么处理呢?
实际结果是,对于要在行动操作中使用的累加器,Spark 只会把每个任务对各累加器的修改应用一次。
因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在 foreach() 这样的行动操作中。
对于在 RDD 转化操作中使用的累加器,就不能保证有这种情况了。
转化操作中累加器可能会发生不止一次更新。
6.2.2 自定义累加器
Spark 也引入了自定义累加器和聚合操作的 API(比如找到要累加的值中的最大值,而不是把这些值加起来)。
自定义累加器需要扩展 AccumulatorParam ,这在 Spark API 文档中有所介绍。
只要该操作同时满足交换律和结合律,就可以使用任意操作来代替数值上的加法。