一、通过反射机制将RDD转为DataFrame

Scala由于其具有隐式转换的特性,所以Spark SQL的Scala接口,是支持自动将包含了case class的RDD转换为DataFrame的。case class就定义了元数据。Spark SQL会通过反射读取传递给case class的参数的名称,然后将其作为列名。

import org.apache.spark.ml.linalg.Vectors
import spark.implicits._ 
case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
val rdd = sc.textFile("/data/mllib/als/sample_movielens_ratings.txt")
def parseRating(str: String): Rating = {
  val fields = str.split("::")
  assert(fields.size == 4)
  Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat, fields(3).toLong)
val ratings = spark.read.textFile("/data/mllib/als/sample_movielens_ratings.txt")
  .map(parseRating)
  .toDF()
ratings.printSchema
ratings.show()

二、通过动态编程的方式将RDD转为DataFrame

import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val rdd = sc.textFile("/data/mllib/als/sample_movielens_ratings.txt")
 val schema = StructType(Array(
    StructField("userId", IntegerType, true),
    StructField("movieId", IntegerType, true),
    StructField("rating", FloatType, true),
    StructField("timestamp", LongType, true)
// 对每一行的数据进行处理
val rowRDD = rdd.map(_.split("::")).map(p => Row(p(0).toInt,p(1).toInt,p(2).toFloat,p(3).toLong))
val data = spark.createDataFrame(rowRDD, schema)
data.printSchema
data.createOrReplaceTempView("test")
spark.sql("select *from test").show()