package src.main.java.org.kududb.examples.sample;
import com.google.common.collect.ImmutableList;
import org.apache.kudu.ColumnSchema;
import org.apache.kudu.Schema;
import org.apache.kudu.Type;
import org.apache.kudu.client.*;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
 * 数据刷新策略对比
public class InsertFlushData {
    // 缓冲大小,也就是数据的条数
    private final static int OPERATION_BATCH = 2000;
     * mode形式:
     * SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND
     * SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC
     * SessionConfiguration.FlushMode.MANUAL_FLUSH
    // 支持三个模式的测试用例
    public static void insertTestGeneric(KuduSession session, KuduTable table, SessionConfiguration.FlushMode mode, int recordCount) throws Exception {
        session.setFlushMode(mode);
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC != mode) {
            session.setMutationBufferSpace(OPERATION_BATCH);
        int commit = 0;
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("value1", 16);
            row.addLong("value2", 16);
            Long gtmMillis;
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
            session.apply(insert);
            if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode) {
                commit = commit + 1;
                if (commit > OPERATION_BATCH / 2) {
                    session.flush();
                    commit = 0;
        // 对于手工提交, 保证完成最后的提交
        if (SessionConfiguration.FlushMode.MANUAL_FLUSH == mode && commit > 0) {
            session.flush();
        // 对于后台自动提交, 必须保证完成最后的提交, 并保证有错误时能抛出异常
        if (SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND == mode) {
            session.flush();
            RowErrorsAndOverflowStatus error = session.getPendingErrors();
            // 检查错误收集器是否有溢出和是否有行错误
            if (error.isOverflowed() || error.getRowErrors().length > 0) {
                if (error.isOverflowed()) {
                    throw new Exception("kudu overflow exception occurred.");
                StringBuilder errorMessage = new StringBuilder();
                if (error.getRowErrors().length > 0) {
                    for (RowError errorObj : error.getRowErrors()) {
                        errorMessage.append(errorObj.toString());
                        errorMessage.append(";");
                throw new Exception(errorMessage.toString());
    // 支持手动flush的测试用例
    public static void insertTestManualFlush(KuduSession session, KuduTable table, int recordCount) throws Exception {
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
        session.setFlushMode(mode);
        session.setMutationBufferSpace(OPERATION_BATCH);
        int commit = 0;
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("value1", 17);
            row.addLong("value2", 17);
            Long gtmMillis;
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
            session.apply(insert);
            // 对于手工提交, 需要buffer在未满的时候flush,这里采用了buffer一半时即提交
            commit = commit + 1;
            if (commit > OPERATION_BATCH / 2) {
                session.flush();
                commit = 0;
        // 对于手工提交, 保证完成最后的提交
        if (commit > 0) {
            session.flush();
    // 自动flush的测试案例
    public static void insertTestAutoFlushSync(KuduSession session, KuduTable table, int recordCount) throws Exception {
        SessionConfiguration.FlushMode mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
        session.setFlushMode(mode);
        for (int i = 0; i < recordCount; i++) {
            Insert insert = table.newInsert();
            PartialRow row = insert.getRow();
            UUID uuid = UUID.randomUUID();
            row.addString("id", uuid.toString());
            row.addInt("value1", 18);
            row.addLong("value2", 18);
            Long gtmMillis;
             * System.currentTimeMillis() 是从1970-01-01开始算的毫秒数(GMT), kudu API是采用纳秒数, 所以需要时间*1000
             * 另外, 考虑到我们是东8区时间, 所以转成Long型需要再加8个小时, 否则存到Kudu的时间是GTM, 比东8区晚8个小时
            // 第一步: 获取当前时间对应的GTM时区unix毫秒数
            gtmMillis = System.currentTimeMillis();
            // 第二步: 将timestamp转成对应的GTM时区unix毫秒数
            Timestamp localTimestamp = new Timestamp(System.currentTimeMillis());
            gtmMillis = localTimestamp.getTime();
            // 将GTM的毫秒数转成东8区的毫秒数量
            Long shanghaiTimezoneMillis = gtmMillis + 8 * 3600 * 1000;
            row.addLong("timestamp", shanghaiTimezoneMillis);
            // 对于AUTO_FLUSH_SYNC模式, apply()将立即完成数据写入,但是不是批处理
            session.apply(insert);
     * 测试案例
    public static void testStrategy() throws KuduException {
        KuduClient client = new KuduClient.KuduClientBuilder("hadoop01").build();
        KuduSession session = client.newSession();
        KuduTable table = client.openTable("bigData2");
        SessionConfiguration.FlushMode mode;
        long d1;
        long d2;
        long timeMillis;
        long seconds;
        int recordCount = 200000;
        try {
            // 自动刷新策略(默认的刷新策略)
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_SYNC;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestAutoFlushSync(session, table, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
            // 后台刷新策略
            mode = SessionConfiguration.FlushMode.AUTO_FLUSH_BACKGROUND;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestGeneric(session, table, mode, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
            // 手动刷新
            mode = SessionConfiguration.FlushMode.MANUAL_FLUSH;
            System.out.println(mode + " is start!");
            d1 = System.currentTimeMillis();
            insertTestManualFlush(session, table, recordCount);
            d2 = System.currentTimeMillis();
            timeMillis = d2 - d1;
            System.out.println(mode.name() + "花费毫秒数: " + timeMillis);
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            if (!session.isClosed()) {
                session.close();
    public static void createTable() {
        String tableName = "bigData2";
        KuduClient client = new KuduClient.KuduClientBuilder("hadoop01").defaultAdminOperationTimeoutMs(60000).build();
        KuduSession session = client.newSession();
        session.setTimeoutMillis(60000);
        try {
            // 测试,如果table存在的情况下,就删除该表
            if (client.tableExists(tableName)) {
                client.deleteTable(tableName);
                System.out.println("delete the table is success!");
            List<ColumnSchema> columns = new ArrayList();
            // 创建列
            columns.add(new ColumnSchema.ColumnSchemaBuilder("id", Type.STRING).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value1", Type.INT32).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("value2", Type.INT64).key(true).build());
            columns.add(new ColumnSchema.ColumnSchemaBuilder("timestamp", Type.INT64).key(true).build());
            // 创建schema
            Schema schema = new Schema(columns);
            // id相当于联合主键
            ImmutableList<String> hashKeys = ImmutableList.of("id", "timestamp");
            CreateTableOptions tableOptions = new CreateTableOptions();
            // 设置hash分区,包括分区数量、副本数目
            tableOptions.addHashPartitions(hashKeys, 20);
            tableOptions.setNumReplicas(1);
            System.out.println("create the table is success! ");
            // 创建table,并设置partition
            client.createTable(tableName, schema, tableOptions);
        } catch (Exception e) {
            e.printStackTrace();
    public static void main(String[] args) {
        try {
            createTable();
            testStrategy();
        } catch (KuduException e) {
            e.printStackTrace();
                    课程链接: http://edu.51cto.com/course/15174.htmlpackage src.main.java.org.kududb.examples.sample;import com.google.common.collect.ImmutableList;import org.apache.kudu.ColumnSchema;import org.apache.k...
2.三种Fulsh Mode
AUTO_FLUSH_SYNC 默认,自动flush,同步方法,调用 kuduSession.apply() 时立即写入,在写入完成后才会返回一个 OperationResponse 对象,也只有模式下才会返回该对象,其他的都是返回 null,调用kuduSession.flush() 不会有任何操作。
AUTO_FLUSH_BACKGROUND 自动后...
				
AUTO_FLUSH_SYNC(默认): 所有的写入都将被发送到带有Apply()函数应用程序的服务器上,Flush()函数不会产生任何效果,因为每个Apply()调用都已经刷新了缓冲区。AUTO_FLUSH_BACKGROUND: 每一个应用的apply()函数都会返回的非常快,但是写操作会被发送到后台进程,可能与来自同一会话的其他写入一起进行批处理。 由于写入是在后台应用的,因此任何错误都
java.lang.NoClassDefFoundError: com/google/common/collect/ImmutableMap 这边可能存在着两个原因: 第一就是maven的问题, 第二就是项目结构添加库的问题。 解决方案: 首先检查项目结构添加库 有没有出现错误 打开项目结构 分别添加上面两个库 添加完成后,运行程序 这样就测试成功了。 如果这样还是不行的话,
环境:cdh6.3.2环境下,spark on yarn,client/cluster模式运行 报错:sparkStreaming 消费kafka数据kudu中报错如下: 每批数据大概4000条 21/03/12 10:13:24 ERROR executor.Executor: Exception in task 1.1 in stage 0.0 (TID 4) org.apache.kudu.client.NonRecoverableException: MANUAL_FLUSH is enabled
2021-02-01 17:11:13 ERROR TaskSetManager:73 - Task 0 in stage 4.0 failed 1 times; aborting job 2021-02-01 17:11:13 INFO TaskSchedulerImpl:57 - Removed TaskSet 4.0, whose tasks have all completed, from pool 2021-02-01 17:11:13 INFO TaskSchedulerI kudukudu-1.9.0+cdh6.2.0-967373.el6.x86_64.rpm 使用kudu的java api执行插入操作的时候,一次性插入10000条,报错: org.apache.kudu.client.NonRecoverableException: MANUAL_FLUSH is enabled but the buffer is too big at or... Kudu 提供了 table 的概念。用户可以建立多个 table,每个 table 都有一个预先定义好的 schema。Schema 里面定义了这个 table 多个 column,每个 column 都有名字,类型,是否允许 null 等。一些 columns 组成了 primary key。 # 连接到MySQL数据库 url = "jdbc:mysql://localhost:3306/mydb" properties = {"user": "root", "password": "password", "driver": "com.mysql.jdbc.Driver"} # 读取MySQL中的数据 df = spark.read.jdbc(url=url, table="mytable", properties=properties) # 将DataFrame写入Kudu中 df.write.format("org.apache.kudu.spark.kudu").option("kudu.master", "kudu.master:7051").option("kudu.table", "mykudutable").mode("append").save() # 关闭SparkSession对象 spark.stop() 在这个例子中,我们使用了MySQL数据库中的“mytable”,并将其写入了名为“mykudutable”的Kudu中。我们还指定了Kudu主节点的地址和端口号。 希望这可以帮助你开始使用Spark SQL读取MySQL数据并将其写入Kudu