@Autowired
private
KafkaProperties kafkaProperties
;
public
static
final
String TOPIC_TEST
=
"test"
;
public
static
final
String TOPIC_MODEL
=
"model"
;
public
static
final
String KAFKA_JSON_LISTENER_CONTAINER_FACTORY
=
"kafkaJsonListenerContainerFactory"
;
@Bean
(
name
=
KAFKA_JSON_LISTENER_CONTAINER_FACTORY
)
public
KafkaListenerContainerFactory
<
ConcurrentMessageListenerContainer
<
String
,
String
>
>
kafkaJsonListenerContainerFactory
(
ConsumerFactory
<
String
,
String
>
consumerFactory
)
{
ConcurrentKafkaListenerContainerFactory
<
String
,
String
>
factory
=
new
ConcurrentKafkaListenerContainerFactory
<
>
(
)
;
factory
.
setConsumerFactory
(
consumerFactory
)
;
factory
.
setConcurrency
(
10
)
;
factory
.
setMessageConverter
(
new
StringJsonMessageConverter
(
)
)
;
factory
.
getContainerProperties
(
)
.
setIdleEventInterval
(
60000
L
*
60
)
;
factory
.
getContainerProperties
(
)
.
setPollTimeout
(
10000
)
;
return
factory
;
private
Map
<
String
,
Object
>
consumerProps
(
)
{
Map
<
String
,
Object
>
props
=
new
HashMap
<
>
(
20
)
;
props
.
put
(
ConsumerConfig
.
BOOTSTRAP_SERVERS_CONFIG
,
kafkaProperties
.
getBootstrapServers
(
)
)
;
props
.
put
(
ConsumerConfig
.
GROUP_ID_CONFIG
,
kafkaProperties
.
getConsumer
(
)
.
getGroupId
(
)
)
;
props
.
put
(
ConsumerConfig
.
ENABLE_AUTO_COMMIT_CONFIG
,
true
)
;
props
.
put
(
ConsumerConfig
.
MAX_PARTITION_FETCH_BYTES_CONFIG
,
"512000"
)
;
props
.
put
(
ConsumerConfig
.
SESSION_TIMEOUT_MS_CONFIG
,
35000
)
;
props
.
put
(
ConsumerConfig
.
FETCH_MAX_WAIT_MS_CONFIG
,
35000
)
;
props
.
put
(
ConsumerConfig
.
REQUEST_TIMEOUT_MS_CONFIG
,
60000
)
;
props
.
put
(
ConsumerConfig
.
AUTO_COMMIT_INTERVAL_MS_CONFIG
,
5000
)
;
props
.
put
(
ConsumerConfig
.
MAX_POLL_RECORDS_CONFIG
,
1000
)
;
props
.
put
(
ConsumerConfig
.
KEY_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
)
;
props
.
put
(
ConsumerConfig
.
VALUE_DESERIALIZER_CLASS_CONFIG
,
StringDeserializer
.
class
)
;
return
props
;
@Bean
public
ConsumerFactory
<
String
,
String
>
consumerFactory
(
)
{
return
new
DefaultKafkaConsumerFactory
<
>
(
consumerProps
(
)
)
;
配置文件如下
spring
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: sboot.kfk
enable-auto-commit: true
fetch-max-wait: 35000
auto-commit-interval: 5000
max-poll-records: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
concurrency: 4
poll-timeout: 10000
结果启动报错
Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
The following candidates were found but could not be injected:
- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.ConsumerFactory' consumerFactory
Action:
Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.
说在配置中找不到ConsumerFactory,但是我实实在在的写了这个@Bean了。最后发现源码里面的ConsumerFactory是这样的。
@Bean
@ConditionalOnMissingBean(ConsumerFactory.class)
public ConsumerFactory<?, ?> kafkaConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());
仔细看,有什么不同?
ConsumerFactory<?, ?> 而不是ConsumerFactory<String, String>,泛型错误,spring的源码中应该有地方是把这个自定义的bean拿到,但是校验的时候发现泛型对不上,因此报错。以此类推比如要自定义RedisTemplate的时候也注意泛型 RedisTemplate<Object,Object>和RedisTemplate<String,Object>的区别,看看到底要求注入的是哪个。
结论:
注意自定义spring bean的时候看好泛型是否能对的上
开发spring-kafka自定义 consumerFactory@Configurationpublic class KafkaConfig { @Autowired private KafkaProperties kafkaProperties; /** 名为test的topic 传输的是字符串. */ public static final String TOPIC_TEST = "test"; /** 名为model的topic 传输的是json. */ public
Consider revisiting the entries above or defining a bean of type ‘org.springframework.data.redis.con
公司最近需要做一个soap请求数据接口,由于没有webservice的服务端,而系统项目使用的是springboot框架,所以索性用springboot集成一个webservice框架用作发布服务,以便方便为后面的soap接口提供数据。
如果这篇文章不是您想找的,请看这篇: WebService soap报文请求返回xml格式以及自定义soap模板
二、问题原因
上午运行得好好的代码,啥都没改下午程序突然就挂了,死活起不来,就报如图所示错误。
让程序猿忍不住怀疑人生,开始相信运行程序之前需要洗手焚香的传说…重点是啥都没改。。然后原因是,本微服务(gateway)中引用了自己写的工具类commons-util工具类,如图所示:
而commons-util工具类中引用了redis依赖没有排除,与gateway微服务中引用的redis-reactive依赖造成冲突。
有时候A依赖覆盖B依赖,这时候程序可以正常运行。
有时候B依赖占
如何在Windows环境安装和启动Active MQ请看上一篇博文:
https://blog.csdn.net/h_j_c_123/article/details/108454607
首先我们使用idea新建一个spring boot工程,不会新建工程的请看该篇博文:
https://blog.csdn.net/h_j_c_123/article/details/93477170
然后我们开始说明如何继承:
首先添加依赖:
<!--ActiveMq-->
<dependen
Consider revisiting the entries above or defining a bean of type ‘org.springframework.data.redis.connection.RedisConnectionFactory’ in your configuration.
Action:
Consider revisiting the entries above...
16:58:47.845 ERROR 11524 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter :
***************************
APPLICATION FAILED TO START
***************************
Description:
A component required a bean of type 'xxx' that coul
也就是在这个Java类里面的注入的接口需要一个Bean实现,
Parameter 1 of constructor in com.abc.auth.controller.TokenController required a bean of type 'org.springframework.cache.CacheManager' that could not be found.
The following candidates were f
怎么解决这个报错Consider revisiting the entries above or defining a bean of type 'org.springframework.security.core.userdetails.UserDetailsService' in your configuration.
这个报错表示在配置中没有指定一个类型为org.springframework.security.core.userdetails.UserDetailsService的bean。解决这个问题可以按照下面的步骤进行:
1. 在配置文件中添加一个bean,指定类型为org.springframework.security.core.userdetails.UserDetailsService。
2. 确保该bean的实现类正常工作,并且已经注册到Spring容器中。
3. 确保该bean的配置文件和Spring容器的配置文件都已经正确加载,并且classpath设置正确。
4. 确保你的代码正确地使用了该bean,即使用正确的名称来调用它。
如果你按照以上步骤进行了操作,但是问题仍未解决,请检查是否还有其他的配置错误。