ActiveMQ安装和使用
一、 ActiveMQ 简介
1 什么是 ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个 完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久 的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
2 什么是消息
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串; 也可以更复杂,可能包含嵌入对象。
3 什么是队列
特点:先进先出
4 什么是消息队列
“消息队列”是在消息的传输过程中保存消息的容器。
5 常用消息服务应用
- ActiveMQ ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完 全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。
- RabbitMQ RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。他遵循 Mozilla Public License 开源协议。开发语言为 Erlang。
- RocketMQ 由阿里巴巴定义开发的一套消息队列应用服务。
二、 消息服务的应用场景
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使 用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同 时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系, 也不需要受对方的影响,即解耦和。
1 异步处理
- 用户注册 用户注册流程: 1)注册处理以及写数据库 2)发送注册成功的手机短信 3)发送注册成功的邮件信息 如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件, 然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应 的业务操作。就是这么方便
- 应用的解耦 例如:订单处理 生成订单流程: 1)在购物车中点击结算 2)完成支付 3)创建订单 4)调用库存系统 订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一 个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能 够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非 常快的,所以有消息中间件对解耦来说也是一个不错的方向。
- 流量的削峰 例如:秒杀功能 秒杀流程: 1)用户点击秒杀 2)发送请求到秒杀应用 3)在请求秒杀应用之前将请求放入到消息队列 4)秒杀应用从消息队列中获取请求并处理。 比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么 办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100 个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死
三、 JMS
1 什么是 JMS
JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于 消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接 口,简化企业应用的开发。
2 JMS 模型
点对点模型(Point To Point)
生产者发送一条消息到 queue,只有一个消费者能收到。
发布订阅模型(Publish/Subscribe)
发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息。
四、ActiveMQ安装
- 下载资源 ActiveMQ 官网: http:// activemq.apache.org 版本说明 : ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。 ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。
- 上传至 Linux 服务器
- 解压安装文件 tar -zxf apache-activemq-5.9.0-bin.tar.gz
- 检查权限 ls -al apache-activemq-5.9.0/bin 如果权限不足,则无法执行,需要修改文件权限: chmod 755 activemq
- 复制应用至本地目录 cp -r apache-activemq-5.9.0 /usr/local/activemq
- 启动 ActiveMQ /usr/local/activemq/bin/activemq start
-
测试 ActiveMQ 检查进程 ps aux | grep activemq 查看是否有activemq进程 管理界面 使用浏览器访问 ActiveMQ 管理应用, 地址如下:
http://
ip:8161/admin/
用户名: admin 密码: admin ActiveMQ 使用的是 jetty 提供 HTTP 服务.启动稍慢,建议短暂等待再访问测试.
-
修改访问端口 修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml
<bean id="jettyPort" class="org.apache.activemq.web.WebConsolePort" init-method="start" >
<!-- the default port number for the web console -->
<property name="host" value="0.0.0.0" />
<property name="port" value="8161" />
</bean>
-
修改数据端口 修改 ActiveMQ 配置文件: /usr/local/activemq/conf/activemq.xml 配置文件中,配置的是 ActiveMQ 的核心配置信息. 是提供服务时使用的配置. 可以修改 启动的访问端口. 即 java 编程中访问 ActiveMQ 的访问端口. 默认端口为 61616. 使用协议是: tcp 协议.
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600" />
</transportConnectors>
-
修改用户名和密码 修改 conf/users.properties 配置文件.内容为: 用户名=密码 保存并重启 ActiveMQ 服务即可.
properties admin=admin
配置文件修改完毕,保存并重新启动 ActiveMQ 服务。 - ActiveMQ 目录介绍 从它的目录来说,还是很简单的:
- bin 存放的是脚本文件
- conf 存放的是基本配置文件
- data 存放的是日志文件
- docs 存放的是说明文档
- examples 存放的是简单的实例
- lib 存放的是 activemq 所需 jar 包
- webapps 用于存放项目的目录
五、 ActiveMQ 术语
1. Destination
目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管理的对象。 MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination 才能接收消息。
2. Producer
消息生成者,负责发送 Message 到目的地。
3. Consumer | Receiver
消息消费者,负责从目的地中消费【处理|监听|订阅】Message。
4. Message
消息,消息封装一次通信的内容。
分为五种:TextMessage、ObjectMessage、MapMessage、BytesMessage、和StreamMessage
1. TextMessage TextMessage消息是一种最常用的文本消息,这种消息的使用最简单。 在Producer端,直接在创建好的Session上创建一条TextMessage消息即可,然后交给Producer进行发送。 在Consumer端,我们直接调用consumer的receiver方法,将接收到的内容保存为TextMessage类型的数据。调用其getText方法获取文本消息内容
2. ObjectMessage 这种消息类型应用也很广泛,使用这个类型的消息,需要支持对象的序列化。在Java中,只要这个类实现了Serializable接口即可。在ActiveMQ的5.12版本以后为了避免收到恶意代码,引入了安全机制,只允许指定的包里的对象能够被传输。使用Connection对象的setTrustAllPackages方法设置所有的类都能够被传输,或者使指定的包里的类能够被传输
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
在Provider端,我们创建一个Serializable对象。将其转换为ObjectMessage消息进行发送即可。 在Consumer端,我们直接调用consumer的receiver方法,将接收到的内容保存为ObjectMessage类型的数据。使用期getObject方法获取对象,再强制转换类型,即可使用。
3. ByteMessage BytesMessage消息是一种字节码消息,它传输的是一个字节数组。使用上只要将我们的消息变成一个字节数组即可,然后直接在producer端调用send方法发送消息即可。 在接收端,我们直接调用consumer的receiver方法,将接收到的内容保存为BytesMessage类型的数据,然后通过类似于IO流的方式读取出来即可(也可以保存为字节数组,然后处理)。
4. MapMessage MapMessage消息传递的是一个Map集合,先创建一个MapMessage,调用其setxxx方法,将xxx类型的以key-value形式的数据插入其中,调用producer对象的send方法发送出去。 在Consumer端,使用receiver方法获取Message,转换为MapMessage类型调用其getxxx方法,使用key获取xxx类型的数据。
5. StreamMessage StreamMessage消息传递的是一个流,先创建一个StreamMessage,调用其writexxx方法,将xxx类型的以流形式的数据传递过去,调用producer对象的send方法发送出去。 在Consumer端,使用receiver方法获取Message,转换为StreamMessage类型调用其readxxx方法获取xxx类型的数据。
5.Broker( 经纪人; 掮客)
Broker是ActiveMQ的一个实例。
我们可以将ActiveMQ看成一个服务,是需要我们下载解压后才能使用(免安装)。
主要使用目的是为了将服务器和客户端解耦,用来做消息的传递。
而Broker是ActiveMQ的一个简易实现,我们只需要在代码中启动Broker(用跑代码的方式启动ActiveMQ),从而实现嵌入式的ACtiveMQ。
使用过程如下:
(1) 运行Broker启动程序
(2) 运行 sender(发送者) 代码,发送mq
(3) 运行consume(消费者) 代码,接收mq
其中broker的启动方式有两种:
1. Broker Service启动 Broker,
例子如下:
BrokerService broker = new BrokerService();
broker.setUseJma(true);
broker.addConnector("tcp://localhost:61616");
broker.start();
2. BrokerFactory方式启动 Broker,
例子如下:
String uri = "properties:broker.properties";
BrokerService broker = BrokerFactory.createBroker(new URI(uri));
broker.addConnector("tcp://localhost:61616");
broker.start();
其中,broker.properties 的内容如下所示:
useJmx=true
persistent=false
brokerName=Cheese
说白了,Broker其实就是实现了用代码的形式启动ActiveMq,将mq嵌入到java代码中,随时用随时启动,在用的时候再去启动。节省了资源,也保证了可靠性。
引用自: https:// blog.csdn.net/hqm12345q w/article/details/80679758
5. 事务
-
为什么要用事务?
消息事务是在生产者producer到broker或broker到consumer过程中同一个session中发生的,保证几条消息在发送过程中的原子性。
可以在connection的createSession方法中指定一个布尔值开启,如果消息确认机制是事务确认,那么在发送message的过程中session就会开启事务(实际上broker的),不用用户显示调用 beginTransaction,这时所有通过session发送的消息都被缓存下来,用户调用session.commit时会发送所有消息,当发送出现异常时用户可以调用rollback进行回滚操作,只有在开启事务状态下有效。 -
事务的过程
在支持事务的session中,producer发送message时在message中带有transaction ID。broker收到message后判断是否有transaction ID,如果有就把message保存在transaction store中,等待commit或者rollback消息。
commit:在事务模式下,在接收消息没有确认的情况也会出列。完成消息。
rollback:回滚的过程是消息先出列,然后重发,默认6次,超过次数后进入到死亡队列。
在没有开启事务的情况下,消息没有确认的情况,消息会停在消息队列中,等待着再次被监听,除非调用session.recover()方法,效果和开启事务并回滚一样会进入死亡队列。
也就是说,开启事务后消息永远不会出现停留在队列的情况,消息会回滚重发,最后到死亡队列中,而不开启事务的情况,只要不使用session.recover();消息会停留在队列中,不会重发,直至被确认出列。如果调用了recover就和回滚重发一样了。
6. 监听器
-
MessageListener:
MessageListener是最原始的消息监听器,它是JMS规范中定义的一个接口。其中定义了一个用于处理接收到的消息的onMessage方法,该方法只接收一个Message参数。我们前面在讲配置消费者的时候用的消息监听器就是MessageListener. -
SessionAwareMessageListener:
SessionAwareMessageListener是Spring为我们提供的,它不是标准的JMS MessageListener。MessageListener的设计只是纯粹用来接收消息的,假如我们在使用MessageListener处理接收到的消息时我们需要发送一个消息通知对方我们已经收到这个消息了,那么这个时候我们就需要在代码里面去重新获取一个Connection或Session。SessionAwareMessageListener的设计就是为了方便我们在接收到消息后发送一个回复的消息,它同样为我们提供了一个处理接收到的消息的onMessage方法,但是这个方法可以同时接收两个参数,一个是表示当前接收到的消息Message,另一个就是可以用来发送消息的Session对象, -
MessageListenerAdapter:
MessageListenerAdapter类实现了MessageListener接口和SessionAwareMessageListener接口,它的主要作用是:1.将接收到的消息进行类型转换,然后通过反射的形式把它交给一个普通的Java类进行处理。2.自定义反射类。3.自动回复发送者消息
六、 ActiveMQ 应用
1. ActiveMQ 常用 API 简介
下述 API 都是接口类型,由定义在 javax.jms 包中. 是 JMS 标准接口定义.
1. ConnectionFactory
链接工厂, 用于创建链接的工厂类型.
2. Connection
链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建.
3. Session
会话, 一次持久有效有状态的访问. 由链接创建.
4. Destination & Queue
目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队 列. 由会话创建. interface Queue extends Destination
5. MessageProducer
消息生成者, 在一次有效会话中, 用于发送消息给 ActiveMQ 服务的工具. 由会话创建.
6. MessageConsumer
消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中 获取消息的工具. 由会话创建.
7. Message
消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者 从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】 具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取.
2. 创建消息生产者
- 创建maven工程,导入activemq的jar包
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
- 创建生产者
public class ActiveMQ_Producer {
public void sendMessage2ActiveMQ(String msg) {
//连接工厂对象
ConnectionFactory connectionFactory=null;
//链接对象
Connection connection=null;
//会话对象
Session session=null;
//发送地址(即队列)
Destination destination=null;
//消息生产者
MessageProducer messageProducer=null;
//消息对象
Message message=null;
try {
* 用户名和密码不同于服务器登录账号密码,该账号密码在conf/jetty-realm.properties中设置,用于链接ActiveMQ,上传消息的
* 默认为admin:admin
* userName:
* password:
* 在conf/activmq.xml中配置
* brokerURL:协议://ip地址:端口
connectionFactory=new ActiveMQConnectionFactory("amdin","admin", "tcp://192.168.40.128:61616");
//创建链接
connection=connectionFactory.createConnection();
//启动链接
connection.start();
//创建session
* transacted:是否开启事务 true|false
* true:开启事务,但后面的acknowledgeMode必须为Session.SESSION_TRANSACTED
* false:不开启事务
* 区别:开启事务后,会保持原子性,通过commit和rollback方法,实现提交和回滚,
* acknowledgeMode: 服务器接收到消息并处理后返回ack,客户端接收服务器到服务器发送的ack后知道已经处理完毕,不需要再从夫发送
* Session.AUTO_ACKNOWLEDGE 消息自动签收
* Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
* Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的
* JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息
* 的重复处理控制。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建地址,即队列的名称
destination=session.createQueue("MassageTest");
//创建生产者
messageProducer = session.createProducer(destination);
//创建消息(简单文本消息)
message = session.createTextMessage(msg);
//发送消息
messageProducer.send(message);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
//关闭资源
if(messageProducer!=null) {
try {
messageProducer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(session!=null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(connection!=null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
3. 创建消息消费者
- 创建maven工程,导入ActiveMQ的jar包
- 创建消费者
public class ActiveMQ_Consumer {
public void getMessageFromActiveMQ() {
//连接工厂对象
ConnectionFactory connectionFactory=null;
//链接对象
Connection connection=null;
//会话对象
Session session=null;
//发送地址(即队列)
Destination destination=null;
//消息消费者
MessageConsumer messageConsumer=null;
//消息对象
TextMessage message=null;
try {
connectionFactory=new ActiveMQConnectionFactory("amdin","admin", "tcp://192.168.40.128:61616");
//创建链接
connection=connectionFactory.createConnection();
//启动链接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建地址,即队列的名称
destination=session.createQueue("MassageTest");
//创建消费者
messageConsumer = session.createConsumer(destination);
//接受消息,并转换为TextMessage文本消息
message = (TextMessage)messageConsumer.receive();
//输出消息
System.out.println("接收到:"+message.getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
//关闭资源
if(messageConsumer!=null) {
try {
messageConsumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(session!=null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(connection!=null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
4. 测试
- 生产
public static void main(String[] args) {
ActiveMQ_Producer producer=new ActiveMQ_Producer();
producer.sendMessage2ActiveMQ("helloword");
}
- 消费
public static void main(String[] args) {
ActiveMQ_Consumer consumer=new ActiveMQ_Consumer();
consumer.getMessageFromActiveMQ();
}
5. 传输对象消息
消息生产者:
public class ActiveMQ_Producer2 {
public void sendMessage2ActiveMQ(Serializable obj) {
//连接工厂对象
ConnectionFactory connectionFactory=null;
//链接对象
Connection connection=null;
//会话对象
Session session=null;
//发送地址(即队列)
Destination destination=null;
//消息生产者
MessageProducer messageProducer=null;
//消息对象
Message message=null;
try {
* 用户名和密码不同于服务器登录账号密码,该账号密码在conf/jetty-realm.properties中设置,用于链接ActiveMQ,上传消息的
* 默认为admin:admin
* userName:
* password:
* 在conf/activmq.xml中配置
* brokerURL:协议://ip地址:端口
connectionFactory=new ActiveMQConnectionFactory("amdin","admin", "tcp://192.168.40.128:61616");
//创建链接
connection=connectionFactory.createConnection();
//启动链接
connection.start();
//创建session
* transacted:是否开启事务 true|false
* true:开启事务,但后面的acknowledgeMode必须为Session.SESSION_TRANSACTED
* false:不开启事务
* 区别:开启事务后,会保持原子性,通过commit和rollback方法,实现提交和回滚,
* acknowledgeMode: 服务器接收到消息并处理后返回ack,客户端接收服务器到服务器发送的ack后知道已经处理完毕,不需要再从夫发送
* Session.AUTO_ACKNOWLEDGE 消息自动签收
* Session.CLIENT_ACKNOWLEDGE 客户端调用acknowledge方法手动签收
* Session.DUPS_OK_ACKNOWLEDGE 不必必须签收,消息可能会重复发送。在第二次重新传递消息的时候,消息头的
* JmsDelivered会被置为true标示当前消息已经传送过一次,客户端需要进行消息
* 的重复处理控制。
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建地址,即队列的名称
destination=session.createQueue("ObjectMassageTest");
//创建生产者(重点)
messageProducer = session.createProducer(destination);
//创建消息(简单文本消息)
message = session.createObjectMessage(obj);
//发送消息
messageProducer.send(message);
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
//关闭资源
if(messageProducer!=null) {
try {
messageProducer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(session!=null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(connection!=null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
消息消费者:
public class ActiveMQ_Consumer2 {
public void getMessageFromActiveMQ() {
//连接工厂对象
ConnectionFactory connectionFactory=null;
//链接对象
Connection connection=null;
//会话对象
Session session=null;
//发送地址(即队列)
Destination destination=null;
//消息消费者
MessageConsumer messageConsumer=null;
//消息对象
Message message=null;
try {
connectionFactory=new ActiveMQConnectionFactory("amdin","admin", "tcp://192.168.40.128:61616");
//在ActiveMQ5.12以后为了避免收到恶意代码,引入了安全机制,只允许指定的包里的对象能够被传输。
//使用setTrustAllPackages方法设置所有的类都能够被传输
//或者使指定的包里的类能够被传输
* ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
factory.setTrustedPackages(new ArrayList(Arrays.asList("org.apache.activemq.test,org.apache.camel.test".split(","))));
((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true);
//创建链接
connection=connectionFactory.createConnection();
//启动链接
connection.start();
//创建session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
//创建地址,即队列的名称
destination=session.createQueue("ObjectMassageTest");
//创建消费者
messageConsumer = session.createConsumer(destination);
//接受消息
message = messageConsumer.receive();
//处理消息(重点)
ObjectMessage objMessage=(ObjectMessage)message;
User user = (User)objMessage.getObject();
System.out.println(user.toString());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
//关闭资源
if(messageConsumer!=null) {
try {
messageConsumer.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(session!=null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if(connection!=null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
七、 实现队列服务监听处理消息
消息监听使用观察者设计模式。 原理:Consumer配置监听后在ActivMQ注册,然后监听本地端口,在ActiveMQ接受到消息后需要Consumer处理,将消息发送给已注册的Consumer。 修改Consumer端代码:
//创建监听,处理消息
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
//输出消息
try {
System.out.println("接收到:"+((TextMessage)message).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
});
八、Topic模型
即pub/sub发布订阅模型,所有订阅该topic的Consumer都能接收到消息。 Producer端: 使用session对象的createTopic(String topicName)方法创建地址。向该地址发送消息。 Cinsumer端: 使用session对象的createTopic(String topicName)方法创建地址。从该地址接受消息。
九、Spring整合ActiveMQ
1. 创建父项目(pom工程)和POJO工程(jar工程)
配置pom.xml
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.xyl</groupId>
<artifactId>parent-ActiveMQ</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>pom</packaging>
<!-- 对依赖的jar包的版本统一进行定义 -->
<properties>
<activemq.version>5.15.9</activemq.version>
<xbean.version>4.5</xbean.version>
<jms.version>4.3.22.RELEASE</jms.version>
<activemq-pool.version>5.15.9</activemq-pool.version>
<solrj.version>4.10.3</solrj.version>
<jedis.version>2.9.0</jedis.version>
<junit.version>4.12</junit.version>
<spring.version>4.3.22.RELEASE</spring.version>
<mybatis.version>3.2.8</mybatis.version>
<mybatis.spring.version>1.2.2</mybatis.spring.version>
<mysql.version>5.1.32</mysql.version>
<slf4j.version>1.6.4</slf4j.version>
<druid.version>1.0.9</druid.version>
<jstl.version>1.2</jstl.version>
<servlet-api.version>2.5</servlet-api.version>
<tomcat.version>2.2</tomcat.version>
<jsp-api.version>2.0</jsp-api.version>
<zkClient-version>0.10</zkClient-version>
<dubbo-version>2.5.4</dubbo-version>
<jackson.version>2.4.2</jackson.version>
<commons-net.version>3.3</commons-net.version>
<commons-fileupload.version>1.3.1</commons-fileupload.version>
</properties>
<!-- jar包的依赖注入 ,由于该工程是一个父工程,所以jar包在该pom文件中只是声明 -->
<dependencyManagement>
<dependencies>
<!-- ActiveMQ客户端完整jar包依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>${activemq.version}</version>
</dependency>
<!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>${xbean.version}</version>
</dependency>
<!-- Spring-JMS插件相关jar包依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>${jms.version}</version>
</dependency>
<!-- Spring-JMS插件相关jar包依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>${activemq-pool.version}</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
<version>${activemq-pool.version}</version>
</dependency>
<dependency>
<groupId>org.apache.solr</groupId>
<artifactId>solr-solrj</artifactId>
<version>${solrj.version}</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>${jedis.version}</version>
</dependency>
<!-- 单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
</dependency>
<!-- 日志处理 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
</dependency>
<!-- Mybatis -->
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis</artifactId>
<version>${mybatis.version}</version>
</dependency>
<dependency>
<groupId>org.mybatis</groupId>
<artifactId>mybatis-spring</artifactId>
<version>${mybatis.spring.version}</version>
</dependency>
<!-- MySql -->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!-- 连接池 -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!-- Spring -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-beans</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aspects</artifactId>
<version>${spring.version}</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>${spring.version}</version>
</dependency>
<!-- JSP相关 -->
<dependency>
<groupId>jstl</groupId>
<artifactId>jstl</artifactId>
<version>${jstl.version}</version>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<version>${servlet-api.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>jsp-api</artifactId>
<version>${jsp-api.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.xml</include>
</includes>
</resource>
<resource>
<directory>src/main/resources</directory>
<includes>
<include>**/*.xml</include>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<!-- tomcat插件,由于子项目不一定每个都是web项目,所以该插件只是声明,并未开启 -->
<pluginManagement>
<plugins>
<!-- 配置Tomcat插件 -->
<plugin>
<groupId>org.apache.tomcat.maven</groupId>
<artifactId>tomcat7-maven-plugin</artifactId>
<version>${tomcat.version}</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>11</source>
<target>11</target>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
在POJO工程中创建一个pojo--User,注入到Producer和Conmsuer中
2. 创建Producer(war工程)
项目分析思路: 1. 使用SpringMVC+Spring创建用户添加页面,通过控制层获取User对象传递到Service层 2. 在Service层调用JmsTemplate对象的send方法,创建消息发送出去。 pom.xml配置文件参照父工程 Spring整合ActiveMQ的配置文件applicationContext-jsm-producer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:jms.properties" />
<!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象. 可以为对象命名.
类似: <bean id="" class="ActiveMQConnectionFactory"></bean> -->
<amq:connectionFactory id="amqConnectionFactory" userName="${USERNAME}" password="${PASSWORD}" brokerURL="${BROKERURL}" />
<!-- 配置池化的ConnectionFactory。 为连接ActiveMQ的connectionFactory提供连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="amqConnectionFactory"></property>
<property name="maxConnections" value="10"></property>
</bean>
<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
<!-- 定义Spring-JMS中的连接工厂对象 CachingConnectionFactory - spring框架提供的连接工厂对象.
不能真正的访问MOM容器. 类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问. -->
<!-- 配置有缓存的ConnectionFactory,session的缓存大小可定制。 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>
<property name="sessionCacheSize" value="3"></property>
</bean>
<!-- JmsTemplate配置 -->
<bean id="template" class="org.springframework.jms.core.JmsTemplate">
<!-- 给定连接工厂, 必须是spring创建的连接工厂. -->
<property name="connectionFactory" ref="connectionFactory"></property>
<!-- 可选 - 默认目的地命名 -->
<property name="defaultDestinationName" value="test-spring"></property>
</bean>
</beans>
用户名密码的参数从jms。properties文件中取
USERNAME=admin
PASSWORD=admin
BROKERURL=tcp://192.168.40.128:61616
在service层将消息发送出去
@Service
public class UserServiveImpl implements UserService {
@Autowired
private JmsTemplate jmsTemplate;
@Override
public void addUser(final User user) {
//可以通过JmsTemplate对象设置queue
//jmsTemplate.setDefaultDestinationName("aaa");
jmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
ObjectMessage message = session.createObjectMessage(user);
return message;
}
3. 创建Consumer(jar工程)
项目思路分析
1. 在Spring中配置消息监听器, 创建自定义的监听器。在监听器中调用service层,将接受到的消息传递到service层 1. 在Test中启动Spring,是线程堵塞,等待消息。 pom.xml配置文件参照父工程 Spring整合ActiveMQ的配置文件applicationContext-jsm-consmuer.xml:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:jms.properties"/>
<!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象. 可以为对象命名.
类似: <bean id="" class="ActiveMQConnectionFactory"></bean> -->
<amq:connectionFactory id="amqConnectionFactory" userName="${USERNAME}" password="${PASSWORD}" brokerURL="${BROKERURL}"
trustAllPackages="true"/>
<!-- 配置池化的ConnectionFactory。 为连接ActiveMQ的connectionFactory提供连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.jms.pool.PooledConnectionFactory">
<property name="connectionFactory" ref="amqConnectionFactory"></property>
<property name="maxConnections" value="10"></property>
</bean>
<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
<!-- 定义Spring-JMS中的连接工厂对象 CachingConnectionFactory - spring框架提供的连接工厂对象.
不能真正的访问MOM容器. 类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问. -->
<!-- 配置有缓存的ConnectionFactory,session的缓存大小可定制。 -->
<bean id="connectionFactory" class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>
<property name="sessionCacheSize" value="3"></property>
<!-- <property name="trustAllPackages" value="true"></property> -->
</bean>
<!-- 注册监听器 -->
<!-- 开始注册监听.
需要的参数有:
acknowledge - 消息确认机制
container-type - 容器类型 default|simple
simple:SimpleMessageListenerContainer最简单的消息监听器容器,只能处理固定数量的JMS会话,且不支持事务。
default:DefaultMessageListenerContainer是一个用于异步消息监听器容器 ,且支持事务
destination-type - 目的地类型. 使用队列作为目的地.
connection-factory - 连接工厂, spring-jms使用的连接工厂,必须是spring自主创建的
不能使用三方工具创建的工程. 如: ActiveMQConnectionFactory.
<jms:listener-container acknowledge="auto" container-type="default" destination-type="queue" connection-factory="connectionFactory" >
<!-- 在监听器容器中注册某监听器对象.
destination - 设置目的地命名
ref - 指定监听器对象
<jms:listener destination="test-spring" ref="queueMessageListener"/>
</jms:listener-container>
</beans>
自定义的监听器:
@Component
public class QueueMessageListener implements MessageListener{
@Autowired
private UserService userServiceImpl;
@Override
public void onMessage(Message message) {
try {
User user=(User)((ObjectMessage)message).getObject();