ActiveMQ
1.什么是 ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个 完全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现,尽管 JMS 规范出台已经是很久 的事情了,但是 JMS 在当今的 J2EE 应用中间仍然扮演着特殊的地位。
2.什么是消息
“消息”是在两台计算机间传送的数据单位。消息可以非常简单,例如只包含文本字符串; 也可以更复杂,可能包含嵌入对象。
3.什么是队列
队列就是事物按照排好的顺序等待接受服务或者处理。
4.什么是消息队列
“消息队列”是在消息的传输过程中保存消息的容器。
5.常用消息服务应用
①ActiveMQ
ActiveMQ 是 Apache 出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完
全支持 JMS1.1 和 J2EE 1.4 规范的 JMS Provider 实现。
②RabbitMQ
RabbitMQ 是一个在 AMQP 基础上完成的,可复用的企业消息系统。他遵循 Mozilla Public
License 开源协议。开发语言为 Erlang。
③RocketMQ
由阿里巴巴定义开发的一套消息队列应用服务。
6.消息服务的应用场景
消息队列的主要特点是异步处理,主要目的是减少请求响应时间和解耦。所以主要的使 用场景就是将比较耗时而且不需要即时(同步)返回结果的操作作为消息放入消息队列。同 时由于使用了消息队列,只要保证消息格式不变,消息的发送方和接收方并不需要彼此联系, 也不需要受对方的影响,即解耦和。
例1用户注册:
1.注册处理以及写数据库
2.发送注册成功的手机短信
3.发送注册成功的邮件信息
如果用消息中间件:则可以创建两个线程来做这些事情,直接发送消息给消息中间件,
然后让邮件服务和短信服务自己去消息中间件里面去取消息,然后取到消息后再自己做对应
的业务操作。就是这么方便
例2订单处理:
1.在购物车中点击结算
2.完成支付
3.创建订单
4.调用库存系统
订单完成后,订单系统并不去直接调用库存系统,而是发送消息到消息中间件,写入一
个订单信息。库存系统自己去消息中间件上去获取,然后做发货处理,并更新库存,这样能
够实现互联网型应用追求的快这一个属性。而库存系统读取订单后库存应用这个操作也是非
常快的,所以有消息中间件对解耦来说也是一个不错的方向。
例3秒杀功能:
1.用户点击秒杀
2.发送请求到秒杀应用
3.在请求秒杀应用之前将请求放入到消息队列
4.秒杀应用从消息队列中获取请求并处理。
比如,系统举行秒杀活动,热门商品。流量蜂拥而至 100 件商品,10 万人挤进来怎么
办?10 万秒杀的操作,放入消息队列。秒杀应用处理消息队列中的 10 万个请求中的前 100
个,其他的打回,通知失败。流量峰值控制在消息队列处,秒杀应用不会瞬间被怼死.
7.什么是 JMS
JMS(Java Messaging Service)是 Java 平台上有关面向消息中间件的技术规范,它便于 消息系统中的 Java 应用程序进行消息交换,并且通过提供标准的产生、发送、接收消息的接 口,简化企业应用的开发。
JMS 模型 :
1.点对点模型(Point To Point):
生产者发送一条消息到 queue,只有一个消费者能收到。
2.发布订阅模型(Publish/Subscribe)
发布者发送到 topic 的消息,只有订阅了 topic 的订阅者才会收到消息
8.ActiveMQ 安装
下载地址:ActiveMQ 官网: http://activemq.apache.org
版本说明:
ActiveMQ5.10.x 以上版本必须使用 JDK1.8 才能正常使用。
ActiveMQ5.9.x 及以下版本使用 JDK1.7 即可正常使用。
1.使用文件上传工具将ActiveMQ上传到linux服务器
2.使用tar -zxf 压缩包名 命令进行解压
目录介绍:
bin 存放的是脚本文件
conf 存放的是基本配置文件
data 存放的是日志文件
docs 存放的是说明文档
examples 存放的是简单的实例
lib 存放的是 activemq 所需 jar 包
webapps 用于存放项目的目录
3.将解压后的文件拷贝/复制到/usr/local目录下
启动测试:
命令:/usr/local/activemq/bin/activemq start
浏览器访问: http:// ip:8161/admin/
用户名: admin
密码: admin
修改访问端口:
修改 ActiveMQ 配置文件: /usr/local/activemq/conf/jetty.xml
修改用户名和密码:
修改 conf/users.properties 配置文件.内容为: 用户名=密码
配置文件 /usr/local/activemq/conf/jactivemq.xml :
配置文件中,配置的是 ActiveMQ 的核心配置信息. 是提供服务时使用的配置. 可以修改
启动的访问端口. 即 java 编程中访问 ActiveMQ 的访问端口.
默认端口为 61616.
使用协议是: tcp 协议.
9.ActiveMQ 术语
Destination: 目的地,JMS Provider(消息中间件)负责维护,用于对 Message 进行管
理对象。MessageProducer 需要指定 Destination 才能发送消息,MessageReceiver 需要指定 Destination才能接收消息。
Producer:
消息生成者,负责发送 Message 到目的地。
Consumer | Receiver:
消息消费者,负责从目的地中消费【处理|监听|订阅】Message。
Message:
消息,消息封装一次通信的内容。
10.ActiveMQ 常用 API
ConnectionFactory
链接工厂, 用于创建链接的工厂类型.
Connection
链接. 用于建立访问 ActiveMQ 连接的类型, 由链接工厂创建.
Session
会话, 一次持久有效有状态的访问. 由链接创建.
Destination & Queue
目的地, 用于描述本次访问 ActiveMQ 的消息访问目的地. 即 ActiveMQ 服务中的具体队
列. 由会话创建.
interface Queue extends Destination
MessageProducer
消息生成者, 在一次有效会话中, 用于发送消息给 ActiveMQ 服务的工具. 由会话创建.
MessageConsumer
消息消费者【消息订阅者,消息处理者】, 在一次有效会话中, 用于从 ActiveMQ 服务中
获取消息的工具. 由会话创建.
Message
消息, 通过消息生成者向 ActiveMQ 服务发送消息时使用的数据载体对象或消息消费者
从 ActiveMQ 服务中获取消息时使用的数据载体对象. 是所有消息【文本消息,对象消息等】
具体类型的顶级接口. 可以通过会话创建或通过会话从 ActiveMQ 服务中获取.
11.ActiveMQ应用demo
1.处理文本消息demo
消息生产者producer:
public static void SendHelloWordActiveMQ(String msgText) {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息生产者
MessageProducer messageProducer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createQueue("helloword-destination");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 创建文本消息
message = session.createTextMessage(msgText);
// 发送消息
messageProducer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (messageProducer != null) {
try {
messageProducer.close();
} catch (JMSException e) {
e.printStackTrace();
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
消息消费者consumer:
public static void readHelloWordActiveMQ() {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息消费者
MessageConsumer messageConsumer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createQueue("helloword-destination");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
// 接收消息
message = messageConsumer.receive();
String msg = ((TextMessage) message).getText();
System.out.println(msg);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (messageConsumer != null) {
try
{
messageConsumer.close();
} catch (JMSException e) {
e.printStackTrace();
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
2.处理对象消息demo
消息生产者producer:
public static void SendHelloWordActiveMQ(Users users) {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息生产者
MessageProducer messageProducer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createQueue("users");
// 创建消息生产者
messageProducer = session.createProducer(destination);
message = session.createObjectMessage(users);
// 发送消息
messageProducer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (messageProducer != null) {
try {
messageProducer.close();
} catch (JMSException e) {
e.printStackTrace();
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
消息消费者consumer:
public static void readHelloWordActiveMQ() {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息消费者
MessageConsumer messageConsumer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createQueue("users");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
// 接收消息
message = messageConsumer.receive();
ObjectMessage obj = (ObjectMessage) message;
Users users = (Users) obj.getObject();
System.out.println(users);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (messageConsumer != null) {
try {
messageConsumer.close();
} catch (JMSException e) {
e.printStackTrace();
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
常见异常处理:
异常原因:
在接收消息时,获取到的对象无法进行强制类型转换成另外一个类
如果两个项目的包名一致,可以生成序列id进行解决
在消费者项目中的pom文件中加入对生产者pojo的依赖
3.队列服务监听消息
消息生产者producer:
public static void SendHelloWordActiveMQ(String msgText) {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息生产者
MessageProducer messageProducer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createQueue("helloword-destination");
// 创建消息生产者
messageProducer = session.createProducer(destination);
// 创建文本消息
message = session.createTextMessage(msgText);
// 发送消息
messageProducer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (messageProducer != null) {
try {
messageProducer.close();
} catch (JMSException e) {
e.printStackTrace();
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
消息消费者consumer:
public static void readHelloWordActiveMQ() {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息消费者
MessageConsumer messageConsumer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createQueue("helloword-destination");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
// 监听消息队列,接收消息
messageConsumer.setMessageListener(new MessageListener() {
// ActiveMQ 回调的方法。通过该方法将消息传递到 consumer
@Override
public void onMessage(Message message) {
String msg = "";
try {
msg = ((TextMessage) message).getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println(msg);
} catch (Exception e) {
e.printStackTrace();
}
4.topic模型实现
消息生产者producer:
public static void SendHelloWordActiveMQ(String msgText) {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息生产者
MessageProducer messageProducer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createTopic("topic");
// 创建消息生产者
messageProducer = session.createProducer
(destination);
// 创建文本消息
message = session.createTextMessage(msgText);
// 发送消息
messageProducer.send(message);
} catch (Exception e) {
e.printStackTrace();
} finally {
if (messageProducer != null) {
try {
messageProducer.close();
} catch (JMSException e) {
e.printStackTrace();
if (session != null) {
try {
session.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
if (connection != null) {
try {
connection.close();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
消息消费者consumer:
public void readHelloWordActiveMQ() {
// 链接工厂
ConnectionFactory connectionFactory = null;
// 链接
Connection connection = null;
// 目的地
Destination destination = null;
// 会话
Session session = null;
// 消息消费者
MessageConsumer messageConsumer = null;
// 消息
Message message = null;
try {
* userName访问消息服务用户名,默认admin 在jetty-ream.xml文件中进行配置
* password访问消息服务密码,默认admin 在jetty-ream.xml文件中进行配置
* brokerURL访问消息服务的url, 协议+ip+端口
// 创建链接工厂
connectionFactory = new ActiveMQConnectionFactory("admin", "admin", "tcp://192.168.75.137:61616");
// 创建链接
connection = connectionFactory.createConnection();
// 启动链接
connection.start();
* transacted 是否使用事务 true时 配合事务提交批量把消息插入到消息队列中 acknowledgeMode必须取值
* Session.SESSION_TRANSACTED 表示把事务管理交给session来管理 false时
* 每次都会提交到消息队列中,不使用事务 acknowledgeMode取值: Session.AUTO_ACKNOWLEDGE
* 自动消息确认机制 消费者消费时自动向MQ中发送一条信息,MQ会将这条消息移除 Session.CLIENT_ACKNOWLEDGE
* 客户端确认机制 需要在客户端调用API来完成 Session.DUPS_OK_ACKNOWLEDGE 有副本的客户端确认机制
* 收到消息时不会在MQ中进行删除会一直缓存在MQ中 acknowledgeMode 事务确认机制
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建目的地,目的地的名称即队列的名称,消费者需要此名称访问对应的队列
destination = session.createTopic("topic");
// 创建消息消费者
messageConsumer = session.createConsumer(destination);
// 接收消息
messageConsumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
String msg = "";
try {
msg = ((TextMessage) message).getText();
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
System.out.println("接收服务端发送的消息:" + msg + "1");
} catch (Exception e) {
e.printStackTrace();
}
12.Spring整合ActiveMQ相关配置
①ActiveMQ相关jar依赖
<!-- ActiveMQ客户端完整jar包依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.9.0</version>
</dependency>
<!-- ActiveMQ和Spring整合配置文件标签处理jar包依赖 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>4.5</version>
</dependency>
<!-- Spring-JMS插件相关jar包依赖 -->
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>4.1.6.RELEASE</version>
</dependency>
<!-- Spring-JMS插件相关jar包依赖 -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-jms-pool</artifactId>
<version>5.9.0</version>
</dependency>
②producer生成者applicationContext-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象. 可以为对象命名.
类似: <bean id="" class="ActiveMQConnectionFactory"></bean> -->
<amq:connectionFactory brokerURL="tcp://192.168.75.137:61616"
userName="admin" password="admin" id="amqConnectionFactory" />
<!-- 配置池化的ConnectionFactory。 为连接ActiveMQ的connectionFactory提供连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
<property name="connectionFactory" ref="amqConnectionFactory"></property>
<property name="maxConnections" value="10"></property>
</bean>
<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
<!-- 定义Spring-JMS中的连接工厂对象 CachingConnectionFactory - spring框架提供的连接工厂对象.
不能真正的访问MOM容器. 类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问. -->
<!-- 配置有缓存的ConnectionFactory,session的缓存大小可定制。 -->
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>
<property name="sessionCacheSize" value="3"></property>
</bean>
<!-- JmsTemplate配置 -->
<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate">
<!-- 给定连接工厂, 必须是spring创建的连接工厂. -->
<property name="connectionFactory" ref="connectionFactory"></property>
<!-- 可选 - 默认目的地命名 -->
<property name="defaultDestinationName" value="test-spring"></property>
</bean>
</beans>
发送消息到activeMQ:
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private JmsTemplate JmsTemplate;
@Override
public void addMsg(Users user) {
//设置目的地名称
// JmsTemplate.setDefaultDestinationName("test");
JmsTemplate.send(new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
return session.createObjectMessage(user);
}
③consumer消费者applicationContext-jms.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:jms="http://www.springframework.org/schema/jms"
xmlns:amq="http://activemq.apache.org/schema/core"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- 需要创建一个连接工厂,连接ActiveMQ. ActiveMQConnectionFactory. 需要依赖ActiveMQ提供的amq标签 -->
<!-- amq:connectionFactory 是bean标签的子标签, 会在spring容器中创建一个bean对象.
可以为对象命名. 类似: <bean id="" class="ActiveMQConnectionFactory"></bean>
<amq:connectionFactory brokerURL="tcp://192.168.75.137:61616"
userName="admin" password="admin" id="amqConnectionFactory"/>
<!-- 配置池化的ConnectionFactory。 为连接ActiveMQ的connectionFactory提供连接池 -->
<bean id="pooledConnectionFactory" class="org.apache.activemq.pool.PooledConnectionFactoryBean">
<property name="connectionFactory" ref="amqConnectionFactory"></property>
<property name="maxConnections" value="10"></property>
</bean>
<!-- spring管理JMS相关代码的时候,必须依赖jms标签库. spring-jms提供的标签库. -->
<!-- 定义Spring-JMS中的连接工厂对象
CachingConnectionFactory - spring框架提供的连接工厂对象. 不能真正的访问MOM容器.
类似一个工厂的代理对象. 需要提供一个真实工厂,实现MOM容器的连接访问.
<bean id="connectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<property name="targetConnectionFactory" ref="pooledConnectionFactory"></property>
<property name="sessionCacheSize" value="3"></property>
</bean>
<!-- 注册监听器 -->
<!-- 开始注册监听.
需要的参数有:
acknowledge - 消息确认机制
container-type - 容器类型 default|simple
simple:SimpleMessageListenerContainer最简单的消息监听器容器,只能处理固定数量的JMS会话,且不支持事务。
default:DefaultMessageListenerContainer是一个用于异步消息监听器容器 ,且支持事务
destination-type - 目的地类型. 使用队列作为目的地.
connection-factory - 连接工厂, spring-jms使用的连接工厂,必须是spring自主创建的
不能使用三方工具创建的工程. 如: ActiveMQConnectionFactory.
<jms:listener-container acknowledge="auto" container-type="default"
destination-type="queue" connection-factory="connectionFactory" >
<!-- 在监听器容器中注册某监听器对象.
destination - 设置目的地命名
ref - 指定监听器对象
<jms:listener destination="test-spring" ref="myListener"/>
</jms:listener-container>
</beans>
消息监听器 从activeMQ从获取消息进行消费
@Component(value="myListener")
public class MyMessageListener implements MessageListener{
@Autowired
private DisposeMsg disposeMsg;
@Override
public void onMessage(Message message) {
try {
Serializable obj = ((ObjectMessage)message).getObject();
disposeMsg.showMsg((Users)obj);