相关文章推荐
强健的生姜  ·  MySQL ...·  4 周前    · 
呐喊的白开水  ·  git ...·  3 月前    · 
傲视众生的白开水  ·  numpy.void ...·  1 年前    · 

RabbitMQ学习-----高级特性2

RabbitAdmin

注入spring容器中,可以很好的操控RabbitMQ.配置如下:

@Configuration
public class RabbitmqConfig {
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory();
        cachingConnectionFactory.setAddresses("172.0.0.0:5672");
        cachingConnectionFactory.setUsername("root");
        cachingConnectionFactory.setPassword("root");
        cachingConnectionFactory.setVirtualHost("/");
        return cachingConnectionFactory;
     * rabbitAdmin操作RabbbitMQ
     * @param connectionFactory
     * @return
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //必须设置成true,否则spring容器不会加载rabbitmq
        //rabbitadmin底层实现就是从spring 容器中获取Exchange/Binding/RountingKey以及Queue的@Bean声明
        //然后使用RabbitTemplate的execute()方法执行对应的修改(添加一个交换机/删除一个绑定/清空一个队列)等一些列rabbitmq操作
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
}
  • rabbitAdmin声明和绑定
public class TestController {
   @Autowired
   private RabbitAdmin rabbitAdmin; 
   // 每个模组都带一个测试服务 用于验证
   public void test() {
      //声明exchange
      rabbitAdmin.declareExchange(new DirectExchange("test_direct", false, false));
      rabbitAdmin.declareExchange(new TopicExchange("test_topic", false, false));
      rabbitAdmin.declareExchange(new FanoutExchange("test_fanout", false, false));
      //声明queue
      rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));
      rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));
      rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));
      //声明队列绑定,将队列绑定到哪个exchange上
      //绑定的要素:队列,exchange,routingKey 
      //两种绑定方式:new Binding()和BindingBuilder
      rabbitAdmin.declareBinding(new Binding("test.direct.queue", Binding.DestinationType.QUEUE, "test_direct", "direct", new HashMap<>()));
      rabbitAdmin.declareBinding(BindingBuilder
            .bind(new Queue("test.topic.queue", false))   //queue可以先声明,也可以在binding的时候声明
            .to(new TopicExchange("test_topic", false, false))
            .with("uer.#")); //指定路由key
      //清空队列数据
      rabbitAdmin.purgeQueue("test.topic.queue", false);
}

消息模板 rabbitTemplate

在与spring整合时需要实例化,在和springboot整合时添加配置即可
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage() throws Exception {
	//1 创建消息
	MessageProperties messageProperties = new MessageProperties();
	messageProperties.getHeaders().put("desc", "信息描述..");
	messageProperties.getHeaders().put("type", "自定义消息类型..");
	Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);
        //参数4可以对传进的消息进行加工设置
	rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
	@Override
	public Message postProcessMessage(Message message) throws AmqpException {
			System.err.println("------添加额外的设置---------");
			message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
			message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
			return message;
}

消息监听容器SimpleMessageListenerContainer

    • 设置事务特性,事务管理器,事务属性,事务容量(并发),是否开启事务,回滚消息;
    • 设置消费者数量,最小最大数量,批量消费;
    • 设置消息确认模式,是否重回队列,异常捕获handler函数;
    • 设置消费者标签生成策略,是否独占模式,消费者属性等;
    • 设置监听器,消息转换器;
    • 在运行中修改消费者数量的大小,接收消息模式等;
    • 消息监听配置;
