黑猴子的家:Spark RDD 之 MySql的输入输出(数据读取与保存的主要方式之一)

1、mysql数据准备

mysql> create table rddtable (id int,name varchar(20));
mysql> flush privileges;
mysql> insert into rddtable(id,name) \
values (1,"abc"), (2,"dddd"), (3,"cccc"), (4,"mmmm"), \
(5,"Female"), (6,"Female"), (7,"Female"), (8,"Female"), \
(9,"Male"), (10,"Female"), (11,"Female");
mysql> flush privileges;

2、Mysql读取

(1)REPL

scala> val rdd = new org.apache.spark.rdd.JdbcRDD (
     | sc,
     | () => {
     | Class.forName ("com.mysql.jdbc.Driver").newInstance()
     | java.sql.DriverManager.getConnection ("jdbc:mysql://hadoop102:3306/rdd", "root", "heiHouZi2018!")},
     | "select * from rddtable where id >= ? and id <= ?;",
     | 10,
     | r => (r.getInt(1),r.getString(2))
rdd: org.apache.spark.rdd.JdbcRDD[(Int, String)] = JdbcRDD[0] at JdbcRDD at <console>:24
scala> println(rdd.count) 
scala> rdd.foreach(println _)
(1,abc)
(2,dddd)
(3,cccc)
(4,mmmm)
(5,Female)
(6,Female)
(7,Female)
(8,Female)
(9,Male)
(10,Female)

(2)code

def main (args: Array[String] ) {
  val sparkConf = new SparkConf ().setMaster ("local[2]").setAppName ("JdbcApp")
  val sc = new SparkContext (sparkConf)
  val rdd = new org.apache.spark.rdd.JdbcRDD (
    () => {
      Class.forName ("com.mysql.jdbc.Driver").newInstance()
      java.sql.DriverManager.getConnection ("jdbc:mysql://master01:3306/rdd", "root", "hive")
    "select * from rddtable where id >= ? and id <= ?;",
    1, //一个分区
    r => (r.getInt(1), r.getString(2)))  //转化为元组
  println (rdd.count () )
  rdd.foreach (println (_) )
  sc.stop ()

3、Mysql写入

(1)REPL

scala> :paste
// Entering paste mode (ctrl-D to finish)
def insertData(iterator: Iterator[String]): Unit = {
    Class.forName ("com.mysql.jdbc.Driver").newInstance()
    val conn = java.sql.DriverManager.getConnection("jdbc:mysql://hadoop102:3306/rdd", "root", "heiHouZi2018!")
    iterator.foreach(data => {
    val ps = conn.prepareStatement("insert into rddtable(name) values (?)")
    ps.setString(1, data) 
    ps.executeUpdate()
// Exiting paste mode, now interpreting.
insertData: (iterator: Iterator[String])Unit
scala> val data = sc.parallelize(List("Female", "Male","Female"))
data: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> data.foreachPartition(insertData)

4、JdbcRDD 接收这样几个参数

首先,要提供一个用于对数据库创建连接的函数。这个函数让每个节点在连接必要的配置后,创建自己读取数据的连接。

接下来,要提供一个可以读取一定范围内数据的查询,以及查询参数中lowerBound和 upperBound 的值。这些参数可以让 Spark 在不同机器上查询不同范围的数据,这样就不会因尝试在一个节点上读取所有数据而遭遇性能瓶颈。

这个函数的最后一个参数是一个可以将输出结果从转为对操作数据有用的格式的函数。如果这个参数空缺,Spark会自动将每行结果转为一个对象数组。

Cassandra数据库和ElasticSearch集成

Infrastructure Projects
Spark Job Server – REST interface for managing and submiting Spark jobs on the same cluster (see blog post for details)
SparkR – R frontend for Spark
MLbase – Machine Learning research project on top of Spark
Apache Mesos – Cluster managerment system that supports running Spark
Alluxio(nee tachyon) – Memory speed virtual distributed storage system that supports running Spark
Spark Cassandra Connector – Easily load your Cassandra data into Spark and Spark SQL;from Datastax
FiloDB – a Spark integrated analytical/columnar database, with in-memory option capable of sub-second concurrent queries
ElasticSearch- Spark SQL Integration
Spark – Scalding – Easily transition Cascading/Scalding code to Spark
Zeppelin – an IPython –like notebook for Spark.There is also ISpark, and the Spark NoteBooke
IBM Spectrum Conductor with Spark – cluster managerment software that integrates with Spark