为了在spark中加载和分割传入的数据,我使用了以下语法。
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
参数partitionColumn
,lowerBound
,upperBound
,numPartitions
是用来优化工作的性能的。
我有一个有1000条记录的表和一个有1到1000的序列号的整数列。
我首先在该列上运行min
和max
,然后将min
的值分配给lowerBound
,将max
的值分配给upperBound
。numPartitions参数给定为3,以便将传入的数据平均分成3个不同的分区(或接近平均)。
当数据较少时,上述设计工作良好。但我有一个如下的情况。
我有一个2030亿条记录的表,没有包含唯一/序列整数的整数列。然后有一个日期列,它的数据分布在5年内,即2016-2021年。
为了更快地移动数据,我每次都要移动每年的一个月的数据。
这是我正在使用的查询。
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
所以上面的查询变成了。
select * from table where date_column >= '2016-01-01' and date_time <= '2016-01-31 23:59:59.999''
以此类推,每年每个月的第一天和最后一天。
这是对我的循环方式的粗略描述。
(2016 to 2021) { year =>
(1 to 12) { month =>
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
为了找出界限,我使用了与下面相同的月份和年份的过滤器。
val bounds = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", "(select min(partitionColumn) as mn, max(partitionColum) as from tablename where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as boundsDF")
.load()
val lowerBound_value = bounds.select("mn").head.getInt(0)
val upperBound_value = bounds.select("mx").head.getInt(0)
问题出在寻找过滤后的数据的下限和上限。
由于数据量很大,在partitionColumn上用给定的过滤器运行min & max的查询比把实际的数据帧写进hdfs要花更多的时间。
我试着在那里给出随机值,但在任务运行时观察到分区中的数据偏移。
是否必须给分区Column的最小和最大值作为下限和上限,以获得更好的数据分布?
如果不是,有什么方法可以指定下限和上限,而不是在数据上运行一个最小和最大的查询?
非常感谢任何帮助。