rabbitmq官方手册
RabbitMQ Tutorials — RabbitMQ
(可以对照手册进行对比)
在编写连接方式之前, 我们先做一些前提的操作,用于我们编写连接代码时更方便
a). 创建一个连接rabbitmq的工具类
public class RabbitmqUtil {
private static ConnectionFactory factory;
static{
factory = new ConnectionFactory();
factory.setHost("虚拟机的ip地址");
factory.setPort(5672);
factory.setVirtualHost("用户的模块");
factory.setUsername("用户名");
factory.setPassword("密码");
public static Connection getConnection(){
try {
return factory.newConnection();
}catch (Exception e){
e.printStackTrace();
return null;
public static void closeConnectionAndChannel(Connection connection, Channel channel){
try {
if (connection != null) connection.close();
if (channel != null)channel.close();
}catch (Exception e){
// e.printStackTrace();
图中画红色框框的就是用户的模块
导入相应的rabbitmq依赖
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.12.0</version>
</dependency>
下面我们进入正题
方式一: Hello World
服务端(provider)代码
如果不是springboot项目,这里方法可以替换中主函数进行运行
如果没有配置rabbitmq 就需要把注释掉的代码解开
public class Provider {
@Test
public void sendMessage() throws IOException, TimeoutException {
// ConnectionFactory factory = new ConnectionFactory();
// //设置ip地址
// factory.setHost("虚拟机ip地址");
// //设置端口号
// factory.setPort(5672);
// //设置虚拟主机
// factory.setVirtualHost("虚拟主机");
设置连接用的用户名和密码
// factory.setUsername("用户名");
// factory.setPassword("密码");
// //通过factory获取连接对象
// Connection connection = factory.newConnection
Connection connection = RabbitmqUtil.getConnection();
Connection connection = RabbitmqUtil.getConnection();
//通过connection获取通道
Channel channel = connection.createChannel();
* 绑定对应的消息列
* 参数1 队列名称 如果不存在自动创建
* 参数2 队列是否持久化 false不持久化
* 参数3 是否独占队列 true 独占队列
* 参数4 消息传完是否自动删除队列 true自动删除
* 参数5 其他参数
channel.queueDeclare("hello",false,false,false,null);
* 发送消息
* 参数1 交换机名称
* 参数2 通道名称
* 参数3 传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN rabbitmq 重启后消息依然在,
* null 重启后消息消失
* 参数4 传递的信息
channel.basicPublish("","hello", MessageProperties.PERSISTENT_TEXT_PLAIN,"hello rabbit".getBytes());
//关闭连接
RabbitmqUtil.closeConnectionAndChannel(connection,channel);
客户端(customer)代码
public class Customer {
public static void main(String[] args) throws IOException, TimeoutException {
// //创建工厂并设置参数
// ConnectionFactory factory = new ConnectionFactory();
// factory.setHost("192.168.47.221");
// factory.setPort(5672);
// factory.setVirtualHost("/msg");
// factory.setUsername("long");
// factory.setPassword("123");
// Connection connection1 = factory.newConnection();
// 获取对象通过对象获取通道
Connection connection = RabbitmqUtil.getConnection();
Channel channel = connection.createChannel();
//绑定消息列
channel.queueDeclare("hello",false,false,false,null);
//读取信息
channel.basicConsume("hello",true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("new String(body) = " + new String(body));
方式二: work(以下方式都是通过工具类来创建connection对象)
有两种方式 第一种为平均分配
provider端代码
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitmqUtil.getConnection();
// 通过连接对象获取通道
Channel channel = connection.createChannel();
//绑定对应的消息栈
* 绑定对应的消息列
* 参数1 队列名称 如果不存在自动创建
* 参数2 队列是否持久化 false不持久化
* 参数3 是否独占队列 true 独占队列
* 参数4 消息传完是否自动删除队列 true自动删除
* 参数5 其他参数
channel.queueDeclare("work",true,false,false,null);
* 发送消息
* 参数1 交换机名称
* 参数2 通道名称
* 参数3 传递消息额外设置 MessageProperties.PERSISTENT_TEXT_PLAIN rabbitmq 重启后消息依然在,
* null 重启后消息消失
* 参数4 传递的信息
for(int i=1;i<=20;i++) {
channel.basicPublish("", "work", null, (i + "hello work").getBytes());
//关闭连接
RabbitmqUtil.closeConnectionAndChannel(connection,channel);
customer端代码(大家可以多写几个customer端代码进行测试,我这里就类举一个了)
public class Customer1 {
public static void main(String[] args) throws IOException {
//获取连接对象
Connection connection = RabbitmqUtil.getConnection();
//获取连接
Channel channel = connection.createChannel();
//绑定消息列
channel.queueDeclare("work",true,false,false,null);
//读取信息
* 参数一: 消息队列的名称
* 参数二: 消息自动确认
* true;消费整自动向 rabbitmq 确认信息消费
* false:不会自动确认消息
* 参数三: 从rabbitmq中获取消息
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("customer1 message :" + new String(body));
第二种为能者多劳方式
provider端代码一样,修改customer端代码即可
public class Customer1 {
public static void main(String[] args) throws IOException {
//获取连接对象
Connection connection = RabbitmqUtil.getConnection();
//获取连接
Channel channel = connection.createChannel();
//绑定消息列
channel.queueDeclare("work",true,false,false,null);
//读取信息
* 参数一: 消息队列的名称
* 参数二: 消息自动确认
* true;消费整自动向 rabbitmq 确认信息消费
* false:不会自动确认消息
* 参数三: 从rabbitmq中获取消息
channel.basicQos(1);// 每次只消费一个消息
channel.basicConsume("work",false,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("customer1 message :" + new String(body));
channel.basicAck(envelope.getDeliveryTag(),false);// 进行手动提交
方式三 Publish/Subscribe (以下所有连接方式都需要用到交换机)
服务端发送消息,所有的客户端都可以获取消息(类似于广播效果)
provider端代码
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitmqUtil.getConnection();
//通过连接对象获取通道
Channel channel = connection.createChannel();
//绑定交换机
* 参数一: 交换机的名称 如何rabbitmq中没有会自动创建
* 参数二: 交换机的类型 固定为fanout
channel.exchangeDeclare("logs","fanout");
* 参数一: 交换机的名称
* 参数二: 路由的key
* 参数三: 其他参数
* 参数四: 要传送的数据
channel.basicPublish("logs","",null,"hello fanout".getBytes());
//关闭连接
RabbitmqUtil.closeConnectionAndChannel(connection,channel);
customer端代码
public class Customer {
public static void main(String[] args) throws IOException{
// 获取当前连接对象
Connection connection = RabbitmqUtil.getConnection();
// 通过连接对象获取通道
Channel channel = connection.createChannel();
//声明交换机
channel.exchangeDeclare("logs","fanout");
//获取临时通道的名称
String queue = channel.queueDeclare().getQueue();
//创建临时对列绑定交换机
* 参数一: 临时通道的名称
* 参数二: 交换机的名称
* 参数三: 路由的key
channel.queueBind(queue,"logs","");
//获取信息
channel.basicConsume(queue,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("fanout message :" + new String(body));
方式四 Routeing
根据route对消息进行发布,provider只对对应的route发送消息 代码如下
provider端代码
public class Provider {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitmqUtil.getConnection();
// 通过连接对象获取通道
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
// 声明交换机
* 参数一: 交换机的名称
* 参数二: 交换机的类型
channel.exchangeDeclare(exchangeName,"direct");
String key = "Info";
//发布消息
channel.basicPublish(exchangeName,key,null,("provider发布的消息类型为:["+ key+"], hello direct").getBytes());
RabbitmqUtil.closeConnectionAndChannel(connection,channel);
customer端代码(customer端代码一样,大家可以多写几个进行试验)
public class Customer {
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitmqUtil.getConnection();
//通过连接对象获取通道
Channel channel = connection.createChannel();
String exchangeName = "logs_direct";
//声明交换机
channel.exchangeDeclare(exchangeName,"direct");
//声明虚拟队列
String queueName = channel.queueDeclare().getQueue();// 获取虚拟队列的名称
* 参数一: 队列名称
* 参数二: 交换机名称
* 参数三: 路由的key
channel.queueBind(queueName,exchangeName,"error");// 绑定交换机
channel.queueBind(queueName,exchangeName,"Info");// 绑定交换机
channel.queueBind(queueName,exchangeName,"Warn");// 绑定交换机
//获取消息
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("接受到的消息为"+ new String(body));
方式五:Topics
动态路由, 在方式四的基础上添加了统配符 通配符的种类有两种 如下
*: 代表任意一个单纯,如 user.* / *.user / *.user.* 以第一种为例:代表只接受 有user开头,route的个数是两个单纯的消息(单词和单词之间用 . 分割 这个的route是只接受两个单词的消息,如以user开头,有三个请求他是接受不到的,要注意!!!)
#:代表一个多多个单词,如user.# / #.user / #.user.# 以第一种为例: 代表只要是以user开头的消息都可以被接受到
provider端代码
public class Provider{
public static void main(String[] args) throws IOException {
// 获取连接对象
Connection connection = RabbitmqUtil.getConnection();
// 通过连接对象获取通道
Channel channel = connection.createChannel();
// 绑定交换机
String exchangeName = "topics";
channel.exchangeDeclare(exchangeName,"topic");
// 声明routekey 发布消息
String routeKey = "user.save";
channel.basicPublish(exchangeName,routeKey,null,("topic要发送参数了啊,routeKey["+routeKey+"]").getBytes());
RabbitmqUtil.closeConnectionAndChannel(connection,channel);
customer端代码
public class Customer {
public static void main(String[] args) throws IOException{
Connection connection = RabbitmqUtil.getConnection();
Channel channel = connection.createChannel();
String exchangeName = "topics";
channel.exchangeDeclare(exchangeName,"topic");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName,exchangeName,"user.*");
channel.basicConsume(queueName,true,new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("customer1接受到消息:"+ new String(body));
以上就是常用的五种rabbitmq的连接方式,希望可以对大家有所帮助 !!!
功能:一个生产者P发送消息到队列Q,一个消费者C接收
生产者实现思路:
创建连接工厂ConnectionFactory,设置服务地址,端口号,设置用户名、密码、virtual host,从连接工厂中获取连接connection,使用连接创建通道channel,使用通道channel创建队列queue,使用通道channel向队列中发送消息,关闭通道和连接。
public class RabbitMQPublishTest {
public static voi
为了避免污染宿主系统环境,于是在虚拟机中搭建了一个linux环境并且安装了rabbitmq-server。然后在远程连接的时候一直连接失败。官网上面给的例子都是在本地使用系统默认的guest用户连接的。没有给出远程连接的例子,于是阅读文档发现:
When the serverfirst starts running, and detects that its database is u...
之前我们写生产者消费者模型的时候,会有很多冗余代码,比如生产者和消费者都需要创建连接工厂、设置连接配置、以及关闭资源等操作。我们打算将这些冗余的操作封装成一个工具类,我们直接调用类得到相关的信息即可,不必每次都写一些连接的配置信息。使用连接工厂,使用单例模式,static只设置一次配置,创建获取连接的方法,创建关闭资源的方法。所以现在,在之前的生产者、消费者Hello World 模型的代码中可以省略很多代码了。
消息队列是在消息的传输过程中保存消息的容器,用于接收消息并以文件的方式存储,一个消息队列可以被一个也可以被多个消费者消费,服务之间最常见的通信方式是直接调用彼此来通信
插个广告,公司最近在招“.NET”开发(杭州),如果你现在还从事 .NET 开发(想用 .NET Core,但被公司不认可),想转 JAVA 开发(但又没有工作经验,惧怕面试),想用微服务架构重构(面对现有庞大的单体应用程序,有心无力),那么请关注右侧的公众号私信我,或许我可以帮到你一些~~~
一般情况下,我们会使用 rabbitmq_management 插件,通过 Web UI 的方式来监...