Spark SQL有两种方法将RDD转为DataFrame。

1. 使用反射机制,推导包含指定类型对象RDD的schema。这种基于反射机制的方法使代码更简洁,而且如果你事先知道数据schema,推荐使用这种方式;

2. 编程方式构建一个schema,然后应用到指定RDD上。这种方式更啰嗦,但如果你事先不知道数据有哪些字段,或者数据schema是运行时读取进来的,那么你很可能需要用这种方式。

利用反射推导schema

  • Scala
  • Python
  • Spark SQL的Scala接口支持自动将包含case class对象的RDD转为DataFrame。对应的case class定义了表的schema。case class的参数名通过反射,映射为表的字段名。case class还可以嵌套一些复杂类型,如Seq和Array。RDD隐式转换成DataFrame后,可以进一步注册成表。随后,你就可以对表中数据使用SQL语句查询了。

    // sc 是已有的 SparkContext 对象
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 为了支持RDD到DataFrame的隐式转换
    import sqlContext.implicits._
    // 定义一个case class.
    // 注意:Scala 2.10的case class最多支持22个字段,要绕过这一限制,
    // 你可以使用自定义class,并实现Product接口。当然,你也可以改用编程方式定义schema
    case class Person(name: String, age: Int)
    // 创建一个包含Person对象的RDD,并将其注册成table
    val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    people.registerTempTable("people")
    // sqlContext.sql方法可以直接执行SQL语句
    val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
    // SQL查询的返回结果是一个DataFrame,且能够支持所有常见的RDD算子
    // 查询结果中每行的字段可以按字段索引访问:
    teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    // 或者按字段名访问:
    teenagers.map(t => "Name: " + t.getAs[String]("name")).collect().foreach(println)
    // row.getValuesMap[T] 会一次性返回多列,并以Map[String, T]为返回结果类型
    teenagers.map(_.getValuesMap[Any](List("name", "age"))).collect().foreach(println)
    // 返回结果: Map("name" -> "Justin", "age" -> 19)

    编程方式定义Schema

  • Scala
  • Python
  • 如果不能事先通过case class定义schema(例如,记录的字段结构是保存在一个字符串,或者其他文本数据集中,需要先解析,又或者字段对不同用户有所不同),那么你可能需要按以下三个步骤,以编程方式的创建一个DataFrame:

  • 从已有的RDD创建一个包含Row对象的RDD
  • 用StructType创建一个schema,和步骤1中创建的RDD的结构相匹配
  • 把得到的schema应用于包含Row对象的RDD,调用这个方法来实现这一步:SQLContext.createDataFrame
  • For example:

    // sc 是已有的SparkContext对象
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    // 创建一个RDD
    val people = sc.textFile("examples/src/main/resources/people.txt")
    // 数据的schema被编码与一个字符串中
    val schemaString = "name age"
    // Import Row.
    import org.apache.spark.sql.Row;
    // Import Spark SQL 各个数据类型
    import org.apache.spark.sql.types.{StructType,StructField,StringType};
    // 基于前面的字符串生成schema
    val schema =
      StructType(
        schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))
    // 将RDD[people]的各个记录转换为Rows,即:得到一个包含Row对象的RDD
    val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))
    // 将schema应用到包含Row对象的RDD上,得到一个DataFrame
    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)
    // 将DataFrame注册为table
    peopleDataFrame.registerTempTable("people")
    // 执行SQL语句
    val results = sqlContext.sql("SELECT name FROM people")
    // SQL查询的结果是DataFrame,且能够支持所有常见的RDD算子
    // 并且其字段可以以索引访问,也可以用字段名访问
    results.map(t => "Name: " + t(0)).collect().foreach(println)