3.3 RDD操作

RDD 支持两种操作:

Python 实现 filter() 转化操作:

>>> lines = sc.textFile('pyspark')
>>> pythonLines = lines.filter(lambda line: "Python" in line)
>>> pythonLines
PythonRDD[16] at RDD at PythonRDD.scala:49
>>> sparkLines = lines.filter(lambda line: "spark" in line)
>>> allLines = pythonLines.union(sparkLines)
>>> allLines.count()
15

Scala 实现 filter() 转化操作:

scala> val lines = sc.textFile("spark-shell")
lines: org.apache.spark.rdd.RDD[String] = spark-shell MapPartitionsRDD[4] at textFile at <console>:24

scala> val scalalines = lines.filter(line => line.contains("scala"))
scalalines: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:25

Java 实现 filter() 转化操作:

JavaRDD<String> lines = sc.textFile("README.md");
JavaRDD<String> javaRDD = lines.filter(
        new Function<String, Boolean>() {
            @Override
            public Boolean call(String s) throws Exception {
                return s.contains("Java");
            }
        }
);
JavaRDD<String> javaRDD = lines.filter(line -> line.contains("Java"));

注意, filter() 操作不会改变已有的 inputRDD 中的数据。
实际上,该操作会返回一个全新的 RDD。

3.3.2 行动操作

行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。
由于行动操作需要生成实际的输出,它们会强制执行那些求值必须用到的 RDD 的转化操作。

【注】

在 Python 中使用行动操作进行计数

pythonLines = lines.filter(lambda line: "Python" in line)
for line in pythonLines.take(pythonLines.count()):
    print(line)

在 Scala 中使用行动操作进行计数

val scalalines = lines.filter(line => line.contains("Scala"))
scalalines.take(scalalines.count().toInt).foreach(println)

在 Java 中使用行动操作进行计数

JavaRDD<String> javaRDD = lines.filter(line -> line.contains("Java"));
for (String line:javaRDD.take((int) javaRDD.count())){
    System.out.println(line);
}
或
javaRDD.take((int) javaRDD.count()).forEach(System.out::println);
javaRDD.collect().forEach(System.out::println);

####3.3.3 惰性求值 RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前 Spark 不会开始计算。