package com.mq.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.51.101:61616" ;
public static final String QUEUE_NAME = "queue01" ;
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
while (true ) {
TextMessage msg = (TextMessage) consumer.receive();
if (msg != null) {
System.out.println("消费者消费的消息为:" + msg.getText());
} else {
break ;
session.close();
connection.close();
System.out.println("****消费者消费消息完毕****" );
访问http://activeMQ的部署IP:8161
,之前生产者生产的三条消息被消费,测试成功。
订阅者或者接收者通过 MessageConsumer
的 setMessageListener(MessageListener listener)
注册一个消息监听器,当消息到达之后,系统自动调用监听器 MessageListener的onMessage(Message message)
方法。
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://192.168.51.101:61616" ;
public static final String QUEUE_NAME = "queue01" ;
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer consumer = session.createConsumer(queue);
//通过监听的方式来消费消息
consumer.setMessageListener(new MessageListener () {
public void onMessage(Message message) {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者消费的消息为:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
System.in.read();
consumer.close();
session.close();
connection.close();
通过监听消费消息遇到的坑
使用监听来消费消息时,必须使用 System.in.read()
保证程序不会提出,否则消息消费者无法消费消息。
消费者的启动情况
假设现在有两个消费者,只有一个生产者,则情况有两种,第一种是先启动生产者,再启动两个消费者。第二种是先启动两个生产者,在启动消费者。
第一种,先启动生产者,等待生产者消息生产完毕之后依次启动消费者1和2,结果是消费者1消费所有消息,消费者2没有消费消息。
第二种,先依次启动消费者1和2,再启动生产者,结果是消费者1消费第1、3、5条消息,消费者2消费第2、4、6条消息。
JMS的编码步骤
1:创建一个 connection factory
2:通过 connection factory
创建 JMS connection
3:启动 JMS connection
4:通过 connection
创建 JMS session
5:创建 JMS destination
6:创建 JMS producer
或者创建 JMS message
并设置 destination
7:创建 JMS consumer
或者是注册一个 JMS message listener
8:发送或者接收 JMS message(s)
9 :关闭所有的 JMS
资源( connection、session、producer、consumer
等)
点对点消息传递域的特点
每个消息只能有一个消费者,类似于1对1的关系。
消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费者都可以提取消息。
消息被消费后队列中不再存储,所以消费者不会消费到已经被消费掉的消息。
基于Java的ActiveMQ编码(使用topic一对多模式)
topic(发布订阅)的特点
生产者将消息发布到topic中,每个消息可以有多个消费者,属于1:N的关系 。
生产者和消费者之间有时间上的相关性 。订阅某一个主题的消费者只能消费自它订阅之后发布的消息。
生产者生产时,topic不保存消息,是无状态的 是不落地的 ,假如无人订阅就去生产,那生产出来的消息就是一条废消息,所以,一般先启动消费者再启动生产者 。
JMS规定允许客户创建持久订阅,在这一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。
生产者编码
package com.mq.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProducer_Topic {
private static final String ACTIVEMQ_URL = "tcp://192.168.51.101:61616" ;
private static final String TOPIC_NAME = "topic01" ;
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
for (int i = 0; i < 6; i++) {
TextMessage msg = session.createTextMessage("TOPIC_NAME---" + i);
producer.send(msg);
producer.close();
session.close();
connection.close();
System.out.println("****消息发布到MQ完毕****" );
消费者编码
package com.mq.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class JmsConsumer_Topic {
private static final String ACTIVEMQ_URL = "tcp://192.168.51.101:61616" ;
private static final String TOPIC_NAME = "topic01" ;
public static void main(String[] args) throws JMSException, IOException {
System.out.println("2号消费者。。。" );
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.start();
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer consumer = session.createConsumer(topic);
consumer.setMessageListener((message) -> {
if (message != null && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("消费者接收到的消息:" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
System.in.read();
consumer.close();
session.close();
connection.close();
此时先分别启动两个消费者1和2,在启动生产者,运行结果如下:
若先启动生产者,再启动消费者,消费者不能消费在消费者启动之前生产者生产的消息。
Queue的Topic的区别
Topic模式
Queue模式
“订阅发-发布”模式,如果当前没有订阅者,消息将会被丢弃。如果有多个订阅者,那么这些订阅者都会收到消息
“负载均衡”模式,如果当前没有消费者,消息也不会丢弃;如果有多个消费者,那么一条消息也只会发送给其中一个消费者,并且要求消费者ack信息。
Queue底层默认会在mq服务器上以文件形式保存,比如Active MQ一般保存在 $AMQ_HOME\data\kr-store\data
下面,也可以配置成DB存储
传递完整性
如果没有订阅者,消息会被丢弃
消息不会被丢弃
由于消息要按诈骗订阅者的数量进行复制,所以处理性能会随着订阅者的增加而明显降低,并且还要结合不同的协议自身的性能差异
由于一条消息只发送给一个消费者,所以就算消费者再多,性能也不或有明显降低,当然不同的消息协议具体性能也是有差异的。
JMS规范和落地产品
什么是JavaEE
JavaEE是一套使用Java进行企业级应用开发的大家一直遵循的13个核心规范工作标准。JavaEE平台提供了一个基于组件的方法来加快设计、开发、装配及部署企业应用程序。
JDBC:数据库连接
JNDI:Java的命名和目录接口
RMI:远程方法调用
Java IDL:接口定义语言/公用对象请求代理程序体系结构
Servlet
XML:可扩展的标记语言
JMS:Java消息服务
JTA:Java事务API
JTS:Java事务服务
JavaMail
什么是Java消息服务(JMS)
Java消息服务指的是两个应用程序之间进行异步通信的API,他为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序开发。在JavaEE中,当两个应用程序使用JMS进行通信时,他们之间并不是直接相连的,而是通过一个共同的消息收发服务组件关联起来以达到解耦/异步削峰的效果。
JMS的组成结构和特点
JMS provider:实现JMS接口和规范的消息中间件,也就是我们的MQ服务器。
JMS producer:消息生产者,创建和发送JMS消息的客户端应用。
JMS consumer:消息消费者,接收和处理JMS消息的客户端应用。
JMS message。
JMS message的消息组成
(1):消息头
使用message.setJMSXxx设置消息头的属性,如图:
其中几个重要的属性如下:
JMSDestination:消息发送的目的地,主要是指Queue和Topic。
JMSDeliveryMode:设置消息是否为持久模式。
(1):持久消息:应该被传送“一次仅仅一次”,这就意味着如果JMS提供者出现故障,该消息并不会丢失,他会在服务器恢复之后再次传递。
(2):非持久消息:最多会传送一次,这意味着服务器出现故障,该消息将永远丢失。
JMSExpiration:可以设置消息在一定时间后过期,默认是永久不过期。消息的过期时间等于Destination中send方法中的timeToLive值加上发送时刻的GMT时间值。如果timeToLive值等于0,则JMSExpiration被设为零,表示该消息永不过期。如果发送后,在消息过期时间之后消息还没有被发送到目的地,则该消息被清除。
JMSPriority:设置消息优先级。0~9一共九个优先级,0~4是普通消息,5~9是加急消息。JMS不要求MQ严格按照这十个优先级发送消息,但必须保证加急消息要先于普通消息到达,默认优先级是4.
JMSMessageID:唯一识别每个消息的标识,由MQ产生。
(2):消息体(封装具体的消息属性)
在ActiveMQ中,一共有五种消息体,发送和接收的消息体类型必须一致对应。
TextMessage:普通字符串消息,包含一个String。
MapMessage:一个Map类型的消息,Key为String类型,Value为Java的基本类型。
BytesMessage:二进制数组表示,包含一个byte[]。
StreamMessage:Java数据流消息,用标准流来顺序的填充和读取。
ObjectMessage:对象消息,包含一个可序列化的Java对象。
(3):消息属性
如果需要除消息头字段以外的值,那么可以使用消息属性。
在识别/去重/重点标注等操作非常有用。
消息属性定义:
消息属性是以属性名和属性值为一对的形式制定的。可以将属性视为消息头的扩展,属性指定一些消息头没有包括的附加信息,比如可以在属性里指定消息选择器。
ActiveMQ的消息可靠性
Persistent:持久化
队列(Queue)的持久化
非持久:当服务器宕机,消息不存在。
messageProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
持久:当服务器宕机,消息依然存在。
messageProducer.setDeliveryMode(DeliveryMode.PERSISTENT)
Queue的默认是持久化的,此模式保证这些消息只被传送一次和成功使用一次。对于这些消息,可靠性是优先考虑的因素。可靠性的另一个重要方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。
主题(Topic)的持久化
topic持久化的代码实现
public class JmsProduce_Topic_Persist {
private static final String ACTIVEMQ_URL = "tcp://192.168.51.101:61616" ;
private static final String TOPIC_NAME = "topic-persist" ;
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer producer = session.createProducer(topic);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);//设置持久化
connection.start();//先声明持久化,再启动 connection
for (int i = 0; i < 3; i++) {
TextMessage msg = session.createTextMessage("topic-persist---" + i);
producer.send(msg);
producer.close();
session.close();
connection.close();
System.out.println("****消息发布到MQ完毕****" );
public class JmsConsumer_Topic_Persist {
private static final String ACTIVEMQ_URL = "tcp://192.168.51.101:61616" ;
private static final String TOPIC_NAME = "topic-persist" ;
public static void main(String[] args) throws JMSException {
System.out.println("****z4****" );
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = factory.createConnection();
connection.setClientID("z4" );//创建订阅者z4
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
//创建一个持久化的订阅
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "remark..." );
connection.start();
Message message = topicSubscriber.receive();
while (message != null) {
TextMessage textMessage = (TextMessage) message;
System.out.println("收到的持久化的topic为:" + textMessage.getText());
message = topicSubscriber.receive();
session.close();
connection.close();
先运行consumer再运行producer,然后访问192.168.51.101:6181
根据测试结果得出,先启动订阅者,订阅者是活跃的,再启动生产者,等到订阅者消费完生产者生产的消息并退出后,订阅者的状态为离线状态。
若先启动订阅者然后手动退出,订阅者处于离线状态,然后启动生产者,生产者生产完消息后自动退出,此时再启动订阅者,发现订阅者仍然可以消费。原因是因为订阅者第一次启动之后已经持久化的订阅了该生产者。
事务:transaction
对于生产者而言,创建session时可以设置是否开启事务,若设置为true,表示开启事务,则在session被close之前,一定要调用session的commit方法手动提交事务,或者使用session的rollback方法回滚事务。
对于消费者而言,创建session时设置事务是否开启,若设置为true,表示开启事务,但是在session被关闭之前没有调用session的commit方法,会使得消息重复消费。所以若消费者开启了事务,在session被关闭之前一定要调用commit方法。
签收:Acknowledge
对于消费者,没有开启事务的情况下,若创建 session
时设置为手动签收,即设置 Session.CLIENT_ACKNOWLEDGE
,则在关闭 session
之前需要调用 acknowledge
方法手动签收。
若开启了事务,并且设置签收为手动签收,则不需要调用acknowledge方法,因为开启事务后需要手动commit提交,此时默认签收方式为自动签收。
若开启事务,并且设置签收为手动签收,但是没有调用session的commit方法,而是调用了session的acknowledge方法,这种情况下会出现消息重复消费的情况。
签收和事务的关系
在事务性会话中,当一个事务被成功提交则消息被自动签收。如果事务回滚,则消息会被再次传递。
非事务性会话中,消息何时被确认取决于创建会话时的应答模式。
JMS总结
JMS的点对点总结
点对点模型是基于队列的,生产者发送消息到队列,消费者从队列中接收消息,队列的使得消息的异步传输成为可能。和我们平时给朋友发短信类似。
如果在Session关闭时有部分消息已被收到但没有签收(acknowledge),那么消费者下次连接到相同的队列时,这些消息还会被再次接收。
队列可以长久地保存消息直到消费者收到消息。消费者不需要因为担心消息会丢失而时刻和队列保持激活的连接状态,充分体现了异步传输模式的优势。
JMS发布订阅总结
JMS Pub/Sub模型中定义了如何向一个内容节点发布和订阅消息,这些节点被称作topic。主题可以被认为是消息的传递中介,发布者(publisher)发布消息到主题,订阅者(subscribe)从主题订阅消息。主题使得消息订阅者和消息发布者保持互相独立,不需要接触即可保证消息的传递。
非持久订阅只有当客户端处于激活状态,也就是和MQ保持连接状态才能接收到发送到某个主题的消息。如果消费者处于离线状态,生产者发送的主题消息将会丢失作废,消费者永远不会收到。
持久化订阅:客户端首先向MQ注册一个自己的身份ID识别号。当这个客户端处于离线状态时,生产者会为这个ID保存所有发送到主题的消息,当客户端再次连接到MQ时会根据消费者的ID得到所有当自己处于离线时发送到主题的消息。
非持久订阅状态下,不能恢复或者重新发送一个未签收的消息。
持久订阅状态下才能恢复或重新发送一个未签收的消息。
ActiveMQ的Broker
Broker相当于一个ActiveMQ实例。说白了,Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动。
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
BrokerService service = new BrokerService();
service.setUseJmx(true );
service.addConnector("tcp://localhost:61616" );
service.start();
Spring整合ActiveMQ
Spring整合ActiveMQ点对点模式(Queue)
在pom文件中导入依赖
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.5</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-all -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.xbean/xbean-spring -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-jms -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.3.20.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-pool -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.15.11</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-core -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-context -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-aop -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework/spring-orm -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-orm</artifactId>
<version>4.3.23.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.aspectj/aspectjrt -->
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjrt</artifactId>
<version>1.6.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/aspectj/aspectjweaver -->
<dependency>
<groupId>aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
<version>1.5.3</version>
</dependency>
<!-- https://mvnrepository.com/artifact/cglib/cglib -->
<dependency>
<groupId>cglib</groupId>
<artifactId>cglib</artifactId>
<version>2.1_2</version>
</dependency>
在resource文件夹下创建Spring配置文件applicationContext.xml
<?xml version="1.0" encoding="UTF-8" ?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-4.3.xsd" >
<context:component-scan base-package="com.mq.activemq" />
<!-- 配置生产者 -->
<bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" >
<property name="connectionFactory" >
<bean class="org.apache.activemq.ActiveMQConnectionFactory" >
<property name="brokerURL" value="tcp://192.168.51.101:61616" />
</bean>
</property>
<property name="maxConnections" value="100" />
</bean>
<!-- 这个是队列的目的地,点对点的 -->
<bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue" >
<constructor-arg index="0" value="spring-active-queue" />
</bean>
<!-- spring提供的JMS工具类,它可以进行消息发送、接收等 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate" >
<property name="connectionFactory" ref="jmsFactory" />
<property name="defaultDestination" ref="destinationQueue" />
<property name="messageConverter" >
<bean class="org.springframework.jms.support.converter.SimpleMessageConverter" />
</property>
</bean>
</beans>
新建生产者类
@Service
public class SpringMQ_Producer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml" );
SpringMQ_Producer producer = (SpringMQ_Producer) context.getBean("springMQ_Producer" );
producer.jmsTemplate.send(session -> session.createTextMessage("Spring和ActivateMQ的整合case..." ));
System.out.println("send task over..." );
新建消费者类
@Service
public class SpringMQ_Consumer {
@Autowired
private JmsTemplate jmsTemplate;
public static void main(String[] args) {
ApplicationContext context = new ClassPathXmlApplicationContext("applicationContext.xml" );
SpringMQ_Consumer consumer = (SpringMQ_Consumer) context.getBean("springMQ_Consumer" );
String retValue = (String) consumer.jmsTemplate.receiveAndConvert();
System.out.println("消费者收到的消息为:" + retValue);
Spring整合ActiveMQ发布订阅模式(topic)
在Spring配置文件applicationContext.xml文件中添加如下配置:
<!-- 这个是主题(Topic) -->
<bean id="destinationTopic" class="org.apache.activemq.command.ActiveMQTopic" >
<constructor-arg index="0" value="spring-active-topic" />
</bean>
修改配置文件中配置
在Spring中里面可以实现消费者不启动,直接通过配置监听器完成
新建MyMessageListener实现MessageListener
@Component
public class MyMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
if (null != message && message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("-------" + textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
在Spring配置文件applicationContext.xml文件中配置监听器
<!-- 配置监听器 -->
<bean id="jmsContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer" >
<property name="connectionFactory" ref="jmsFactory" />
<property name="destination" ref="destinationTopic" />
<property name="messageListener" ref="myMessageListener" />
</bean>
这样只需要启动消息生产者,不需要启动消息消费者,当有消息到达MQ时,马上可以监听到此消息。
SpringBoot整合ActiveMQ
SpringBoot整合ActiveMQ点对点模式(Queue)
SpringBoot整合ActiveMQ点对点模式(Queue)生产者
新建SpringBoot项目,然后添加相关依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.1.5.RELEASE</version>
</dependency>
修改SpringBoot配置文件application.yml
server:
port: 7777
spring:
activemq:
broker-url: tcp://192.168.51.101:61616
user: admin
password: admin
pub-sub-domain: false
myqueue: boot-activemq-queue
分别创建ConfigBean和Queue_Produce两个类
@Component
@EnableJms
public class ConfigBean {
@Value("${myqueue} " )
private String myQueue;
@Bean
public Queue queue () {
return new ActiveMQQueue(myQueue);
@Component
public class Queue_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Queue queue;
public void produceMsg () {
jmsMessagingTemplate.convertAndSend(queue, "****" + UUID.randomUUID().toString().substring(0, 6));
创建测试类
@SpringBootTest(classes = BootMqProduceApplication.class)
@RunWith(SpringJUnit4ClassRunner.class)
@WebAppConfiguration
public class BootMqProduceApplicationTests {
@Resource
private Queue_Produce queue_produce;
@Test
public void textSend() throws Exception {
queue_produce.produceMsg();
基于SpringBoot的间隔发送
在Queue_Produce类中添加如下代码
@Scheduled(fixedDelay = 3000)
public void produceMsgScheduled () {
jmsMessagingTemplate.convertAndSend(queue, "****Scheduled:" + UUID.randomUUID().toString().substring(0, 6));
System.out.println("produceMsgScheduled send OK" );
@Scheduled(fixedDelay = 3000)注解表示每三秒执行一次该方法。
然后在SpringBoot的主启动类上添加@EnableScheduling注解,表示开启Scheduled功能,然后直接启动主启动类。
SpringBoot整合ActiveMQ点对点模式(Queue)消费者
创建SpringBoot项目,添加相关依赖
配置application.yml文件,此时消费者的端口号不能和生产者相同。
创建类Queue_Consumer
@Component
public class Queue_Consumer {
@JmsListener(destination = "${myqueue} " )
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("******消费者收到的消息:" + textMessage.getText());
这里 @JmsListener(destination = "${myqueue}")
注解表示监听在配置文件中配置的名为myqueue的队列。
SpringBoot整合ActiveMQ发布订阅模式(Topic)
SpringBoot整合ActiveMQ发布订阅模式(Topic)生产者
创建SpringBoot项目,导入相关依赖。
配置application.yml文件。
server:
port: 6666
spring:
activemq:
broker-url: tcp://192.168.51.101:61616
user: admin
password: admin
pub-sub-domain: true
mytopic: boot-activemq-topic
分别创建config包和produce包,并分别在两个包中创建ConfigBean和Topic_Produce。
@Component
@EnableJms
public class ConfigBean {
@Value("${mytopic} " )
private String topicName;
@Bean
public Topic topic () {
return new ActiveMQTopic(topicName);
@Component
public class Topic_Produce {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Topic topic;
@Scheduled(fixedDelay = 3000)
public void produceTopic () {
jmsMessagingTemplate.convertAndSend(topic, "主题消息" + UUID.randomUUID().toString().substring(0, 6));
在SpringBoot启动主类中添加@EnableScheduling注解
SpringBoot整合ActiveMQ发布订阅模式(Topic)消费者
创建SpringBoot项目,导入相关依赖。
配置application.yml文件。
server:
port: 5555
spring:
activemq:
broker-url: tcp://192.168.51.101:61616
user: admin
password: admin
pub-sub-domain: true
mytopic: boot-activemq-topic
创建consumer包并创建Topic_Consumer类
@Component
public class Topic_Consumer {
@JmsListener(destination = "${mytopic} " )
public void receive(TextMessage textMessage) throws JMSException {
System.out.println("消费者收到订阅的主题:" + textMessage.getText());
创建完成之后先启动消费者主类,再启动生产者主类。
ActiveMQ的传输协议
ActiveMQ默认使用TCP协议,但是使用NIO协议性能要高于TCP。
通过查看activemq.xml文件可以知道ActiveMQ默认出厂时支持多种通信协议。而ActiveMQ默认的消息协议是openwire协议。
TCP(Transmission Control Protocol)协议
TCP是默认的Broker配置,TCP的Client监听端口为61616。
在网络传输数据前,必须要序列化数据,消息是通过一个叫wire protocol的协议来序列化成字节流。默认情况下ActiveMQ把wire protocol叫做OpenWire,它的目的是促使网络上的效率和数据快速交互。
NIO协议
NIO协议和TCP协议类似但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可以有更多的client调用和服务端有更多的负载。
适合使用NIO的场景
(1):可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的,因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
(2):可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
这样的配置表示这个端口使用以TCP协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持Openwire协议。如果想让这个端口既支持NIO网络IO模型,又支持多个协议,需要用到auto。
vim编辑activemq.xml文件,添加如下配置并重启ActiveMQ。
ActiveMQ的消息存储和持久化
ActiveMQ的持久化策略
为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一般都采用持久化机制。
ActiveMQ的消息持久化机制有JDBC、AMQ、KahaDB、LevelDB,无论是那种持久化方式,消息的存储逻辑都是一致的。
就是在发送者将消息发送出去后,消息中心首先将消息存储到本地数据文件、内存储数据或者远程数据库等再试图将消息发送给接受者。若成功则将消息从存储中删除,失败则继续尝试发送。
AMQ Message Store
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储在一个文件中,文件的默认大小为32M,当一个存储文件中的消息已经被全部消费,那么这个文件被标记为可删除文件,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
KahaDB消息存储(默认)
在ActiveMQ5.4之后默认使用KahaDB对ActiveMQ进行持久化,可以从activemq.xml配置文件中得出。KahaDB是基于日志文件进行持久化的,类似于Redis的AOF。
由上图不难看出,使用KahaDB作为ActiveMQ的持久化机制时,数据被存放在ActiveMQ安装目录下的data目录下。
KahaDB是目前默认的存储方式,可用于任何场景,提高了性能和恢复能力。
消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解决方案,他对典型的消息使用模式进行了优化。
数据被追加到data logs中,当不再需要log文件中的数据的时候,log文件会被丢弃。
KahaDB的存储原理
KahaDB在消息保存目录中只有四类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比非常简洁。
db-.log KahaDB:存储消息到预定义大小的数据记录文件中,文件名为db.log。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.log、db-2.log...。当不再有引用到数据文件的任何消息时,文件会被删除或者归档。
db.data:该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质上是B树。使用B-Tree作为索引指向db-.log里面存储的信息。
db.free:当前db.data文件里那些页面是空闲的,文件具体内容是所有空闲页的ID。
db.redo:用来进行消息回复,如果KahaDB消息存储在强制退出后启动,用于恢复B-Tree索引。
lock:文件锁,表示当前获得KahaDB读写权限的broker。
LevelDB消息存储
这种文件存储是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库存储形式,但是它提供比KahaDB更快的持久型。
LevelDB不使用自定义B-Tree实现来索引,而是使用了基于LevelDB的索引。
JDBC持久化
导入jdbc的驱动包,放入ActiveMQ安装目录下的lib目标中,根据自己安装的mysql版本。
修改conf目录下的activemq.xml文件,设置持久化方式为JDBC持久化。
配置dataSource,该bean放在activemq.xml中的broker标签下面,import标签上面即可。
这里使用的是dbcp数据库连接池,默认使用dbcp数据库连接池,若想使用其它数据库连接池,还需要导入相关连接池jar包到lib下。
配置完成之后,启动mysql,创建一个名为activemq的数据库,之后重启ActiveMQ,可以看到ActiveMQ会自动的帮我们创建三张表,在activemq库中。
消息表,默认表名为ACTIVEMQ MSGS,queue和topic都存放在里面,结构如下图。
- activemq_acks表:用于存储订阅关系。如果是持久化Topic,订阅者和服务器的订阅关系在这个表保存,数据库字段如下图
- activemq_lock表:在集群环境中才有用,只有一个Broker可以获得消息,称为Master Broker,其它的只能作为备份等待Master。主要记录哪一个Broker是当前的Master。
在配置过程中遇到的坑:配置完成之后,发现ActiveMQ启动一直失败,查看日志文件并看了有关博客后发现是因为我的mysql没有开放远程登录的权限。
解决方案:执行下图所示的SQL语句即可
在点到点类型中(queue)
当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中,不会保存到mysql数据库中。
当DeliveryMode设置为PERSISTENCE时,消息保存在broker的响应的文件或者数据库中。
而且点对点类型中消息一旦被Consumer消费就会从broker中删除。
在发布订阅(topic)中
一般是先启动消费者再启动生产者,这种情况下,会将生产者生产的消息放入activemq_msge表中,并且消费者消费之后不删除这些消息,将消费者的订阅信息放入activemq_acks表中。
JDBC Message store with ActiveMQ Journal
单纯的使用JDBC,每次有消息过来时,JDBC都要去写库和读库,使用ActiveMQ Journal高速缓存,大大提高了性能。当消费者的消费速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件中,如果消费者的消费速度很快,在journal文件还没有同步到DB之前,消费者已经消费了900条消息,那么这个时候只需要同步剩余的100条消息到DB即可。如果消费者的消费速度很慢,这个时候journal文件可以使消息以批量方式写到DB中。
修改activemq.xml文件即可,将之前的仅有JDBC的持久化方案注释掉,配置新的带高速缓存的JDBC持久化方案。
ActiveMQ的多节点集群
引入消息队列之后如何保证其高可用?
基于Zookeeper和LevelDB搭建ActiveMQ集群。集群仅提供主备方式的高可用集群功能,避免单点故障。
ActiveMQ集群对比
基于sharedFileSystem共享文件系统的Master/Salve
基于JDBC Master/Slave
基于可复制的LevelDB Master/Slave
Zookeeper+replicated-leveled-store的主从集群
使用Zookeeper集群注册所有的ActiveMQ Broker但是只有其中的一个Broker可以提供服务,它被视为Master,其它的Broker处于待机状态被视为Slave。
如果Master因故障而不能提供服务Zookeeper会从Slave中选举出一个Broker充当Master。
Slave连接Master并同步它们的存储状态,Slave不接受客户端连接。所有的存储操作都被复制到连接至Master的Slave。如果Master宕机得到了最新更新的Slave会成为Master。故障节点在恢复后会重新加入到集群中并连接Master进入Slave模式。
三台虚拟机,要求已经配置好Zookeeper集群。
ActiveMQ的客户端只能访问Master的Broker,其他处于Slave的Broker不能访问,所以客户端连接的Broker应该使用failover协议(失败转移)。
当一个ActiveMQ节点挂掉或者一个Zookeeper挂掉,ActiveMQ服务仍然正常运转,如果仅剩一个ActiveMQ节点,由于不能选举Master,所以ActiveMQ不能正常运行。
如果Zookeeper仅剩一个节点活动,不管ActiveMQ各节点存活,ActiveNQ也不能正常提供服务(ActiveMQ的高可用依赖于Zookeeper集群的高可用)。
ActiveMQ的高级特性
异步投递:Async Sends
ActiveMQ支持同步、异步两种发送的模式将消息发送到Broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。
ActiveMQ默认使用异步发送的模式:除非明确指出使用同步发送的方式或者在未使用事务的前提下发送持久化的消息,这两种情况都是同步发送。
如果没有使用事务且发送的事持久化的消息,每一次发送都是同步发送的且会阻塞producer直到broker返回一个确认,表示消息已经被安全的持久化到磁盘。确认机制提供了消息安全的保障,但同时会阻塞客户端带来了很大的延时。
很多高性能的应用,允许在失败的情况下有少量的数据丢失。如果你的应用满足这个特点,你可以使用异步发送来提高生产率,即使发送的是持久化的消息。
异步发送可以最大化的producer端的发送效率。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能。不过也带来了额外的问题。
使用异步发送需要消耗较多的Client端的内存同时也会导致Broker端性能消耗增加。此外它不能有效的确保消息发的发送成功。在useAsyncSend=true的情况下客户端需要容忍消息丢失的可能。
new ActiveMQConnectionFactory("tcp://localhost:61616?jms.useAsyncSend=true");
connectionFactory.setUseAsyncSend(true);
connection.setUseAsyncSend(true);
异步投递怎么确认发送成功
异步发送丢失消息的场景是:生产者设置UseAsyncSend=true,使用producer.send(msg)持续发送消息,有序消息不阻塞,生产者会认为所有send的消息均成功被发送至MQ,如果MQ突然宕机,此时生产者端内存中尚未被发送至MQ的消息都会丢失,所以正确的异步发送方法是需要接收回调的。
同步发送和异步发送的区别就在于:同步发送等send不阻塞了就表示一定发送成功了,而异步发送需要接收回执并由客户端再判断一次是否发送成功。
public class JmsProduce_AsyncSend {
public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.51.101:61616,tcp://192.168.51.102:61616,tcp://192.168.51.103:61616)?randomize=false" ;
public static final String QUEUE_NAME = "jdbc01" ;
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true );//开启异步发送
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage msg = null;
for (int i = 1; i <= 3; i++) {
msg = session.createTextMessage("jdbc msg---" + i);
msg.setJMSMessageID(UUID.randomUUID().toString() + "---order" );
String msgID = msg.getJMSMessageID();
producer.send(msg, new AsyncCallback () {
@Override
public void onSuccess () {//发送成功的回调函数
System.out.println(msgID + "has bean ok send" );
@Override
public void onException(JMSException e) {//发送失败的回调函数
System.out.println(msgID + "file to send to mq" );
session.close();
connection.close();
System.out.println("****消息发送到MQ完成****" );
延时投递和定时投递
如何开启?
在activemq.xml中配置schedulerSupport为true。
Java代码中封装的辅助消息类型:ScheduledMessage
public class JmsProduce_DelayAndSchedule {
public static final String ACTIVEMQ_URL = "failover:(tcp://192.168.51.101:61616,tcp://192.168.51.102:61616,tcp://192.168.51.103:61616)?randomize=false" ;
public static final String QUEUE_NAME = "jdbc01" ;
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false , Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
long delay = 3 * 1000;
long period = 4 * 1000;
int repeat = 5;
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
for (int i = 1; i <= 6; i++) {
TextMessage msg = session.createTextMessage("jdbc msg---" + i);
//表示延迟投递的时间为3s,一共投递5次,每次投递的时间间隔为4s
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
msg.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
msg.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(msg);
session.close();
connection.close();
System.out.println("****消息发送到MQ完成****" );
ActiveMQ的消费重试机制
具体哪些情况会引起消息重发
Client使用了事务并且在Session中调用了rollback()。
Client使用了事务并且在调用commit之前关闭或者没有调用commit。
Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()。
消息重发的时间间隔和重发次数
默认:间隔1s,重复6次
有毒消息Poison ACK
一个消息被重发超过默认的最大重发次数(默认6次)时,消费端会给MQ发送一个"poison ack"表示这个消息有毒。告诉broker不要再发了,这个时候broker会把这个消息放到DLQ(死信队列)中。
重发机制的有关属性配置
collisionAvoidanceFactor
设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围,默认为0.15
maximumRedeliveries
最大重传次数,达到最大重练次数后抛出异常。为-1时表示不限制次数,为0时表示不进行重传,默认为6
maximumRedeliveryDelay
最大传送延迟,只在useExponentialBackOff为true时有效,假设首次重连间隔为10ms,倍数为2,则第二次重连时间间隔为20ms,第三次重连时间间隔为40ms,当重连时间间隔大于最大重练时间间隔时,以后每次重练时间间隔都为最大重连时间间隔,默认为-1
initialRedeliveryDelay
初始重发延迟时间,默认为1000L
redeliveryDelay
重发延迟时间,当initialRedeliveryDelay=0时生效,默认为1000L
useCollisionAvoidance
启动防止冲突功能,默认false
useExponentialBackOff
启动指数倍数递增的方式增加延迟时间,默认为false
backOffMultiplier
重连时间间隔递增倍数,只有值大于1和启动useExponentialBackOff参数时才生效,默认为5
ActiveMQ中引入了"死信队列"(Dead Letter Queue)的概念,即一条消息再被重发了多次后(默认为6次,redeliveryCounter=6),将会被ActiveMQ移入"死信队列"。开发人员可以在这个Queue中查看处理出错的消息。
死信队列的策略
SharedDeadLetterStrategy:将所有的DeadLetter保存在一个共享的队列中,这是ActiveMQ Broker端的默认的策略。
IndividualDeadLetterStrategy:把DeadLetter放入各自的死信通道中。
对于Queue而言,死信通道的前缀默认为"ActiveMQ.DLQ.Queue."。
对于Topic而言,死信通道的前缀默认为"AxtiveMQ.DLQ.Topic."
默认情况下,无论是Topic还是Queue,broker都将使用Queue来保存DeadLeader,即死信队列通常为Queue,不过开发者也可以指定为Topic。
将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。“useQueueForTopicMessages”属性表示是否将Topic的DeadLetter保存在Queue中,默认为true。
有时需要直接删除过期的消息而不需要放入死信队列中,“processExpired”表示是否将过期的消息放入死信队列中,默认为true。
默认情况下,ActiveMQ是不会吧非持久的死消息发送到死信队列中的。processNonPersistent表示是否将非持久化的消息放入死信队列中,默认为false。
如何防止消息不被重复消费?幂等性等问题
网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能造成重复消费。
如果消息时做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
准备一个第三方做消息记录,以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那么消费者开始消费前,先去redis中查询有没有消费记录即可。
saojiatete
学生党 @ 春招再向我招手
粉丝