JMS规范和AMQP协议
JMS经典模式详解
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件
(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行
异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。
它类似于JDBC(Java Database Connectivity)。
JMS规范文档下载地址:
https://download.oracle.com/otndocs/jcp/7195-jms-1.1-fr-spec-oth-JSpec/
JMS消息
消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。
报文头包括消息头字段和消息头属性。字段是JMS协议规定的字段,属性可以由用户按需添加。
JMS报文头全部字段:
JMSCorrelationID 客户端使用该字段的值与另一条消息关联。一个典型的场景是使用该字段 将响应消息与请求消息关联。JMSCorrelationID可以包含如下值: - 服务器规定的消息ID - 应用指定的字符串 - 服务器原生的byte[]值 JMSReplyTo 该字段包含了在客户端发送消息的时候指定的Destination。即对该消息的 响应应该发送到该字段指定的Destination。设置了该字段值的消息一般期 望收到一个响应。 JMSRedelivered 如果这个字段是true,则告知消费者应用这条消息已经发送过了,消费端 应用应该小心别重复处理了。 JMSType 消息发送的时候用于标识该消息的类型。具体有哪些类型,由JMS实现厂商 决定。 JMSExpiration 发送消息时,其到期时间将计算为send方法上指定的生存时间值与当前 GMT值之和。 从send方法返回时,消息的JMSExpiration标头字段包含此 值。 收到消息后,其JMSExpiration标头字段包含相同的值。 过期时间 JMSPriority JMS定义了一个十级优先级值,最低优先级为0,最高优先级为9。 此外, 客户端应将优先级0-4视为正常优先级,将优先级5-9视为快速优先级。 JMS不需要服务器严格执行消息的优先级排序; 但是,它应该尽力在普通 消息之前传递加急消息。消息主体则携带着应用程序的数据或有效负载。
根据有效负载的类型来划分,可以将消息分为几种类型:
简单文本(TextMessage)
可序列化的对象(ObjectMessage)
属性集合(MapMessage)
字节流(BytesMessage)
原始值流(StreamMessage)
无有效负载的消息(Message)。
JMS由以下元素组成:
JMS供应商产品 JMS接口的一个实现。该产品可以是Java的JMS实现,也可以是非Java的面向消息中间件的适 配器。
JMS Client
生产或消费基于消息的Java的应用程序或对象。
JMS Producer
创建并发送消息的JMS客户。
JMS Consumer
接收消息的JMS客户。
JMS Message
包括可以在JMS客户之间传递的数据的对象
JMS Queue
缓存消息的容器。消息的接受顺序并不一定要与消息的发送顺序相同。消息被消费后将从队列 中移除。
JMS Topic
Pub/Sub模式。
ConnectionFactory 接口(连接工厂)
用户用来创建到JMS提供者的连接的被管对象。JMS客户通过可移植的接口访问连接,这样当下层的实现改变
时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据
消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
Session 接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。
MessageConsumer 接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。
一个消息有三个主要部分:
消息头(必须):
包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):
包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):
允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)。
点对点(队列模式)
在点对点或队列模型下 ,一个生产者向一个特定的 队列 发布消息,一个消费者从该队列中读取消息。这里,
生产者知道消费者的队列,并直接将消息发送到消费者的队列,概括为:
一条消息只有一个消费者获得
生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行状态。
每一个成功处理的消息要么自动确认,要么由接收者手动确认。
JMS有两种传递消息的方式。
标记为NON_PERSISTENT(非持久)的消息最多投递一次,而标记为PERSISTENT(持久)的消息将使用暂存后再转送的机理投递。
如果一个JMS服务下线,持久性消息不会丢失,等该服务恢复时再传递。默认的消息传递方式是非持久性的。使用非持久性消息可能降低内务和需要的存储器,当不需要接收所有消息时使用。
Apache ActiveMQ
RabbitMQ
RocketMQ
JBoss 社区所研发的 HornetQ
Joram
Coridan的MantaRay
The OpenJMS Group的OpenJMS
专有供应商包括
BEA的BEA WebLogic Server JMS
TIBCO Software的EMS
GigaSpaces Technologies的GigaSpaces
Softwired 2006的iBus
IONA Technologies的IONA JMS
SeeBeyond的IQManager(2005年8月被Sun Microsystems并购)
webMethods的JMS+-
my-channels的Nirvana
Sonic Software的SonicMQ
SwiftMQ的SwiftMQ
IBM的WebSphere MQ
JMS在应用集群中的问题
生产中应用基本上都是以集群部署的。在Queue模式下,消息的消费没有什么问题,因为不同节点的相同应用会抢占式地消费消息,这样还能分摊负载。 如果使用Topic广播模式?对于一个消息,不同节点的相同应用都会收到该消息,进行相应的操作,这样就重复消费了。。。
方案一:选择Queue模式,创建多个一样的Queue,每个应用消费自己的Queue。弊端:浪费空间,生产者还需要关注下游到底有几个消费者,违反了“解耦”的初衷。
方案二:选择Topic模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争。弊端:对业务侵入较大,不是优雅的解决方法。
ActiveMQ通过“虚拟主题”解决了这个问题。
生产中似乎需要结合这两种模式:即不同节点的相同应用间存在竞争,会部分消费(P2P),而不 同的应用都需要消费到全量的消息(Topic)模式。这样就可以避免重复消费。
JMS是JEE平台的标准消息传递API。它可以在商业和开源实现中使用。每个实现都包括一个JMS服务器,一个JMS客户端库,以及用于管理消息传递系统的其他特定于实现的组件。 JMS提供程序可以是消息传递服务的独立实现,也可以是非JMS消息传递系统的桥梁。
JMS客户端API是标准化的,因此JMS应用程序可在供应商的实现之间移植。但是:
底层消息传递实现未指定,因此JMS实现之间没有互操作性。除非存在桥接技术,否则想要共享消息传递的Java应用程序必须全部使用相同的JMS实现。
如果没有供应商特定的JMS客户端库来启用互操作性,则非Java应用程序将无法访问JMS。
AMQP 0-9-1是一种消息传递协议,而不是像JMS这样的API。任何实现该协议的客户端都可以访问支持AMQP 0-9-1的代理。
协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的AMQP 0-9-1客户端都可以参与消息传递系统,而无需桥接不兼容的服务器实现。
AMQP协议剖析
AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0。
Virtual host:
虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue。
Exchange:
交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。
Routing key:
路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通常需要和具体的Exchange类型、Binding的Routing key结合起来使用。
Bindings:
指定了Exchange和Queue之间的绑定关系。Exchange根据消息的Routing key和Binding配置(绑定关系、Binding、Routing key等)来决定把消息分派到哪些具体的queue中。这依赖于Exchange类型。
Message Queue:
实际存储消息的容器,并把消息传递给最终的Consumer。
AMQP 传输层架构
AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。
我们假定有一个可靠的面向流的网络传输层(TCP/IP或等价的协议)。
在一个单一的socket连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序列传输。
我们使用小的数据类型来构造数据帧,如bit,integer,string以及字段表。数据帧的字段做了轻微的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。
线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是AMQP)。我们假定AMQP会扩展,改进以及随时间的其他变化,并要求wire-level格式支持这些变化。
AMQP客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一个很有用的技术手段,因为它可以让我们断言假设和前置条件。
在AMQP中,我们需要协商协议的一些特殊方面:
1、 真实的协议和版本。服务器可能在同一个端口支持多个协议。
2、 双方的加密参数和认证方式。这是功能层的一部分。
3、 数据帧最大大小,通道数量以及其他操作限制。
对限制条件的认同可能会导致双方重新分配key的缓存,避免死锁。每个发来的数据帧要么遵守认同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。出色地践行了“要么一切工作正常,要么完全不工作”的RabbitMQ哲学。
协商双方认同限制到一个小的值,如下:
服务端必须告诉客户端它加上了什么限制。
客户端响应服务器,或许会要求对客户端的连接降低限制。
RabbitMQ的JMS客户端用RabbitMQ Java客户端实现,既与JMS API兼容,也与AMQP 0-9-1协议兼容。
RabbitMQ JMS客户端不支持某些JMS 1.1功能:
JMS客户端不支持服务器会话。
XA事务支持接口未实现。
RabbitMQ JMS主题选择器插件支持主题选择器。队列选择器尚未实现。
支持RabbitMQ连接的SSL和套接字选项,但仅使用RabbitMQ客户端提供的(默认)SSL连接协议。