一个通过websocket通信的go语言实现的即时通讯demo


写在前面

今天学习一个小册子其中的一篇,并且跟着写demo。本文大部分内容是从小册子分布式IM原理与实战复制过来的,主要用于记录一下学习到的知识。如有侵权请联系我删除。感谢作者逆水

项目结构

├── client
│   ├── main.go // 客户端,监听,读取消息,发送消息
│   └── option.go // 客户端配置,ip+port,用户名
├── client_cmd
│   └── main.go // 启动客户端
├── go.mod
├── go.sum
├── main.go // 启动服务端
└── serv
    ├── main.go // 服务端,监听,读取消息,转发消息
    └── option.go // 服务端配置,ip+port,id

Server

监听、读取消息、转发消息

启用服务监听请求,因为websocket协议是在http基础上升级而来,因此我们只需要启用一个http服务就可以处理websocket连接请求

package serv

import (
   "context"
   "fmt"
   "github.com/pkg/errors"
   "net"
   "net/http"
   "sync"

   "github.com/gobwas/ws"
   "github.com/sirupsen/logrus"
)

// Server is a websocket Server
type Server struct {
   once    sync.Once
   id      string
   address string
   sync.Mutex
   // 会话列表
   users map[string]net.Conn
}

// NewServer NewServer
func NewServer(opts ...OptionFunc) *Server {
   return newServer(opts...)
}

func newServer(opts ...OptionFunc) *Server {
   option := &ServerStartOptions{}
   for i := range opts {
      opts[i](option)
   }

   return &Server{
      id:      option.id,
      address: option.listen,
      users:   make(map[string]net.Conn, 100),
   }
}

// Start server
func (s *Server) Start() error {
   mux := http.NewServeMux()
   log := logrus.WithFields(logrus.Fields{
      "module": "Server",
      "listen": s.address,
      "id":     s.id,
   })

   mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
      // step1. 升级
      conn, _, _, err := ws.UpgradeHTTP(r, w)
      if err != nil {
         conn.Close()
         return
      }
      //step2. 读取userId
      user := r.URL.Query().Get("user")
      if user == "" {
         conn.Close()
         return
      }
      //step3. 添加到会话管理中
      old, ok := s.addUser(user, conn)
      if ok {
         // 如果之前已经登录了,则断开旧的连接
         old.Close()
      }
      log.Infof("user %s in", user)

      go func(user string, conn net.Conn) {
         //step4. 读取消息
         err := s.readloop(user, conn)
         if err != nil {
            log.Error(err)
         }
         conn.Close()
         //step5. 连接断开,删除用户
         s.delUser(user)

         log.Infof("connection of %s closed", user)
      }(user, conn)

   })
   log.Infoln("started")
   // 监听
   return http.ListenAndServe(s.address, mux)
}

// 添加用户
func (s *Server) addUser(user string, conn net.Conn) (net.Conn, bool) {
   s.Lock()
   defer s.Unlock()
   old, ok := s.users[user] //返回旧的连接
   s.users[user] = conn     //缓存
   return old, ok
}

// 删除用户
func (s *Server) delUser(user string) {
   s.Lock()
   defer s.Unlock()
   delete(s.users, user)
}

// Shutdown Shutdown
func (s *Server) Shutdown() {
   s.once.Do(func() {
      s.Lock()
      defer s.Unlock()
      for _, conn := range s.users {
         conn.Close()
      }
   })
}

// 循环读取conn连接上发送过来的消息
func (s *Server) readloop(user string, conn net.Conn) error {
   for {
      frame, err := ws.ReadFrame(conn)
      if err != nil {
         return err
      }
      if frame.Header.OpCode == ws.OpClose {
         return errors.New("remote side close the conn")
      }

      if frame.Header.Masked {
         ws.Cipher(frame.Payload, frame.Header.Mask, 0)
      }
      // 接收文本帧内容
      if frame.Header.OpCode == ws.OpText {
         // 用goroutine将消息异步发送到每一个连接者
         go s.handle(user, string(frame.Payload))
      }
   }
}

// 广播消息
func (s *Server) handle(user string, message string) {
   logrus.Infof("recv message %s from %s", message, user)
   s.Lock()
   defer s.Unlock()
   broadcast := fmt.Sprintf("%s -- FROM %s", message, user)
   for u, conn := range s.users {
      if u == user { // 不发给自己
         continue
      }
      logrus.Infof("send to %s : %s", u, broadcast)
      err := s.writeText(conn, broadcast)
      if err != nil {
         logrus.Errorf("write to %s failed, error: %v", user, err)
      }
   }
}

