实时ETL开发之流计算程序【编程】


编写完成从Kafka消费数据,打印控制台上,其中创建SparkSession实例对象时,需要设置参数值。


package cn.itcast.logistics.etl.realtime
import cn.itcast.logistics.common.Configuration
import org.apache.commons.lang3.SystemUtils
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.{DataFrame, SparkSession}
 * 编写StructuredStreaming程序,实时从Kafka消息数据(物流相关数据和CRM相关数据),打印控制台Console
     * 1. 初始化设置Spark Application配置
     * 2. 判断Spark Application运行模式进行设置
     * 3. 构建SparkSession实例对象
     * 4. 初始化消费物流Topic数据参数
     * 5. 消费物流Topic数据,打印控制台
     * 6. 初始化消费CRM Topic数据参数
     * 7. 消费CRM Topic数据,打印控制台
     * 8. 启动流式应用,等待终止
object LogisticsEtlApp {
    def main(args: Array[String]): Unit = {
        // step1. 构建SparkSession实例对象,设置相关属性参数值
        // 1. 初始化设置Spark Application配置
        val sparkConf = new SparkConf()
            .setAppName(this.getClass.getSimpleName.stripSuffix("$"))
            .set("spark.sql.session.timeZone", "Asia/Shanghai")
            .set("spark.sql.files.maxPartitionBytes", "134217728")
            .set("spark.sql.files.openCostInBytes", "134217728")
            .set("spark.sql.shuffle.partitions", "3")
            .set("spark.sql.autoBroadcastJoinThreshold", "67108864")
        // 2. 判断Spark Application运行模式进行设置
        if (SystemUtils.IS_OS_WINDOWS || SystemUtils.IS_OS_MAC) {
            //本地环境LOCAL_HADOOP_HOME
            System.setProperty("hadoop.home.dir", Configuration.LOCAL_HADOOP_HOME)
            //设置运行环境和checkpoint路径
            sparkConf
                .set("spark.master", "local[3]")
                .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_WIN_CHECKPOINT_DIR)
        } else {
            //生产环境
            sparkConf
                .set("spark.master", "yarn")
                .set("spark.sql.streaming.checkpointLocation", Configuration.SPARK_APP_DFS_CHECKPOINT_DIR)
        // 3. 构建SparkSession实例对象
        val spark: SparkSession = SparkSession.builder()
            .config(sparkConf)
            .getOrCreate()
        import spark.implicits._
        // step2. 从Kafka实时消费数据,设置Kafka Server地址和Topic名称
        // step3. 将ETL转换后数据打印到控制台,启动流式应用
        // 4. 初始化消费物流Topic数据参数
        val logisticsDF: DataFrame = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
            .option("subscribe", "logistics")
            .option("maxOffsetsPerTrigger", "100000")
            .load()
        // 5. 消费物流Topic数据,打印控制台
        logisticsDF.writeStream
            .queryName("query-logistics-console")
            .outputMode(OutputMode.Append())
            .format("console")
            .option("numRows", "10")
            .option("truncate", "false")
            .start()
        // 6. 初始化消费CRM Topic数据参数
        val crmDF: DataFrame = spark.readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "node2.itcast.cn:9092")
            .option("subscribe", "crm")
            .option("maxOffsetsPerTrigger", "100000")
            .load()
        // 7. 消费CRM Topic数据,打印控制
        crmDF.writeStream
            .queryName("query-crm-console")
            .outputMode(OutputMode.Append())
            .format("console")
            .option("numRows", "10")
            .option("truncate", "false")
            .start()
        // step4. 流式应用启动以后,等待终止,关闭资源
        // 8. 启动流式应用,等待终止
        spark.streams.active.foreach(query => println("启动Query:" + query.name))
        spark.streams.awaitAnyTermination()
}


SparkSQL 参数调优设置:


  • 1)、设置会话时区:set("spark.sql.session.timeZone", "Asia/Shanghai")


  • 2)、设置读取文件时单个分区可容纳的最大字节数


set("spark.sql.files.maxPartitionBytes", "134217728")


  • 3)、设置合并小文件的阈值:set("spark.sql.files.openCostInBytes", "134217728")


  • 4)、设置 shuffle 分区数:set("spark.sql.shuffle.partitions", "4")


  • 5)、设置执行 join 操作时能够广播给所有 worker 节点的最大字节大小


set("spark.sql.autoBroadcastJoinThreshold", "67108864")

基于Apache-doris怎么构建数据中台(七)-数据指标管理
维度主要分为定性维度和定量维度,定性维度,主要是偏文字描述类如城市、性别、职业等;定量维度,主要是数值类描述如收入、年龄等,对定量维度需要做数值分组处理。
工作3年,月薪20k+的大数据开发人员,突然说我不想只做Hadoop、Spark、Flink层面的技术开发
“不管国内或全球“新冠”疫情有多严重、还得持续多久,我只想先保住我的工作,如果降薪,我也能在短时间找到待遇更好的下一个东家”。 ——《大数据就业特训营》23期学员李斌 2014年做大数据培训至今,已有5年之多,可以说大数据技术的发展变化速度之快,用“突飞猛进”来说毫不夸张。
SparkSQL ThriftServer 安全相关功能的现状分析
SparkSQL Thrift Server 是 Spark SQL基于 Apache Hive的 HiveServer2开发的,通过SparkSQL Thrift Server 可以使 Spark SQL支持 JDBC/ODBC 的连接方式,用户可以通过 JDBC and ODBC 协议,在Spark上执行 SQL。
《Spark与Hadoop大数据分析》——1.5 小结
本节书摘来自华章计算机《Spark与Hadoop大数据分析》一书中的第1章,第1.5节,作者 [美]文卡特·安卡姆(Venkat Ankam),译 吴今朝,更多章节内容可以访问云栖社区“华章计算机”公众号查看。