Collectives™ on Stack Overflow
Find centralized, trusted content and collaborate around the technologies you use most.
Learn more about Collectives
Teams
Q&A for work
Connect and share knowledge within a single location that is structured and easy to search.
Learn more about Teams
Ask Question
To load and partition the incoming data in spark, I am using the following syntax.
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
The parameters partitionColumn
, lowerBound
, upperBound
, numPartitions
are used to optimise the performance of the job.
I have a table of 1000 records & an integer column that has serial numbers from 1 to 1000.
I am first running min
and max
on that column to assign min
value to lowerBound
and max
value to upperBound
. The numPartitions parameter is given as 3 so that the incoming data is split into 3 different partitions evenly (or close to being even).
The above design works well when there is less data. But I have a scenario as below.
I have a table of 203 billion records with no integer column that contain unique/serial integers. Then there is a date column that has data spread across 5 years namely 2016-2021.
In order to move the data faster, I am moving a month's data of each year everytime.
This is the query I am using:
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
So the above query becomes:
select * from table where date_column >= '2016-01-01' and date_time <= '2016-01-31 23:59:59.999''
and so on with first and last day of each month for every year.
This is a rough description of how my loop is:
(2016 to 2021) { year =>
(1 to 12) { month =>
val query = s"(select * from table where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as datadf"
val dataframe = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", query)
.option("partitionColumn", partitionColumn)
.option("lowerBound", lowerBound_value)
.option("upperBound", upperBound_value)
.option("numPartitions", numPartitions)
.option("fetchsize", 15000)
.load()
To find out bounds, I am using the same filters of month and year as below:
val bounds = spark.read.format("jdbc")
.option("url", url)
.option("driver", driver)
.option("user", user)
.option("password", password)
.option("dbtable", "(select min(partitionColumn) as mn, max(partitionColum) as from tablename where date_column >= '${YearMonth.of(year.toInt, month).atDay(1).toString} and date_time <= '${YearMonth.of(year.toInt, month).atEndOfMonth().toString} 23:59:59.999') as boundsDF")
.load()
val lowerBound_value = bounds.select("mn").head.getInt(0)
val upperBound_value = bounds.select("mx").head.getInt(0)
The issue comes here with finding the lower and upper bounds of the filtered data.
Because of the huge data size, the query that runs min & max on the partitionColumn with the given filters is taking way more time than writing the actual dataframe into hdfs.
I tried giving random values there but observed data skew in the partitions while the tasks are running.
It is mandatory to give min and max of then partitionColumn as lower and upper bounds for better data distribution ?
If not, is there any way to specify lower and upper bounds instead of running a min & max query on data ?
Any help is much appreciated.
With 200+ billion rows, I do hope your table is partitioned in your DB on the same date column on which you are accessing the data. Without that, queries will by quite hopeless.
But have you tried integer equivalent of date/timestamp values in lower and upper bounds? Check this reference for Spark's conversion of integer values to timestamps.
The JDBC options lowerBound and upperBound are converted to
TimestampType/DateType values in the same way as casting strings to
TimestampType/DateType values. The conversion is based on Proleptic
Gregorian calendar, and time zone defined by the SQL config
spark.sql.session.timeZone. In Spark version 2.4 and below, the
conversion is based on the hybrid calendar (Julian + Gregorian) and on
default system time zone.
As you mentioned, there is no pre-existing integer column which may be used here. So with your loops, the upper and lower bounds are static and hence convertible to static upper and lower numeric values. Based upon Spark's internals, lower and upper bound values are divided into numeric ranges and multiple queries are thrown to DB to fetch a single partition's data per query. This also means that table partitioning on relevant column or having appropriate indices in source DB is really significant for performance.
You will need to ensure that the placeholders for upper and lower bounds are appropriately placed in your provided query. As a heads up; the actual numeric values may vary depending on the database system in use. If that scenario pops up, i.e. database system's integer conversion to date is different, then you will need to provide the values accepted by database rather than Spark. From same docs:
Parameters:
connectionFactory - a factory that returns an open Connection. The RDD takes care of closing the connection.
sql - the text of the query. The query must contain two ? placeholders for parameters used to partition the results. For
example,
select title, author from books where ? <= id and id <= ?
lowerBound - the minimum value of the first placeholder
upperBound - the maximum value of the second placeholder The lower and upper bounds are inclusive.
From same, it is also clear that <= and >= are utilized so both upper and lower bounds are inclusive; a point of confusion I observed on other questions.
Thanks for contributing an answer to Stack Overflow!
- Please be sure to answer the question. Provide details and share your research!
But avoid …
- Asking for help, clarification, or responding to other answers.
- Making statements based on opinion; back them up with references or personal experience.
To learn more, see our tips on writing great answers.