func (s *Server) writeText(conn net.Conn, message string) error {
   // 创建文本帧数据
   f := ws.NewTextFrame([]byte(message))
   return ws.WriteFrame(conn, f)
}

// 启动服务
func RunServerStart(ctx context.Context, version string, opts ...OptionFunc) error {
   server := NewServer(opts...)
   defer server.Shutdown()
   return server.Start()
}

上面的核心部分就是handleFunc中的握手升级过程,它实现了一个连接的生命周期管理,我们来看这个连接的状态图


服务端配置文件

package serv

type ServerStartOptions struct {
   id     string
   listen string
}

type OptionFunc func(s *ServerStartOptions)

func WithId(id string) OptionFunc {
   return func(s *ServerStartOptions) {
      s.id = id
   }
}

func WithListen(listen string) OptionFunc {
   return func(s *ServerStartOptions) {
      s.listen = listen
   }
}

启动服务端go run main.go

func main() {
   if err := serv.RunServerStart(context.Background(), "v1",
      serv.WithId("1"),
      serv.WithListen("0.0.0.0:6789")); err != nil {
      panic(err)
   }
   time.Sleep(time.Hour)
   fmt.Println("main end")
}

Client

连接、读取消息、发送消息

package client

import (
   "context"
   "fmt"
   "github.com/gobwas/ws"
   "github.com/gobwas/ws/wsutil"
   "github.com/pkg/errors"
   "github.com/sirupsen/logrus"
   "net"
   "net/url"
   "time"
)

type handler struct {
   conn  net.Conn
   close chan struct{}
   recv  chan []byte
}

func (h *handler) readloop(conn net.Conn) error {
   logrus.Info("readloop started")
   for {
      // 循环监听服务端发送的消息
      frame, err := ws.ReadFrame(conn)
      if err != nil {
         return err
      }
      if frame.Header.OpCode == ws.OpClose {
         return errors.New("remote side close the channel")
      }
      if frame.Header.OpCode == ws.OpText {
         h.recv <- frame.Payload
      }
   }
}

func connect(addr string) (*handler, error) {
   _, err := url.Parse(addr)
   if err != nil {
      return nil, err
   }

   conn, _, _, err := ws.Dial(context.Background(), addr)
   if err != nil {
      return nil, err
   }

   h := handler{
      conn:  conn,
      close: make(chan struct{}, 1),
      recv:  make(chan []byte, 10),
   }

   // 启动一个goroutine监听消息
   go func() {
      err := h.readloop(conn)
      if err != nil {
         logrus.Warn(err)
      }
      // 通知上层
      h.close <- struct{}{}
   }()

   return &h, nil
}

func Run(ctx context.Context, opts ...OptionFunc) error {
   option := &ClientOption{}
   for i := range opts {
      opts[i](option)
   }
   url := fmt.Sprintf("%s?user=%s", option.address, option.user)
   logrus.Info("connect to ", url)
   //连接到服务,并返回hander对象
   h, err := connect(url)
   if err != nil {
      return err
   }
   go func() {
      // 读取消息并显示
      for msg := range h.recv {
         logrus.Info("Receive message:", string(msg))
      }
   }()

   tk := time.NewTicker(time.Second * 6)
   for {
      select {
      case <-tk.C:
         //每6秒发送一个消息
         err := h.sendText("hello")
         if err != nil {
            logrus.Error(err)
         }
      case <-h.close:
         logrus.Printf("connection closed")
         return nil
      }
   }
}

func (h *handler) sendText(msg string) error {
   logrus.Info("send message :", msg)
   return wsutil.WriteClientText(h.conn, []byte(msg))
}

客户端配置

package client

type ClientOption struct {
   address string
   user    string
}

type OptionFunc func(s *ClientOption)

func WithAddress(address string) OptionFunc {
   return func(o *ClientOption) {
      o.address = address
   }
}

func WithUser(user string) OptionFunc {
   return func(o *ClientOption) {
      o.user = user
   }
}

启动客户端go run main.go

package main

import (
   "7d-im/client"
   "context"
)

func main() {
   if err := client.Run(context.Background(),
      client.WithAddress("ws://0.0.0.0:6789"),
      client.WithUser("张三")); err != nil {
      panic(err)
   }
}

运行结果

总结

本文实现了即时通讯中最基本的几个环节,包括服务端的监听、接收消息、广播消息,客户端的拨号、接收消息、发送消息。

小册子作者内容丰富,从最底层的ip,tcp协议讲起,以及项目架构、性能、部署等多方面介绍,值得一读。

参考

[1]从一个demo开始

[2][7d-im]feat/day1


文章作者: Alex
版权声明: 本博客所有文章除特別声明外,均采用 CC BY 4.0 许可协议。转载请注明来源 Alex !
  目录