RabbitMQ 系列(2) —— 用 java 连接 RabbitMQ
RabbitMQ 的相关概念
RabbitMQ 作为一个消息中间件,整体上采用了生产者与消费者模型,主要负责接收,存储和转发消息。
生产者和消费者
RabbitMQ 从宏观上可以视为
其中:
- Producer: 生产者,负责创建消息,并将消息发布到 RabbitMQ 中
- Broker: 消息中间件服务节点
- Consumer: 消费者负责订阅队列 并从队列上接收消息。
其详细的工作流程可视为:
RabbitMQ 的架构模型
RabbitMQ 的整体架构可以入下图所示
队列
队列是 Rabbit MQ 的内部对象,用于存储消息。多个消费者可以订阅同一队列
交换器
交换器主要负责将生成者消息投递到队列中。
在 RabbitMQ 中,要想使用 交换器将消息头送到正确的队列上,就需要使用 BindingKey 和 RoutingKey。 BindingKey 就是 交换器和队列之间的固定通路,而 RoutingKey 就是消息选择那些通路进行投送的规则。
交换器的类型
- fanout: 将消息发送到所有与该交换器绑定的队列上
- deirect: 指定某一条BindingKey,将消息直接发送到队列上
- topic: 根据自设定的路由规则将消息投送到队列中
- headers: 不依赖路由键投递消息而是根据消息的内容进行消息投送。
使用 java 连接 RabbitMQ 的简答案例
前期准备
默认情况下 Rabbit MQ 默认的用户名和密码为 “guest”,但是该账户只能通过本地访问,因此需要创建 一个远程访问的用户,并设置权限
# 为 RabbitMQ 创建一个新的用户
# 用户名为 root 密码为 root123
rabbitmqctl add_user root root123
# 为 root 用户设置所有权限
rabbitmqctl set_permission -p/ root ".*" ".*" ".*"
# 设置 root 用户为管理员
rabbitmqctl set_user_tags root administrator
生产者与消费的Demo
Step1: 通过 maven 引入相关包
首先需要引入 rabbitmq-client 和 rabbitmq 客户端所依赖的 slf4j 包
<dependencies>
<!-- https://mvnrepository.com/artifact/com.rabbitmq/amqp-client -->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.7.1</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-api -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.26</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.26</version>
<scope>test</scope>
</dependency>
</dependencies>
生产者相关代码
public class RabbitProducer {
private static final String EXCHANGE_NAME = "exchange_demo";
private static final String ROUTING_KEY = "routingkey_demo";
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "192.168.0.0"; // 服务器所在id即可
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException,InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("root");
factory.setPassword("root123");
// 创建连接
Connection connection = factory.newConnection();
// 创建信道
Channel channel = connection.createChannel();
// 创建一个持久化,非排他的、非自动删除的队列
channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
channel.queueDeclare(QUEUE_NAME,true,false,false,null);
System.out.println(channel.isOpen());
// 将交换器与队列通过路由键绑定
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
String message = "Hello World!";
channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
channel.close();
connection.close();
}
Step3: 消费者相关代码
public class RabbitConsumer {
private static final String QUEUE_NAME = "queue_demo";
private static final String IP_ADDRESS = "192.168.0.0";
private static final int PORT = 5672;
public static void main(String[] args) throws IOException, TimeoutException,InterruptedException {
Address[] addresses = new Address[]{
new Address(IP_ADDRESS,PORT)
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("root");
factory.setPassword("root123");
Connection connection = factory.newConnection(addresses);
final Channel channel = connection.createChannel();
Consumer consumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("recv: message: " + new String(body));
TimeUnit.SECONDS.sleep(1);
}catch (InterruptedException e){
e.printStackTrace();
channel.basicAck(envelope.getDeliveryTag(),false);
channel.basicConsume(QUEUE_NAME,consumer);