@Configuration
public class RabbitmqConfig {
@Bean  
    public TopicExchange exchange001() {  
        return new TopicExchange("topic001", true, false);  
    @Bean  
    public Queue queue001() {  
        return new Queue("queue001", true); //队列持久  
    @Bean  
    public Binding binding001() {  
        return BindingBuilder.bind(queue001()).to(exchange001()).with("spring.*");  
    @Bean  
    public TopicExchange exchange002() {  
        return new TopicExchange("topic002", true, false);  
    @Bean  
    public Queue queue002() {  
        return new Queue("queue002", true); //队列持久  
    @Bean  
    public Binding binding002() {  
        return BindingBuilder.bind(queue002()).to(exchange002()).with("rabbit.*");  
    @Bean  
    public Queue queue003() {  
        return new Queue("queue003", true); //队列持久  
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
       //设置监听队列
        container.setQueues(queue001(), queue002(), queue003());                                        
        container.setConcurrentConsumers(1);                          //消费者数量
        container.setMaxConcurrentConsumers(5);                       //最大消费者数量
        container.setDefaultRequeueRejected(false);                   //重回队列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);           //签收模式
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {  //消费端标签策略
            @Override
            public String createConsumerTag(String queue) {
                return queue+"_"+ UUID.randomUUID();
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                //做消息的处理
                String messageBody = new String(message.getBody());
                System.out.println(messageBody);
        return container;
}

消息监听适配器(messageListenerAdapter)

  • 适配自定义的MessageDelegate类。我们就可以不采用监听的方式,采用适配的方式-----适配不同的消息处理方法
//自定义messageDelegate
public class MessageDelegate {
//自定义MessageListenerAdapter名称必须为handleMessage
    public void handleMessage(byte[] messageBody) {    //注意为字节数组
        System.out.println("handleMessage默认方法,消息内容 String:" + new String(messageBody));
//rabbitmq配置
@Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(new Queue("test.direct.queue", false));                                        //设置监听队列
        container.setConcurrentConsumers(1);
        container.setMaxConcurrentConsumers(5);
        container.setDefaultRequeueRejected(false);                   //重回队列
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);           //签收模式
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {  //消费端标签策略
            @Override
            public String createConsumerTag(String queue) {
                return queue+"_"+ UUID.randomUUID();
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {
                //做消息的处理
                String messageBody = new String(message.getBody());
                System.out.println(messageBody);
        //配置适配器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("handleMessage");   //将message传递给MessageDelegate中的方法
        container.setMessageListener(adapter);
       //全局的转换器:
	ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
	TextMessageConverter textConvert = new TextMessageConverter();
	convert.addDelegate("text", textConvert);
	convert.addDelegate("html/text", textConvert);
	convert.addDelegate("xml/text", textConvert);
	convert.addDelegate("text/plain", textConvert);
	Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
	convert.addDelegate("json", jsonConvert);
	convert.addDelegate("application/json", jsonConvert);
	ImageMessageConverter imageConverter = new ImageMessageConverter();
	convert.addDelegate("image/png", imageConverter);
	convert.addDelegate("image", imageConverter);
	PDFMessageConverter pdfConverter = new PDFMessageConverter();
	convert.addDelegate("application/pdf", pdfConverter);
	adapter.setMessageConverter(convert);
        return container;
    }



消息转化messageConvert

messageListenerAdapter使用不匹配的listenerMethod时报异常,需要使用MessageConverter将消息进行转换

  • 自定义方法名
//自定义MessageDelegate类
public class MessageDelegate {
    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
}
  • TextMessageConverter转换器
//自定义MessageConverter 
public class TextMessageConvert implements MessageConverter {
    //java对象转化为message对象
    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        return new Message(object.toString().getBytes(), messageProperties);
    //message对象转化为java对象
    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        String contenType = message.getMessageProperties().getContentType();
        if (StringUtils.isNotBlank(contenType) && contenType.contains("text")) {
            return new String(message.getBody());
        return message.getBody();
//修改RabbitMQConfig类
//配置适配器时添加messageconvert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("handleMessage");   //MessageDelegate中的方法
adapter.setMessageConverter(new TextMessageConvert());
container.setMessageListener(adapter);

QueueOrTagToMethodName

将队列名称和方法名称进行一一适配,不同的队列使用不同的方法进行监听

//修改RabbitMQConfig类
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setMessageConverter(new TextMessageConverter());
Map<String, String> queueOrTagToMethodName = new HashMap<>();
queueOrTagToMethodName.put("queue001", "method1");
queueOrTagToMethodName.put("queue002", "method2");
adapter.setQueueOrTagToMethodName(queueOrTagToMethodName);  //核心读取map中的queue,分配给不同的method
container.setMessageListener(adapter);  
//---------------------------------分割线--------------------------------------------//
//修改messageDelegate
public class MessageDelegate {
    public void method1(String messageBody) {
        System.err.println("method1 收到消息内容:" + new String(messageBody));
    public void method2(String messageBody) {
        System.err.println("method2 收到消息内容:" + new String(messageBody));

消息转换器MessageConverter

正常情况下,消息是以二进制的数据形式进行转换的,如果需要将二进制数据转化为其他形式,就需要使用到MessageConverter----------实现该接口,重写toMessage和fromMessage.

  • Jackson2JsonMessageConverter(Java对象转换)
//修改RabbitMQConfig
// 1.1 支持json格式的转换器
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//重点,加入json格式的转换器 json对应Map对象
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);
//----------------------------分割线---------------------------------------//
//修改messageListernContainer中的messageDelegater
public class MessageDelegate {
    //json对应Map对象---该map接收json格式的数据
    public void consumeMessage(Map messageBody) {
        System.err.println("map方法, 消息内容:" + messageBody);
//------------------------------------分割线:修改生产者----------------------//
public void testSendJsonMessage() throws Exception {
	Order order = new Order();
	order.setId("001");
	order.setName("消息订单");
	order.setContent("描述信息");
	ObjectMapper mapper = new ObjectMapper();
	String json = mapper.writeValueAsString(order);
	System.err.println("order 4 json: " + json);
	MessageProperties messageProperties = new MessageProperties();
	//这里注意一定要修改contentType为 application/json
	messageProperties.setContentType("application/json");
	Message message = new Message(json.getBytes(), messageProperties);
	rabbitTemplate.send("topic001", "spring.order", message);
	}
  • DefaultJackson2JavaTypeMapper(java对象映射)
  • 多对象映射装换
//1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);    
//-----------------------------分割线----------------------------------------------//
//key表示标签 对应一个类的具体全路径。类和标签绑定之后,标签是order,意思就是转换成order类
Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
idClassMapping.put("order", com.cp.spring.entity.Order.class);
idClassMapping.put("packaged", com.cp.spring.entity.Packaged.class);
javaTypeMapper.setIdClassMapping(idClassMapping);
//一层套一层
jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
adapter.setMessageConverter(jackson2JsonMessageConverter);
container.setMessageListener(adapter);


public class MessageDelegate {
 //json对应Map对象
 public void consumeMessage(Order order) {
        System.err.println("order对象, 消息内容, id: " + order.getId() + 
 ", name: " + order.getName() + 
 ", content: "+ order.getContent());
 public void consumeMessage(Packaged pack) {
        System.err.println("package对象, 消息内容, id: " + pack.getId() + 
 ", name: " + pack.getName() + 
 ", content: "+ pack.getDescription());
}
  • 全局转化
修改RabbitMQConfig类
//1.4 ext convert
MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
adapter.setDefaultListenerMethod("consumeMessage");
//全局的转换器:所有小的Converter都可以放到这个大的Converter中
ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();
TextMessageConverter textConvert = new TextMessageConverter();
//text走文本转换器
convert.addDelegate("text", textConvert);
convert.addDelegate("html/text", textConvert);
convert.addDelegate("xml/text", textConvert);
convert.addDelegate("text/plain", textConvert);
//json走json转换器
Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
convert.addDelegate("json", jsonConvert);
convert.addDelegate("application/json", jsonConvert);
//图片走图片转换器
ImageMessageConverter imageConverter = new ImageMessageConverter();
convert.addDelegate("image/png", imageConverter);
convert.addDelegate("image", imageConverter);
//pdf走pdf转换器
PDFMessageConverter pdfConverter = new PDFMessageConverter();
convert.addDelegate("application/pdf", pdfConverter);
adapter.setMessageConverter(convert);
//--------------------------分割线---------------------imageConvert-----------------------//
public class ImageMessageConverter implements MessageConverter {
	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		throw new MessageConversionException(" convert error ! ");
	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		System.err.println("-----------Image MessageConverter----------");
		Object _extName = message.getMessageProperties().getHeaders().get("extName");
		String extName = _extName == null ? "png" : _extName.toString();
		byte[] body = message.getBody();
		String fileName = UUID.randomUUID().toString();
		String path = "d:/010_test/" + fileName + "." + extName;
		File f = new File(path);
		try {
			Files.copy(new ByteArrayInputStream(body), f.toPath());
		} catch (IOException e) {