From 31f28a54abb0c7915ef7bb4943ddcf26e5bea1f2 Mon Sep 17 00:00:00 2001 From: withchao <993506633@qq.com> Date: Sun, 28 Jan 2024 17:29:46 +0800 Subject: [PATCH] feat: conversation FindRecvMsgNotNotifyUserIDs --- go.mod | 2 +- go.sum | 6 +++-- internal/push/push_to_client.go | 0 internal/rpc/conversation/conversaion.go | 26 +++++++++++++------- pkg/common/db/cache/config.go | 2 +- pkg/common/db/mgo/conversation.go | 10 ++++++-- pkg/common/db/table/relation/conversation.go | 2 +- pkg/rpccache/conversation.go | 22 +++++++++++++++++ pkg/rpcclient/conversation.go | 8 ++++++ 9 files changed, 62 insertions(+), 16 deletions(-) delete mode 100644 internal/push/push_to_client.go diff --git a/go.mod b/go.mod index 739733d67..d496aac12 100644 --- a/go.mod +++ b/go.mod @@ -4,7 +4,7 @@ go 1.19 require ( firebase.google.com/go v3.13.0+incompatible - github.com/OpenIMSDK/protocol v0.0.49 + github.com/OpenIMSDK/protocol v0.0.53 github.com/OpenIMSDK/tools v0.0.29 github.com/bwmarrin/snowflake v0.3.0 // indirect github.com/dtm-labs/rockscache v0.1.1 diff --git a/go.sum b/go.sum index b7b40632f..0ce3d0320 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ firebase.google.com/go v3.13.0+incompatible/go.mod h1:xlah6XbEyW6tbfSklcfe5FHJIw github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c= github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ= -github.com/OpenIMSDK/protocol v0.0.48 h1:8MIMjyzJRsruYhVv2ZKArFiOveroaofDOb3dlAdgjsw= -github.com/OpenIMSDK/protocol v0.0.48/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= +github.com/OpenIMSDK/protocol v0.0.49 h1:wcqJOMBis7f153zNI7V82Fc4WyqA1GanMgXUQgL618k= +github.com/OpenIMSDK/protocol v0.0.49/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y= github.com/OpenIMSDK/tools v0.0.29 h1:NS4PEwYl9sX3SWsMjDOLVxMo3LcTWREMr+2cjzWjcqc= github.com/OpenIMSDK/tools v0.0.29/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI= github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM= @@ -171,6 +171,8 @@ github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9 github.com/hashicorp/go-uuid v1.0.2/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= github.com/hashicorp/go-uuid v1.0.3 h1:2gKiV6YVmrJ1i2CKKa9obLvRieoRGviZFL26PcT/Co8= github.com/hashicorp/go-uuid v1.0.3/go.mod h1:6SBZvOh/SIDV7/2o3Jml5SYk/TvGqwFJ/bN7x4byOro= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= diff --git a/internal/push/push_to_client.go b/internal/push/push_to_client.go deleted file mode 100644 index e69de29bb..000000000 diff --git a/internal/rpc/conversation/conversaion.go b/internal/rpc/conversation/conversaion.go index 40803089c..2288113c5 100644 --- a/internal/rpc/conversation/conversaion.go +++ b/internal/rpc/conversation/conversaion.go @@ -90,11 +90,11 @@ func (c *conversationServer) GetConversation(ctx context.Context, req *pbconvers return resp, nil } -func (m *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) { +func (c *conversationServer) GetSortedConversationList(ctx context.Context, req *pbconversation.GetSortedConversationListReq) (resp *pbconversation.GetSortedConversationListResp, err error) { log.ZDebug(ctx, "GetSortedConversationList", "seqs", req, "userID", req.UserID) var conversationIDs []string if len(req.ConversationIDs) == 0 { - conversationIDs, err = m.conversationDatabase.GetConversationIDs(ctx, req.UserID) + conversationIDs, err = c.conversationDatabase.GetConversationIDs(ctx, req.UserID) if err != nil { return nil, err } @@ -102,7 +102,7 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req conversationIDs = req.ConversationIDs } - conversations, err := m.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs) + conversations, err := c.conversationDatabase.FindConversations(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } @@ -110,22 +110,22 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req return nil, errs.ErrRecordNotFound.Wrap() } - maxSeqs, err := m.msgRpcClient.GetMaxSeqs(ctx, conversationIDs) + maxSeqs, err := c.msgRpcClient.GetMaxSeqs(ctx, conversationIDs) if err != nil { return nil, err } - chatLogs, err := m.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs) + chatLogs, err := c.msgRpcClient.GetMsgByConversationIDs(ctx, conversationIDs, maxSeqs) if err != nil { return nil, err } - conversationMsg, err := m.getConversationInfo(ctx, chatLogs, req.UserID) + conversationMsg, err := c.getConversationInfo(ctx, chatLogs, req.UserID) if err != nil { return nil, err } - hasReadSeqs, err := m.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs) + hasReadSeqs, err := c.msgRpcClient.GetHasReadSeqs(ctx, req.UserID, conversationIDs) if err != nil { return nil, err } @@ -157,8 +157,8 @@ func (m *conversationServer) GetSortedConversationList(ctx context.Context, req UnreadTotal: unreadTotal, } - m.conversationSort(conversation_isPinTime, resp, conversation_unreadCount, conversationMsg) - m.conversationSort(conversation_notPinTime, resp, conversation_unreadCount, conversationMsg) + c.conversationSort(conversation_isPinTime, resp, conversation_unreadCount, conversationMsg) + c.conversationSort(conversation_notPinTime, resp, conversation_unreadCount, conversationMsg) resp.ConversationElems = utils.Paginate(resp.ConversationElems, int(req.Pagination.GetPageNumber()), int(req.Pagination.GetShowNumber())) return resp, nil @@ -529,3 +529,11 @@ func (c *conversationServer) getConversationInfo( } return conversationMsg, nil } + +func (c *conversationServer) GetConversationNotReceiveMessageUserIDs(ctx context.Context, req *pbconversation.GetConversationNotReceiveMessageUserIDsReq) (*pbconversation.GetConversationNotReceiveMessageUserIDsResp, error) { + userIDs, err := c.conversationDatabase.GetConversationNotReceiveMessageUserIDs(ctx, req.ConversationID) + if err != nil { + return nil, err + } + return &pbconversation.GetConversationNotReceiveMessageUserIDsResp{UserIDs: userIDs}, nil +} diff --git a/pkg/common/db/cache/config.go b/pkg/common/db/cache/config.go index 52ece95f7..7599d8a11 100644 --- a/pkg/common/db/cache/config.go +++ b/pkg/common/db/cache/config.go @@ -35,7 +35,7 @@ func getPublishKey(topic string, key []string) []string { }, { Local: config.Config.LocalCache.Conversation, - Keys: []string{cachekey.ConversationIDsKey, cachekey.ConversationKey}, + Keys: []string{cachekey.ConversationKey, cachekey.ConversationIDsKey, cachekey.ConversationNotReceiveMessageUserIDsKey}, }, } subscribe = make(map[string][]string) diff --git a/pkg/common/db/mgo/conversation.go b/pkg/common/db/mgo/conversation.go index 849a78b3d..640c7a3d5 100644 --- a/pkg/common/db/mgo/conversation.go +++ b/pkg/common/db/mgo/conversation.go @@ -96,8 +96,14 @@ func (c *ConversationMgo) FindUserIDAllConversations(ctx context.Context, userID return mgoutil.Find[*relation.ConversationModel](ctx, c.coll, bson.M{"owner_user_id": userID}) } -func (c *ConversationMgo) FindRecvMsgNotNotifyUserIDs(ctx context.Context, conversationID string) ([]string, error) { - return mgoutil.Find[string](ctx, c.coll, bson.M{"conversation_id": conversationID, "recv_msg_opt": constant.ReceiveNotNotifyMessage}, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1})) +func (c *ConversationMgo) FindRecvMsgUserIDs(ctx context.Context, conversationID string, recvOpts []int) ([]string, error) { + var filter any + if len(recvOpts) == 0 { + filter = bson.M{"conversation_id": conversationID} + } else { + filter = bson.M{"conversation_id": conversationID, "recv_msg_opt": bson.M{"$in": recvOpts}} + } + return mgoutil.Find[string](ctx, c.coll, filter, options.Find().SetProjection(bson.M{"_id": 0, "owner_user_id": 1})) } func (c *ConversationMgo) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) { diff --git a/pkg/common/db/table/relation/conversation.go b/pkg/common/db/table/relation/conversation.go index 0fba736f9..583e41c0f 100644 --- a/pkg/common/db/table/relation/conversation.go +++ b/pkg/common/db/table/relation/conversation.go @@ -53,7 +53,7 @@ type ConversationModelInterface interface { Take(ctx context.Context, userID, conversationID string) (conversation *ConversationModel, err error) FindConversationID(ctx context.Context, userID string, conversationIDs []string) (existConversationID []string, err error) FindUserIDAllConversations(ctx context.Context, userID string) (conversations []*ConversationModel, err error) - FindRecvMsgNotNotifyUserIDs(ctx context.Context, conversationID string) ([]string, error) + FindRecvMsgUserIDs(ctx context.Context, conversationID string, recvOpts []int) ([]string, error) GetUserRecvMsgOpt(ctx context.Context, ownerUserID, conversationID string) (opt int, err error) GetAllConversationIDs(ctx context.Context) ([]string, error) GetAllConversationIDsNumber(ctx context.Context) (int64, error) diff --git a/pkg/rpccache/conversation.go b/pkg/rpccache/conversation.go index 8eadad9d4..ae77b29b7 100644 --- a/pkg/rpccache/conversation.go +++ b/pkg/rpccache/conversation.go @@ -88,3 +88,25 @@ func (c *ConversationLocalCache) GetConversations(ctx context.Context, ownerUser } return conversations, nil } + +func (c *ConversationLocalCache) getConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) (*listMap[string], error) { + return localcache.AnyValue[*listMap[string]](c.local.Get(ctx, cachekey.GetConversationNotReceiveMessageUserIDsKey(conversationID), func(ctx context.Context) (any, error) { + return newListMap(c.client.GetConversationNotReceiveMessageUserIDs(ctx, conversationID)) + })) +} + +func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID) + if err != nil { + return nil, err + } + return res.List, nil +} + +func (c *ConversationLocalCache) GetConversationNotReceiveMessageUserIDMap(ctx context.Context, conversationID string) (map[string]struct{}, error) { + res, err := c.getConversationNotReceiveMessageUserIDs(ctx, conversationID) + if err != nil { + return nil, err + } + return res.Map, nil +} diff --git a/pkg/rpcclient/conversation.go b/pkg/rpcclient/conversation.go index 53332beac..80053e870 100644 --- a/pkg/rpcclient/conversation.go +++ b/pkg/rpcclient/conversation.go @@ -139,3 +139,11 @@ func (c *ConversationRpcClient) GetConversations( } return resp.Conversations, nil } + +func (c *ConversationRpcClient) GetConversationNotReceiveMessageUserIDs(ctx context.Context, conversationID string) ([]string, error) { + resp, err := c.Client.GetConversationNotReceiveMessageUserIDs(ctx, &pbconversation.GetConversationNotReceiveMessageUserIDsReq{ConversationID: conversationID}) + if err != nil { + return nil, err + } + return resp.UserIDs, nil +}