我们在设计产品的时候通常都会遇到消息通知的时候,比如用户下单支付成功,比如用户有站内信来可以实时通知。而http是单向的,客户端请求,服务端返回,这次请求就已经结束。而websocket可以保持连接实现长连接,在遇到通知时往往使用websocket来达到服务端主动向客户端发送消息的目的。

我们的目标是实现服务端主动向某个用户发消息。所以要做到一下四步。

  • 建立连接(保持连接)
  • 断开连接(删除连接)
  • 维护连接(心跳检测)
  • 我们这里使用到 github.com/gorilla/websocket 包。

    首先是定义一个客户端连接的结构,先有了连接的结构才能保存连接,ID是一个客户端连接的id,而Socket是真正的客户端连接

    // 客户端连接信息
    type Client struct {
    	ID            string          // 连接ID
    	AccountId     string          // 账号id, 一个账号可能有多个连接
    	Socket        *websocket.Conn // 连接
    	HeartbeatTime int64           // 前一次心跳时间
    

    然后定义一个客户端管理,来管理所有的客户端连接,并且实例化为一个全局的变量。

    // 消息类型
    const (
    	MessageTypeHeartbeat = "heartbeat" // 心跳
    	MessageTypeRegister  = "register"  // 注册
    	HeartbeatCheckTime = 9  // 心跳检测几秒检测一次
    	HeartbeatTime      = 20 // 心跳距离上一次的最大时间
    	ChanBufferRegister = 100 // 注册chan缓冲
    	ChanBufferUnregister = 100 // 注销chan大小
    // 客户端管理
    type ClientManager struct {
    	Clients  map[string]*Client  // 保存连接
    	Accounts map[string][]string // 账号和连接关系,map的key是账号id即:AccountId,这里主要考虑到一个账号多个连接
    	mu       *sync.Mutex
    // 定义一个管理Manager
    var Manager = ClientManager{
    	Clients:  make(map[string]*Client),  // 参与连接的用户,出于性能的考虑,需要设置最大连接数
    	Accounts: make(map[string][]string), // 账号和连接关系
    	mu:       new(sync.Mutex),
    var (
    	RegisterChan   = make(chan *Client, ChanBufferRegister) // 注册
    	unregisterChan = make(chan *Client, ChanBufferUnregister) // 注销
    

    这里还要封装一下服务器给客户端发消息的格式,以便客户端连接成功后服务端给客户端回复消息

    // 封装回复消息
    type ServiceMessage struct {
        Type    string                `json:"type"` // 类型
        Content ServiceMessageContent `json:"content"`
    type ServiceMessageContent struct {
        Body     string `json:"body"`      // 主要数据
        MetaData string `json:"meta_data"` // 扩展数据
    func CreateReplyMsg(t string, content ServiceMessageContent) []byte {
        replyMsg := ServiceMessage{
            Type:    t,
            Content: content,
        msg, _ := json.Marshal(replyMsg)
        return msg
    

    建立连接和断开连接

    连接保持在Manager里的Clients,和Accounts。Clients用于保存每个与客户端通信的连接。而Account保持者连接id与连接分类(Category)的绑定关系。

    // 注册注销
    func register() {
    	for {
    		select {
    		case conn := <-RegisterChan: // 新注册,新连接
    			// 加入连接,进行管理
    			accountBind(conn)
    			// 回复消息
    			content := CreateReplyMsg(MessageTypeRegister, ServiceMessageContent{})
    			_ = conn.Socket.WriteMessage(websocket.TextMessage, content)
    		case conn := <-unregisterChan: // 注销,或者没有心跳
    			// 关闭连接
    			_ = conn.Socket.Close()
    			// 删除Client
    			unAccountBind(conn)
    // 绑定账号
    func accountBind(c *Client) {
    	Manager.mu.Lock()
    	defer Manager.mu.Unlock()
    	// 加入到连接
    	Manager.Clients[c.ID] = c
    	// 加入到绑定
    	if _, ok := Manager.Accounts[c.AccountId]; ok { // 该账号已经有绑定,就追加一个绑定
    		Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId], c.ID)
    	} else { // 没有就新增一个账号的绑定切片
    		Manager.Accounts[c.AccountId] = []string{c.ID}
    // 解绑账号
    func unAccountBind(c *Client) {
    	Manager.mu.Lock()
    	defer Manager.mu.Unlock()
    	// 取消连接
    	delete(Manager.Clients, c.ID)
    	// 取消绑定
    	if len(Manager.Accounts[c.AccountId]) > 0 {
    		for k, clientId := range Manager.Accounts[c.AccountId] {
    			if clientId == c.ID { // 找到绑定客户端Id
    				Manager.Accounts[c.AccountId] = append(Manager.Accounts[c.AccountId][:k], Manager.Accounts[c.AccountId][k+1:]...)
    

    维护连接(心跳检测)

    每隔一段时间,就检测一次心跳,如果上次心跳时间超过了HeartbeatTime时间视为已经断开连接。

    // 维持心跳
    func heartbeat() {
    	for {
    		// 获取所有的Clients
    		Manager.mu.Lock()
    		clients := make([]*Client, len(Manager.Clients))
    		for _, c := range Manager.Clients {
    			clients = append(clients, c)
    		Manager.mu.Unlock()
    		for _, c := range clients {
    			if time.Now().Unix()-c.HeartbeatTime > HeartbeatTime {
    				unAccountBind(c)
    		time.Sleep(time.Second * HeartbeatCheckTime)
    
    // 管理连接
    func Start() {
    	// 检查心跳
    	go func() {
    		defer func() {
    			if r := recover(); r != nil {
    				log.Println(r)
    		heartbeat()
    	// 注册注销
    	go func() {
    		defer func() {
    			if r := recover(); r != nil {
    				log.Println(r)
    		register()
    

    根据账号获取连接

    // 根据账号获取连接
    func GetClient (accountId string) []*Client{
    	clients := make([]*Client,0)
    	Manager.mu.Lock()
    	defer Manager.mu.Unlock()
    	if len(Manager.Accounts[accountId]) > 0 {
    		for _,clientId := range Manager.Accounts[accountId] {
    			if c,ok := Manager.Clients[clientId]; ok {
    				clients = append(clients,c)
    	return clients
    

    读取客户端的消息

    我们这是只是心跳用到了,所以只要判断客户端是心跳消息,然后回复即可。

    // 读取信息,即收到消息
    func (c *Client) Read() {
        defer func() {
            _ = c.Socket.Close()
       for {
           // 读取消息
           _, body, err := c.Socket.ReadMessage()
           if err != nil {
               break
           var msg struct {
               Type string `json:"type"`
           err = json.Unmarshal(body, &msg)
           if err != nil {
               log.Println(err)
               continue
           if msg.Type == MessageTypeHeartbeat { // 维持心跳消息
               // 刷新连接时间
               c.HeartbeatTime = time.Now().Unix()
               // 回复心跳
               replyMsg := CreateReplyMsg(MessageTypeHeartbeat, ServiceMessageContent{})
               err = c.Socket.WriteMessage(websocket.TextMessage, replyMsg)
               if err != nil {
                   log.Println(err)
               continue
    

    对客户端发送消息

    只要找到连接,对连接发送消息即可。

    // 发送消息
    func Send(accounts []string,message ServiceMessage) error{
        msg,err := json.Marshal(message)
        if err != nil {
            return err
        for _,accountId := range accounts{
            // 获取连接id
            clients := GetClient(accountId)
            // 发送消息
            for _,c := range clients {
                _ = c.Socket.WriteMessage(websocket.TextMessage, msg)
        return nil
    

    这里对http请求升级为websocket,然后单独建立一个goroutine去维持连接。下面类似这样调用,但是鉴权,日志等很多细节不完善,只是提供一个思路。

    package wesocket
    import (
        websocket2 "demo/websocket"
        "fmt"
        "github.com/gin-gonic/gin"
        "github.com/gorilla/websocket"
        "github.com/rs/xid"
        "log"
        "net/http"
        "time"
    type MessageNotifyRequest struct {
        UserId    string `form:"user_id"`
    func MessageNotify(ctx *gin.Context) {
        // 获取参数
        var params MessageNotifyRequest
        if err := ctx.ShouldBindQuery(&params); err != nil {
            log.Println(err)
            return
        // TODO: 鉴权
        // 将http升级为websocket
        conn, err := (&websocket.Upgrader{
            // 1. 解决跨域问题
            CheckOrigin: func(r *http.Request) bool {
                return true
        }).Upgrade(ctx.Writer, ctx.Request, nil) // 升级
        if err != nil {
            log.Println(err)
            http.NotFound(ctx.Writer, ctx.Request)
            return
        // 创建一个实例连接
        ConnId := xid.New().String()
        client := &websocket2.Client{
            ID:            ConnId, // 连接id
            AccountId:      fmt.Sprintf("%s", params.UserId),
            HeartbeatTime: time.Now().Unix(),
            Socket:        conn,
        // 用户注册到用户连接管理
        websocket2.RegisterChan <- client
        // 读取信息
        go func() {
            defer func() {
                if r := recover(); r != nil {
                    log.Printf("MessageNotify read panic: %+v\n",r)
            client.Read()
    

    用websocket做消息通知,对于后端来说,主要是绑定连接管理连接,绑定连接就是用户id和websocket连接建立一个绑定关系,而管理连接就是存储连接,删除连接,维护连接的健康(心跳检测),其次就是定义服务端接收和发送数据的格式。总体大概就是这样一个思路。