package com.zhbr.dataImport.test
import kafka.serializer.StringDecoder
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Kafka_to_Hive {
def main(args: Array[String]): Unit = {
//获取sparkSession
val spark = SparkSession.builder().appName(this.getClass.getSimpleName.filter(!_.equals('$')))
.master("local[4]").config("spark.streaming.receiver.writeAheadLog.enable","true").getOrCreate()
//获取sparkContext
val sc = spark.sparkContext
//设置日志级别
sc.setLogLevel("WARN")
val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
//设置检查点,通常生产环境当中,为了保证数据不丢失,将数据放到hdfs之上,hdfs的高容错,多副本特征
ssc.checkpoint("./kafka-chk2")
//设置kafkaParams
val kafkaParams=Map("metadata.broker.list"->"node01:9092,node02:9092,node03:9092","group.id"->"group1")
//设置topics
val topics=Set("hive2kafka2")
//获取数据
val data: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
//获取真正的数据,数据在元组的第二位
val realData: DStream[String] = data.map(x=>x._2)
realData.map(record => record.toString).foreachRDD(rdd => {
import spark.implicits._
val df = spark.read.json(spark.createDataset(rdd))
//存入MySQL
df.write.mode(SaveMode.Append).format("jdbc")
.option(JDBCOptions.JDBC_URL,"jdbc:mysql://localhost:3306/test11")
.option("user","root")
.option("password","123")
.option(JDBCOptions.JDBC_TABLE_NAME,"lsb_copy")
.save()
//存入hive
//df.createTempView("df_tmp")
//spark.sql("insert into table df_copy select * from df_tmp")
//开启流式计算
ssc.start()
ssc.awaitTermination()
欢迎各位大神提出更简单、更快捷的解决思路。
Spark Streaming实时解析flume和kafka传来的josn数据写入mysql
注意,以下文件不提供
配置c3p0-config.xml链接,链接数据库
配置log4j.properties、my.properties
另,还需将您的spark和hadoop安装文件下的core-site.xml、hdfs-site.xml和hive-site.xml拷贝到src\main\resources目录下
一.启动相关服务1.启动Zookeeper服务2.启动kafka相关服务二.代码演示(非常简单,写一个KafkaProducer)import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRec...
1.写在前面在spark streaming+kafka对流式数据处理过程中,往往是spark streaming消费kafka的数据写入hdfs中,再进行hive映射形成数仓,当然也可以利用sparkSQL直接写入hive形成数仓。对于写入hdfs中,如果是普通的rdd则API为saveAsTextFile(),如果是PairRDD则API为saveAsHadoopFile()。当然高版本的sp...
1.1.1 bootstrap.servers
指定broker的地址清单,不需要包含所有的broker地址,生产者会从给定的broker里找到其它broker的信息,建议最少提供两个broker的信息。
1.1.2 key.serializer
broker希望接收到的消息的键和值都是字节数组。
1.1.3 value.serializer
指定的类会将值序列化。
1.2 创建新的生产者示例
private Properties kaf
问题:UDF时对一行的处理,批量导入就会涉及多行的问题,怎么将多行数据放到一个udf中?
解决思路:用collect_list函数将多行转成集合,在udf中循环遍历,发送到kafka
package cn.kobold;
import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.had.
Kafka是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。那么Kafka到底是如何做到这么高的吞吐量和性能的呢?
一、页缓存技术 + 磁盘顺序写
首先Kafka每次接收到数据都会往磁盘上去写,如下图所示。
那么在这里我们不禁有一个疑问了,如果把数据基于磁盘来存储,频繁的往磁盘文件里写数据,这个性能会不会很差?大家肯定都觉得磁盘写性能是极差的。
没错,要是真的跟上面那个图那么简单的话,那确实这个性能是比较差的。.
什么是zookeeper?
zookeeper 主要是服务于分布式服务,可以用zookeeper来做:统一配置管理,统一命名服务,分布式锁,集群管理。使用分布式系统就无法避免对节点管理的问题(需要是实时感知节点的状态,对接点进行统一管理等等),而由于这些问题处理起来
spark streaming是spark中用来处理流式数据的,用来对接各类消息队列是极好的。spark streaming并不是真正实时的流式处理,它本质上还是批处理,只是每一个批次间隔的时间很短。
我是用java来写的。跟大佬们的scala不能比,没有scala简洁。。
先是maven需要依赖的spark-kafka包:
<dependency>
<gro...
pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">.
在虚拟机上写一个脚本,为了制造假数据,通过flume下沉到Kafka。再通过Java代码从Kafka的Topic中获取数据临时保存到本地文件中,再将本地文件上传到HDFS上
1.虚拟机启动 zookeeper、Kafka。
2.在启动一个生产者、一个消费者。
注:脚本文件:/root/log
[root@hdp-1 log]# ./makelog.sh
while tru...