spark读取kafka数据

    // Create DataFrame representing the stream of input lines from kafka
    val lines = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")
      .option("subscribe", "log_active")
      .load()

错误信息:

21/02/01 10:39:50 WARN consumer.ConsumerConfig: The configuration max.poll.records = 1 was supplied but isn't a known config.
21/02/01 10:39:50 INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
21/02/01 10:39:50 INFO utils.AppInfoParser: Kafka commitId : unknown
21/02/01 10:39:50 ERROR streaming.StreamExecution: Query [id = 3a0fd490-4f78-4d4f-ac33-a245b04e363f, runId = 2c5f1322-2c8e-4e5a-b992-5b859cb0bdd6] terminated with error
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
        at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
        at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
        at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Exception in thread "stream execution thread for [id = 3a0fd490-4f78-4d4f-ac33-a245b04e363f, runId = 2c5f1322-2c8e-4e5a-b992-5b859cb0bdd6]" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
        at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
        at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
        at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
        at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
        at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
        at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
        at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
        at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
        at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V

这个在官方文档中有介绍。地址如下:https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs

方案一:错误信息中可以看出kafka的版本: Kafka version : 0.9.0-kafka-2.0.2, 而我在pom.xml中应用的jar是0.10,因此导致包不一致。

# Set the environment variable for the duration of your shell session:
export SPARK_KAFKA_VERSION=0.10
spark-submit arguments
# Or:
# Set the environment variable for the duration of a single command:
SPARK_KAFKA_VERSION=0.10 spark-submit arguments

方法二:参照 https://docs.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs

spark2+kafka报错:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe spark读取kafka数据 // Create DataFrame representing the stream of input lines from kafka val lines = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092") .option("subscribe", ". 在项目依赖比较复杂或者 Java 运行的环境有问题时,或者同一类型的 jar 包有不同版本存在,都可能触发该错误。本质上说是 JVM 找不到某个类的特定方法,也就是说 JVM 加载了错误版本的类。说白了,就是 JVM 找不到真正想要调用的方法啦!出现该错误的情形主要有以下两个种: 导入了不匹配的包版本;开发环境和运行环境不一致。 2 解决方法 查看“Ext er nal Libraries”,看 报错 的方法到底存不存在,如果不存在,说明这个包一定有问题啦,更新包就可以啦;如果存在,说明包已... SLF4J: Failed to load class " org .slf4j.impl.StaticLogg er Bind er ". SLF4J: Defaulting to no-op er ation (NOP) logg er implementation SLF4J: See http://www.slf4j. org /codes.html#Stat...
flink消费 kafka 上数据时 报错 : Caused by: java . lan g. NoSuchMethodError : org . apache . kafka . client s.con sum er . Kafka Con sum er .assign(L java /util/List;)V 错误原因: Flink的 kafka Con sum er 版本和 kafka - client 的版本不一致 解决方案:三者版本一致 正确配...
ER ROR [ Kafka S er v er id=1] Fatal er ror during Kafka S er v er startup. Prepare to shutdown ( kafka .s er v er . Kafka S er v er ) java . lan g. NoSuchMethodError : org . apache .zookeep er .ZooKeep er .multi(L java / lan g/It er able;L...
线上 kafka 消息堆积,所有con sum er 全部掉线,到底怎么回事? 最近处理了一次线上故障,具体故障表现就是 kafka 某个topic消息堆积,这个topic的相关con sum er 全部掉线。 整体排查过程和事后的复盘都很有意思,并且结合本次故障,对 kafka 使用的最佳实践有了更深刻的理解。 好了,一起来回顾下这次线上故障吧,最佳实践总结放在最后,千万不要错过。
工程中明明有该方法,却提示 java . lan g. NoSuchMethodError 错误 java 的类加载机制是把所有不同名称的本类和引用类的包全部加载到内存,这样就有一个问题,如果有两个类名一样怎么办,机器很难区分 引用类,即引用别人包中的类: public class Data(){ public static getData(){ System.o...
1、 报错 信息: org . apache . kafka . client s.con sum er .OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {qukan_log_v3-198=2289560518} 报错 原因:当消费者消费offset大于或小于当前 kafka 集...
参考文章: flume kafka spark streaming整合后集群 报错 org . apache . kafka . client s.con sum er . Kafka Con sum er . subscribe (L java /ut https://blog.csdn.net/u010936936/article/details/77247075?locationNum=2&amp;fps=1    最近在...
Invocation of init method failed; nested exception is java . lan g. NoSuchMethodError : org .springframework.scheduling.quartz.Schedul er FactoryBean.getObject()L org /quartz/Schedul er ; at org .springframewo
org . apache . kafka .common. er rors.int er ruptexception: java . lan g.int er ruptedexce
这个错误是 Kafka 出现的一个异常情况,意味着程序在运行过程中被强制中断了。具体来说,是由于程序在等待某个操作完成时,被外部因素(比如操作系统)强制中断了,导致程序无法正常完成该操作并抛出该异常。这个异常通常会在 Kafka 客户端的使用中出现,比如在数据消费者的消费过程中。 解决该错误需要找到导致程序被中断的具体原因。可能的原因包括操作系统资源耗尽、网络连接不稳定等等。一些具体的解决方案可能包括增加操作系统资源、优化程序代码、尝试连接其他网络等等。总之,这个异常提示需要我们注意程序在运行过程中可能会出现的问题,及时处理并改进程序的运行方式。
spark2+kafka报错:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe