环境
java 11
gradle
spring boot 2.1.1
RabbitMQ 服务
需要
STOMP
插件安装
如果你需要定时消息,需要安装
rabbitmq_delayed_message_exchange
依赖包安装:
implementation ('org.springframework.boot:spring-boot-starter-thymeleaf')
implementation('org.springframework.boot:spring-boot-starter-security') // 安全模块
implementation('org.springframework.security:spring-security-messaging')
implementation('org.springframework.boot:spring-boot-starter-websocket') // websocket
implementation('org.springframework.session:spring-session-data-redis')// spring session data redis 已支持认证多项目共享
implementation('io.lettuce:lettuce-core')
代理设置类SessionSocketsStompConfig
@Configuration
@EnableWebSocketMessageBroker
public class SessionSocketsStompConfig extends AbstractSessionWebSocketMessageBrokerConfigurer<Session> {
* 设置一个SockJS端点链接
* /socket
* setAllowedOrigins 跨域设置
* @param registry 端点注册器
public void configureStompEndpoints(StompEndpointRegistry registry) {
registry.addEndpoint("/socket").setAllowedOrigins("*").withSockJS();
* 消息规则,可以理解成消息 订阅地址前缀的
* 总要 (red) registry.setPathMatcher(new AntPathMatcher("."));
* 因为 rabbitmq是不支持URL "/" 分隔符,所以我们要自定义它的分隔符为 ".",如果不定义就会报错.
* 比如订阅 /amq/queue/delayed.trade 过期订单
* .setUserDestinationBroadcast("/topic/unresolved.user.destination")
* .setUserRegistryBroadcast("/topic/registry.broadcast")
* 多项目设置,比如A 端点给 B端点用户发送,需要设置的
* <a>https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#websocket-stomp-enable</a>
* @param registry 端点设置
@Override
public void configureMessageBroker(MessageBrokerRegistry registry) {
registry.setPathMatcher(new AntPathMatcher("."));
registry.enableStompBrokerRelay("/topic", "/queue", "/exchange", "/amq/queue")
// 其他服务访问桥连,一但这里没有找到可以去这里推送给其他用户
.setUserDestinationBroadcast("/topic/unresolved.user.destination")
.setUserRegistryBroadcast("/topic/registry.broadcast")
.setAutoStartup(true);
registry.setApplicationDestinationPrefixes("/app");
registry.setPreservePublishOrder(true);
AbstractSessionWebSocketMessageBrokerConfigurer启用Session的管理,它很简单就是管理用户认证后Session的生命周期.
Sockets安全配置类SocketSecurityConfig
public class SocketSecurityConfig extends AbstractSecurityWebSocketMessageBrokerConfigurer {
@Autowired
private FindByIndexNameSessionRepository<? extends Session> sessionRepository;
* 因为我们采用spring-session 头x-auth-token 认证
* 那么传统的web认证就无法,获取到消息认证信息了.
* 所以我们要从它的session获取认证信息手动增加到消息上.
* 这样消息就具有认证信息了.
* 我们可以参考spring
* https://docs.spring.io/spring/docs/current/spring-framework-reference/web.html#websocket-stomp-authentication-token-based
* @param registration 消息请求体
@Override
protected void customizeClientInboundChannel(ChannelRegistration registration) {
registration.interceptors(new ChannelInterceptor() {
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
// 获取消息头
StompHeaderAccessor accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor.class);
// 获取 x-auth-token session id
List<String> sessions = accessor != null ? accessor.getNativeHeader("x-auth-token") : null;
if (sessions == null) return message;
//根据 session id 获取它的认证信息
Session session = sessionRepository.findById(sessions.get(0));
if (session == null) return message;
//根据 session id 获取它的认证信息 SPRING_SECURITY_CONTEXT
SecurityContext securityContext = session.getAttribute("SPRING_SECURITY_CONTEXT");
if (securityContext == null) return message;
//根据 SPRING_SECURITY_CONTEXT 获取用户的认证信息 Authentication
Authentication user = securityContext.getAuthentication();
//判断用户是否已认证,一般获取到的是已认证的用户,也就是只要存在,用户肯定被认证了.
//但是 如果你设置了用户过期,没有设置清理session 用户还是存在的.这时候就必须去判断用户的认证状态,不然会报无权限错误
if (user.isAuthenticated()) {
// 重要这里 我们获取的认证信息,手动赋值给它.
accessor.setUser(user);
/*根据spring session sockets API 将x-auth-token 放入sockets 会话
让spring sockets 代理托管我们的session 会话生命周期.若果用户没有断开链接
session 是不会过期的具体查看 @SessionSocketsStompConfig 代码*/
Map<String, Object> attributes = accessor.getSessionAttributes();
if (attributes != null) {
attributes.put("SPRING.SESSION.ID", sessions.get(0));
accessor.setSessionAttributes(attributes);
} else {
throw new BadCredentialsException("Your authentication information has expired please login again.");
return message;
* sockets 权限消息
* @param messages 消息体
@Override
protected void configureInbound(MessageSecurityMetadataSourceRegistry messages) {
messages
.simpSubscribeDestMatchers("/user/queue/errors", "").permitAll()
.simpDestMatchers("/app/**").hasRole("USER")
.simpSubscribeDestMatchers("/user/**", "/queue/**").hasRole("USER");
* 关闭Sockets 的CSRF 保护
* @return true
@Override
protected boolean sameOriginDisabled() {
return true;
我没有写客户端实现,由于时间问题,暂时没有实现.
这是一个最基本的代理服务链接RabbitMQ的实现,它可以跨域,也集成了安全,重要的知识点都涵盖了.
关于定时这个消息是rabbitmq_delayed_message_exchange这个插件实现的,后面我会详细介绍.
对于不明白的可以加我QQ询问.
示例代码 github gitee