相关文章推荐
坚强的手链  ·  使用 FFmpeg ...·  2 年前    · 
大力的面包  ·  sql - Error with ...·  2 年前    · 
活泼的足球  ·  influxDB-查询操作 - ...·  2 年前    · 

Spring MVC整合WebSocket通信

============================================================================

1、使用 Spring 的低层级 WebSocket API

2、使用JSR356定义的WebSocket规范

3、使用消息代理注解@EnableWebSocketMessageBroker

============================================================================

转载: http://www.cnblogs.com/yangchongxing/p/8474256.html

WebSocket是HTML5开始提供的一种浏览器与服务器间进行全双工通讯的网络技术。依靠这种技术可以实现客户端和服务器端的长连接,双向实时通信。
特点:事件驱动、异步,使用ws或者wss协议的客户端socket,能够实现真正意义上的推送功能
缺点:少部分浏览器不支持,浏览器支持的程度与方式有区别。

参考资料: https://developer.mozilla.org/zh-CN/docs/Web/API/WebSocket

WebSocket(url[, protocols])  返回一个 WebSocket 对象
名称                  值    作用
WebSocket.CONNECTING  0    正尝试与服务器建立连接
WebSocket.OPEN        1    与服务器已经建立连接
WebSocket.CLOSING     2    正在关闭与服务器的连接
WebSocket.CLOSED      3    已经关闭了与服务器的连接

WebSocket实例的readyState属性对照判断状态

binaryType      使用二进制的数据类型连接
bufferedAmount  只读 未发送至服务器的字节数
extensions      只读 服务器选择的扩展
onclose         用于指定连接关闭后的回调函数
onerror         用于指定连接失败后的回调函数
onmessage       用于指定当从服务器接受到信息时的回调函数
onopen          用于指定连接成功后的回调函数
protocol        只读 服务器选择的下属协议
readyState      只读 当前的链接状态
url             只读 WebSocket 的绝对路径

WebSocket.close([code[, reason]]) 关闭当前链接

WebSocket.send(data) 向服务器发送数据


Java服务端:
JSR356定义了WebSocket的规范,JSR356 的 WebSocket 规范使用 javax.websocket.*的 API,可以将一个普通 Java 对象(POJO)使用 @ServerEndpoint 注释作为 WebSocket 服务器的端点。

1、使用 Spring 的低层级 WebSocket API

实现WebSocketHandler接口、该接口包含5个方法,用于处理不同的事件

