一、报错信息如下:
java.lang.NoSuchMethodError: org.apache.kafka.clients.producer.internals.TransactionalRequestResult.<init>(Ljava/lang/String;)V
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.enqueueNewPartitions(FlinkKafkaInternalProducer.java:301)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.flushNewPartitions(FlinkKafkaInternalProducer.java:275)
at org.apache.flink.streaming.connectors.kafka.internal.FlinkKafkaInternalProducer.flush(FlinkKafkaInternalProducer.java:189)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1022)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:939)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:99)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:321)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1035)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:120)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:101)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:90)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:186)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:156)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:314)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointStreamOperator(SubtaskCheckpointCoordinatorImpl.java:614)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.buildOperatorSnapshotFutures(SubtaskCheckpointCoordinatorImpl.java:540)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:507)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:266)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$5(StreamTask.java:892)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:882)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:850)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:113)
at org.apache.flink.streaming.runtime.io.CheckpointBarrierAligner.processBarrier(CheckpointBarrierAligner.java:198)
at org.apache.flink.streaming.runtime.io.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:93)
at org.apache.flink.streaming.runtime.io.StreamTaskNetworkInput.emitNext(StreamTaskNetworkInput.java:158)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:67)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:345)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxStep(MailboxProcessor.java:191)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:181)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:558)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:530)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
二、解决方式有两种
1、参考博客
https://blog.csdn.net/Abandon_Sun/article/details/82589996
2、首先思考一下为什么要使用这个依赖
我在本地Idea中跑Flink程序,当时加入了kafka-clients依赖,当时并不知道这个依赖有什么作用,以为是必须的,后来尝试把依赖注释掉,再启动Flink程序,发现日志再无报错,问题解决
<!--<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.1</version>
</dependency>-->
3、当部署作业到测试集群中时,又会出现如下错误:
java.lang.AbstractMethodError: Method com/meiyijia/pd/flink/meta/source/JSONKeyValueDeserializationSchema.deserialize(Lorg/apache/flink/kafka/shaded/org/apache/kafka/clients/consumer/ConsumerRecord;)Ljava/lang/Object; is abstract
at com.meiyijia.pd.flink.meta.source.JSONKeyValueDeserializationSchema.deserialize(JSONKeyValueDeserializationSchema.java)
at org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema.deserialize(KafkaDeserializationSchema.java:79)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.partitionConsumerRecordsHandler(KafkaFetcher.java:183)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:146)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:823)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:201)
经过深入研究发现Kafka 2.2.1版本是不支持事务,而我在使用Kafka时设置了隔离级别和精准一次消费,而Kafka 2.4及以上版本才支持,所以程序就会报错,所以我的解决方案是取消设置隔离级别,并把精准一次、至少一次这些取消掉
public static FlinkKafkaProducer<JSONObject> getFlinkToKafkaSink(){
Properties conf = new Properties();
conf.setProperty("bootstrap.servers", "xxx:9092");
// conf.setProperty("transaction.timeout.ms", 10 * 60 * 1000 + "");
return new FlinkKafkaProducer<>(
"default",
new KafkaSerializationSchema<JSONObject>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(JSONObject s, @Nullable Long timestamp) {return new ProducerRecord<>(kafka_topic, null,
s.toString().getBytes(StandardCharsets.UTF_8));
conf,
FlinkKafkaProducer.Semantic.NONE
4、实在不行就自己自定义Flink Source、Flink Sink,使用最原生的Kafka API生产/消费数据,毕竟Flink-Kafka-Connect连接器有时候版本的兼容性不是特别好