如何为spark read语句设计查找lowerBound和upperBound的方法来分割输入的数据?

0 人关注

为了在spark中加载和分割传入的数据,我使用了以下语法。

val dataframe = spark.read.format("jdbc")
          .option("url", url)
          .option("driver", driver)
          .option("user", user)
          .option("password", password)
          .option("dbtable", query)
          .option("partitionColumn", partitionColumn)
          .option("lowerBound", lowerBound_value)
          .option("upperBound", upperBound_value)
          .option("numPartitions", numPartitions)
          .option("fetchsize", 15000)
          .load()

参数partitionColumn,lowerBound,upperBound,numPartitions 是用来优化工作的性能的。

我有一个有1000条记录的表和一个有1到1000的序列号的整数列。 我首先在该列上运行minmax ,然后将min 的值分配给lowerBound ,将max 的值分配给upperBound 。numPartitions参数给定为3,以便将传入的数据平均分成3个不同的分区(或接近平均)。

当数据较少时,上述设计工作良好。但我有一个如下的情况。

我有一个2030亿条记录的表,没有包含唯一/序列整数的整数列。然后有一个日期列,它的数据分布在5年内,即2016-2021年。 为了更快地移动数据,我每次都要移动每年的一个月的数据。 这是我正在使用的查询。

val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"

所以上面的查询变成了。 select * from table where date_column >= '2016-01-01' and date_time <= '2016-01-31 23:59:59.999'' 以此类推,每年每个月的第一天和最后一天。

这是对我的循环方式的粗略描述。

(2016 to 2021) { year =>
   (1 to 12) { month =>
           val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
           val dataframe = spark.read.format("jdbc")
              .option("url", url)
              .option("driver", driver)
              .option("user", user)
              .option("password", password)
              .option("dbtable", query)
              .option("partitionColumn", partitionColumn)
              .option("lowerBound", lowerBound_value)
              .option("upperBound", upperBound_value)
              .option("numPartitions", numPartitions)
              .option("fetchsize", 15000)
              .load()

为了找出界限,我使用了与下面相同的月份和年份的过滤器。

val bounds = spark.read.format("jdbc")
          .option("url", url)
          .option("driver", driver)
          .option("user", user)
          .option("password", password)
          .option("dbtable", "(select min(partitionColumn) as mn, max(partitionColum) as from tablename where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as boundsDF")
          .load()
val lowerBound_value = bounds.select("mn").head.getInt(0)
val upperBound_value = bounds.select("mx").head.getInt(0)

问题出在寻找过滤后的数据的下限和上限。 由于数据量很大,在partitionColumn上用给定的过滤器运行min & max的查询比把实际的数据帧写进hdfs要花更多的时间。

我试着在那里给出随机值,但在任务运行时观察到分区中的数据偏移。

是否必须给分区Column的最小和最大值作为下限和上限,以获得更好的数据分布? 如果不是,有什么方法可以指定下限和上限,而不是在数据上运行一个最小和最大的查询?

非常感谢任何帮助。

apache-spark
apache-spark-sql
Metadata
Metadata
发布于 2021-06-10
1 个回答
xenodevil
xenodevil
发布于 2021-06-14
0 人赞同

面对2000多亿行的数据,我希望你的表在你的数据库中是在你访问数据的同一日期列上分区的。没有这一点,查询将是相当无望的。

但是你有没有试过将日期/时间戳值的下限和上限进行整数等价?看看 这个参考资料 ,了解Spark将整数值转换为时间戳的方法。

JDBC选项lowerBound和upperBound被转换为 TimestampType/DateType值,其方式与将字符串转换为 TimestampType/DateType值。该转换是基于Proleptic 公历,以及由SQL配置定义的时区。 spark.sql.session.timeZone。在Spark 2.4及以下版本中,转换是基于混合历法的。 转换是基于混合历法(Julian + Gregorian)和 默认的系统时区。

正如你提到的,这里没有预先存在的整数列可以使用。所以在你的循环中,上界和下界是静态的,因此可以转换为静态的上界和下界数值。基于Spark的内部结构,上下限值被划分为数字范围,并 向DB抛出多个查询 ,以获取每个查询的单一分区的数据。这也意味着在相关的列上进行表的分区或在源数据库中有适当的索引对性能来说是非常重要的。

你将需要确保上下限的占位符在你提供的查询中被适当地放置。作为提醒,实际的数值可能会根据使用的数据库系统而有所不同。如果出现这种情况,即数据库系统对日期的整数转换是不同的,那么你将需要提供数据库所接受的值,而不是Spark。来自 同一个文档

connectionFactory - 一个返回开放连接的工厂。RDD负责关闭连接。 sql - 查询的文本。该查询必须包含两个用于划分结果的参数的占位符。例如
select title, author from books where ? <= id and id <= ?
lowerBound - the minimum value of the first placeholder