• spark-xml 库创建为 Maven 库 。 针对 Maven 坐标,请指定:

  • Databricks Runtime 7.x 及更高版本: com.databricks:spark-xml_2.12:<release>
  • 有关最新版 <release> ,请参阅 spark-xml 版本

  • 在群集上 安装库

    本部分中的示例使用 books XML 文件。

  • 检索 books XML 文件:

    $ wget https://github.com/databricks/spark-xml/raw/master/src/test/resources/books.xml
    
  • 将文件上传到 DBFS

    读取和写入 XML 数据

    /*Infer schema*/
    CREATE TABLE books
    USING xml
    OPTIONS (path "dbfs:/books.xml", rowTag "book")
    /*Specify column names and types*/
    CREATE TABLE books (author string, description string, genre string, _id string, price double, publish_date string, title string)
    USING xml
    OPTIONS (path "dbfs:/books.xml", rowTag "book")
    

    Scala

    // Infer schema
    import com.databricks.spark.xml._ // Add the DataFrame.read.xml() method
    val df = spark.read
      .option("rowTag", "book")
      .xml("dbfs:/books.xml")
    val selectedData = df.select("author", "_id")
    selectedData.write
      .option("rootTag", "books")
      .option("rowTag", "book")
      .xml("dbfs:/newbooks.xml")
    // Specify schema
    import org.apache.spark.sql.types.{StructType, StructField, StringType, DoubleType}
    val customSchema = StructType(Array(
      StructField("_id", StringType, nullable = true),
      StructField("author", StringType, nullable = true),
      StructField("description", StringType, nullable = true),
      StructField("genre", StringType, nullable = true),
      StructField("price", DoubleType, nullable = true),
      StructField("publish_date", StringType, nullable = true),
      StructField("title", StringType, nullable = true)))
    val df = spark.read
      .option("rowTag", "book")
      .schema(customSchema)
      .xml("books.xml")
    val selectedData = df.select("author", "_id")
    selectedData.write
      .option("rootTag", "books")
      .option("rowTag", "book")
      .xml("dbfs:/newbooks.xml")
    
    # Infer schema
    library(SparkR)
    sparkR.session("local[4]", sparkPackages = c("com.databricks:spark-xml_2.12:<release>"))
    df <- read.df("dbfs:/books.xml", source = "xml", rowTag = "book")
    # Default `rootTag` and `rowTag`
    write.df(df, "dbfs:/newbooks.xml", "xml")
    # Specify schema
    customSchema <- structType(
      structField("_id", "string"),
      structField("author", "string"),
      structField("description", "string"),
      structField("genre", "string"),
      structField("price", "double"),
      structField("publish_date", "string"),
      structField("title", "string"))
    df <- read.df("dbfs:/books.xml", source = "xml", schema = customSchema, rowTag = "book")
    # In this case, `rootTag` is set to "ROWS" and `rowTag` is set to "ROW".
    write.df(df, "dbfs:/newbooks.xml", "xml", "overwrite")
    
  • path:XML 文件的位置。 接受标准 Hadoop 通配表达式。
  • rowTag:视为行的行标记。 例如,在此 XML <books><book><book>...</books> 中,该值为 book。 默认为 ROW
  • samplingRatio:推断架构的采样率 (0.0 ~ 1)。 默认值为 1。 可用类型为 StructTypeArrayTypeStringTypeLongTypeDoubleTypeBooleanTypeTimestampTypeNullType,除非提供架构。
  • excludeAttribute:是否排除元素中的属性。 默认值为 false。
  • nullValue:视为 null 值的值。 默认为 ""
  • mode:用于处理损坏记录的模式。 默认为 PERMISSIVE
  • PERMISSIVE:
    • 遇到损坏的记录时,会将所有字段设置为 null,并将格式错误的字符串放入 columnNameOfCorruptRecord 配置的新字段中。
    • 遇到数据类型错误的字段时,会将有问题的字段设置为 null
    • DROPMALFORMED:忽略损坏的记录。
    • FAILFAST:检测到损坏的记录时引发异常。
    • inferSchema:如果为 true,则尝试为生成的每个 DataFrame 列推理一个相应的类型,例如布尔、数字或日期类型。 如果为 false,则生成的所有列均为字符串类型。 默认为 true
    • columnNameOfCorruptRecord:存储格式错误的字符串的新字段的名称。 默认为 _corrupt_record
    • attributePrefix:属性的前缀,用于区分属性和元素。 这是字段名称的前缀。 默认为 _
    • valueTag:没有子元素的元素中有属性时用于值的标记。 默认为 _VALUE
    • charset:默认为 UTF-8,但可设置为其他有效字符集名称。
    • ignoreSurroundingSpaces:是否应跳过值周围的空格。 默认值为 false。
    • rowValidationXSDPath:XSD 文件的路径,用于验证每行的 XML。 未能通过验证的行视为上述解析错误。 XSD 不会以其他方式影响提供或推理的架构。 如果群集中的执行程序上未显示上述本地路径,则应使用 SparkContext.addFile 将 XSD 及其依赖的其他任何路径添加到 Spark 执行程序。 在这种情况下,若要使用本地 XSD /foo/bar.xsd,请调用 addFile("/foo/bar.xsd") 并将 "bar.xsd" 作为 rowValidationXSDPath 传递。
    • path:写入文件的位置。
    • rowTag:视为行的行标记。 例如,在此 XML <books><book><book>...</books> 中,该值为 book。 默认为 ROW
    • rootTag:视为根的根标记。 例如,在此 XML <books><book><book>...</books> 中,该值为 books。 默认为 ROWS
    • nullValue:写入 null 值的值。 默认值为字符串 "null"。 当为 "null" 时,它不会为字段写入属性和元素。
    • attributePrefix:属性的前缀,用于区分属性和元素。 这是字段名称的前缀。 默认为 _
    • valueTag:没有子元素的元素中有属性时用于值的标记。 默认为 _VALUE
    • compression:保存到文件时使用的压缩编解码器。 应是实现 org.apache.hadoop.io.compress.CompressionCodec 的类的完全限定名称,或是不区分大小写的短名称之一(bzip2gziplz4snappy)。 默认为不压缩。
    • 支持使用短名称;可以使用 xml 代替 com.databricks.spark.xml

      XSD 支持

      可以使用 rowValidationXSDPath 针对 XSD 架构验证各行。

      可以使用实用工具 com.databricks.spark.xml.util.XSDToSchema 从某些 XSD 文件中提取 Spark DataFrame 架构。 它仅支持简单类型、复杂类型和序列类型,仅支持基本 XSD 功能,且处于试验阶段。

      import com.databricks.spark.xml.util.XSDToSchema
      import java.nio.file.Paths
      val schema = XSDToSchema.read(Paths.get("/path/to/your.xsd"))
      val df = spark.read.schema(schema)....xml(...)
      

      分析嵌套 XML

      尽管 from_xml 方法主要用于将 XML 文件转换为 DataFrame,但也可以使用它在现有的 DataFrame 中解析字符串值列中的 XML,然后将其添加为新列,并将解析结果作为结构:

      import com.databricks.spark.xml.functions.from_xml
      import com.databricks.spark.xml.schema_of_xml
      import spark.implicits._
      val df = ... /// DataFrame with XML in column 'payload'
      val payloadSchema = schema_of_xml(df.select("payload").as[String])
      val parsed = df.withColumn("parsed", from_xml($"payload", payloadSchema))
      
    • mode:
      • 如果设置为默认值 PERMISSIVE,则解析模式默认为 DROPMALFORMED。 如果在 from_xml 的架构中加入与 columnNameOfCorruptRecord 匹配的列,则 PERMISSIVE 模式会将格式错误的记录输出到生成的结构中的该列。
      • 如果设置为 DROPMALFORMED,则未正确解析的 XML 值将为列生成 null 值。 不删除任何行。
      • from_xml 将包含 XML 的字符串数组转换为已解析结构的数组。 请改用 schema_of_xml_array
      • from_xml_string 是在 UDF 中使用的替代方法,可直接对字符串(而不是列)进行操作。
      • 由于 DataFrame 和 XML 之间存在结构差异,因此对于从 XML 数据转换为 DataFrame 以及从 DataFrame 转换为 XML 数据来说,有一些转换规则。 可以使用选项 excludeAttribute 禁止处理某些属性。

        将 XML 转换为 DataFrame

      • 属性:属性会转换为具有 attributePrefix 选项中指定的前缀的字段。 如果 attributePrefix_,则以下文档

        <one myOneAttrib="AAAA">
            <two>two</two>
            <three>three</three>
        

        将生成以下架构:

        |-- _myOneAttrib: string (nullable = true) |-- two: string (nullable = true) |-- three: string (nullable = true)
      • 如果元素有属性但没有子元素,则属性值将位于 valueTag 选项中指定的独立字段。 如果 valueTag_VALUE,则以下文档

        <two myTwoAttrib="BBBBB">two</two> <three>three</three>

        将生成以下架构:

        |-- two: struct (nullable = true) | |-- _VALUE: string (nullable = true) | |-- _myTwoAttrib: string (nullable = true) |-- three: string (nullable = true)

        将 DataFrame 转换为 XML

        从具有 ArrayType 字段且其元素为 ArrayType 的 DataFrame 写入 XML 文件时,将为该元素提供额外的嵌套字段。 读写 XML 数据时不会发生这种情况,但写入从其他源读取的 DataFrame 时会。 因此,读写和写读 XML 文件具有同一结构,但写入从其他源读取的 DataFrame 可能具有另一结构。

        如果 DataFrame 具有以下架构:

         |-- a: array (nullable = true)
         |    |-- element: array (containsNull = true)
         |    |    |-- element: string (containsNull = true)
        

        和以下数据:

        +------------------------------------+
        |                                   a|
        +------------------------------------+
        |[WrappedArray(aa), WrappedArray(bb)]|
        +------------------------------------+
        

        则将生成以下 XML 文件:

        <item>aa</item> <item>bb</item>
  •