spark读取kafka数据
// Create DataFrame representing the stream of input lines from kafka
val lines = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092")
.option("subscribe", "log_active")
.load()
错误信息:
21/02/01 10:39:50 WARN consumer.ConsumerConfig: The configuration max.poll.records = 1 was supplied but isn't a known config.
21/02/01 10:39:50 INFO utils.AppInfoParser: Kafka version : 0.9.0-kafka-2.0.2
21/02/01 10:39:50 INFO utils.AppInfoParser: Kafka commitId : unknown
21/02/01 10:39:50 ERROR streaming.StreamExecution: Query [id = 3a0fd490-4f78-4d4f-ac33-a245b04e363f, runId = 2c5f1322-2c8e-4e5a-b992-5b859cb0bdd6] terminated with error
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Exception in thread "stream execution thread for [id = 3a0fd490-4f78-4d4f-ac33-a245b04e363f, runId = 2c5f1322-2c8e-4e5a-b992-5b859cb0bdd6]" java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
at org.apache.spark.sql.kafka010.SubscribeStrategy.createConsumer(ConsumerStrategy.scala:63)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.createConsumer(KafkaOffsetReader.scala:297)
at org.apache.spark.sql.kafka010.KafkaOffsetReader.<init>(KafkaOffsetReader.scala:78)
at org.apache.spark.sql.kafka010.KafkaSourceProvider.createSource(KafkaSourceProvider.scala:88)
at org.apache.spark.sql.execution.datasources.DataSource.createSource(DataSource.scala:243)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:158)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2$$anonfun$applyOrElse$1.apply(StreamExecution.scala:155)
at scala.collection.mutable.MapLike$class.getOrElseUpdate(MapLike.scala:194)
at scala.collection.mutable.AbstractMap.getOrElseUpdate(Map.scala:80)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:155)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$2.applyOrElse(StreamExecution.scala:153)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
at org.apache.spark.sql.catalyst.trees.TreeNode.transform(TreeNode.scala:256)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan$lzycompute(StreamExecution.scala:153)
at org.apache.spark.sql.execution.streaming.StreamExecution.logicalPlan(StreamExecution.scala:147)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:276)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:206)
Exception in thread "main" org.apache.spark.sql.streaming.StreamingQueryException: org.apache.kafka.clients.consumer.KafkaConsumer.subscribe(Ljava/util/Collection;)V
这个在官方文档中有介绍。地址如下:https://www.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs
方案一:错误信息中可以看出kafka的版本:
Kafka version : 0.9.0-kafka-2.0.2,
而我在pom.xml中应用的jar是0.10,因此导致包不一致。
# Set the environment variable for the duration of your shell session:
export SPARK_KAFKA_VERSION=0.10
spark-submit arguments
# Or:
# Set the environment variable for the duration of a single command:
SPARK_KAFKA_VERSION=0.10 spark-submit arguments
方法二:参照
https://docs.cloudera.com/documentation/spark2/latest/topics/spark2_kafka.html#running_jobs
spark2+kafka报错:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe
spark读取kafka数据 // Create DataFrame representing the stream of input lines from kafka val lines = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", "kafka1:9092,kafka2:9092,kafka3:9092") .option("subscribe", ".
在项目依赖比较复杂或者
Java
运行的环境有问题时,或者同一类型的 jar 包有不同版本存在,都可能触发该错误。本质上说是 JVM 找不到某个类的特定方法,也就是说 JVM 加载了错误版本的类。说白了,就是 JVM 找不到真正想要调用的方法啦!出现该错误的情形主要有以下两个种:
导入了不匹配的包版本;开发环境和运行环境不一致。
2 解决方法
查看“Ext
er
nal Libraries”,看
报错
的方法到底存不存在,如果不存在,说明这个包一定有问题啦,更新包就可以啦;如果存在,说明包已...
SLF4J: Failed to load class "
org
.slf4j.impl.StaticLogg
er
Bind
er
".
SLF4J: Defaulting to no-op
er
ation (NOP) logg
er
implementation
SLF4J: See http://www.slf4j.
org
/codes.html#Stat...
flink消费
kafka
上数据时
报错
:
Caused by:
java
.
lan
g.
NoSuchMethodError
:
org
.
apache
.
kafka
.
client
s.con
sum
er
.
Kafka
Con
sum
er
.assign(L
java
/util/List;)V
错误原因:
Flink的
kafka
Con
sum
er
版本和
kafka
-
client
的版本不一致
解决方案:三者版本一致
正确配...
ER
ROR [
Kafka
S
er
v
er
id=1] Fatal
er
ror during
Kafka
S
er
v
er
startup. Prepare to shutdown (
kafka
.s
er
v
er
.
Kafka
S
er
v
er
)
java
.
lan
g.
NoSuchMethodError
:
org
.
apache
.zookeep
er
.ZooKeep
er
.multi(L
java
/
lan
g/It
er
able;L...
线上
kafka
消息堆积,所有con
sum
er
全部掉线,到底怎么回事?
最近处理了一次线上故障,具体故障表现就是
kafka
某个topic消息堆积,这个topic的相关con
sum
er
全部掉线。
整体排查过程和事后的复盘都很有意思,并且结合本次故障,对
kafka
使用的最佳实践有了更深刻的理解。
好了,一起来回顾下这次线上故障吧,最佳实践总结放在最后,千万不要错过。
工程中明明有该方法,却提示
java
.
lan
g.
NoSuchMethodError
错误
java
的类加载机制是把所有不同名称的本类和引用类的包全部加载到内存,这样就有一个问题,如果有两个类名一样怎么办,机器很难区分
引用类,即引用别人包中的类:
public class Data(){
public static getData(){
System.o...
1、
报错
信息:
org
.
apache
.
kafka
.
client
s.con
sum
er
.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {qukan_log_v3-198=2289560518}
报错
原因:当消费者消费offset大于或小于当前
kafka
集...
参考文章:
flume
kafka
spark
streaming整合后集群
报错
org
.
apache
.
kafka
.
client
s.con
sum
er
.
Kafka
Con
sum
er
.
subscribe
(L
java
/ut
https://blog.csdn.net/u010936936/article/details/77247075?locationNum=2&fps=1
最近在...
Invocation of init method failed; nested exception is
java
.
lan
g.
NoSuchMethodError
:
org
.springframework.scheduling.quartz.Schedul
er
FactoryBean.getObject()L
org
/quartz/Schedul
er
;
at
org
.springframewo
org
.
apache
.
kafka
.common.
er
rors.int
er
ruptexception:
java
.
lan
g.int
er
ruptedexce
这个错误是
Kafka
出现的一个异常情况,意味着程序在运行过程中被强制中断了。具体来说,是由于程序在等待某个操作完成时,被外部因素(比如操作系统)强制中断了,导致程序无法正常完成该操作并抛出该异常。这个异常通常会在
Kafka
客户端的使用中出现,比如在数据消费者的消费过程中。
解决该错误需要找到导致程序被中断的具体原因。可能的原因包括操作系统资源耗尽、网络连接不稳定等等。一些具体的解决方案可能包括增加操作系统资源、优化程序代码、尝试连接其他网络等等。总之,这个异常提示需要我们注意程序在运行过程中可能会出现的问题,及时处理并改进程序的运行方式。
spark2+kafka报错:java.lang.NoSuchMethodError:org.apache.kafka.clients.consumer.KafkaConsumer.subscribe