我们在设计产品的时候通常都会遇到消息通知的时候,比如用户下单支付成功,比如用户有站内信来可以实时通知。而http是单向的,客户端请求,服务端返回,这次请求就已经结束。而websocket可以保持连接实现长连接,在遇到通知时往往使用websocket来达到服务端主动向客户端发送消息的目的。
我们的目标是实现服务端主动向某个用户发消息。所以要做到一下四步。
建立连接(保持连接)
断开连接(删除连接)
维护连接(心跳检测)
我们这里使用到 github.com/gorilla/websocket 包。
首先是定义一个客户端连接的结构,先有了连接的结构才能保存连接,ID是一个客户端连接的id,而Socket是真正的客户端连接
// 客户端连接信息
type Client struct {
ID string // 连接ID
AccountId string // 账号id, 一个账号可能有多个连接
Socket *websocket.Conn // 连接
HeartbeatTime int64 // 前一次心跳时间
然后定义一个客户端管理,来管理所有的客户端连接,并且实例化为一个全局的变量。
// 消息类型
const (
MessageTypeHeartbeat = "heartbeat" // 心跳
MessageTypeRegister = "register" // 注册
HeartbeatCheckTime = 9 // 心跳检测几秒检测一次
HeartbeatTime = 20 // 心跳距离上一次的最大时间
ChanBufferRegister = 100 // 注册chan缓冲
ChanBufferUnregister = 100 // 注销chan大小
// 客户端管理
type ClientManager struct {
Clients map[string]*Client // 保存连接
Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接
mu *sync.Mutex
// 定义一个管理Manager
var Manager = ClientManager{
Clients: make(map[string]*Client), // 参与连接的用户,出于性能的考虑,需要设置最大连接数
Accounts: make(map[string][]string), // 账号和连接关系
mu: new(sync.Mutex),
var (
RegisterChan = make(chan *Client, ChanBufferRegister) // 注册
unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
这里还要封装一下服务器给客户端发消息的格式,以便客户端连接成功后服务端给客户端回复消息
// 封装回复消息
type ServiceMessage struct {
Type string `json:"type"` // 类型
Content ServiceMessageContent `json:"content"`
type ServiceMessageContent struct {
Body string `json:"body"` // 主要数据
MetaData string `json:"meta_data"` // 扩展数据
func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
replyMsg := ServiceMessage{
Type: t,
Content: content,
msg, _ := json.Marshal(replyMsg)
return msg
建立连接和断开连接
连接保持在Manager里的Clients,和Accounts。Clients用于保存每个与客户端通信的连接。而Account保持者连接id与连接分类(Category)的绑定关系。
// 注册注销
func register() {
for {
select {
case conn := <-RegisterChan: // 新注册,新连接
// 加入连接,进行管理
accountBind(conn)
// 回复消息
content := CreateReplyMsg(MessageTypeRegister, ServiceMessageContent{})
_ = conn.Socket.WriteMessage(websocket.TextMessage, content)
case conn := <-unregisterChan: // 注销,或者没有心跳
// 关闭连接
_ = conn.Socket.Close()
// 删除Client
unAccountBind(conn)
// 绑定账号
func accountBind(c *Client) {
Manager.mu.Lock()
defer Manager.mu.Unlock()
// 加入到连接
Manager.Clients[c.ID] = c
// 加入到绑定
if _, ok := Manager.Accounts[c.AccountId]; ok { // 该账号已经有绑定,就追加一个绑定
Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId], c.ID)
} else { // 没有就新增一个账号的绑定切片
Manager.Accounts[c.AccountId] = []string{c.ID}
// 解绑账号
func unAccountBind(c *Client) {
Manager.mu.Lock()
defer Manager.mu.Unlock()
// 取消连接
delete(Manager.Clients, c.ID)
// 取消绑定
if len(Manager.Accounts[c.AccountId]) > 0 {
for k, clientId := range Manager.Accounts[c.AccountId] {
if clientId == c.ID { // 找到绑定客户端Id
Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId][:k], Manager.Accounts[c.AccountId][k+1:]...)
维护连接(心跳检测)
每隔一段时间,就检测一次心跳,如果上次心跳时间超过了HeartbeatTime时间视为已经断开连接。
// 维持心跳
func heartbeat() {
for {
// 获取所有的Clients
Manager.mu.Lock()
clients := make([]*Client, len(Manager.Clients))
for _, c := range Manager.Clients {
clients = append(clients, c)
Manager.mu.Unlock()
for _, c := range clients {
if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
unAccountBind(c)
time.Sleep(time.Second * HeartbeatCheckTime)
// 管理连接
func Start() {
// 检查心跳
go func() {
defer func() {
if r := recover(); r != nil {
log.Println(r)
heartbeat()
// 注册注销
go func() {
defer func() {
if r := recover(); r != nil {
log.Println(r)
register()
根据账号获取连接
// 根据账号获取连接
func GetClient (accountId string) []*Client{
clients := make([]*Client,0)
Manager.mu.Lock()
defer Manager.mu.Unlock()
if len(Manager.Accounts[accountId]) > 0 {
for _,clientId := range Manager.Accounts[accountId] {
if c,ok := Manager.Clients[clientId]; ok {
clients = append(clients,c)
return clients
读取客户端的消息
我们这是只是心跳用到了,所以只要判断客户端是心跳消息,然后回复即可。
// 读取信息,即收到消息
func (c *Client) Read() {
defer func() {
_ = c.Socket.Close()
for {
// 读取消息
_, body, err := c.Socket.ReadMessage()
if err != nil {
break
var msg struct {
Type string `json:"type"`
err = json.Unmarshal(body, &msg)
if err != nil {
log.Println(err)
continue
if msg.Type == MessageTypeHeartbeat { // 维持心跳消息
// 刷新连接时间
c.HeartbeatTime = time.Now().Unix()
// 回复心跳
replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
if err != nil {
log.Println(err)
continue
对客户端发送消息
只要找到连接,对连接发送消息即可。
// 发送消息
func Send(accounts []string,message ServiceMessage) error{
msg,err := json.Marshal(message)
if err != nil {
return err
for _,accountId := range accounts{
// 获取连接id
clients := GetClient(accountId)
// 发送消息
for _,c := range clients {
_ = c.Socket.WriteMessage(websocket.TextMessage, msg)
return nil
这里对http请求升级为websocket,然后单独建立一个goroutine去维持连接。下面类似这样调用,但是鉴权,日志等很多细节不完善,只是提供一个思路。
package wesocket
import (
websocket2 "demo/websocket"
"fmt"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/rs/xid"
"log"
"net/http"
"time"
type MessageNotifyRequest struct {
UserId string `form:"user_id"`
func MessageNotify(ctx *gin.Context) {
// 获取参数
var params MessageNotifyRequest
if err := ctx.ShouldBindQuery(¶ms); err != nil {
log.Println(err)
return
// TODO: 鉴权
// 将http升级为websocket
conn, err := (&websocket.Upgrader{
// 1. 解决跨域问题
CheckOrigin: func(r *http.Request) bool {
return true
}).Upgrade(ctx.Writer, ctx.Request, nil) // 升级
if err != nil {
log.Println(err)
http.NotFound(ctx.Writer, ctx.Request)
return
// 创建一个实例连接
ConnId := xid.New().String()
client := &websocket2.Client{
ID: ConnId, // 连接id
AccountId: fmt.Sprintf("%s", params.UserId),
HeartbeatTime: time.Now().Unix(),
Socket: conn,
// 用户注册到用户连接管理
websocket2.RegisterChan <- client
// 读取信息
go func() {
defer func() {
if r := recover(); r != nil {
log.Printf("MessageNotify read panic: %+v\n",r)
client.Read()
用websocket做消息通知,对于后端来说,主要是绑定连接和管理连接,绑定连接就是用户id和websocket连接建立一个绑定关系,而管理连接就是存储连接,删除连接,维护连接的健康(心跳检测),其次就是定义服务端接收和发送数据的格式。总体大概就是这样一个思路。