相关文章推荐
欢乐的篮球  ·  Apache NiFi 存储redis ...·  4 月前    · 
温暖的香烟  ·  c# .net ...·  1 年前    · 
聪明的猕猴桃  ·  Spring ...·  1 年前    · 
精彩文章免费看

SpringBoot整合RabbitMQ系列(二)Exchange的四种类型

Exchange是什么

我们可以将 Exchange 当做一个消息的中转站,所有的消息在发送到指定队列前都要经过这一层中转站。中转站再根据某些规则进行匹配来决定将消息分发到哪些队列中。

Exchange的几种类型

  • Direct(默认)
    只有当 Routing Key 等于 Binding Key 时,消息才会被路由到队列中。
  • Fanout
    实现了一种广播机制,将消息发送到所有与这个 Exchange 绑定的队列中。
  • Topic
    定义了一种匹配规则,同样根据 Routing Key 和 Binding Key 进行模糊匹配,更加灵活。
  • Headers
    Headers Exchange 会忽略 RoutingKey 而根据消息中的 Headers 和创建绑定关系时指定的 Arguments 来匹配决定路由到哪些 Queue,其性能比较差而且 Direct Exchange 完全可以代替它,所以不建议使用。
  • Exchange 的创建与绑定

        @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "队列名"),
                exchange = @Exchange(name = "交换机名称"), key = "Binding Key"))
        public void listen(String msg) {
            log.info("成功消费队列的消息: {}", msg);
    

    以上就是一个通过注解声明队列同时绑定Exchange的一个列子。
    如果仅声明一条队列,那么当它被创建时,后台会自动将这个队列绑定到一个名称为空的 Direct Exchange 上,绑定 Routing Key 与队列名称相同。

    package com.jiangzhe.rabbitmq.chapter2;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.amqp.core.ExchangeTypes;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.annotation.Exchange;
    import org.springframework.amqp.rabbit.annotation.Queue;
    import org.springframework.amqp.rabbit.annotation.QueueBinding;
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableScheduling;
    import org.springframework.scheduling.annotation.Scheduled;
    @Slf4j
    @EnableScheduling
    @SpringBootApplication
    public class Chapter2Application {
        private static final String FANOUT_EXCHANGE_NAME = "amqp.fanout";
        private static final String TOPIC_EXCHANGE_NAME = "amqp.topic";
        private static final String DIRECT_EXCHANGE_NAME = "amqp.direct";
        @Autowired
        private RabbitTemplate rabbitTemplate;
        public static void main(String[] args) {
            SpringApplication.run(Chapter2Application.class, args);
        @RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct"),
                exchange = @Exchange(name = DIRECT_EXCHANGE_NAME), key = "direct.cn"))
        public void listenDirect(String msg, Message message) {
            String consumerQueue = message.getMessageProperties().getConsumerQueue();
            log.info("成功消费队列 {} 的消息: {}", consumerQueue, msg);
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue(value = "fanoutA"),
                        exchange = @Exchange(type = ExchangeTypes.FANOUT, name = FANOUT_EXCHANGE_NAME)),
                @QueueBinding(value = @Queue(value = "fanoutB"),
                        exchange = @Exchange(type = ExchangeTypes.FANOUT, name = FANOUT_EXCHANGE_NAME)),
                @QueueBinding(value = @Queue(value = "fanoutC"),
                        exchange = @Exchange(type = ExchangeTypes.FANOUT, name = FANOUT_EXCHANGE_NAME))})
        public void listenFanout(String msg, Message message) {
            String consumerQueue = message.getMessageProperties().getConsumerQueue();
            log.info("成功消费队列 {} 的消息: {}", consumerQueue, msg);
         * 其中*代表匹配任意的一个词,#代表匹配0或多个词
        @RabbitListener(bindings = {
                @QueueBinding(value = @Queue(name = "topicA"),
                        exchange = @Exchange(type = ExchangeTypes.TOPIC, name = TOPIC_EXCHANGE_NAME), key = "topic.cn.#"),
                @QueueBinding(value = @Queue(name = "topicB"),
                        exchange = @Exchange(type = ExchangeTypes.TOPIC, name = TOPIC_EXCHANGE_NAME), key = "topic.com.*"),
                @QueueBinding(value = @Queue(name = "topicC"),
                        exchange = @Exchange(type = ExchangeTypes.TOPIC, name = TOPIC_EXCHANGE_NAME), key = "topic.#"),
        public void listenTopic(String msg, Message message) {
            String consumerQueue = message.getMessageProperties().getConsumerQueue();
            log.info("成功消费队列 {} 的消息: {}", consumerQueue, msg);
        @Scheduled(initialDelay = 2000, fixedRate = 1000000)
        public void send() throws InterruptedException {
            log.info("----------------------------");
            log.info("direct");
            rabbitTemplate.convertAndSend(DIRECT_EXCHANGE_NAME, "direct.cn", "hello");
            Thread.sleep(1000);
            log.info("----------------------------");
            log.info("fanout");
            rabbitTemplate.convertAndSend(FANOUT_EXCHANGE_NAME, "这个值无效", "hello");
            Thread.sleep(1000);
            log.info("----------------------------");
            log.info("to topicC and topicC");
            rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "topic.com.a", "hello");
            Thread.sleep(1000);
            log.info("----------------------------");
            log.info("to topicA and topicC");
            rabbitTemplate.convertAndSend(TOPIC_EXCHANGE_NAME, "topic.cn", "hello");
    
    2019-11-02 12:54:09.775  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
    2019-11-02 12:54:09.775  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : direct
    2019-11-02 12:54:09.849  INFO 2756 --- [ntContainer#2-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 direct 的消息: hello
    2019-11-02 12:54:10.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
    2019-11-02 12:54:10.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : fanout
    2019-11-02 12:54:10.839  INFO 2756 --- [ntContainer#1-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 fanoutA 的消息: hello
    2019-11-02 12:54:10.839  INFO 2756 --- [ntContainer#1-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 fanoutB 的消息: hello
    2019-11-02 12:54:10.839  INFO 2756 --- [ntContainer#1-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 fanoutC 的消息: hello
    2019-11-02 12:54:11.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
    2019-11-02 12:54:11.809  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : to topicC and topicC
    2019-11-02 12:54:11.838  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 topicB 的消息: hello
    2019-11-02 12:54:11.838  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 topicC 的消息: hello
    2019-11-02 12:54:12.810  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : ----------------------------
    2019-11-02 12:54:12.810  INFO 2756 --- [   scheduling-1] c.j.r.chapter2.Chapter2Application       : to topicA and topicC
    2019-11-02 12:54:12.840  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 topicC 的消息: hello
    2019-11-02 12:54:12.840  INFO 2756 --- [ntContainer#0-1] c.j.r.chapter2.Chapter2Application       : 成功消费队列 topicA 的消息: hello
    
    最后编辑于:2019-11-08 22:38