最近遇到这样的一个场景:

存在两个Hadoop集群,需要将一个集群中的hive数据传输到另一个集群的hive中。且源端hive为其他公司数据源,涉及到的一定的安全和保密性。

现大致思路为:

Java读取源端hive—>我司kafka—>sparkStreaming读取kafka—>目标端hive

代码示例:

Java获取其他公司hive表数据:
package com.zhbr.dataImport.test;
import com.alibaba.fastjson.JSON;
import com.zhbr.dataImport.rdbms.ImportRDBMSData;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.sql.*;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
 * @ClassName GW_to_Kafka_test
 * @Description TODO
 * @Autor yanni
 * @Date 2020/3/25 9:07
 * @Version 1.0
public class GW_to_Kafka_test2 {
    private static String brokerList = "192.168.72.141:9092,192.168.72.142:9092,192.168.72.143:9092";
    // public static final String topic="topic-demo";
    private static String topic = "hive2kafka2";
    public static void main(String[] args) throws SQLException {
    	//自定义的JDBC方式读取
        Connection conn  = ImportRDBMSData.getConn();
        Statement stmt  = ImportRDBMSData.getStatement(conn);
        String querySQL = "select * from lsb_copy";
        ResultSet res = stmt.executeQuery(querySQL);
        //创建ListBuffer集合
        ArrayList<Map<String, Object>> list = new ArrayList<Map<String, Object>>();
        //获得结果集结构信息(元数据)
        ResultSetMetaData metaData = res.getMetaData();
        //ResultSet列数
        int columnCount = metaData.getColumnCount();
        //配置生产者客户端参数
        //将配置序列化
        Properties properties = new Properties();
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //内存缓冲
        properties.put("buffer.memory", 67108864);
        //批处理大小
        properties.put("batch.size", 131072);
        //发送间隔
        properties.put("linger.ms", 100);
        //消息的最大大小
        properties.put("max.request.size", 10485760);
        //失败重试
        properties.put("retries", 3);
        properties.put("retry.backoff.ms", 20000);
        //ack级别(1代表保证leader收到)
        properties.put("acks", "1");
        properties.put("bootstrap.servers", brokerList);
        properties.put("compression.type", "gzip");
        //创建KafkaProducer 实例
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
        try {
        // ResultSet转List<Map>数据结构
        // next用于移动到ResultSet的下一行,使下一行成为当前行
        while (res.next()) {
            //创建map集合
            HashMap<String, Object> map = new HashMap<String, Object>();
            // 遍历获取对当前行的每一列的键值对,put到map中
            for (int i = 1;i<=columnCount;i++) {
                // 获取当前行某一列字段的字段名
                String allColumnName = metaData.getColumnName(i).toLowerCase();
                // rs.getObject(i) 获得当前行某一列字段的值
                Object columnValue = res.getObject(i);
                map.put(allColumnName,columnValue);
            //将数据添加到list集合
            list.add(map);
            //当list集合容量为5000时,发送一次
            if(list.size()==5000){
                String str = JSON.toJSONString(list);
                //构建待发送的消息
                ProducerRecord<String,String> record=new ProducerRecord<String, String>(topic,str);
                //尝试发送消息
                kafkaProducer.send(record);
                //打印发送成功
                System.out.println("batchSize 5000 send success from producer");
                //清空list集合
                list.clear();
        //将剩下的不满5000条的数据发送
        if(list.size()>0){
            String str = JSON.toJSONString(list);
            //构建待发送的消息
            ProducerRecord<String,String> record=new ProducerRecord<String, String>(topic,str);
            //尝试发送消息
            kafkaProducer.send(record);
            //打印发送成功
            System.out.println("batchSize "+list.size()+" send success from producer");
            //清空list集合
            list.clear();
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            //关闭生产者客户端实例
            kafkaProducer.close();
            ImportRDBMSData.closeAllConn(stmt,conn);

分批次写入,避免因为性能问题导致数据丢失及服务器宕机,如此可基本保证hive表大数据量的写入工作。

\

sparkStreaming实时消费kafka,将数据保存到hive
package com.zhbr.dataImport.test
import kafka.serializer.StringDecoder
import org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions
import org.apache.spark.sql.{SaveMode, SparkSession}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka.KafkaUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
object Kafka_to_Hive {
    def main(args: Array[String]): Unit = {
      //获取sparkSession
      val spark = SparkSession.builder().appName(this.getClass.getSimpleName.filter(!_.equals('$')))
        .master("local[4]").config("spark.streaming.receiver.writeAheadLog.enable","true").getOrCreate()
      //获取sparkContext
      val sc = spark.sparkContext
      //设置日志级别
      sc.setLogLevel("WARN")
      val ssc: StreamingContext = new StreamingContext(sc,Seconds(5))
      //设置检查点,通常生产环境当中,为了保证数据不丢失,将数据放到hdfs之上,hdfs的高容错,多副本特征
      ssc.checkpoint("./kafka-chk2")
      //设置kafkaParams
      val kafkaParams=Map("metadata.broker.list"->"node01:9092,node02:9092,node03:9092","group.id"->"group1")
      //设置topics
      val topics=Set("hive2kafka2")
      //获取数据
      val data: InputDStream[(String, String)] = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topics)
      //获取真正的数据,数据在元组的第二位
      val realData: DStream[String] = data.map(x=>x._2)
      realData.map(record => record.toString).foreachRDD(rdd => {
        import spark.implicits._
        val df = spark.read.json(spark.createDataset(rdd))
        //存入MySQL
        df.write.mode(SaveMode.Append).format("jdbc")
                  .option(JDBCOptions.JDBC_URL,"jdbc:mysql://localhost:3306/test11")
                  .option("user","root")
                  .option("password","123")
                  .option(JDBCOptions.JDBC_TABLE_NAME,"lsb_copy")
                  .save()
        //存入hive
        //df.createTempView("df_tmp")
        //spark.sql("insert into table df_copy select * from df_tmp")
      //开启流式计算
      ssc.start()
      ssc.awaitTermination()

欢迎各位大神提出更简单、更快捷的解决思路。

Spark Streaming实时解析flume和kafka传来的josn数据写入mysql 注意,以下文件不提供 配置c3p0-config.xml链接,链接数据库 配置log4j.properties、my.properties 另,还需将您的spark和hadoop安装文件下的core-site.xml、hdfs-site.xml和hive-site.xml拷贝到src\main\resources目录下 一.启动相关服务1.启动Zookeeper服务2.启动kafka相关服务二.代码演示(非常简单,写一个KafkaProducer)import java.util.Properties;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRec... 1.写在前面在spark streaming+kafka对流式数据处理过程中,往往是spark streaming消费kafka数据写入hdfs中,再进行hive映射形成数仓,当然也可以利用sparkSQL直接写入hive形成数仓。对于写入hdfs中,如果是普通的rdd则API为saveAsTextFile(),如果是PairRDD则API为saveAsHadoopFile()。当然高版本的sp... 1.1.1 bootstrap.servers 指定broker的地址清单,不需要包含所有的broker地址,生产者会从给定的broker里找到其它broker的信息,建议最少提供两个broker的信息。 1.1.2 key.serializer broker希望接收到的消息的键和值都是字节数组。 1.1.3 value.serializer 指定的类会将值序列化。 1.2 创建新的生产者示例 private Properties kaf 问题:UDF时对一行的处理,批量导入就会涉及多行的问题,怎么将多行数据放到一个udf中? 解决思路:用collect_list函数将多行转成集合,在udf中循环遍历,发送到kafka package cn.kobold; import org.apache.hadoop.hive.ql.exec.Description; import org.apache.had. Kafka是高吞吐低延迟的高并发、高性能的消息中间件,在大数据领域有极为广泛的运用。配置良好的Kafka集群甚至可以做到每秒几十万、上百万的超高并发写入。那么Kafka到底是如何做到这么高的吞吐量和性能的呢? 一、页缓存技术 + 磁盘顺序写 首先Kafka每次接收到数据都会往磁盘上去写,如下图所示。 那么在这里我们不禁有一个疑问了,如果把数据基于磁盘来存储,频繁的往磁盘文件里写数据,这个性能会不会很差?大家肯定都觉得磁盘写性能是极差的。 没错,要是真的跟上面那个图那么简单的话,那确实这个性能是比较差的。. 什么是zookeeper? zookeeper 主要是服务于分布式服务,可以用zookeeper来做:统一配置管理,统一命名服务,分布式锁,集群管理。使用分布式系统就无法避免对节点管理的问题(需要是实时感知节点的状态,对接点进行统一管理等等),而由于这些问题处理起来 spark streamingspark中用来处理流式数据的,用来对接各类消息队列是极好的。spark streaming并不是真正实时的流式处理,它本质上还是批处理,只是每一个批次间隔的时间很短。 我是用java来写的。跟大佬们的scala不能比,没有scala简洁。。 先是maven需要依赖的spark-kafka包: &lt;dependency&gt; &lt;gro... pom.xml <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"&gt. 在虚拟机上写一个脚本,为了制造假数据,通过flume下沉Kafka。再通过Java代码从Kafka的Topic中获取数据临时保存到本地文件中,再将本地文件上传到HDFS上 1.虚拟机启动 zookeeper、Kafka。 2.在启动一个生产者、一个消费者。 注:脚本文件:/root/log [root@hdp-1 log]# ./makelog.sh while tru...