import java.util.Properties
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
* 连接数据库工具类
object ConnOracleUtile {
* 读取Oracle数据库
* @param spark
* @param dbtable
* @return
def readPMSGSOracleData(spark:SparkSession,dbtable:String)={
val data: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:oracle:thin:@//10.212.242.XXX:11521/pmsgs")
.option("dbtable", dbtable)
.option("user", "qyw")
.option("password", "XXX")
.load()
* 读取mysql数据库
* @param spark
* @param dbtable
* @return
def readUSAPPData(spark:SparkSession,dbtable:String)={
val data: DataFrame = spark.read.format("jdbc")
.option("url", "jdbc:mysql://21.76.120.XX:3306/us_app")
.option("dbtable", dbtable)
.option("user", "root")
.option("password", "XXX")
.load()
* 将数据写入到Mysql数据库(追加写入)
* @param result
* @param dbtable
def writeData(result:DataFrame,dbtable:String): Unit ={
result.write.mode(SaveMode.Append).format("jdbc")
.option(JDBCOptions.JDBC_URL,"jdbc:mysql://21.76.120.XX:3306/us_app?rewriteBatchedStatement=true")
.option("user","root")
.option("password","XXX")
.option(JDBCOptions.JDBC_TABLE_NAME,dbtable)
.option(JDBCOptions.JDBC_TXN_ISOLATION_LEVEL,"NONE") //不开启事务
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE,500) //设置批量插入
.save()
接口trait:
package com.zhbr.`trait`
import org.apache.spark.SparkContext
import org.apache.spark.sql.sparkSession
trait ProcessDataSC {
* 处理数据,实现不同的etl操作
* @param spark
def process(sc: SparkContext,spark:SparkSession)
mianClass:
package com.zhbr.mainClass
import com.zhbr.process._
import org.apache.spark.sql.SparkSession
object DataProcessMain {
def main(args: Array[String]): Unit = {
//获取sparkSession
val sparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.filter(!_.equals('$'))).getOrCreate()
//获取sparkContext
val sparkContext = sparkSession.sparkContext
//设置日志级别
sparkContext.setLogLevel("WARN")
//计算程序
DLZZ.process(sparkContext,sparkSession)
//释放资源
sparkContext.stop()
sparkSession.stop()
计算程序:
package com.zhbr.process.run_date
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import com.zhbr.`trait`.{ProcessDataSC}
import com.zhbr.utils.{ConnOracleUtile}
import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql._
1. 运行数据
object DLZZ extends ProcessDataSC{
* 处理数据,实现不同的etl操作
* @param spark
override def process(sc:SparkContext,spark:SparkSession): Unit = {
//读取增量数据的条件
val tableName = "(select * from edc_exchange.T_EDC_TRAN_CURRENT_LINE where DATA_DATE=trunc(sysdate)-1) tableTemp"
//读取电流日曲线表T_EDC_TRAN_CURRENT_LINE,获取数据
val DLRQX_Data: DataFrame = ConnOracleUtile.readPMSGSOracleData(spark,tableName)
//读取数据(终端停复电台区表)
val ZDTFDTQData: DataFrame = ConnOracleUtile.readUSAPPData(spark,"pdwqy_qxzh_zdtfd_tg")
//分别创建临时表
DLRQX_Data.registerTempTable("DLRQX_Table")
ZDTFDTQData.registerTempTable("ZDTFDTQData_table")
//计算过程(示意)
val DLZZData = spark.sql("select dl.*,zd.* from DLRQX_Table dl join ZDTFDTQData_table zd on dl.ssds = zd.ssdsid")
//将最后的数据保存到表中(电流转置表)
DLZZData.write.mode(SaveMode.Append).format("jdbc")
.option(JDBCOptions.JDBC_URL,"jdbc:mysql://21.76.120.XX:3306/us_app?rewriteBatchedStatement=true")
.option("user","root")
.option("password","XXX")
.option(JDBCOptions.JDBC_TABLE_NAME,"tempfinal_pdwqy_qxzh_dlzz")
.option(JDBCOptions.JDBC_TXN_ISOLATION_LEVEL,"NONE") //不开启事务
.option(JDBCOptions.JDBC_BATCH_INSERT_SIZE,500) //设置批量插入
.save()
以上计算步骤关键点:(以时间为参考做增量更新,每天凌晨抽取计算前一天的数据)
val tableName = "(select * from edc_exchange.T_EDC_TRAN_CURRENT_LINE where DATA_DATE=trunc(sysdate)-1) tableTemp"
- 查询增量数据的SQL需要重新命名,否则报错。如代码中的:
val tableName = "(select * from edc_exchange.T_EDC_TRAN_CURRENT_LINE where DATA_DATE=trunc(sysdate)-1) tableTemp"
- 增量更新的写入操作中,SaveMode必须为Append追加。
SparkSQL的全量更新非常简单,其实只需要在写入环节将SaveMode的模式修改为Overwrite即可。
但是这样也存在问题:
即OverWrite模式会先将原来的表删掉,然后spark根据字段推测各字段类型,重写生成新表。但新表的字段类型往往与原表有较大出入,如果是对字段的类型有着比较严格的要求,那最好不要使用OverWrite模式。
如本人公司项目原本的目标表字段类型:

如粗暴地使用OverWrite做全量更新,数据的字段类型会变为:

因此,如需要严格保持原表的字段类型还需要使用Append模式:
然后在程序执行前,执行一条删除原表数据的SQL语句:
truncate table 表名;
这样,现将原表数据删除,在追加新的数据,也可以完成全量更新。
在sparksql中,保存数据到数据,只有Append,Overwrite,ErrorIfExists,Ignore四种模式,不满足项目需求,此处大概说一下我们需求,当业务库有数据发生变化,需要更新、插入、删除数仓中ods层的数据,因此需要改造源码。......
在前几年,整体移动网络环境相比现在差很多,加之流量费用又相对较高,因此每当我们发布新版本的时候,一些用户升级并不是很积极,这就造成了新版本的升级率并不高。而google为了解决了这个问题,提出了SmartAppUpdate,即增量更新(也叫做差分升级)。尽管现在网络环境有了很大的提升,但一个不争的事实就是应用越做越大,因此,增量更新在目前的仍然是一种解决APP更新包过大的有效方案。今天,我们就来聊聊增量更新。增量更新的关键在于如何理解增量一词。来想想平时我们的开发过程,往往都是今天在昨天的基础上修改一些代码,app的更新也是类似的:往往都是在旧版本的app上进行修改。这样看来,增量更新就是原有
全量增量用于数据采集的差异:
全量抽取简单,但是数据量大;增量抽取,相对复杂,要求对数据差异准确性高,对业务系统的性能不能有太大压力。
增量与全量用于数据同步的差异:
全量,就是每天定时(一般是夜里,避开业务高峰期)或者周期性全量把数据从一个地方拷贝到另外一个地方;可以采用直接用新数据全部覆盖旧数据的方式;或者覆盖前判断下如果新旧不一致就更新,如果不一致则不更新;这里面有一个隐藏的问题:如果采用异步写,数据源物理删除了,怎么直接通过全量数据同步?这就需要借助一些中间操作日志文件,
Spark Streaming
在流式计算模型中,永远都拿不到全量数据去计算,因为输出是持续的,在时间上也可以认为是无界的.同时计算结果持续输出即计算结果在时间上也是无界的.流失计算对实时性要求较高,一般先定义目标计算,然后数据到来之后将计算逻辑应用于数据,为了提高计算效果,往往尽可能采用增量计算代替全量计算.
而在批处理模型中,先要有全量数据集,然后定义计算逻辑,并将计算应用于全量数据,计...
Spark内部针对DataSource表的查询做了缓存优化,使得在同一任务中多次访问同一张DataSource表场景下可以跳过重复的获取表meta数据过程,以提升表读取性能。缓存的内容是表名和其对应的LogicalRelation。
缓存机制:
SQL语法解析后进行Analyzer的过程,因为我们关注表的缓存机制,所以只看表分析中的一个关键Rule:ResolveRelations。Analyzer 对Parsed Logical Plan进行遍历,发现UnresolvedRelation后,就对它启动合规
我在 hive 创建了一个 sql 对应的数据表,用于模拟增量抽取,其中存放了 2333 条数据;对数据进行增量抽取。利用左连接的特性:收集左表的全部数据,以及右表符合连接条件的数据,也就是。懂动态分区以及SQL左连接,不会可以看这两篇文章,因为之前那个版本写的比较捞,所以有了这篇文章。结果验证:直接上 hive,执行。进行抽取,原理很简单。值的数据,这样就找出了所有新。,其余不相等的会自动填充。我这里是通过左连接与。中的数据表,根据字段。,查看数量是否变多。............
有两个k-v格式的RDD需要union之后再进行reduceByKey操作(如:要将每日增量几十万的数据更新到全量几亿的数据)
优化方案:先将两个RDD公共部分提取出来,然后将公共部分先union再进行reduceByKey,最后将结果和前面没有交集的RDD相加(union)
具体做法:将较小的RDD1的key放入set并广播broadcast,然后将大的RDD2过滤filter出含该key较...
如果要按条件读表,按官网的解释可以用query参数,官网也说dbtable和query参数不能同时使用,但是我测试发现只用query会报错,说没有dbtable参数。
requirement failed: Option ‘dbtable’ is required
可以用dbtable参数查询,注意写法,要加一个表的别名。
```py
jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://1
Spark Streaming可以用于实时流项目的开发,实时流项目的数据源除了可以来源于日志、文件、网络端口等,常常也有这种需求,那就是实时分析处理MySQL中的增量数据。面对这种需求当然我们可以通过JDBC的方式定时查询Mysql,然后再对查询到的数据进行处理也能得到预期的结果,但是Mysql往往还有其他业务也在使用,这些业务往往比较重要,通过JDBC方式频繁查询会对Mysql造成大量无形的压力...