public interface WebSocketHandler {
    void afterConnectionEstablished(WebSocketSession session) throws Exception;
    void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception;
    void handleTransportError(WebSocketSession session, Throwable exception) throws Exception;
    void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception;
    boolean supportsPartialMessages();

比实现接口更为简单的方式是扩展AbstractWebSocketHandler抽象类,下面是抽象类的代码

public abstract class AbstractWebSocketHandler implements WebSocketHandler {
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        if (message instanceof TextMessage) {
            handleTextMessage(session, (TextMessage) message);
        else if (message instanceof BinaryMessage) {
            handleBinaryMessage(session, (BinaryMessage) message);
        else if (message instanceof PongMessage) {
            handlePongMessage(session, (PongMessage) message);
        else {
            throw new IllegalStateException("Unexpected WebSocket message type: " + message);
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    protected void handleBinaryMessage(WebSocketSession session, BinaryMessage message) throws Exception {
    protected void handlePongMessage(WebSocketSession session, PongMessage message) throws Exception {
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    @Override
    public boolean supportsPartialMessages() {
        return false;

我们以文本消息为例直接继承TextWebSocketHandler类
TextWebSocketHandler继承自AbstractWebSocketHandler类用来处理文本消息。
BinaryWebSocketHandler继承自AbstractWebSocketHandler类用来处理二进制消息。

public class CommoditySocket extends TextWebSocketHandler {
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        System.out.println("open...");
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
        System.out.println("Closed");
    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        super.handleTextMessage(session, message);
        System.out.println("收到消息:" + message.getPayload());

消息处理类完成了,来看配置文件

首先追加websocket命名空间

<beans xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:task="http://www.springframework.org/schema/task"
    xmlns:websocket="http://www.springframework.org/schema/websocket"
    xsi:schemaLocation="http://www.springframework.org/schema/websocket http://www.springframework.org/schema/websocket/spring-websocket-4.3.xsd
        http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task-4.3.xsd
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-4.3.xsd
        http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-4.3.xsd
        http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-4.3.xsd">

 再追加如下配置,注意path的路径这个是请求的地址,ws://域名:端口/项目名/socket/text

<websocket:handlers>
    <websocket:mapping handler="textHandler" path="/socket/text"/>
</websocket:handlers>
<bean id="textHandler" class="cn.ycx.web.socket.TextHandler"></bean>

浏览器端使用标准的WebSocket

var url = 'ws://' + window.location.host + '/ycxxml/socket/text';  //配置文件中配的path有关
var socket = new WebSocket(url);
socket.onopen = function() {
    console.log("open...");
    socket.send("start talk...")
socket.onmessage = function(e) {
    console.log("服务器发来:" + e.data);
    document.write("" + e.data + "<br/>");
socket.onclose = function() {
    console.log("close...");

2、使用JSR356定义的WebSocket规范

@ServerEndpoint(value="/websocket/commodity/{userId}", configurator = SpringConfigurator.class)

特别注意:configurator = SpringConfigurator.class,若要进行对象注入此段代码必须加

表示将普通的Java对象注解为WebSocket服务端点,运行在ws://[Server端IP或域名]:[Server端口]/项目/websocket/commodity/{userId}的访问端点,客户端浏览器已经可以对WebSocket客户端API发起HTTP长连接了。
@OnOpen
在新连接建立时被调用。@PathParam可以传递url参数,满足业务需要。Session表明两个WebSocket端点对话连接的另一端,可以理解为类似HTTPSession的概念。
@OnClose
在连接被终止时调用。参数closeReason可封装更多细节,如为什么一个WebSocket连接关闭。
@OnMessage
注解的Java方法用于接收传入的WebSocket信息,这个信息可以是文本格式,也可以是二进制格式

服务器端代码:

package cn.ycx.web.socket;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.websocket.CloseReason;
import javax.websocket.OnClose;
import javax.websocket.OnError;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import org.springframework.web.socket.server.standard.SpringConfigurator;
import com.alibaba.fastjson.JSON;
 * @author 杨崇兴
@ServerEndpoint(value="/websocket/commodity/{fromUserId}/{toUserId}", configurator = SpringConfigurator.class)
public class WebSocketServer {
    // 已经建立链接的对象缓存起来
    private static ConcurrentMap<Integer, WebSocketServer> serverMap = new ConcurrentHashMap<Integer, WebSocketServer>();
    // 当前session
    private Session currentSession;
    @OnOpen
    public void onOpen(Session session, @PathParam("fromUserId") int fromUserId, @PathParam("toUserId") int toUserId) throws IOException {
        this.currentSession = session;
        serverMap.put(fromUserId, this);//建立链接时,缓存对象
    @OnClose
    public void onClose(Session session, CloseReason reason) {
        System.out.println(reason.toString());
        if (serverMap.containsValue(this)) {
            Iterator<Integer> keys = serverMap.keySet().iterator();
            int userId = 0;
            while(keys.hasNext()) {
                userId 




    
= keys.next();
                if (serverMap.get(userId) == this) {
                    serverMap.remove(userId, this);//关闭链接时,删除缓存对象
        this.currentSession = null;
        try {
            session.close();
        } catch (IOException e) {
            e.printStackTrace();
    @OnMessage()
    @SuppressWarnings("unchecked")
    public void onMessage(String json) {
        HashMap<String, String> map =  JSON.parseObject(json, HashMap.class);
        int fromUserId = Integer.parseInt(map.get("fromUserId"));
        int toUserId = Integer.parseInt(map.get("toUserId"));
        String content = map.get("content").toString();
        WebSocketServer server = serverMap.get(toUserId);//若存在则用户在线,否在用户不在线
        if (server != null && server.currentSession.isOpen()) {
            if (fromUserId != toUserId) {
                try {
                    server.currentSession.getBasicRemote().sendText(content);
                } catch (IOException e) {
                    e.printStackTrace();
    @OnError
    public void onError(Throwable t) {
        t.printStackTrace();

注意:修改了原来的问题,serverMap对象全局缓存了已经链接上的对象,通过这对象也能判断用户是否在线。

注意:使用spring boot是要定义ServerEndpointExporter

If you want to use @ServerEndpoint in a Spring Boot application that used an embedded container, you must declare a single ServerEndpointExporter @Bean, as shown in the following example:

如果想要在使用嵌入式容器的Spring Boot应用中使用@ServerEndpoint,则必须声明单个ServerEndpointExporter @Bean,如下例所示:

@Bean
public ServerEndpointExporter serverEndpointExporter() {
    return new ServerEndpointExporter();

The bean shown in the preceding example registers any @ServerEndpoint annotated beans with the underlying WebSocket container. When deployed to a standalone servlet container, this role is performed by a servlet container initializer, and the ServerEndpointExporter bean is not required.

前面示例中所示任何在WebSocket容器中使用@ServerEndpoint注解标注的beans。当部署到独立的servlet容器时,此角色由servlet容器初始值设定项执行,并且不需要 ServerEndpointExporter bean

浏览器端:

<%@ page language="java" contentType="text/html; charset=UTF-8"
    pageEncoding="UTF-8"%>
<!DOCTYPE html PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN" "http://www.w3.org/TR/html4/loose.dtd">
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8">
<title>socketjs</title>
</head>
发送者:<input id="fromUserId" value="2">
接收者:<input id="toUserId" value="3">
<button type="button" onclick="openClick();">打开</button>
<button type="button" onclick="closeClick();">关闭</button><br/>
<input id="message" value="---"/>
<button type="button" onclick="sendClick();">发送</button>
<div id="content"></div>
<script>
var socket;
var t;
function openClick() {
    connection();
function closeClick() {
    if (socket) {
        socket.close();
function sendClick() {
    var fromUserId = document.getElementById("fromUserId").value;
    var toUserId = document.getElementById("toUserId").value;
    var content = document.getElementById("message").value;
    var obj = {
            "fromUserId":fromUserId,
            "toUserId":toUserId,
            "content":content
    document.getElementById("content").innerHTML = document.getElementById("content").innerHTML + '<br/>' + fromUserId + "说:" + content;
    socket.send(JSON.stringify(obj));
    console.log(fromUserId + "说:" + JSON.stringify(content));
var connection = function() {
    console.log("connection()");
    




    
var fromUserId = document.getElementById("fromUserId");
    var toUserId = document.getElementById("toUserId");
    var url = 'ws://' + window.location.host + '/ycxcode/websocket/commodity/{' + fromUserId.value + '}/{' + toUserId.value + '}';
    socket = new WebSocket(url);
    socket.onopen = onopen;
    socket.onmessage = onmessage;
    socket.onclose = onclose;
    socket.onerror = onerror;
//重连
var reconnection = function() {
    //与服务器已经建立连接
    if (socket && socket.readyState == 1) {
        clearTimeout(t);
    } else {
        //已经关闭了与服务器的连接
        if (socket.readyState == 3) {
            connection();
        //0正尝试与服务器建立连接,2正在关闭与服务器的连接
        t = setTimeout(function() {
            reconnection();
            }, 1000);
var onopen = function() {
    console.log("onopen()");
var onclose = function() {
    console.log("onclose()");
    reconnection();
var onmessage = function(e) {
    console.log("onmessage()");
    if (e.data === "") return;
    var toUserId = document.getElementById("toUserId").value;
    document.getElementById("content").innerHTML = document.getElementById("content").innerHTML + '<br/>' + toUserId + "说:" + e.data;
    console.log(toUserId + "说:" + e.data);
var onerror = function() {
    console.log("error...");
    reconnection();
</script>
</body>
</html>

3、使用消息代理注解@EnableWebSocketMessageBroker

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-websocket</artifactId>
</dependency>
@Configuration
@EnableWebSecurity
public class WebSecurityConfig extends WebSecurityConfigurerAdapter {
    @Override
    public void configure(WebSecurity web) {
        //设置不拦截规则
        web.ignoring().antMatchers("/**");

websocket配置

@Configuration
@EnableWebSocket // 启用WebSocket
@EnableWebSocketMessageBroker // 启用消息代理
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    public static final String END_POINT = "/websocket-end-point";
    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        //允许使用socketJs方式访问,访问点为websocket,允许跨域
        registry.addEndpoint(END_POINT).setAllowedOrigins("*").withSockJS().setHeartbeatTime(5000)
                .setDisconnectDelay(3000).setStreamBytesLimit(512);

借助消息队列给前端发送

@Autowired
SimpMessagingTemplate messagingTemplate;
public static final String WEBSOCKET_FANOUT_EXCHANGE = "websocket-fanout-exchange";
public static final String QUEUE_SEND_WEBSOCKET_MESSAGE = "send_websocket_message";
// 广播交换器
@Bean
public FanoutExchange websocketFanoutExchange() {
    return new FanoutExchange(RabbitMQConstants.WEBSOCKET_FANOUT_EXCHANGE);
@Bean
public Queue sendWebsocketMessageQueue() {
    return new Queue(RabbitMQConstants.QUEUE_SEND_WEBSOCKET_MESSAGE);
@Bean
public Binding websocketMessageQueueBinding() {
    return BindingBuilder.bind(sendWebsocketMessageQueue()).to(websocketFanoutExchange());
@RabbitListener(queues = RabbitMQConstants.QUEUE_SEND_WEBSOCKET_MESSAGE)
public void handleSendWebsocketMessage(Map<String, Object> param) {
    messagingTemplate.convertAndSend("/subscribe/websocket/progress/", param);// 前端订阅地址
let sockJS = new SockJS('/websocket-end-point/');
let stompClient = Stomp.over(sockJS)
stompClient.connect({}, function () {
    stompClient.subscribe('/subscribe/websocket/progress/', function (response) {
        // 处理
sockJS.onclose = function () {