@Autowired private KafkaProperties kafkaProperties ; /** 名为test的topic 传输的是字符串. */ public static final String TOPIC_TEST = "test" ; /** 名为model的topic 传输的是json. */ public static final String TOPIC_MODEL = "model" ; /** 自定义接受json的listener. */ public static final String KAFKA_JSON_LISTENER_CONTAINER_FACTORY = "kafkaJsonListenerContainerFactory" ; @Bean ( name = KAFKA_JSON_LISTENER_CONTAINER_FACTORY ) public KafkaListenerContainerFactory < ConcurrentMessageListenerContainer < String , String > > kafkaJsonListenerContainerFactory ( ConsumerFactory < String , String > consumerFactory ) { ConcurrentKafkaListenerContainerFactory < String , String > factory = new ConcurrentKafkaListenerContainerFactory < > ( ) ; factory . setConsumerFactory ( consumerFactory ) ; factory . setConcurrency ( 10 ) ; factory . setMessageConverter ( new StringJsonMessageConverter ( ) ) ; factory . getContainerProperties ( ) . setIdleEventInterval ( 60000 L * 60 ) ; factory . getContainerProperties ( ) . setPollTimeout ( 10000 ) ; return factory ; /** */ private Map < String , Object > consumerProps ( ) { Map < String , Object > props = new HashMap < > ( 20 ) ; props . put ( ConsumerConfig . BOOTSTRAP_SERVERS_CONFIG , kafkaProperties . getBootstrapServers ( ) ) ; /** groupId */ props . put ( ConsumerConfig . GROUP_ID_CONFIG , kafkaProperties . getConsumer ( ) . getGroupId ( ) ) ; /** 消费者是否自动提交偏移 量,默认值是 true */ props . put ( ConsumerConfig . ENABLE_AUTO_COMMIT_CONFIG , true ) ; /** 服务器从每个分区里返回给消费者的最大字节数 默认值是 1MB */ props . put ( ConsumerConfig . MAX_PARTITION_FETCH_BYTES_CONFIG , "512000" ) ; /** 消费者在被认为死亡之前可以与服务器断开连接的时间,默认是 3s */ props . put ( ConsumerConfig . SESSION_TIMEOUT_MS_CONFIG , 35000 ) ; /** 指定 broker 的等待时间,默认是 500ms */ props . put ( ConsumerConfig . FETCH_MAX_WAIT_MS_CONFIG , 35000 ) ; /** 请求超时配置 */ props . put ( ConsumerConfig . REQUEST_TIMEOUT_MS_CONFIG , 60000 ) ; /** 多长时间自动提交一次 */ props . put ( ConsumerConfig . AUTO_COMMIT_INTERVAL_MS_CONFIG , 5000 ) ; /** 控制单次调用 call() 方法能够返回的记录数量 */ props . put ( ConsumerConfig . MAX_POLL_RECORDS_CONFIG , 1000 ) ; props . put ( ConsumerConfig . KEY_DESERIALIZER_CLASS_CONFIG , StringDeserializer . class ) ; props . put ( ConsumerConfig . VALUE_DESERIALIZER_CLASS_CONFIG , StringDeserializer . class ) ; return props ; // 如果spring.kafka属性不足 则需要自定义ConsumerFactory @Bean public ConsumerFactory < String , String > consumerFactory ( ) { return new DefaultKafkaConsumerFactory < > ( consumerProps ( ) ) ;

配置文件如下

spring
  kafka:
    bootstrap-servers: localhost:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: sboot.kfk
      enable-auto-commit: true
      fetch-max-wait: 35000
      auto-commit-interval: 5000
      max-poll-records: 1000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      concurrency: 4
      poll-timeout: 10000

结果启动报错

Description:
Parameter 1 of method kafkaListenerContainerFactory in org.springframework.boot.autoconfigure.kafka.KafkaAnnotationDrivenConfiguration required a bean of type 'org.springframework.kafka.core.ConsumerFactory' that could not be found.
The following candidates were found but could not be injected:
	- Bean method 'kafkaConsumerFactory' in 'KafkaAutoConfiguration' not loaded because @ConditionalOnMissingBean (types: org.springframework.kafka.core.ConsumerFactory; SearchStrategy: all) found beans of type 'org.springframework.kafka.core.ConsumerFactory' consumerFactory
Action:
Consider revisiting the entries above or defining a bean of type 'org.springframework.kafka.core.ConsumerFactory' in your configuration.

说在配置中找不到ConsumerFactory,但是我实实在在的写了这个@Bean了。最后发现源码里面的ConsumerFactory是这样的。

	@Bean
	@ConditionalOnMissingBean(ConsumerFactory.class)
	public ConsumerFactory<?, ?> kafkaConsumerFactory() {
		return new DefaultKafkaConsumerFactory<>(this.properties.buildConsumerProperties());

仔细看,有什么不同?
ConsumerFactory<?, ?> 而不是ConsumerFactory<String, String>,泛型错误,spring的源码中应该有地方是把这个自定义的bean拿到,但是校验的时候发现泛型对不上,因此报错。以此类推比如要自定义RedisTemplate的时候也注意泛型 RedisTemplate<Object,Object>和RedisTemplate<String,Object>的区别,看看到底要求注入的是哪个。

结论:
注意自定义spring bean的时候看好泛型是否能对的上

开发spring-kafka自定义 consumerFactory@Configurationpublic class KafkaConfig { @Autowired private KafkaProperties kafkaProperties; /** 名为test的topic 传输的是字符串. */ public static final String TOPIC_TEST = "test"; /** 名为model的topic 传输的是json. */ public
Consider revisiting the entries above or defining a bean of type ‘org.springframework.data.redis.con
公司最近需要做一个soap请求数据接口,由于没有webservice的服务端,而系统项目使用的是springboot框架,所以索性用springboot集成一个webservice框架用作发布服务,以便方便为后面的soap接口提供数据。 如果这篇文章不是您想找的,请看这篇: WebService soap报文请求返回xml格式以及自定义soap模板 二、问题原因 上午运行得好好的代码,啥都没改下午程序突然就挂了,死活起不来,就报如图所示错误。 让程序猿忍不住怀疑人生,开始相信运行程序之前需要洗手焚香的传说…重点是啥都没改。。然后原因是,本微服务(gateway)中引用了自己写的工具类commons-util工具类,如图所示: 而commons-util工具类中引用了redis依赖没有排除,与gateway微服务中引用的redis-reactive依赖造成冲突。 有时候A依赖覆盖B依赖,这时候程序可以正常运行。 有时候B依赖占
如何在Windows环境安装和启动Active MQ请看上一篇博文: https://blog.csdn.net/h_j_c_123/article/details/108454607 首先我们使用idea新建一个spring boot工程,不会新建工程的请看该篇博文: https://blog.csdn.net/h_j_c_123/article/details/93477170 然后我们开始说明如何继承: 首先添加依赖: <!--ActiveMq--> <dependen
Consider revisiting the entries above or defining a bean of type ‘org.springframework.data.redis.connection.RedisConnectionFactory’ in your configuration. Action: Consider revisiting the entries above...
16:58:47.845 ERROR 11524 --- [ main] o.s.b.d.LoggingFailureAnalysisReporter : *************************** APPLICATION FAILED TO START *************************** Description: A component required a bean of type 'xxx' that coul 也就是在这个Java类里面的注入的接口需要一个Bean实现, Parameter 1 of constructor in com.abc.auth.controller.TokenController required a bean of type 'org.springframework.cache.CacheManager' that could not be found. The following candidates were f
怎么解决这个报错Consider revisiting the entries above or defining a bean of type 'org.springframework.security.core.userdetails.UserDetailsService' in your configuration.
这个报错表示在配置中没有指定一个类型为org.springframework.security.core.userdetails.UserDetailsService的bean。解决这个问题可以按照下面的步骤进行: 1. 在配置文件中添加一个bean,指定类型为org.springframework.security.core.userdetails.UserDetailsService。 2. 确保该bean的实现类正常工作,并且已经注册到Spring容器中。 3. 确保该bean的配置文件和Spring容器的配置文件都已经正确加载,并且classpath设置正确。 4. 确保你的代码正确地使用了该bean,即使用正确的名称来调用它。 如果你按照以上步骤进行了操作,但是问题仍未解决,请检查是否还有其他的配置错误。