一:创建两个连接工厂

@Bean(name = "firstConnectionFactory")
    public CachingConnectionFactory baseRabbitMqConnectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setChannelCacheSize(25);
        connectionFactory.setChannelCheckoutTimeout(0);
        connectionFactory.setPublisherReturns(false);
        connectionFactory.setPublisherConfirms(false);
        return connectionFactory;
    @Bean(name = "secondConnectionFactory")
    public CachingConnectionFactory baseRabbitMqConnectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.2:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setChannelCacheSize(25);
        connectionFactory.setChannelCheckoutTimeout(0);
        connectionFactory.setPublisherReturns(false);
        connectionFactory.setPublisherConfirms(false);
        return connectionFactory;

可以看到,上文我们创建了两个连接工厂,并且注册了bean。

二:绑定RabbitListenerContainerFactory

由于我们是使用@RabbitListener注解接收信息,每一个注解的方法都会由这个RabbitListenerContainerFactory创建一个MessageListenerContainer,负责接收消息。因此,我们需要将相应的连接绑定到这个类。同样,也是创建两个。

@Bean(name = "firstMultiListenerContainer")
    public SimpleRabbitListenerContainerFactory baseMultiListenerContainer(@Qualifier("firstConnectionFactory") ConnectionFactory firstConnectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(firstConnectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(5);
        factory.setMaxConcurrentConsumers(5);
        factory.setPrefetchCount(2);
        factory.setDefaultRequeueRejected(true);
        factory.setErrorHandler(new MqErrorHandler());
        return factory;
    @Bean(name = "secondMultiListenerContainer")
    public SimpleRabbitListenerContainerFactory baseMultiListenerContainer(@Qualifier("secondConnectionFactory") ConnectionFactory secondConnectionFactory){
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(secondConnectionFactory);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        factory.setConcurrentConsumers(5);
        factory.setMaxConcurrentConsumers(5);
        factory.setPrefetchCount(2);
        factory.setDefaultRequeueRejected(true);
        factory.setErrorHandler(new MqErrorHandler());
        return factory;
 

这里由于有多个ConnectionFactory 的Bean,因此我们需要用到**@Qualifier**注解来指定方法引入哪一个bean。

三:消费者监听消息

@RabbitListener(queues = "queueName", containerFactory = "firstMultiListenerContainer", errorHandler = "rabbitListenerErrorHandler")
    public void handleMessage(Message msg, Channel channel) throws Exception{
        //业务逻辑
    @RabbitListener(queues = "queueName", containerFactory = "secondMultiListenerContainer", errorHandler = "rabbitListenerErrorHandler")
    public void handleMessage1(Message msg, Channel channel) throws Exception{
        //业务逻辑

看起来好像没什么问题,测试一下!

***************************
    APPLICATION FAILED TO START
    ***************************
    Description:
    Parameter 1 of method simpleRabbitListenerContainerFactory in org.springframework.boot.autoconfigure.amqp.RabbitAnnotationDrivenConfiguration required a single bean, but 2 were found:
    	- firstConnectionFactory: defined by method 'baseRabbitMqConnectionFactory' in class path resource [com/rabbitmq/sender/config/RabbitmqConfig.class]
    	- secondConnectionFactory: defined by method 'bizRabbitMqConnectionFactory' in class path resource [com/rabbitmq/sender/config/RabbitmqConfig.class]
    Action:
    Consider marking one of the beans as @Primary, updating the consumer to accept multiple beans, or using @Qualifier to identify the bean that should be consumed

可以看到,应用启动失败,问题描述说RabbitAnnotationDrivenConfiguration 类的**simpleRabbitListenerContainerFactory **方法里面,有一个参数只需要一个bean,但是我们声明了两个,导致该方法不知道使用哪一个。下面来看一下这个方法:

simpleRabbitListenerContainerFactory方法

@Bean(
        name = {"rabbitListenerContainerFactory"}
    @ConditionalOnMissingBean(
        name = {"rabbitListenerContainerFactory"}
    @ConditionalOnProperty(
        prefix = "spring.rabbitmq.listener",
        name = {"type"},
        havingValue = "simple",
        matchIfMissing = true
    public SimpleRabbitListenerContainerFactory simpleRabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        configurer.configure(factory, connectionFactory);
        return factory;

可以看到,该方法是在没有**name=“rabbitListenerContainerFactory”**的Bean的时候,会调用这个方法创建一个Bean,方法里面的参数确实只需要一个ConnectionFactory 。

那么问题就来了,我们给它创建一个name=“rabbitListenerContainerFactory”的Bean,不让它调用这个方法不就行了?试试看!

方案一:创建一个name=“rabbitListenerContainerFactory”的Bean

@Bean(name = "rabbitListenerContainerFactory")
        public SimpleRabbitListenerContainerFactory baseMultiListenerContainer(@Qualifier("firstConnectionFactory") ConnectionFactory firstConnectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(firstConnectionFactory);
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            factory.setConcurrentConsumers(5);
            factory.setMaxConcurrentConsumers(5);
            factory.setPrefetchCount(2);
            factory.setDefaultRequeueRejected(true);
            factory.setErrorHandler(new MqErrorHandler());
            return factory;
        @Bean(name = "secondMultiListenerContainer")
        public SimpleRabbitListenerContainerFactory baseMultiListenerContainer(@Qualifier("secondConnectionFactory") ConnectionFactory secondConnectionFactory){
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(secondConnectionFactory);
            factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
            factory.setConcurrentConsumers(5);
            factory.setMaxConcurrentConsumers(5);
            factory.setPrefetchCount(2);
            factory.setDefaultRequeueRejected(true);
            factory.setErrorHandler(new MqErrorHandler());
            return factory;

上面我们将第一个Bean的name改一下,再将对应的消费者监听信息改一下,再试一次!

    2019-07-17 09:36:56.698  INFO 11424 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
    2019-07-17 09:36:56.725  INFO 11424 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: firstConnectionFactory#6d511b5f:0/SimpleConnection@4e868ef5 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64602]
    2019-07-17 09:36:56.768  INFO 11424 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.1.1:5672]
    2019-07-17 09:36:56.772  INFO 11424 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: secondConnectionFactory#1c807b1d:0/SimpleConnection@6872f9c8 [delegate=amqp://guest@127.0.1.1:5672/, localPort= 64603]

可以看到,这个真的是可以的!那么,还有其他方法吗?答案是肯定的!

方案二:使用@Primary

我们在问题的解决方案里,看到提示使用@Primary注解,该注解代表:自动装配时当出现多个Bean候选者时,被注解为@Primary的Bean将作为首选者,否则将抛出异常。这样不就解决了刚才方法不知道使用哪个Bean的问题了。

    @Bean(name = "firstConnectionFactory")
    @Primary
    public CachingConnectionFactory baseRabbitMqConnectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses("127.0.0.1:5672");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        connectionFactory.setCacheMode(CachingConnectionFactory.CacheMode.CHANNEL);
        connectionFactory.setChannelCacheSize(25);
        connectionFactory.setChannelCheckoutTimeout(0);
        connectionFactory.setPublisherReturns(false);
        connectionFactory.setPublisherConfirms(false);
        return connectionFactory;

我们将第一个Bean设置@Primary,再次启动!

2019-07-17 09:58:18.559  INFO 7276 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.1.1:5672]
    2019-07-17 09:58:18.585  INFO 7276 --- [           main] o.s.a.r.c.CachingConnectionFactory       : Created new connection: secondConnectionFactory#547e29a4:0/SimpleConnection@189b5fb1 [delegate=amqp://guest@127.0.1.1:5672/, localPort= 64874]
    2019-07-17 09:58:18.590  INFO 7276 --- [ntContainer#0-1] o.s.a.r.c.CachingConnectionFactory       : Attempting to connect to: [127.0.0.1:5672]
    2019-07-17 09:58:18.643  INFO 7276 --- [ntContainer#0-1] o.s.a.r.c.CachingConnectionFactory       : Created new connection: firstConnectionFactory#7a0e1b5e:0/SimpleConnection@48048382 [delegate=amqp://guest@127.0.0.1:5672/, localPort= 64875]

可以看到,应用启动成功,该方法也可行!

方案三:使用@Primary

如果不依赖于amqp,启动类排除

@EnableAutoConfiguration(exclude={RabbitAnnotationDrivenConfiguration.class})
                    前言之前在项目中使用了rabbitmq,后面由于需求改动,需要在一个项目中使用两个不同ip的数据源,接收不同的消息队列,因此这篇文章做一个简单记录。一:创建两个连接工厂@Bean(name = "firstConnectionFactory")    public CachingConnectionFactory baseRabbitMqConnectionFactory(){        CachingConnectionFactory connectionFactory = new Cac
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2020-06-10 08:23:09.734 [//] [main] ERROR o.s.b.d.LoggingFailureAna
				
一、知识点 1、RabbitListenerEndpoint : Endpoint为终端,像电脑、手机都是终端,他们都可以接受外部信息并响应,如手机来短信了就有提示。这里也用了终端的概念,例如:被@RabbitListener注解修饰方法也有终端的特点 可以接受外部信息并响应。 public interface RabbitListenerEndpoint { * the id of this endpoint String getId(); /**...
Parameter 0 of method sqlSessionFactory in com.yky.ykyAdmin.farmework.config.MyBatisConfig required a bean of type 'javax.sql.DataSource' that could not be found. The following candidates were found but could not be injected: - Bean method 'dataSource' i
之前在项目中使用了rabbitmq,然后简单整理了一下相关的信息,相关文章spring boot整合rabbitmq。 后面由于需求改动,需要在一个项目中使用两个不同ip的数据源,接收不同的消息队列,因此这篇文章做一个简单记录。 在之前的文章中,我们已经知道,spring boot整合rabbitmq,简单来说其实就是创建一个连接工厂(ConnectionFactory),然后在这个的...
无侵入式引入多源rabbitMq场景(主源自动装配,副源手动配置) springboot接入单个rabbitmq可以通过springboot自动装配原理,简单地在配置文件中设置好必要信息即可; 当需要接入多个rabbitmq源时,度娘找到的多数处理方法是给每个源进行手动配置好ConnectionFactory、RabbitTemplate、SimpleRabbitListenerContainerFactory、RabbitAdmin等等并把其中的主源使用@Primary进行标志;当主源配置信息比较复杂时
1. 可靠性:RabbitMQ 提供了丰富的机制来保证消息的可靠性,如持久化、确认机制等。 2. 灵活性:RabbitMQ 支持多种消息传递模式,如点对点、发布/订阅、请求/响应等,能够满足各种场景的需求。 3. 可扩展性:RabbitMQ 的集群架构能够实现高可用性和高性能的消息传递,同时支持动态扩展节点。 4. 多语言支持:RabbitMQ 提供了多种客户端库,支持多种编程语言,如Java、Python、Ruby、C#等,方便不同语言的应用接入。 5. 可视化管理界面:RabbitMQ 提供了易于使用的 Web 管理界面,方便管理员对消息队列进行管理和监控。 总之,RabbitMQ 是一款功能强大、灵活性高、可靠性好的消息代理软件,广泛应用于企业级系统中,能够有效地处理大量的异步消息传递。