5.5.1 MySQL 数据库连接

Cassandra 还没有使用 Spark SQL,不过它会返回由 CassandraRow 对象组成的 RDD,这些对象有一部分方法与 Spark SQL 的 Row 对象的方法相同。

Spark 的 Cassandra 连接器目前只能在 Java 和 Scala 中使用。

Maven 依赖

https://github.com/datastax/spark-cassandra-connector
Java 连接文档

<dependency> <!-- Cassandra -->
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector</artifactId>
    <version>${spark.cassandra.version}</version>
</dependency>
<dependency>
    <groupId>com.datastax.spark</groupId>
    <artifactId>spark-cassandra-connector-java_2.11</artifactId>
    <version>1.6.0-M1</version>
</dependency>

连接

Java

SparkConf conf = new SparkConf(true).set("spark.cassandra.connection.host", DataBaseUtil.CASSANDRA_HOSTNAME);
JavaSparkContext sc = new JavaSparkContext(DataBaseUtil.SPARK_MASTER, DataBaseUtil.SPARK_APPNAME,conf);

Scala

val conf = new SparkConf(true).set("spark.cassandra.connection.host", DataBaseUtil.CASSANDRA_HOSTNAME)
val sc = new SparkContext(DataBaseUtil.SPARK_MASTER, DataBaseUtil.SPARK_APPNAME, conf)

保存

在 Scala 中保存数据到 Cassandra

val rdd = sc.parallelize(List(Seq("moremagic", 1)))
rdd.saveToCassandra("test" , "kv", SomeColumns("key", "value"))

代码

如果有用户名和密码的话,则需要分别设置 spark.cassandra.auth.username 和 spark.cassandra.auth.password 。
假定你只有一个 Cassandra 集群要连接,可以在创建 SparkContext 时就把这些都设好。

通过在 cassandraTable() 的调用中加上 where 子句,可以限制查询的数据。
例如 sc.cassandraTable(…).where(“key=?”, “panda”) 。