Flink1.13.6
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:79)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.createProducer(FlinkKafkaProducer.java:1230)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initProducer(FlinkKafkaProducer.java:1346)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initNonTransactionalProducer(FlinkKafkaProducer.java:1342)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:990)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.beginTransaction(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.beginTransactionInternal(TwoPhaseCommitSinkFunction.java:403)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.initializeState(TwoPhaseCommitSinkFunction.java:394)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1195)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.ByteArraySerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:399)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:366)
... 26 more
问题1 解决
集群依赖问题;
本地运行代码,调用线上kafka集群,是能正常接受数据的。打包后放至服务器上运行则报错。判断是集群依赖问题。
#前N次尝试,在pom文件中将很多和kafka相关的依赖包就加上了provided,如
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.12</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
然后再在服务器上flink/lib文件夹下添加相应provided的依赖,打包运行依然是报错。
后调整flink的类加载方式,将flink-conf.yml中的classloader.resolve-order参数,由默认的child-first改成parent-first(pom文件中的依赖将之前加的provided去掉了。),然后再打包运行就好了。
ERROR org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager$1 119 accept - Received uncaught exception. java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.<init>(BinlogSplitReader.java:84) ~[flink-connector-mysql-cdc-2.2.1.jar:2.2.1]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:160) ~[flink-connector-mysql-cdc-2.2.1.jar:2.2.1]
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:73) ~[flink-connector-mysql-cdc-2.2.1.jar:2.2.1]
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58) ~[flink-connector-base-1.15.2.jar:1.15.2]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142) ~[flink-connector-base-1.15.2.jar:1.15.2]
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105) ~[flink-connector-base-1.15.2.jar:1.15.2]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) ~[?:1.8.0_242]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) ~[?:1.8.0_242]
at java.util.concurrent.FutureTask.run(FutureTask.java) ~[?:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) ~[?:1.8.0_242]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ~[?:1.8.0_242]
at java.lang.Thread.run(Thread.java:748) ~[?:1.8.0_242]
Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder
at java.net.URLClassLoader.findClass(URLClassLoader.java:382) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:419) ~[?:1.8.0_242]
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) ~[?:1.8.0_242]
at java.lang.ClassLoader.loadClass(ClassLoader.java:352) ~[?:1.8.0_242]
... 12 more
WARN org.apache.flink.runtime.taskmanager.Task 1104 transitionState - Source: MySQL_Source -> Sink: Print to Std. Out (4/12)#0 (4107a92305c74b8cab6fe2797b4a2420) switched from RUNNING to FAILED with failure cause: java.lang.RuntimeException: One or more fetchers have encountered exception
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:385)
at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:519)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:804)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:753)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:948)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:927)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:741)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:563)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava18/com/google/common/util/concurrent/ThreadFactoryBuilder
at com.ververica.cdc.connectors.mysql.debezium.reader.BinlogSplitReader.<init>(BinlogSplitReader.java:84)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.checkSplitOrStartNext(MySqlSplitReader.java:160)
at com.ververica.cdc.connectors.mysql.source.reader.MySqlSplitReader.fetch(MySqlSplitReader.java:73)
at org.apache.flink.connector.base.source.reader.fetcher.FetchTask.run(FetchTask.java:58)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava18.com.google.common.util.concurrent.ThreadFactoryBuilder
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:419)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
at java.lang.ClassLoader.loadClass(ClassLoader.java:352)
... 12 more
问题2 解决
一开始flink使用的是1.15.2,flink-connector-jdbc版本也是1.15.2;这两个依赖中使用的flink-shaded-guava版本不一致。flink使用的是guava30,connect使用的是guava18。我这边将flink版本降成1.13.6,然后添加依赖
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>18.0-13.0</version>
</dependency>
Exception in thread "main" org.apache.flink.util.FlinkRuntimeException: Coordinator Provider for node Source: MySQL_Source -> Sink: Print to Std. Out is not serializable.
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:597)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain(StreamingJobGraphGenerator.java:454)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setChaining(StreamingJobGraphGenerator.java:375)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:176)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobGraph(StreamingJobGraphGenerator.java:114)
at org.apache.flink.streaming.api.graph.StreamGraph.getJobGraph(StreamGraph.java:959)
at org.apache.flink.client.StreamGraphTranslator.translateToJobGraph(StreamGraphTranslator.java:50)
at org.apache.flink.client.FlinkPipelineTranslationUtil.getJobGraph(FlinkPipelineTranslationUtil.java:39)
at org.apache.flink.client.deployment.executors.PipelineExecutorUtils.getJobGraph(PipelineExecutorUtils.java:56)
at org.apache.flink.client.deployment.executors.LocalExecutor.getJobGraph(LocalExecutor.java:104)
at org.apache.flink.client.deployment.executors.LocalExecutor.execute(LocalExecutor.java:82)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1956)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1847)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:69)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1833)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.scala:801)
at com.cdc.controller.MysqlToKafka.mainMethod(MysqlToKafka.scala:46)
at com.cdc.AppMainClass$.main(AppMainClass.scala:18)
at com.cdc.AppMainClass.main(AppMainClass.scala)
Caused by: java.io.NotSerializableException: com.cdc.controller.MysqlToKafka
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:624)
at org.apache.flink.util.SerializedValue.<init>(SerializedValue.java:62)
at org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createJobVertex(StreamingJobGraphGenerator.java:594)
... 18 more
问题3 解决
MysqlToKafka.scala我这边创建的是一个class,然后这个类无法序列化。我把MysqlToKafka.scala改成了object,就可以运行了。(scala的object内的方法默认是静态的。不参与序列化,所以不会报错。)
也可能是之前MysqlToKafka.scala这个class里面的代码中有一个内部序列化类。不知道挪出来会不会正常,没试。
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:440)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:291)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:318)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:303)
at org.apache.flink.streaming.connectors.kafka.internals.FlinkKafkaInternalProducer.<init>(FlinkKafkaInternalProducer.java:79)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.lambda$abortTransactions$3(FlinkKafkaProducer.java:1282)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
at java.util.HashMap$KeySpliterator.forEachRemaining(HashMap.java:1548)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:291)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool.helpComplete(ForkJoinPool.java:1870)
at java.util.concurrent.ForkJoinPool.externalHelpComplete(ForkJoinPool.java:2467)
at java.util.concurrent.ForkJoinTask.externalAwaitDone(ForkJoinTask.java:324)
at java.util.concurrent.ForkJoinTask.doInvoke(ForkJoinTask.java:405)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:734)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:160)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:174)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:583)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.abortTransactions(FlinkKafkaProducer.java:1263)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.initializeState(FlinkKafkaProducer.java:1189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:189)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:171)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:118)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:290)
at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:441)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:585)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.executeRestore(StreamTask.java:565)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runWithCleanUpOnFail(StreamTask.java:650)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:540)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:759)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:566)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: class org.apache.kafka.common.serialization.StringSerializer is not an instance of org.apache.kafka.common.serialization.Serializer
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:399)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:430)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:415)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:366)
... 37 more
问题4 解决
flink-connector-kafka.jar依赖冲突了。
<dependency>
<groupId>org.apache.flink</groupId>