【Spark】Spark 并行查询 Greenplum

本文结构如下:

  • Spark SQL 几个属性介绍
  • Spark 并行查询
  • Spark 支持通过 JDBC 连接关系型数据库,连接方式如下:

    // Note: JDBC loading and saving can be achieved via either the load/save or jdbc methods
    // Loading data from a JDBC source
    val jdbcDF = spark.read
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .load()
    val connectionProperties = new Properties()
    connectionProperties.put("user", "username")
    connectionProperties.put("password", "password")
    val jdbcDF2 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying the custom data types of the read schema
    connectionProperties.put("customSchema", "id DECIMAL(38, 0), name STRING")
    val jdbcDF3 = spark.read
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Saving data to a JDBC source
    jdbcDF.write
      .format("jdbc")
      .option("url", "jdbc:postgresql:dbserver")
      .option("dbtable", "schema.tablename")
      .option("user", "username")
      .option("password", "password")
      .save()
    jdbcDF2.write
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    // Specifying create table column data types on write
    jdbcDF.write
      .option("createTableColumnTypes", "name CHAR(64), comments VARCHAR(1024)")
      .jdbc("jdbc:postgresql:dbserver", "schema.tablename", connectionProperties)
    

    Spark 通过 JDBC 读取关系型数据库,默认查全表,只有一个 Task 去执行查询操作,大量数据情况下,效率是很慢的。

    这时,可以通过构造多个 Task 并行连接 Greenplum 提升效率。

    二、Spark SQL 几个属性介绍

    如何构造多个 Task 来提升效率呢?首先想到的应该是 Spark SQL 本身的支持。

    查看官网 Spark SQL 资料,定位到 JDBC To Other Databases:

    在属性列表中找到了解决方法:

    Property Name Meaning dbtable The JDBC table that should be read. Note that anything that is valid in a FROM clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses driver The class name of the JDBC driver to use to connect to this URL partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing

    更加详细参数参考:JDBC To Other Databases

    主要解释三个参数:

  • dbtable:表名,可以是真实存在的关系表,也可以是通过查询语句 AS 出来的表。其实只要是在 SQL 语句里,FROM 后面能跟的语句用在 dbtable 属性都合法,其原理就是拼接 SQL 语句,dbtable 会填在 FROM 后面。
  • numPartitions:读、写的最大分区数,也决定了开启数据库连接的数目。使用 numPartitions 有一点点限制, 如果指定了 numPartitions 大于1的值,但是没有指定分区规则,仍只有一个 task 去执行查询。
  • partitionColumn, lowerBound, upperBound:指定读数据时的分区规则。要使用这三个参数,必须定义 numPartitions,而且这三个参数不能单独出现,要用就必须全部指定。而且 lowerBound, upperBound 不是过滤条件,只是用于决定分区跨度。在分区的时候,会根据 numPartitions 将 lowerBound 和 upperBound 拆分成,然后并行去执行查询。
  • 三、Spark 并行查询

    在知悉了上述信息之后,就可以通过增加 Task 数量来提升访问关系型数据的效率,大致有以下两种方法:

    3.1、第一种:numPartitions,partitionColumn, lowerBound

    val spark = SparkSession
      .builder()
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .appName("load data from gp test")
      .getOrCreate()
    // 开始时间
    val startTime = System.currentTimeMillis()
    val gpRDF = spark.read
      .format("jdbc")
      .option("driver", "com.pivotal.jdbc.GreenplumDriver")
      .option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb")
      .option("partitionColumn", "person_id")
      .option("lowerBound", lowerBound)
      .option("upperBound", upperBound)
      .option("numPartitions", numPartitions)
      .option("dbtable", "public.t_timing_face_person")
      .option("user", "gpadmin")
      .option("password", "gpadmin")
      .load()
    

    3.2、第二种:dbtable

    不用 numPartitions,partitionColumn, lowerBound, upperBound,可以通过 dbtable 构造子查询,并行执行多个查询得到多个结果 RDD,最后通过 reduce 合并成一个 RDD。

    val stride = Math.ceil(dataNums / numPartitions).toInt
    val spark = SparkSession
      .builder()
      .config("spark.sql.warehouse.dir", warehouseLocation)
      .appName("load data from gp")
      .getOrCreate()
    // 创建 numPartitions 个 task
    val registerDF = Range(0, numPartitions)
      .map(index => {
        spark
          .read
          .format("jdbc")
          .option("driver", "com.pivotal.jdbc.GreenplumDriver")
          .option("url", "jdbc:pivotal:greenplum://192.168.11.72:5432;DatabaseName=testdb")
          .option("dbtable", s"(SELECT feature FROM public.t_timing_face_person WHERE person_id > ${stride * index} AND person_id <= ${stride * (index + 1)}) AS t_tmp_${index}")
          .option("user", "gpadmin")
          .option("password", "gpadmin")
          .load()
      .reduce((rdd1, rdd2) => rdd1.union(rdd2))