Java整合RabbitMQ
依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
1.基本API
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//连接IP
connectionFactory.setHost("");
//连接端口
connectionFactory.setPort(5672);
//虚拟主机(默认为/)
connectionFactory.setVirtualHost("/");
//账号 默认是guest
connectionFactory.setUsername("root");
//密码 默认是guest
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//业务操作...
//关闭信道与连接
channel.close();
connection.close();
2.创建交换机
//第一个参数是:交换机名字,第二个是:交换机类型,第三个是:是否持久化,第四个是:是否自动删除,第五个是:是否内部交换机,第六个是:创建参数
channel.exchangeDeclare("shanguoyu", BuiltinExchangeType.DIRECT,false,false,false,null);
3.创建备用交换机
//创建备用交换机
channel.exchangeDeclare("shanguoyu2", BuiltinExchangeType.DIRECT,false,false,false,null);
//创建主交换机并用备用交换机绑定到主交换机上
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("alternate-exchange","shanguoyu2");
channel.exchangeDeclare("shanguoyu3", BuiltinExchangeType.DIRECT,false,false,false,argsMap);
4.创建队列
//第一个参数为:队列名,第二个参数为:是否持久化,第三个参数为:是否为排他队列,第四个参数:是否为自动删除队列,第五个参数为:创建参数
channel.queueDeclare("shanguoyu",false,false,false,null);
5.创建队列指定死信交换机
//创建死信交换机(注意:没有为死信交换机指定绑定)
channel.exchangeDeclare("shanguoyu4", BuiltinExchangeType.DIRECT,false,false,false,null);
//创建队列是的时候绑定私信交换机
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-dead-letter-exchange","shanguoyu4");
channel.queueDeclare("shanguoyu2",false,false,false,argsMap);
6.TTL+DLX完成延时队列
/**
* 演示TTL+DLX完成延时队列
public class QueueTest3 {
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//连接IP
connectionFactory.setHost("192.168.1.108");
//连接端口
connectionFactory.setPort(5672);
//虚拟主机(默认为/)
connectionFactory.setVirtualHost("/");
//账号 默认是guest
connectionFactory.setUsername("root");
//密码 默认是guest
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
//创建死信交换器 与队列等
channel.exchangeDeclare("DEAD_LETTER_EXCHANGE", BuiltinExchangeType.DIRECT,false,false,false,null);
channel.queueDeclare("DEAD_LETTER_QUEUE", false, false, false, null);
channel.queueBind("DEAD_LETTER_QUEUE", "DEAD_LETTER_EXCHANGE", "DL");
//创建交换器队列等并绑定死信交换器并指定队列中的消息的过期时间为5秒
channel.exchangeDeclare("shanguoyu", BuiltinExchangeType.DIRECT,false,false,false,null);
Map<String,Object> map=new HashMap<>();
map.put("x-message-ttl", TimeUnit.SECONDS.toMillis(20));
map.put("x-dead-letter-exchange","DEAD_LETTER_EXCHANGE");
map.put("x-dead-letter-routing-key","DL");
channel.queueDeclare("shanguoyu", false, false, false, map);
channel.queueBind("shanguoyu", "shanguoyu", "shanguoyu");
* 生产者生产消息
class Product{
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//连接IP
connectionFactory.setHost("192.168.1.108");
//连接端口
connectionFactory.setPort(5672);
//虚拟主机(默认为/)
connectionFactory.setVirtualHost("/");
//账号 默认是guest
connectionFactory.setUsername("root");
//密码 默认是guest
connectionFactory.setPassword("123456");
//创建连接
Connection connection = connectionFactory.newConnection();
//创建信道
Channel channel = connection.createChannel();
channel.basicPublish("shanguoyu", "shanguoyu", null, "Hello".getBytes());
channel.close();
connection.close();
}
7.创建具有过期时间的队列
//创建队列的时候指定队列过期时间(单位为毫秒,我这里为5秒过期时间)
Map<String, Object> argsMap = new HashMap<>();
argsMap.put("x-expires", 5000);
channel.queueDeclare("shanguoyu3",false,false,false,argsMap);
8.创建绑定关系
//将队列绑定到交换机上
//第一个参数是:队列名,第二个参数是:交换机名字,第三个名字是:路由键,第四个参数是:创建参数
channel.queueBind("shanguoyu","shanguoyu","shanguoyu",null);
9.生产者的发送消息
//生产者发送消息
//第一个参数:交换机名字,第二个参数:路由键,第三个参数:是否开启失败通知,第四个参数为:不支持的参数,第五个参数为:消息的头帧,第六个参数为:消息体
channel.basicPublish("shanguoyu","shanguoyu",false,false,null,"Hello".getBytes(StandardCharsets.UTF_8));
10.生产者的失败通知
//生产者发送消息(注意点:不要提前关闭信道与连接因为失败通知可能还没回来,找不到交换机不会触发失败通知,只有在找不到队列的时候才触发失败通知)
//第一个参数:交换机名字,第二个参数:路由键,第三个参数:是否开启失败通知,第四个参数为:不支持的参数,第五个参数为:消息的头帧,第六个参数为:消息体
channel.addReturnListener((replycode, replyText, exchange, routingKey, basicProperties, bytes) -> {
System.out.println("交换机为:" + exchange);
System.out.println("路由键为:" + routingKey);
System.out.println("消息体为:" + new String(bytes));
System.out.println("消息头帧为:" + basicProperties);
System.out.println("错误文本为:" + replyText);
System.out.println("错误码为:" + replycode);
//故意写成shanguoyu1,因为没有这个路由键 好触发失败通知
channel.basicPublish("shanguoyu", "shanguoyu1", true, false, null, "Hello".getBytes(StandardCharsets.UTF_8));
11.发送方确认的一般确认
//在该信道上开启发送者确认
channel.confirmSelect();
channel.basicPublish("shanguoyu","asdsadsa",false,false,null,"Hello".getBytes());
boolean b = channel.waitForConfirms();
if (b) {
System.out.println("发送成功");
}else{
System.out.println("发送失败");
}
12.发送方确认的批量确认
//开启发送方确认
channel.confirmSelect();
// 启用发送者确认模式(批量确认)
channel.waitForConfirmsOrDie();
for (int i = 0; i < 10; i++) {
channel.basicPublish("shanguoyu","asdsadsa",false,false,null,"Hello".getBytes());
}
13.发送方的异步确认
//开启发送方确认
channel.confirmSelect();
//添加异步确认
channel.addConfirmListener(
((deliveryTag,multiple)->System.out.println("发送成功标签是:"+deliveryTag+",multiple:"+multiple)),
((deliveryTag,multiple)->System.out.println("发送失败标签是:"+deliveryTag+",multiple:"+multiple))
for (int i = 0; i < 10; i++) {
channel.basicPublish("shanguoyu","asdsadsa",false,false,null,"Hello".getBytes());
}
14.消费者消费消息(订阅推送的方式)
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
//第一个参数是:订阅的队列名称,第二个参数是:是否自动确认消息,第三个参数是:消费者标签,第四个参数是:不支持的参数,第五个参数是:是否独占,第六个是:创建参数,第七个是:消费者
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
15.消费者手动ACK
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
//手动ACK,第二个参数是 是否批量处理
channel.basicAck(envelope.getDeliveryTag(),false);
//第一个参数是:订阅的队列名称,第二个参数是:是否自动确认消息,第三个参数是:消费者标签,第四个参数是:不支持的参数,第五个参数是:是否独占,第六个是:创建参数,第七个是:消费者
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
16.消费者自动ACK
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
//第一个参数是:订阅的队列名称,第二个参数是:是否自动确认消息,第三个参数是:消费者标签,第四个参数是:不支持的参数,第五个参数是:是否独占,第六个是:创建参数,第七个是:消费者
channel.basicConsume("shanguoyu",true,"",false,false,null,consumer);
17.消息的单挑拒绝
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
//消息的拒绝Reject,第二个参数是是否允许重新投递
channel.basicReject(envelope.getDeliveryTag(),true);
//第一个参数是:订阅的队列名称,第二个参数是:是否自动确认消息,第三个参数是:消费者标签,第四个参数是:不支持的参数,第五个参数是:是否独占,第六个是:创建参数,第七个是:消费者
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
18.消息的批量拒绝
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
//消息的拒绝Nack,第二个参数是:是否批量拒绝,第三个参数是是否允许重新投递
channel.basicNack(envelope.getDeliveryTag(),false,true);
//第一个参数是:订阅的队列名称,第二个参数是:是否自动确认消息,第三个参数是:消费者标签,第四个参数是:不支持的参数,第五个参数是:是否独占,第六个是:创建参数,第七个是:消费者
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
19.消费端的QOS
//第一个参数是缓冲区大小,第二个参数是最多多少条消息,第三个参数是级别
channel.basicQos(0, 1, false);
//业务操作....
Consumer consumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String msg=new String(body,"UTF-8");
System.out.println("消息体为:"+msg);
System.out.println("路由键为:"+envelope.getRoutingKey());
System.out.println("交换机为:"+envelope.getExchange());
System.out.println("标签为:"+envelope.getDeliveryTag());
//消息的拒绝Nack,第二个参数是:是否批量拒绝,第三个参数是是否允许重新投递
channel.basicAck(envelope.getDeliveryTag(),false);
//第一个参数是:订阅的队列名称,第二个参数是:是否自动确认消息,第三个参数是:消费者标签,第四个参数是:不支持的参数,第五个参数是:是否独占,第六个是:创建参数,第七个是:消费者
channel.basicConsume("shanguoyu",false,"",false,false,null,consumer);
20.延时队列
Map<String, Object> argss = new HashMap<String, Object>();
argss.put("x-delayed-type", "direct");
channel.exchangeDeclare("dea.letter.exchange", "x-delayed-message",false,false,false,argss);
channel.queueDeclare("dead.letter.queue", false, false, false, null);
channel.queueBind("dead.letter.queue", "dea.letter.exchange", "DL");