From 28898f5b793195a59f0024b5ed3362ed3c57e5c5 Mon Sep 17 00:00:00 2001 From: Monet Lee Date: Wed, 10 Jul 2024 15:48:12 +0800 Subject: [PATCH] feat: implement server-initiated heartbeat in msgGateway module (#2404) * feat: implement send ping msg when platform is web in gateway. * add context life cycle control. * feat: implement heartbeat logic in msggateway. * update heartbeat logic. * update to correct method name and comment. * update initiate heartbeat logic. * rename ws_server * update writePingMsg logic * update log level to warn. --- internal/msggateway/client.go | 53 +++++++++++++++++++ internal/msggateway/constant.go | 3 ++ internal/msggateway/long_conn.go | 3 +- .../{n_ws_server.go => ws_server.go} | 0 4 files changed, 58 insertions(+), 1 deletion(-) rename internal/msggateway/{n_ws_server.go => ws_server.go} (100%) diff --git a/internal/msggateway/client.go b/internal/msggateway/client.go index 0581a025b..1270eb978 100644 --- a/internal/msggateway/client.go +++ b/internal/msggateway/client.go @@ -20,6 +20,7 @@ import ( "runtime/debug" "sync" "sync/atomic" + "time" "github.com/openimsdk/open-im-server/v3/pkg/msgprocessor" "github.com/openimsdk/protocol/constant" @@ -72,6 +73,8 @@ type Client struct { closed atomic.Bool closedErr error token string + hbCtx context.Context + hbCancel context.CancelFunc } // ResetClient updates the client's state with new connection and context information. @@ -88,6 +91,7 @@ func (c *Client) ResetClient(ctx *UserConnContext, conn LongConn, longConnServer c.closed.Store(false) c.closedErr = nil c.token = ctx.GetToken() + c.hbCtx, c.hbCancel = context.WithCancel(c.ctx) } func (c *Client) pingHandler(_ string) error { @@ -98,6 +102,13 @@ func (c *Client) pingHandler(_ string) error { return c.writePongMsg() } +func (c *Client) pongHandler(_ string) error { + if err := c.conn.SetReadDeadline(pongWait); err != nil { + return err + } + return nil +} + // readMessage continuously reads messages from the connection. func (c *Client) readMessage() { defer func() { @@ -110,7 +121,9 @@ func (c *Client) readMessage() { c.conn.SetReadLimit(maxMessageSize) _ = c.conn.SetReadDeadline(pongWait) + c.conn.SetPongHandler(c.pongHandler) c.conn.SetPingHandler(c.pingHandler) + c.activeHeartbeat(c.hbCtx) for { log.ZDebug(c.ctx, "readMessage") @@ -147,6 +160,7 @@ func (c *Client) readMessage() { case CloseMessage: c.closedErr = ErrClientClosed return + default: } } @@ -235,6 +249,7 @@ func (c *Client) close() { c.closed.Store(true) c.conn.Close() + c.hbCancel() // Close server-initiated heartbeat. c.longConnServer.UnRegister(c) } @@ -321,6 +336,44 @@ func (c *Client) writeBinaryMsg(resp Resp) error { return c.conn.WriteMessage(MessageBinary, encodedBuf) } +// Actively initiate Heartbeat when platform in Web. +func (c *Client) activeHeartbeat(ctx context.Context) { + if c.PlatformID == constant.WebPlatformID { + go func() { + log.ZDebug(ctx, "server initiative send heartbeat start.") + ticker := time.NewTicker(pingPeriod) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + if err := c.writePingMsg(); err != nil { + log.ZWarn(c.ctx, "send Ping Message error.", err) + return + } + case <-c.hbCtx.Done(): + return + } + } + }() + } +} +func (c *Client) writePingMsg() error { + if c.closed.Load() { + return nil + } + + c.w.Lock() + defer c.w.Unlock() + + err := c.conn.SetWriteDeadline(writeWait) + if err != nil { + return err + } + + return c.conn.WriteMessage(PingMessage, nil) +} + func (c *Client) writePongMsg() error { if c.closed.Load() { return nil diff --git a/internal/msggateway/constant.go b/internal/msggateway/constant.go index 64664ac0a..125be1635 100644 --- a/internal/msggateway/constant.go +++ b/internal/msggateway/constant.go @@ -53,6 +53,9 @@ const ( // Time allowed to read the next pong message from the peer. pongWait = 30 * time.Second + // Send pings to peer with this period. Must be less than pongWait. + pingPeriod = (pongWait * 9) / 10 + // Maximum message size allowed from peer. maxMessageSize = 51200 ) diff --git a/internal/msggateway/long_conn.go b/internal/msggateway/long_conn.go index 7d5bef4c3..c1b3e27c9 100644 --- a/internal/msggateway/long_conn.go +++ b/internal/msggateway/long_conn.go @@ -16,10 +16,11 @@ package msggateway import ( "encoding/json" - "github.com/openimsdk/tools/apiresp" "net/http" "time" + "github.com/openimsdk/tools/apiresp" + "github.com/gorilla/websocket" "github.com/openimsdk/tools/errs" ) diff --git a/internal/msggateway/n_ws_server.go b/internal/msggateway/ws_server.go similarity index 100% rename from internal/msggateway/n_ws_server.go rename to internal/msggateway/ws_server.go