如何加快数据的读取过程
利用SparkSQL读取数据库数据的时候,如果数据量很大,那么在读取数据的时候就会花费大量的时间,因此,怎么让数据并行读取加快读取数据的速度呢?

在SparkSQL中,读取数据的时候可以分块读取。例如下面这样,指定了partitionColumn,lowerBound,upperBound,numPartitions等读取数据的参数。

关于这四个参数的意思,SparkSQL官方解释是:

Property Name Meaning
partitionColumn, lowerBound, upperBound These options must all be specified if any of them is specified. In addition, numPartitions must be specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned. This option applies only to reading.
numPartitions The maximum number of partitions that can be used for parallelism in table reading and writing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we decrease it to this limit by calling coalesce(numPartitions) before writing.
public static Dataset<Row> sparkLoad(SparkSession spark, String url, String fullTable, 
            String partitionColumn, long lowerBound, long upperBound, int numPartitions) {
        DataFrameReader reader = spark.read().format("jdbc").option("url", url)
                .option("dbtable", fullTable).option("user", "postgres")
                .option("driver","org.postgresql.Driver")
                .option("password", "webgis327");
        if(partitionColumn != null){
            reader = reader.option("partitionColumn", partitionColumn)
                          .option("lowerBound", lowerBound)
                          .option("upperBound", upperBound)
                          .option("numPartitions", numPartitions);
        return reader.load();

从上面的解释来看,分区列得是数字类型;所谓的并行读取其实就是开了多个数据库连接,分块读取的。另外需要注意的是:

Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.

也就是说,这些参数的设置不会过滤数据,所以sql中读取了多少数据,那么返回的就是多少条数据,lowerBound和upperBound并不会过滤数据。那么如果说设置的lowerBound偏大(可能读取的数据中分区列的值比这个小),或者设置的upperBound数值设置的大小偏小(可能读取的数据中分区列中最大的值比upperBound大),这个时候数据是怎么读取和返回的呢?

举个例子:

如果一个表分10个分区读取,id为分区列,其值从0-101,但是设置的lowerBound是1,upperBound是100,那么读取的规则如下:
第一个分区:select * from tablename where id<=10;
第二个分区:select * from tablename where id >=10 and id<20;
第三个分区:select * from tablename where id >=20 and id <30;
……
第十个分区:select * from tablename where id >=90;

这样查询不会遗漏结果,但是至于上面的边界条件,等于号是在左边还是在右边这个还不太清楚,笔者也没有研究。其实这个如果要研究的话,也很简单,用一个测试表分区后,分别读取每个分区的数据条数并打印输出,就可以知道了。
思路如下:
读取每个partition的数据数量
但是笔者觉得没有研究的必要,如果对项目没有任何bug上的影响,就不需要研究这么细节,对技术没有什么帮助。

如何加快数据的读取过程 利用SparkSQL读取数据库数据的时候,如果数据量很大,那么在读取数据的时候就会花费大量的时间,因此,怎么让数据并行读取加快读取数据的速度呢?在SparkSQL中,读取数据的时候可以分块读取。例如下面这样,指定了partitionColumn,lowerBound,upperBound,numPartitions等读取数据的参数。关于这四个参数的意思,Spark
如何理解SparkSQL中的partitionColumn, lowerBound, upperBound, numPartitions 在SparkSQL中,读取数据的时候可以分块读取。例如下面这样,指定了partitionColumnlowerBoundupperBoundnumPartitions等读取数据的参数。简单来说,就是并行读取。 关于这四个参数的意思,SparkSQL官方解释是:
ROW_NUMBER() OVER(PARTITION BY COLUMN ORDER BY COLUMN DESC)函数的作用是指定COLUMN(列)进行分区,在分区内指定COLUMN(列)进行排序,其中PARTITION BY COLUMN 为分区函数。代码具体实现如下 不进行分区的排序方式 --对商品价格的升序排序 SELECT A.*, ROW_NUMBER() OVE...
新博客文章地址下文以 mysql 为例在spark中使用jdbc在 spark-env.sh 文件中加入:export SPARK_CLASSPATH=/path/mysql-connector-java-5.1.34.jar 任务提交时加入:--jars /path/mysql-connector-java-5.1.34.jar 1. 单partition(无并发)调用函数def jdbc(url
前几天复习二分然后百度了下lower_bound()和upper_bound()函数,现在将我的理解说下,先贴出我写的代码! #include using namespace std; int lower_bound(int *a, int n, int value) //二分查找第一个大于或等于value的下标key int l = 0; int r = n - 1; while (
以前用这两个函数的时候,简单看了几句别人的博客,记住了大概,用的时候每用一次就弄混一次,相当难受,今天对照着这两个函数的源码和自己的尝试发现:其实这两个函数只能用于 “升序” 序列。 为什么还要加个引号呢?因为比较规则可以自定义,如果你非要把比较规则定义成 5 比 3 小,那么 降序序列也是可以用的,否则不可以,直接看下例子(这是升序序列的 ): #include&lt;bits/stdc+...
jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)[source] Construct a DataFrame representing the database table named table ac...
一步步学spark之一scala高级特性中Lower bounds(下界)与Upper bounds(上界) Upper bounds(上界):我们使用一个对象或者一个类必须是什么类型的,也就是说必须是什么类型或者什么类型的子类。 Lower bounds(下界):我们使用一个对象或者一个类必须是什么类型的父类,也可以是接口。 Upper bounds 语法
转载自:http://blog.csdn.net/lw_ghy/article/details/51419877 有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能会比期望差很多。数据倾斜调优,就是使用各种技术方案解决不同类型的数据倾斜问题,以保证Spark作业的性能。 数据倾斜发生时的现象   1、绝大多数task执行得都非常快,但个别ta
在测试从Spark Shell读取MySQL一张Large Table时,发生了Out of memory和connection timeout问题,记录一下处理的过程: MySQL Table资料笔数:1400万笔左右 Spark Cluster配置:Master * 1,Slave * 3,皆为1 core 8G  Spark版本:2.1.1 星火配置配置: spark-en
    STL中的每个算法都非常精妙,接下来的几天我想集中学习一下STL中的算法。   ForwardIter lower_bound(ForwardIter first, ForwardIter last,const _Tp&amp; val)算法返回一个非递减序列[first, last)中的第一个大于等于值val的位置。      ForwardIter upper_bound(Forw...