写在前面
今天学习一个小册子其中的一篇,并且跟着写demo。本文大部分内容是从小册子分布式IM原理与实战复制过来的,主要用于记录一下学习到的知识。如有侵权请联系我删除。感谢作者逆水。
项目结构
├── client
│ ├── main.go // 客户端,监听,读取消息,发送消息
│ └── option.go // 客户端配置,ip+port,用户名
├── client_cmd
│ └── main.go // 启动客户端
├── go.mod
├── go.sum
├── main.go // 启动服务端
└── serv
├── main.go // 服务端,监听,读取消息,转发消息
└── option.go // 服务端配置,ip+port,id
Server
监听、读取消息、转发消息
启用服务监听请求,因为websocket协议是在http基础上升级而来,因此我们只需要启用一个http服务就可以处理websocket连接请求
package serv
import (
"context"
"fmt"
"github.com/pkg/errors"
"net"
"net/http"
"sync"
"github.com/gobwas/ws"
"github.com/sirupsen/logrus"
)
// Server is a websocket Server
type Server struct {
once sync.Once
id string
address string
sync.Mutex
// 会话列表
users map[string]net.Conn
}
// NewServer NewServer
func NewServer(opts ...OptionFunc) *Server {
return newServer(opts...)
}
func newServer(opts ...OptionFunc) *Server {
option := &ServerStartOptions{}
for i := range opts {
opts[i](option)
}
return &Server{
id: option.id,
address: option.listen,
users: make(map[string]net.Conn, 100),
}
}
// Start server
func (s *Server) Start() error {
mux := http.NewServeMux()
log := logrus.WithFields(logrus.Fields{
"module": "Server",
"listen": s.address,
"id": s.id,
})
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
// step1. 升级
conn, _, _, err := ws.UpgradeHTTP(r, w)
if err != nil {
conn.Close()
return
}
//step2. 读取userId
user := r.URL.Query().Get("user")
if user == "" {
conn.Close()
return
}
//step3. 添加到会话管理中
old, ok := s.addUser(user, conn)
if ok {
// 如果之前已经登录了,则断开旧的连接
old.Close()
}
log.Infof("user %s in", user)
go func(user string, conn net.Conn) {
//step4. 读取消息
err := s.readloop(user, conn)
if err != nil {
log.Error(err)
}
conn.Close()
//step5. 连接断开,删除用户
s.delUser(user)
log.Infof("connection of %s closed", user)
}(user, conn)
})
log.Infoln("started")
// 监听
return http.ListenAndServe(s.address, mux)
}
// 添加用户
func (s *Server) addUser(user string, conn net.Conn) (net.Conn, bool) {
s.Lock()
defer s.Unlock()
old, ok := s.users[user] //返回旧的连接
s.users[user] = conn //缓存
return old, ok
}
// 删除用户
func (s *Server) delUser(user string) {
s.Lock()
defer s.Unlock()
delete(s.users, user)
}
// Shutdown Shutdown
func (s *Server) Shutdown() {
s.once.Do(func() {
s.Lock()
defer s.Unlock()
for _, conn := range s.users {
conn.Close()
}
})
}
// 循环读取conn连接上发送过来的消息
func (s *Server) readloop(user string, conn net.Conn) error {
for {
frame, err := ws.ReadFrame(conn)
if err != nil {
return err
}
if frame.Header.OpCode == ws.OpClose {
return errors.New("remote side close the conn")
}
if frame.Header.Masked {
ws.Cipher(frame.Payload, frame.Header.Mask, 0)
}
// 接收文本帧内容
if frame.Header.OpCode == ws.OpText {
// 用goroutine将消息异步发送到每一个连接者
go s.handle(user, string(frame.Payload))
}
}
}
// 广播消息
func (s *Server) handle(user string, message string) {
logrus.Infof("recv message %s from %s", message, user)
s.Lock()
defer s.Unlock()
broadcast := fmt.Sprintf("%s -- FROM %s", message, user)
for u, conn := range s.users {
if u == user { // 不发给自己
continue
}
logrus.Infof("send to %s : %s", u, broadcast)
err := s.writeText(conn, broadcast)
if err != nil {
logrus.Errorf("write to %s failed, error: %v", user, err)
}
}
}
func (s *Server) writeText(conn net.Conn, message string) error {
// 创建文本帧数据
f := ws.NewTextFrame([]byte(message))
return ws.WriteFrame(conn, f)
}
// 启动服务
func RunServerStart(ctx context.Context, version string, opts ...OptionFunc) error {
server := NewServer(opts...)
defer server.Shutdown()
return server.Start()
}
上面的核心部分就是handleFunc中的握手升级过程,它实现了一个连接的生命周期管理,我们来看这个连接的状态图
服务端配置文件
package serv
type ServerStartOptions struct {
id string
listen string
}
type OptionFunc func(s *ServerStartOptions)
func WithId(id string) OptionFunc {
return func(s *ServerStartOptions) {
s.id = id
}
}
func WithListen(listen string) OptionFunc {
return func(s *ServerStartOptions) {
s.listen = listen
}
}
启动服务端go run main.go
func main() {
if err := serv.RunServerStart(context.Background(), "v1",
serv.WithId("1"),
serv.WithListen("0.0.0.0:6789")); err != nil {
panic(err)
}
time.Sleep(time.Hour)
fmt.Println("main end")
}
Client
连接、读取消息、发送消息
package client
import (
"context"
"fmt"
"github.com/gobwas/ws"
"github.com/gobwas/ws/wsutil"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
"net"
"net/url"
"time"
)
type handler struct {
conn net.Conn
close chan struct{}
recv chan []byte
}
func (h *handler) readloop(conn net.Conn) error {
logrus.Info("readloop started")
for {
// 循环监听服务端发送的消息
frame, err := ws.ReadFrame(conn)
if err != nil {
return err
}
if frame.Header.OpCode == ws.OpClose {
return errors.New("remote side close the channel")
}
if frame.Header.OpCode == ws.OpText {
h.recv <- frame.Payload
}
}
}
func connect(addr string) (*handler, error) {
_, err := url.Parse(addr)
if err != nil {
return nil, err
}
conn, _, _, err := ws.Dial(context.Background(), addr)
if err != nil {
return nil, err
}
h := handler{
conn: conn,
close: make(chan struct{}, 1),
recv: make(chan []byte, 10),
}
// 启动一个goroutine监听消息
go func() {
err := h.readloop(conn)
if err != nil {
logrus.Warn(err)
}
// 通知上层
h.close <- struct{}{}
}()
return &h, nil
}
func Run(ctx context.Context, opts ...OptionFunc) error {
option := &ClientOption{}
for i := range opts {
opts[i](option)
}
url := fmt.Sprintf("%s?user=%s", option.address, option.user)
logrus.Info("connect to ", url)
//连接到服务,并返回hander对象
h, err := connect(url)
if err != nil {
return err
}
go func() {
// 读取消息并显示
for msg := range h.recv {
logrus.Info("Receive message:", string(msg))
}
}()
tk := time.NewTicker(time.Second * 6)
for {
select {
case <-tk.C:
//每6秒发送一个消息
err := h.sendText("hello")
if err != nil {
logrus.Error(err)
}
case <-h.close:
logrus.Printf("connection closed")
return nil
}
}
}
func (h *handler) sendText(msg string) error {
logrus.Info("send message :", msg)
return wsutil.WriteClientText(h.conn, []byte(msg))
}
客户端配置
package client
type ClientOption struct {
address string
user string
}
type OptionFunc func(s *ClientOption)
func WithAddress(address string) OptionFunc {
return func(o *ClientOption) {
o.address = address
}
}
func WithUser(user string) OptionFunc {
return func(o *ClientOption) {
o.user = user
}
}
启动客户端go run main.go
package main
import (
"7d-im/client"
"context"
)
func main() {
if err := client.Run(context.Background(),
client.WithAddress("ws://0.0.0.0:6789"),
client.WithUser("张三")); err != nil {
panic(err)
}
}
运行结果
总结
本文实现了即时通讯中最基本的几个环节,包括服务端的监听、接收消息、广播消息,客户端的拨号、接收消息、发送消息。
小册子作者内容丰富,从最底层的ip,tcp协议讲起,以及项目架构、性能、部署等多方面介绍,值得一读。