相关文章推荐
活泼的蚂蚁  ·  大数据量 ...·  2 年前    · 
另类的饼干  ·  Unit Testing ...·  2 年前    · 
import java.util.Properties import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} * 连接数据库工具类 object ConnOracleUtile { * 读取Oracle数据库 * @param spark * @param dbtable * @return def readPMSGSOracleData(spark:SparkSession,dbtable:String)={ val data: DataFrame = spark.read.format("jdbc") .option("url", "jdbc:oracle:thin:@//10.212.242.XXX:11521/pmsgs") .option("dbtable", dbtable) .option("user", "qyw") .option("password", "XXX") .load() * 读取mysql数据库 * @param spark * @param dbtable * @return def readUSAPPData(spark:SparkSession,dbtable:String)={ val data: DataFrame = spark.read.format("jdbc") .option("url", "jdbc:mysql://21.76.120.XX:3306/us_app") .option("dbtable", dbtable) .option("user", "root") .option("password", "XXX") .load() * 将数据写入到Mysql数据库(追加写入) * @param result * @param dbtable def writeData(result:DataFrame,dbtable:String): Unit ={ result.write.mode(SaveMode.Append).format("jdbc") .option(JDBCOptions.JDBC_URL,"jdbc:mysql://21.76.120.XX:3306/us_app?rewriteBatchedStatement=true") .option("user","root") .option("password","XXX") .option(JDBCOptions.JDBC_TABLE_NAME,dbtable) .option(JDBCOptions.JDBC_TXN_ISOLATION_LEVEL,"NONE") //不开启事务 .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE,500) //设置批量插入 .save()

接口trait:

package com.zhbr.`trait`
import org.apache.spark.SparkContext
import org.apache.spark.sql.sparkSession
trait ProcessDataSC {
    * 处理数据,实现不同的etl操作
    * @param spark
  def process(sc: SparkContext,spark:SparkSession)

mianClass:

package com.zhbr.mainClass
import com.zhbr.process._
import org.apache.spark.sql.SparkSession
object DataProcessMain {
  def main(args: Array[String]): Unit = {
    //获取sparkSession
    val sparkSession = SparkSession.builder().appName(this.getClass.getSimpleName.filter(!_.equals('$'))).getOrCreate()
    //获取sparkContext
    val sparkContext = sparkSession.sparkContext
    //设置日志级别
    sparkContext.setLogLevel("WARN")
	//计算程序
    DLZZ.process(sparkContext,sparkSession)
    //释放资源
    sparkContext.stop()
    sparkSession.stop()

计算程序:

package com.zhbr.process.run_date
import java.sql.Timestamp
import java.text.SimpleDateFormat
import java.util.{Calendar, Date}
import com.zhbr.`trait`.{ProcessDataSC}
import com.zhbr.utils.{ConnOracleUtile}
import org.apache.spark.SparkContext
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql._
 1. 运行数据
object DLZZ extends ProcessDataSC{
    * 处理数据,实现不同的etl操作
    * @param spark
  override def process(sc:SparkContext,spark:SparkSession): Unit = {
	//读取增量数据的条件
    val tableName = "(select * from edc_exchange.T_EDC_TRAN_CURRENT_LINE where DATA_DATE=trunc(sysdate)-1) tableTemp"
	//读取电流日曲线表T_EDC_TRAN_CURRENT_LINE,获取数据
    val DLRQX_Data: DataFrame = ConnOracleUtile.readPMSGSOracleData(spark,tableName)
	//读取数据(终端停复电台区表)
    val ZDTFDTQData: DataFrame = ConnOracleUtile.readUSAPPData(spark,"pdwqy_qxzh_zdtfd_tg")
    //分别创建临时表
    DLRQX_Data.registerTempTable("DLRQX_Table")
    ZDTFDTQData.registerTempTable("ZDTFDTQData_table")
   	//计算过程(示意)
   	val DLZZData = spark.sql("select dl.*,zd.* from DLRQX_Table dl join ZDTFDTQData_table zd on dl.ssds = zd.ssdsid")
    //将最后的数据保存到表中(电流转置表)
    DLZZData.write.mode(SaveMode.Append).format("jdbc")
      .option(JDBCOptions.JDBC_URL,"jdbc:mysql://21.76.120.XX:3306/us_app?rewriteBatchedStatement=true")
      .option("user","root")
      .option("password","XXX")
      .option(JDBCOptions.JDBC_TABLE_NAME,"tempfinal_pdwqy_qxzh_dlzz")
      .option(JDBCOptions.JDBC_TXN_ISOLATION_LEVEL,"NONE")    //不开启事务
      .option(JDBCOptions.JDBC_BATCH_INSERT_SIZE,500)   //设置批量插入
      .save()

以上计算步骤关键点:(以时间为参考做增量更新,每天凌晨抽取计算前一天的数据)

val tableName = "(select * from edc_exchange.T_EDC_TRAN_CURRENT_LINE where DATA_DATE=trunc(sysdate)-1) tableTemp"
  1. 查询增量数据的SQL需要重新命名,否则报错。如代码中的:
val tableName = "(select * from edc_exchange.T_EDC_TRAN_CURRENT_LINE where DATA_DATE=trunc(sysdate)-1) tableTemp"
  1. 增量更新的写入操作中,SaveMode必须为Append追加。

SparkSQL的全量更新

SparkSQL的全量更新非常简单,其实只需要在写入环节将SaveMode的模式修改为Overwrite即可。
但是这样也存在问题:

即OverWrite模式会先将原来的表删掉,然后spark根据字段推测各字段类型,重写生成新表。但新表的字段类型往往与原表有较大出入,如果是对字段的类型有着比较严格的要求,那最好不要使用OverWrite模式。

如本人公司项目原本的目标表字段类型:
在这里插入图片描述
如粗暴地使用OverWrite做全量更新,数据的字段类型会变为:
在这里插入图片描述
因此,如需要严格保持原表的字段类型还需要使用Append模式:

然后在程序执行前,执行一条删除原表数据的SQL语句:
truncate table 表名;
这样,现将原表数据删除,在追加新的数据,也可以完成全量更新。

sparksql中,保存数据到数据,只有Append,Overwrite,ErrorIfExists,Ignore四种模式,不满足项目需求,此处大概说一下我们需求,当业务库有数据发生变化,需要更新、插入、删除数仓中ods层的数据,因此需要改造源码。...... 在前几年,整体移动网络环境相比现在差很多,加之流量费用又相对较高,因此每当我们发布新版本的时候,一些用户升级并不是很积极,这就造成了新版本的升级率并不高。而google为了解决了这个问题,提出了SmartAppUpdate,即增量更新(也叫做差分升级)。尽管现在网络环境有了很大的提升,但一个不争的事实就是应用越做越大,因此,增量更新在目前的仍然是一种解决APP更新包过大的有效方案。今天,我们就来聊聊增量更新增量更新的关键在于如何理解增量一词。来想想平时我们的开发过程,往往都是今天在昨天的基础上修改一些代码,app的更新也是类似的:往往都是在旧版本的app上进行修改。这样看来,增量更新就是原有 全量增量用于数据采集的差异: 全量抽取简单,但是数据量大;增量抽取,相对复杂,要求对数据差异准确性高,对业务系统的性能不能有太大压力。 增量全量用于数据同步的差异: 全量,就是每天定时(一般是夜里,避开业务高峰期)或者周期性全量把数据从一个地方拷贝到另外一个地方;可以采用直接用新数据全部覆盖旧数据的方式;或者覆盖前判断下如果新旧不一致就更新,如果不一致则不更新;这里面有一个隐藏的问题:如果采用异步写,数据源物理删除了,怎么直接通过全量数据同步?这就需要借助一些中间操作日志文件, Spark Streaming 在流式计算模型中,永远都拿不到全量数据去计算,因为输出是持续的,在时间上也可以认为是无界的.同时计算结果持续输出即计算结果在时间上也是无界的.流失计算对实时性要求较高,一般先定义目标计算,然后数据到来之后将计算逻辑应用于数据,为了提高计算效果,往往尽可能采用增量计算代替全量计算. 而在批处理模型中,先要有全量数据集,然后定义计算逻辑,并将计算应用于全量数据,计... Spark内部针对DataSource表的查询做了缓存优化,使得在同一任务中多次访问同一张DataSource表场景下可以跳过重复的获取表meta数据过程,以提升表读取性能。缓存的内容是表名和其对应的LogicalRelation。 缓存机制: SQL语法解析后进行Analyzer的过程,因为我们关注表的缓存机制,所以只看表分析中的一个关键Rule:ResolveRelations。Analyzer 对Parsed Logical Plan进行遍历,发现UnresolvedRelation后,就对它启动合规 我在 hive 创建了一个 sql 对应的数据表,用于模拟增量抽取,其中存放了 2333 条数据;对数据进行增量抽取。利用左连接的特性:收集左表的全部数据,以及右表符合连接条件的数据,也就是。懂动态分区以及SQL左连接,不会可以看这两篇文章,因为之前那个版本写的比较捞,所以有了这篇文章。结果验证:直接上 hive,执行。进行抽取,原理很简单。值的数据,这样就找出了所有新。,其余不相等的会自动填充。我这里是通过左连接与。中的数据表,根据字段。,查看数量是否变多。............ 有两个k-v格式的RDD需要union之后再进行reduceByKey操作(如:要将每日增量几十万的数据更新全量几亿的数据) 优化方案:先将两个RDD公共部分提取出来,然后将公共部分先union再进行reduceByKey,最后将结果和前面没有交集的RDD相加(union) 具体做法:将较小的RDD1的key放入set并广播broadcast,然后将大的RDD2过滤filter出含该key较... 如果要按条件读表,按官网的解释可以用query参数,官网也说dbtable和query参数不能同时使用,但是我测试发现只用query会报错,说没有dbtable参数。 requirement failed: Option ‘dbtable’ is required 可以用dbtable参数查询,注意写法,要加一个表的别名。 ```py jdbcDF = spark.read .format("jdbc") .option("url", "jdbc:mysql://1 Spark Streaming可以用于实时流项目的开发,实时流项目的数据源除了可以来源于日志、文件、网络端口等,常常也有这种需求,那就是实时分析处理MySQL中的增量数据。面对这种需求当然我们可以通过JDBC的方式定时查询Mysql,然后再对查询到的数据进行处理也能得到预期的结果,但是Mysql往往还有其他业务也在使用,这些业务往往比较重要,通过JDBC方式频繁查询会对Mysql造成大量无形的压力...