From 7000755e1530dfdd543f1d0cec625b2c790538f5 Mon Sep 17 00:00:00 2001 From: pluto <2631223275@qq.com> Date: Thu, 17 Aug 2023 14:20:46 +0800 Subject: [PATCH] Fix bug #862 (#879) * test * Solve the problem that the storage of multiple terminals in the online state is overwritten * go.sum * debug * debug * Judging whether to be kicked out * Judging whether to be kicked out * Finish debug --- go.mod | 2 +- go.sum | 4 +- go.work.sum | 4 +- internal/api/route.go | 2 +- internal/rpc/user/user.go | 2 +- pkg/common/db/cache/user.go | 84 +++++++++++++++++++++++++++++++++---- pkg/rpcclient/user.go | 2 +- 7 files changed, 84 insertions(+), 16 deletions(-) diff --git a/go.mod b/go.mod index c2f8093e7..87cb62369 100644 --- a/go.mod +++ b/go.mod @@ -37,7 +37,7 @@ require ( require github.com/google/uuid v1.3.0 require ( - github.com/OpenIMSDK/protocol v0.0.10 + github.com/OpenIMSDK/protocol v0.0.11 github.com/OpenIMSDK/tools v0.0.13 github.com/aliyun/aliyun-oss-go-sdk v2.2.8+incompatible github.com/go-redis/redis v6.15.9+incompatible diff --git a/go.sum b/go.sum index 1b9d43ec9..bf86b45e1 100644 --- a/go.sum +++ b/go.sum @@ -17,8 +17,8 @@ cloud.google.com/go/storage v1.30.1/go.mod h1:NfxhC0UJE1aXSx7CIIbCf7y9HKT7Biccwk firebase.google.com/go v3.13.0+incompatible h1:3TdYC3DDi6aHn20qoRkxwGqNgdjtblwVAyRLQwGn/+4= firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIwjt8toICdV5Wh9ptHs= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= -github.com/OpenIMSDK/protocol v0.0.10 h1:OiJR2BAAJjuKKK8KPxYZdJCwOSzMMxwF5fnJdOmLPdQ= -github.com/OpenIMSDK/protocol v0.0.10/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.11 h1:3sWujfQhO1I1Da1b8UScaZcc0DNunSbNiH3DagZB/AA= +github.com/OpenIMSDK/protocol v0.0.11/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.13 h1:rcw4HS8S2DPZR9UOBxD8/ol9UBMzXBypzOVEytDRIMo= github.com/OpenIMSDK/tools v0.0.13/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= diff --git a/go.work.sum b/go.work.sum index 8ae1c5cc7..76e060d37 100644 --- a/go.work.sum +++ b/go.work.sum @@ -221,8 +221,8 @@ cloud.google.com/go/websecurityscanner v1.6.1/go.mod h1:Njgaw3rttgRHXzwCB8kgCYqv cloud.google.com/go/workflows v1.11.1 h1:2akeQ/PgtRhrNuD/n1WvJd5zb7YyuDZrlOanBj2ihPg= cloud.google.com/go/workflows v1.11.1/go.mod h1:Z+t10G1wF7h8LgdY/EmRcQY8ptBD/nvofaL6FqlET6g= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= -github.com/OpenIMSDK/protocol v0.0.10 h1:OiJR2BAAJjuKKK8KPxYZdJCwOSzMMxwF5fnJdOmLPdQ= -github.com/OpenIMSDK/protocol v0.0.10/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.11 h1:3sWujfQhO1I1Da1b8UScaZcc0DNunSbNiH3DagZB/AA= +github.com/OpenIMSDK/protocol v0.0.11/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409 h1:DTQ/38ao/CfXsrK0cSAL+h4R/u0VVvfWLZEOlLwEROI= github.com/alecthomas/kingpin/v2 v2.3.1 h1:ANLJcKmQm4nIaog7xdr/id6FM6zm5hHnfZrvtKPxqGg= github.com/alecthomas/kingpin/v2 v2.3.1/go.mod h1:oYL5vtsvEHZGHxU7DMp32Dvx+qL+ptGn6lWaot2vCNE= diff --git a/internal/api/route.go b/internal/api/route.go index f961489a3..8cf65b03d 100644 --- a/internal/api/route.go +++ b/internal/api/route.go @@ -82,7 +82,7 @@ func NewGinRouter(discov discoveryregistry.SvcDiscoveryRegistry, rdb redis.Unive userRouterGroup.POST("/get_users", ParseToken, u.GetUsers) userRouterGroup.POST("/get_users_online_status", ParseToken, u.GetUsersOnlineStatus) userRouterGroup.POST("/get_users_online_token_detail", ParseToken, u.GetUsersOnlineTokenDetail) - userRouterGroup.POST("/subscribe_users_status", ParseToken, u.UnSubscriberStatus) + userRouterGroup.POST("/subscribe_users_status", ParseToken, u.SubscriberStatus) userRouterGroup.POST("/unsubscribe_users_status", ParseToken, u.UnSubscriberStatus) userRouterGroup.POST("/get_users_status", ParseToken, u.GetUserStatus) userRouterGroup.POST("/get_subscribe_users_status", ParseToken, u.GetSubscribeUsersStatus) diff --git a/internal/rpc/user/user.go b/internal/rpc/user/user.go index a16a16bb2..a1cb0db09 100644 --- a/internal/rpc/user/user.go +++ b/internal/rpc/user/user.go @@ -299,7 +299,7 @@ func (s *userServer) SetUserStatus(ctx context.Context, req *pbuser.SetUserStatu FromUserID: value.UserID, ToUserID: userID, Status: value.Status, - PlatformID: value.PlatformID, + PlatformID: value.PlatformIDs[0], } s.userNotificationSender.UserStatusChangeNotification(ctx, tips) } diff --git a/pkg/common/db/cache/user.go b/pkg/common/db/cache/user.go index 05374cc5f..fee907575 100644 --- a/pkg/common/db/cache/user.go +++ b/pkg/common/db/cache/user.go @@ -17,6 +17,7 @@ package cache import ( "context" "encoding/json" + "github.com/OpenIMSDK/protocol/constant" "hash/crc32" "strconv" "time" @@ -37,6 +38,7 @@ const ( olineStatusKey = "ONLINE_STATUS:" userOlineStatusExpireTime = time.Second * 60 * 60 * 24 statusMod = 501 + platformID = "_PlatformIDSuffix" ) type UserCache interface { @@ -92,6 +94,10 @@ func (u *UserCacheRedis) getUserGlobalRecvMsgOptKey(userID string) string { return userGlobalRecvMsgOptKey + userID } +func (u *UserCacheRedis) getUserStatusHashKey(userID string, Id int32) string { + return userID + "_" + string(Id) + platformID +} + func (u *UserCacheRedis) GetUserInfo(ctx context.Context, userID string) (userInfo *relationTb.UserModel, err error) { return getCache( ctx, @@ -177,9 +183,9 @@ func (u *UserCacheRedis) GetUserStatus(ctx context.Context, userIDs []string) ([ if err == redis.Nil { // key or field does not exist res = append(res, &user.OnlineStatus{ - UserID: userID, - Status: 0, - PlatformID: -1, + UserID: userID, + Status: constant.Offline, + PlatformIDs: nil, }) continue } else { @@ -211,12 +217,74 @@ func (u *UserCacheRedis) SetUserStatus(ctx context.Context, list []*user.OnlineS if err != nil { return errs.Wrap(err) } - _, err = u.rdb.HSet(ctx, key, status.UserID, string(jsonData)).Result() - if err != nil { - return errs.Wrap(err) - } - if isNewKey > 0 { + if isNewKey == 0 { + _, err = u.rdb.HSet(ctx, key, status.UserID, string(jsonData)).Result() + if err != nil { + return errs.Wrap(err) + } u.rdb.Expire(ctx, key, userOlineStatusExpireTime) + } else { + result, err := u.rdb.HGet(ctx, key, status.UserID).Result() + if err != nil { + return errs.Wrap(err) + } + var onlineStatus user.OnlineStatus + err = json.Unmarshal([]byte(result), &onlineStatus) + if err != nil { + return errs.Wrap(err) + } + onlineStatus.UserID = status.UserID + if status.Status == constant.Offline { + var newPlatformIDs []int32 + for _, val := range onlineStatus.PlatformIDs { + if val != status.PlatformIDs[0] { + newPlatformIDs = append(newPlatformIDs, val) + } + } + if newPlatformIDs == nil { + onlineStatus.Status = constant.Offline + onlineStatus.PlatformIDs = nil + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) + } + } else { + onlineStatus.PlatformIDs = newPlatformIDs + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) + } + } + } else { + onlineStatus.Status = constant.Online + // Judging whether to be kicked out. + flag := false + for _, val := range onlineStatus.PlatformIDs { + if val == status.PlatformIDs[0] { + flag = true + break + } + } + if !flag { + onlineStatus.PlatformIDs = append(onlineStatus.PlatformIDs, status.PlatformIDs[0]) + } + newjsonData, err := json.Marshal(&onlineStatus) + if err != nil { + return errs.Wrap(err) + } + _, err = u.rdb.HSet(ctx, key, status.UserID, string(newjsonData)).Result() + if err != nil { + return errs.Wrap(err) + } + } } } return nil diff --git a/pkg/rpcclient/user.go b/pkg/rpcclient/user.go index 3c24efbbd..2fdeb134f 100644 --- a/pkg/rpcclient/user.go +++ b/pkg/rpcclient/user.go @@ -158,6 +158,6 @@ func (u *UserRpcClient) GetAllUserIDs(ctx context.Context, pageNumber, showNumbe } func (u *UserRpcClient) SetUserStatus(ctx context.Context, userID string, status int32, platformID int) error { - _, err := u.Client.SetUserStatus(ctx, &user.SetUserStatusReq{StatusList: []*user.OnlineStatus{{UserID: userID, Status: status, PlatformID: int32(platformID)}}}) + _, err := u.Client.SetUserStatus(ctx, &user.SetUserStatusReq{StatusList: []*user.OnlineStatus{{UserID: userID, Status: status, PlatformIDs: []int32{int32(platformID)}}}}) return err }