feat: optimize tools up35 (#1552)
* upgrade package and rtc convert * upgrade package and rtc convert * upgrade package and rtc convert
This commit is contained in:
parent
bb6462647a
commit
6b55cfd0b8
4
go.mod
4
go.mod
|
@ -4,6 +4,8 @@ go 1.19
|
|||
|
||||
require (
|
||||
firebase.google.com/go v3.13.0+incompatible
|
||||
github.com/OpenIMSDK/protocol v0.0.31
|
||||
github.com/OpenIMSDK/tools v0.0.20
|
||||
github.com/bwmarrin/snowflake v0.3.0 // indirect
|
||||
github.com/dtm-labs/rockscache v0.1.1
|
||||
github.com/gin-gonic/gin v1.9.1
|
||||
|
@ -33,8 +35,6 @@ require github.com/google/uuid v1.3.1
|
|||
|
||||
require (
|
||||
github.com/IBM/sarama v1.41.3
|
||||
github.com/OpenIMSDK/protocol v0.0.31
|
||||
github.com/OpenIMSDK/tools v0.0.18
|
||||
github.com/aliyun/aliyun-oss-go-sdk v2.2.9+incompatible
|
||||
github.com/go-redis/redis v6.15.9+incompatible
|
||||
github.com/redis/go-redis/v9 v9.2.1
|
||||
|
|
4
go.sum
4
go.sum
|
@ -20,8 +20,8 @@ github.com/IBM/sarama v1.41.3 h1:MWBEJ12vHC8coMjdEXFq/6ftO6DUZnQlFYcxtOJFa7c=
|
|||
github.com/IBM/sarama v1.41.3/go.mod h1:Xxho9HkHd4K/MDUo/T/sOqwtX/17D33++E9Wib6hUdQ=
|
||||
github.com/OpenIMSDK/protocol v0.0.31 h1:ax43x9aqA6EKNXNukS5MT5BSTqkUmwO4uTvbJLtzCgE=
|
||||
github.com/OpenIMSDK/protocol v0.0.31/go.mod h1:F25dFrwrIx3lkNoiuf6FkCfxuwf8L4Z8UIsdTHP/r0Y=
|
||||
github.com/OpenIMSDK/tools v0.0.18 h1:h3CvKB90DNd2aIJcOQ99cqgeW6C0na0PzR1TNsfxwL0=
|
||||
github.com/OpenIMSDK/tools v0.0.18/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
|
||||
github.com/OpenIMSDK/tools v0.0.20 h1:zBTjQZRJ5lR1FIzP9mtWyAvh5dKsmJXQugi4p8X/97k=
|
||||
github.com/OpenIMSDK/tools v0.0.20/go.mod h1:eg+q4A34Qmu73xkY0mt37FHGMCMfC6CtmOnm0kFEGFI=
|
||||
github.com/QcloudApi/qcloud_sign_golang v0.0.0-20141224014652-e4130a326409/go.mod h1:1pk82RBxDY/JZnPQrtqHlUFfCctgdorsd9M06fMynOM=
|
||||
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7 h1:uSoVVbwJiQipAclBbw+8quDsfcvFjOpI5iCf4p/cqCs=
|
||||
github.com/alcortesm/tgz v0.0.0-20161220082320-9c5fe88206d7/go.mod h1:6zEj6s6u/ghQa61ZWa/C2Aw3RkjiTBOix7dkqa1VLIs=
|
||||
|
|
|
@ -16,7 +16,6 @@ package friend
|
|||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/OpenIMSDK/tools/tx"
|
||||
|
||||
"github.com/OpenIMSDK/protocol/sdkws"
|
||||
|
|
|
@ -0,0 +1,227 @@
|
|||
package pkg
|
||||
|
||||
import (
|
||||
mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||
mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3"
|
||||
mongoModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
mysqlModelRtc "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mysql"
|
||||
"time"
|
||||
)
|
||||
|
||||
type convert struct{}
|
||||
|
||||
func (convert) User(v mysqlModel.UserModel) mongoModel.UserModel {
|
||||
return mongoModel.UserModel{
|
||||
UserID: v.UserID,
|
||||
Nickname: v.Nickname,
|
||||
FaceURL: v.FaceURL,
|
||||
Ex: v.Ex,
|
||||
AppMangerLevel: v.AppMangerLevel,
|
||||
GlobalRecvMsgOpt: v.GlobalRecvMsgOpt,
|
||||
CreateTime: v.CreateTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Friend(v mysqlModel.FriendModel) mongoModel.FriendModel {
|
||||
return mongoModel.FriendModel{
|
||||
OwnerUserID: v.OwnerUserID,
|
||||
FriendUserID: v.FriendUserID,
|
||||
Remark: v.Remark,
|
||||
CreateTime: v.CreateTime,
|
||||
AddSource: v.AddSource,
|
||||
OperatorUserID: v.OperatorUserID,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) FriendRequest(v mysqlModel.FriendRequestModel) mongoModel.FriendRequestModel {
|
||||
return mongoModel.FriendRequestModel{
|
||||
FromUserID: v.FromUserID,
|
||||
ToUserID: v.ToUserID,
|
||||
HandleResult: v.HandleResult,
|
||||
ReqMsg: v.ReqMsg,
|
||||
CreateTime: v.CreateTime,
|
||||
HandlerUserID: v.HandlerUserID,
|
||||
HandleMsg: v.HandleMsg,
|
||||
HandleTime: v.HandleTime,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Black(v mysqlModel.BlackModel) mongoModel.BlackModel {
|
||||
return mongoModel.BlackModel{
|
||||
OwnerUserID: v.OwnerUserID,
|
||||
BlockUserID: v.BlockUserID,
|
||||
CreateTime: v.CreateTime,
|
||||
AddSource: v.AddSource,
|
||||
OperatorUserID: v.OperatorUserID,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Group(v mysqlModel.GroupModel) mongoModel.GroupModel {
|
||||
return mongoModel.GroupModel{
|
||||
GroupID: v.GroupID,
|
||||
GroupName: v.GroupName,
|
||||
Notification: v.Notification,
|
||||
Introduction: v.Introduction,
|
||||
FaceURL: v.FaceURL,
|
||||
CreateTime: v.CreateTime,
|
||||
Ex: v.Ex,
|
||||
Status: v.Status,
|
||||
CreatorUserID: v.CreatorUserID,
|
||||
GroupType: v.GroupType,
|
||||
NeedVerification: v.NeedVerification,
|
||||
LookMemberInfo: v.LookMemberInfo,
|
||||
ApplyMemberFriend: v.ApplyMemberFriend,
|
||||
NotificationUpdateTime: v.NotificationUpdateTime,
|
||||
NotificationUserID: v.NotificationUserID,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) GroupMember(v mysqlModel.GroupMemberModel) mongoModel.GroupMemberModel {
|
||||
return mongoModel.GroupMemberModel{
|
||||
GroupID: v.GroupID,
|
||||
UserID: v.UserID,
|
||||
Nickname: v.Nickname,
|
||||
FaceURL: v.FaceURL,
|
||||
RoleLevel: v.RoleLevel,
|
||||
JoinTime: v.JoinTime,
|
||||
JoinSource: v.JoinSource,
|
||||
InviterUserID: v.InviterUserID,
|
||||
OperatorUserID: v.OperatorUserID,
|
||||
MuteEndTime: v.MuteEndTime,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) GroupRequest(v mysqlModel.GroupRequestModel) mongoModel.GroupRequestModel {
|
||||
return mongoModel.GroupRequestModel{
|
||||
UserID: v.UserID,
|
||||
GroupID: v.GroupID,
|
||||
HandleResult: v.HandleResult,
|
||||
ReqMsg: v.ReqMsg,
|
||||
HandledMsg: v.HandledMsg,
|
||||
ReqTime: v.ReqTime,
|
||||
HandleUserID: v.HandleUserID,
|
||||
HandledTime: v.HandledTime,
|
||||
JoinSource: v.JoinSource,
|
||||
InviterUserID: v.InviterUserID,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Conversation(v mysqlModel.ConversationModel) mongoModel.ConversationModel {
|
||||
return mongoModel.ConversationModel{
|
||||
OwnerUserID: v.OwnerUserID,
|
||||
ConversationID: v.ConversationID,
|
||||
ConversationType: v.ConversationType,
|
||||
UserID: v.UserID,
|
||||
GroupID: v.GroupID,
|
||||
RecvMsgOpt: v.RecvMsgOpt,
|
||||
IsPinned: v.IsPinned,
|
||||
IsPrivateChat: v.IsPrivateChat,
|
||||
BurnDuration: v.BurnDuration,
|
||||
GroupAtType: v.GroupAtType,
|
||||
AttachedInfo: v.AttachedInfo,
|
||||
Ex: v.Ex,
|
||||
MaxSeq: v.MaxSeq,
|
||||
MinSeq: v.MinSeq,
|
||||
CreateTime: v.CreateTime,
|
||||
IsMsgDestruct: v.IsMsgDestruct,
|
||||
MsgDestructTime: v.MsgDestructTime,
|
||||
LatestMsgDestructTime: v.LatestMsgDestructTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Object(engine string) func(v mysqlModel.ObjectModel) mongoModel.ObjectModel {
|
||||
return func(v mysqlModel.ObjectModel) mongoModel.ObjectModel {
|
||||
return mongoModel.ObjectModel{
|
||||
Name: v.Name,
|
||||
UserID: v.UserID,
|
||||
Hash: v.Hash,
|
||||
Engine: engine,
|
||||
Key: v.Key,
|
||||
Size: v.Size,
|
||||
ContentType: v.ContentType,
|
||||
Group: v.Cause,
|
||||
CreateTime: v.CreateTime,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Log(v mysqlModel.Log) mongoModel.LogModel {
|
||||
return mongoModel.LogModel{
|
||||
LogID: v.LogID,
|
||||
Platform: v.Platform,
|
||||
UserID: v.UserID,
|
||||
CreateTime: v.CreateTime,
|
||||
Url: v.Url,
|
||||
FileName: v.FileName,
|
||||
SystemType: v.SystemType,
|
||||
Version: v.Version,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) SignalModel(v mysqlModelRtc.SignalModel) mongoModelRtc.SignalModel {
|
||||
return mongoModelRtc.SignalModel{
|
||||
SID: v.SID,
|
||||
InviterUserID: v.InviterUserID,
|
||||
CustomData: v.CustomData,
|
||||
GroupID: v.GroupID,
|
||||
RoomID: v.RoomID,
|
||||
Timeout: v.Timeout,
|
||||
MediaType: v.MediaType,
|
||||
PlatformID: v.PlatformID,
|
||||
SessionType: v.SessionType,
|
||||
InitiateTime: v.InitiateTime,
|
||||
EndTime: v.EndTime,
|
||||
FileURL: v.FileURL,
|
||||
Title: v.Title,
|
||||
Desc: v.Desc,
|
||||
Ex: v.Ex,
|
||||
IOSPushSound: v.IOSPushSound,
|
||||
IOSBadgeCount: v.IOSBadgeCount,
|
||||
SignalInfo: v.SignalInfo,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) SignalInvitationModel(v mysqlModelRtc.SignalInvitationModel) mongoModelRtc.SignalInvitationModel {
|
||||
return mongoModelRtc.SignalInvitationModel{
|
||||
SID: v.SID,
|
||||
UserID: v.UserID,
|
||||
Status: v.Status,
|
||||
InitiateTime: v.InitiateTime,
|
||||
HandleTime: v.HandleTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Meeting(v mysqlModelRtc.MeetingInfo) mongoModelRtc.MeetingInfo {
|
||||
return mongoModelRtc.MeetingInfo{
|
||||
RoomID: v.RoomID,
|
||||
MeetingName: v.MeetingName,
|
||||
HostUserID: v.HostUserID,
|
||||
Status: v.Status,
|
||||
StartTime: time.Unix(v.StartTime, 0),
|
||||
EndTime: time.Unix(v.EndTime, 0),
|
||||
CreateTime: v.CreateTime,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) MeetingInvitationInfo(v mysqlModelRtc.MeetingInvitationInfo) mongoModelRtc.MeetingInvitationInfo {
|
||||
return mongoModelRtc.MeetingInvitationInfo{
|
||||
RoomID: v.RoomID,
|
||||
UserID: v.UserID,
|
||||
CreateTime: v.CreateTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) MeetingVideoRecord(v mysqlModelRtc.MeetingVideoRecord) mongoModelRtc.MeetingVideoRecord {
|
||||
return mongoModelRtc.MeetingVideoRecord{
|
||||
RoomID: v.RoomID,
|
||||
FileURL: v.FileURL,
|
||||
CreateTime: v.CreateTime,
|
||||
}
|
||||
}
|
|
@ -0,0 +1,86 @@
|
|||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewMeeting(db *mongo.Database) (table.MeetingInterface, error) {
|
||||
coll := db.Collection("meeting")
|
||||
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "room_id", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetUnique(true),
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "host_user_id", Value: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "create_time", Value: -1},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &meeting{coll: coll}, nil
|
||||
}
|
||||
|
||||
type meeting struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (x *meeting) Find(ctx context.Context, roomIDs []string) ([]*table.MeetingInfo, error) {
|
||||
return mgoutil.Find[*table.MeetingInfo](ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}})
|
||||
}
|
||||
|
||||
func (x *meeting) CreateMeetingInfo(ctx context.Context, meetingInfo *table.MeetingInfo) error {
|
||||
return mgoutil.InsertMany(ctx, x.coll, []*table.MeetingInfo{meetingInfo})
|
||||
}
|
||||
|
||||
func (x *meeting) UpdateMeetingInfo(ctx context.Context, roomID string, update map[string]any) error {
|
||||
if len(update) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mgoutil.UpdateOne(ctx, x.coll, bson.M{"room_id": roomID}, bson.M{"$set": update}, false)
|
||||
}
|
||||
|
||||
func (x *meeting) GetUnCompleteMeetingIDList(ctx context.Context, roomIDs []string) ([]string, error) {
|
||||
if len(roomIDs) == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
return mgoutil.Find[string](ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}, "status": 0}, options.Find().SetProjection(bson.M{"_id": 0, "room_id": 1}))
|
||||
}
|
||||
|
||||
func (x *meeting) Delete(ctx context.Context, roomIDs []string) error {
|
||||
return mgoutil.DeleteMany(ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}})
|
||||
}
|
||||
|
||||
func (x *meeting) GetMeetingRecords(ctx context.Context, hostUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*table.MeetingInfo, error) {
|
||||
var and []bson.M
|
||||
if hostUserID != "" {
|
||||
and = append(and, bson.M{"host_user_id": hostUserID})
|
||||
}
|
||||
if !startTime.IsZero() {
|
||||
and = append(and, bson.M{"create_time": bson.M{"$gte": startTime}})
|
||||
}
|
||||
if !endTime.IsZero() {
|
||||
and = append(and, bson.M{"create_time": bson.M{"$lte": endTime}})
|
||||
}
|
||||
filter := bson.M{}
|
||||
if len(and) > 0 {
|
||||
filter["$and"] = and
|
||||
}
|
||||
return mgoutil.FindPage[*table.MeetingInfo](ctx, x.coll, filter, pagination, options.Find().SetSort(bson.M{"create_time": -1}))
|
||||
}
|
|
@ -0,0 +1,76 @@
|
|||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewMeetingInvitation(db *mongo.Database) (table.MeetingInvitationInterface, error) {
|
||||
coll := db.Collection("meeting_invitation")
|
||||
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "room_id", Value: 1},
|
||||
{Key: "user_id", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetUnique(true),
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "create_time", Value: -1},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &meetingInvitation{coll: coll}, nil
|
||||
}
|
||||
|
||||
type meetingInvitation struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (x *meetingInvitation) FindUserIDs(ctx context.Context, roomID string) ([]string, error) {
|
||||
return mgoutil.Find[string](ctx, x.coll, bson.M{"room_id": roomID}, options.Find().SetProjection(bson.M{"_id": 0, "user_id": 1}))
|
||||
}
|
||||
|
||||
func (x *meetingInvitation) CreateMeetingInvitationInfo(ctx context.Context, roomID string, inviteeUserIDs []string) error {
|
||||
now := time.Now()
|
||||
return mgoutil.InsertMany(ctx, x.coll, utils.Slice(inviteeUserIDs, func(userID string) *table.MeetingInvitationInfo {
|
||||
return &table.MeetingInvitationInfo{
|
||||
RoomID: roomID,
|
||||
UserID: userID,
|
||||
CreateTime: now,
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func (x *meetingInvitation) GetUserInvitedMeetingIDs(ctx context.Context, userID string) (meetingIDs []string, err error) {
|
||||
fiveDaysAgo := time.Now().AddDate(0, 0, -5)
|
||||
return mgoutil.Find[string](ctx, x.coll, bson.M{"user_id": userID, "create_time": bson.M{"$gte": fiveDaysAgo}}, options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1}))
|
||||
}
|
||||
|
||||
func (x *meetingInvitation) Delete(ctx context.Context, roomIDs []string) error {
|
||||
return mgoutil.DeleteMany(ctx, x.coll, bson.M{"room_id": bson.M{"$in": roomIDs}})
|
||||
}
|
||||
|
||||
func (x *meetingInvitation) GetMeetingRecords(ctx context.Context, joinedUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error) {
|
||||
var and []bson.M
|
||||
and = append(and, bson.M{"user_id": joinedUserID})
|
||||
if !startTime.IsZero() {
|
||||
and = append(and, bson.M{"create_time": bson.M{"$gte": startTime}})
|
||||
}
|
||||
if !endTime.IsZero() {
|
||||
and = append(and, bson.M{"create_time": bson.M{"$lte": endTime}})
|
||||
}
|
||||
opt := options.Find().SetSort(bson.M{"create_time": -1}).SetProjection(bson.M{"_id": 0, "room_id": 1})
|
||||
return mgoutil.FindPage[string](ctx, x.coll, bson.M{"$and": and}, pagination, opt)
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
)
|
||||
|
||||
func NewMeetingRecord(db *mongo.Database) (table.MeetingRecordInterface, error) {
|
||||
coll := db.Collection("meeting_record")
|
||||
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "room_id", Value: 1},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &meetingRecord{coll: coll}, nil
|
||||
}
|
||||
|
||||
type meetingRecord struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (x *meetingRecord) CreateMeetingVideoRecord(ctx context.Context, meetingVideoRecord *table.MeetingVideoRecord) error {
|
||||
return mgoutil.InsertMany(ctx, x.coll, []*table.MeetingVideoRecord{meetingVideoRecord})
|
||||
}
|
|
@ -0,0 +1,89 @@
|
|||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewSignal(db *mongo.Database) (table.SignalInterface, error) {
|
||||
coll := db.Collection("signal")
|
||||
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "sid", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetUnique(true),
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "inviter_user_id", Value: 1},
|
||||
},
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "initiate_time", Value: -1},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &signal{coll: coll}, nil
|
||||
}
|
||||
|
||||
type signal struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (x *signal) Find(ctx context.Context, sids []string) ([]*table.SignalModel, error) {
|
||||
return mgoutil.Find[*table.SignalModel](ctx, x.coll, bson.M{"sid": bson.M{"$in": sids}})
|
||||
}
|
||||
|
||||
func (x *signal) CreateSignal(ctx context.Context, signalModel *table.SignalModel) error {
|
||||
return mgoutil.InsertMany(ctx, x.coll, []*table.SignalModel{signalModel})
|
||||
}
|
||||
|
||||
func (x *signal) Update(ctx context.Context, sid string, update map[string]any) error {
|
||||
if len(update) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mgoutil.UpdateOne(ctx, x.coll, bson.M{"sid": sid}, bson.M{"$set": update}, false)
|
||||
}
|
||||
|
||||
func (x *signal) UpdateSignalFileURL(ctx context.Context, sID, fileURL string) error {
|
||||
return x.Update(ctx, sID, map[string]any{"file_url": fileURL})
|
||||
}
|
||||
|
||||
func (x *signal) UpdateSignalEndTime(ctx context.Context, sID string, endTime time.Time) error {
|
||||
return x.Update(ctx, sID, map[string]any{"end_time": endTime})
|
||||
}
|
||||
|
||||
func (x *signal) Delete(ctx context.Context, sids []string) error {
|
||||
if len(sids) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mgoutil.DeleteMany(ctx, x.coll, bson.M{"sid": bson.M{"$in": sids}})
|
||||
}
|
||||
|
||||
func (x *signal) PageSignal(ctx context.Context, sesstionType int32, sendID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*table.SignalModel, error) {
|
||||
var and []bson.M
|
||||
if !startTime.IsZero() {
|
||||
and = append(and, bson.M{"initiate_time": bson.M{"$gte": startTime}})
|
||||
}
|
||||
if !endTime.IsZero() {
|
||||
and = append(and, bson.M{"initiate_time": bson.M{"$lte": endTime}})
|
||||
}
|
||||
if sesstionType != 0 {
|
||||
and = append(and, bson.M{"sesstion_type": sesstionType})
|
||||
}
|
||||
if sendID != "" {
|
||||
and = append(and, bson.M{"inviter_user_id": sendID})
|
||||
}
|
||||
return mgoutil.FindPage[*table.SignalModel](ctx, x.coll, bson.M{"$and": and}, pagination, options.Find().SetSort(bson.M{"initiate_time": -1}))
|
||||
}
|
|
@ -0,0 +1,78 @@
|
|||
package mgo
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/mgoutil"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/OpenIMSDK/tools/utils"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/table"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"time"
|
||||
)
|
||||
|
||||
func NewSignalInvitation(db *mongo.Database) (table.SignalInvitationInterface, error) {
|
||||
coll := db.Collection("signal_invitation")
|
||||
_, err := coll.Indexes().CreateMany(context.Background(), []mongo.IndexModel{
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "sid", Value: 1},
|
||||
{Key: "user_id", Value: 1},
|
||||
},
|
||||
Options: options.Index().SetUnique(true),
|
||||
},
|
||||
{
|
||||
Keys: bson.D{
|
||||
{Key: "initiate_time", Value: -1},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &signalInvitation{coll: coll}, nil
|
||||
}
|
||||
|
||||
type signalInvitation struct {
|
||||
coll *mongo.Collection
|
||||
}
|
||||
|
||||
func (x *signalInvitation) Find(ctx context.Context, sid string) ([]*table.SignalInvitationModel, error) {
|
||||
return mgoutil.Find[*table.SignalInvitationModel](ctx, x.coll, bson.M{"sid": sid})
|
||||
}
|
||||
|
||||
func (x *signalInvitation) CreateSignalInvitation(ctx context.Context, sid string, inviteeUserIDs []string) error {
|
||||
now := time.Now()
|
||||
return mgoutil.InsertMany(ctx, x.coll, utils.Slice(inviteeUserIDs, func(userID string) *table.SignalInvitationModel {
|
||||
return &table.SignalInvitationModel{
|
||||
UserID: userID,
|
||||
SID: sid,
|
||||
InitiateTime: now,
|
||||
HandleTime: time.Unix(0, 0),
|
||||
}
|
||||
}))
|
||||
}
|
||||
|
||||
func (x *signalInvitation) HandleSignalInvitation(ctx context.Context, sID, InviteeUserID string, status int32) error {
|
||||
return mgoutil.UpdateOne(ctx, x.coll, bson.M{"sid": sID, "user_id": InviteeUserID}, bson.M{"$set": bson.M{"status": status, "handle_time": time.Now()}}, true)
|
||||
}
|
||||
|
||||
func (x *signalInvitation) PageSID(ctx context.Context, recvID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error) {
|
||||
var and []bson.M
|
||||
and = append(and, bson.M{"user_id": recvID})
|
||||
if !startTime.IsZero() {
|
||||
and = append(and, bson.M{"initiate_time": bson.M{"$gte": startTime}})
|
||||
}
|
||||
if !endTime.IsZero() {
|
||||
and = append(and, bson.M{"initiate_time": bson.M{"$lte": endTime}})
|
||||
}
|
||||
return mgoutil.FindPage[string](ctx, x.coll, bson.M{"$and": and}, pagination, options.Find().SetProjection(bson.M{"_id": 0, "sid": 1}).SetSort(bson.M{"initiate_time": -1}))
|
||||
}
|
||||
|
||||
func (x *signalInvitation) Delete(ctx context.Context, sids []string) error {
|
||||
if len(sids) == 0 {
|
||||
return nil
|
||||
}
|
||||
return mgoutil.DeleteMany(ctx, x.coll, bson.M{"sid": bson.M{"$in": sids}})
|
||||
}
|
|
@ -0,0 +1,51 @@
|
|||
package table
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"time"
|
||||
)
|
||||
|
||||
type MeetingInfo struct {
|
||||
RoomID string `bson:"room_id"`
|
||||
MeetingName string `bson:"meeting_name"`
|
||||
HostUserID string `bson:"host_user_id"`
|
||||
Status int64 `bson:"status"`
|
||||
StartTime time.Time `bson:"start_time"`
|
||||
EndTime time.Time `bson:"end_time"`
|
||||
CreateTime time.Time `bson:"create_time"`
|
||||
Ex string `bson:"ex"`
|
||||
}
|
||||
|
||||
type MeetingInterface interface {
|
||||
Find(ctx context.Context, roomIDs []string) ([]*MeetingInfo, error)
|
||||
CreateMeetingInfo(ctx context.Context, meetingInfo *MeetingInfo) error
|
||||
UpdateMeetingInfo(ctx context.Context, roomID string, update map[string]any) error
|
||||
GetUnCompleteMeetingIDList(ctx context.Context, roomIDs []string) ([]string, error)
|
||||
Delete(ctx context.Context, roomIDs []string) error
|
||||
GetMeetingRecords(ctx context.Context, hostUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*MeetingInfo, error)
|
||||
}
|
||||
|
||||
type MeetingInvitationInfo struct {
|
||||
RoomID string `bson:"room_id"`
|
||||
UserID string `bson:"user_id"`
|
||||
CreateTime time.Time `bson:"create_time"`
|
||||
}
|
||||
|
||||
type MeetingInvitationInterface interface {
|
||||
FindUserIDs(ctx context.Context, roomID string) ([]string, error)
|
||||
CreateMeetingInvitationInfo(ctx context.Context, roomID string, inviteeUserIDs []string) error
|
||||
GetUserInvitedMeetingIDs(ctx context.Context, userID string) (meetingIDs []string, err error)
|
||||
Delete(ctx context.Context, roomIDs []string) error
|
||||
GetMeetingRecords(ctx context.Context, joinedUserID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error)
|
||||
}
|
||||
|
||||
type MeetingVideoRecord struct {
|
||||
RoomID string `bson:"room_id"`
|
||||
FileURL string `bson:"file_url"`
|
||||
CreateTime time.Time `bson:"create_time"`
|
||||
}
|
||||
|
||||
type MeetingRecordInterface interface {
|
||||
CreateMeetingVideoRecord(ctx context.Context, meetingVideoRecord *MeetingVideoRecord) error
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
package table
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/OpenIMSDK/tools/errs"
|
||||
"github.com/OpenIMSDK/tools/pagination"
|
||||
"github.com/redis/go-redis/v9"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SignalModel struct {
|
||||
SID string `bson:"sid"`
|
||||
InviterUserID string `bson:"inviter_user_id"`
|
||||
CustomData string `bson:"custom_data"`
|
||||
GroupID string `bson:"group_id"`
|
||||
RoomID string `bson:"room_id"`
|
||||
Timeout int32 `bson:"timeout"`
|
||||
MediaType string `bson:"media_type"`
|
||||
PlatformID int32 `bson:"platform_id"`
|
||||
SessionType int32 `bson:"session_type"`
|
||||
InitiateTime time.Time `bson:"initiate_time"`
|
||||
EndTime time.Time `bson:"end_time"`
|
||||
FileURL string `bson:"file_url"`
|
||||
|
||||
Title string `bson:"title"`
|
||||
Desc string `bson:"desc"`
|
||||
Ex string `bson:"ex"`
|
||||
IOSPushSound string `bson:"ios_push_sound"`
|
||||
IOSBadgeCount bool `bson:"ios_badge_count"`
|
||||
SignalInfo string `bson:"signal_info"`
|
||||
}
|
||||
|
||||
type SignalInterface interface {
|
||||
Find(ctx context.Context, sids []string) ([]*SignalModel, error)
|
||||
CreateSignal(ctx context.Context, signalModel *SignalModel) error
|
||||
Update(ctx context.Context, sid string, update map[string]any) error
|
||||
UpdateSignalFileURL(ctx context.Context, sID, fileURL string) error
|
||||
UpdateSignalEndTime(ctx context.Context, sID string, endTime time.Time) error
|
||||
Delete(ctx context.Context, sids []string) error
|
||||
PageSignal(ctx context.Context, sesstionType int32, sendID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []*SignalModel, error)
|
||||
}
|
||||
|
||||
type SignalInvitationModel struct {
|
||||
SID string `bson:"sid"`
|
||||
UserID string `bson:"user_id"`
|
||||
Status int32 `bson:"status"`
|
||||
InitiateTime time.Time `bson:"initiate_time"`
|
||||
HandleTime time.Time `bson:"handle_time"`
|
||||
}
|
||||
|
||||
type SignalInvitationInterface interface {
|
||||
Find(ctx context.Context, sid string) ([]*SignalInvitationModel, error)
|
||||
CreateSignalInvitation(ctx context.Context, sid string, inviteeUserIDs []string) error
|
||||
HandleSignalInvitation(ctx context.Context, sID, InviteeUserID string, status int32) error
|
||||
PageSID(ctx context.Context, recvID string, startTime, endTime time.Time, pagination pagination.Pagination) (int64, []string, error)
|
||||
Delete(ctx context.Context, sids []string) error
|
||||
}
|
||||
|
||||
func IsNotFound(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
err = errs.Unwrap(err)
|
||||
return err == mongo.ErrNoDocuments || err == redis.Nil
|
||||
}
|
||||
|
||||
func IsDuplicate(err error) bool {
|
||||
if err == nil {
|
||||
return false
|
||||
}
|
||||
return mongo.IsDuplicateKeyError(errs.Unwrap(err))
|
||||
}
|
|
@ -0,0 +1,40 @@
|
|||
package relation
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type MeetingInfo struct {
|
||||
RoomID string `gorm:"column:room_id;primary_key;size:128;index:room_id;index:status,priority:1"`
|
||||
MeetingName string `gorm:"column:meeting_name;size:64"`
|
||||
HostUserID string `gorm:"column:host_user_id;size:64;index:host_user_id"`
|
||||
Status int64 `gorm:"column:status;index:status,priority:2"`
|
||||
StartTime int64 `gorm:"column:start_time"`
|
||||
EndTime int64 `gorm:"column:end_time"`
|
||||
CreateTime time.Time `gorm:"column:create_time"`
|
||||
Ex string `gorm:"column:ex;size:1024"`
|
||||
}
|
||||
|
||||
func (MeetingInfo) TableName() string {
|
||||
return "meeting"
|
||||
}
|
||||
|
||||
type MeetingInvitationInfo struct {
|
||||
RoomID string `gorm:"column:room_id;primary_key;size:128"`
|
||||
UserID string `gorm:"column:user_id;primary_key;size:64;index:user_id"`
|
||||
CreateTime time.Time `gorm:"column:create_time"`
|
||||
}
|
||||
|
||||
func (MeetingInvitationInfo) TableName() string {
|
||||
return "meeting_invitation"
|
||||
}
|
||||
|
||||
type MeetingVideoRecord struct {
|
||||
RoomID string `gorm:"column:room_id;size:128"`
|
||||
FileURL string `gorm:"column:file_url"`
|
||||
CreateTime time.Time `gorm:"column:create_time"`
|
||||
}
|
||||
|
||||
func (MeetingVideoRecord) TableName() string {
|
||||
return "meeting_video_record"
|
||||
}
|
|
@ -0,0 +1,43 @@
|
|||
package relation
|
||||
|
||||
import (
|
||||
"time"
|
||||
)
|
||||
|
||||
type SignalModel struct {
|
||||
SID string `gorm:"column:sid;type:char(128);primary_key"`
|
||||
InviterUserID string `gorm:"column:inviter_user_id;type:char(64);index:inviter_user_id_index"`
|
||||
CustomData string `gorm:"column:custom_data;type:text"`
|
||||
GroupID string `gorm:"column:group_id;type:char(64)"`
|
||||
RoomID string `gorm:"column:room_id;primary_key;type:char(128)"`
|
||||
Timeout int32 `gorm:"column:timeout"`
|
||||
MediaType string `gorm:"column:media_type;type:char(64)"`
|
||||
PlatformID int32 `gorm:"column:platform_id"`
|
||||
SessionType int32 `gorm:"column:sesstion_type"`
|
||||
InitiateTime time.Time `gorm:"column:initiate_time"`
|
||||
EndTime time.Time `gorm:"column:end_time"`
|
||||
FileURL string `gorm:"column:file_url" json:"-"`
|
||||
|
||||
Title string `gorm:"column:title;size:128"`
|
||||
Desc string `gorm:"column:desc;size:1024"`
|
||||
Ex string `gorm:"column:ex;size:1024"`
|
||||
IOSPushSound string `gorm:"column:ios_push_sound"`
|
||||
IOSBadgeCount bool `gorm:"column:ios_badge_count"`
|
||||
SignalInfo string `gorm:"column:signal_info;size:1024"`
|
||||
}
|
||||
|
||||
func (SignalModel) TableName() string {
|
||||
return "signal"
|
||||
}
|
||||
|
||||
type SignalInvitationModel struct {
|
||||
UserID string `gorm:"column:user_id;primary_key"`
|
||||
SID string `gorm:"column:sid;type:char(128);primary_key"`
|
||||
Status int32 `gorm:"column:status"`
|
||||
InitiateTime time.Time `gorm:"column:initiate_time;primary_key"`
|
||||
HandleTime time.Time `gorm:"column:handle_time"`
|
||||
}
|
||||
|
||||
func (SignalInvitationModel) TableName() string {
|
||||
return "signal_invitation"
|
||||
}
|
|
@ -0,0 +1,206 @@
|
|||
package pkg
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"gopkg.in/yaml.v3"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
gormMysql "gorm.io/driver/mysql"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/logger"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||
rtcMgo "github.com/openimsdk/open-im-server/v3/tools/up35/pkg/internal/rtc/mongo/mgo"
|
||||
)
|
||||
|
||||
const (
|
||||
versionTable = "dataver"
|
||||
versionKey = "data_version"
|
||||
versionValue = 35
|
||||
)
|
||||
|
||||
func InitConfig(path string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return yaml.Unmarshal(data, &config.Config)
|
||||
}
|
||||
|
||||
func GetMysql() (*gorm.DB, error) {
|
||||
conf := config.Config.Mysql
|
||||
mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", conf.Username, conf.Password, conf.Address[0], conf.Database)
|
||||
return gorm.Open(gormMysql.Open(mysqlDSN), &gorm.Config{Logger: logger.Discard})
|
||||
}
|
||||
|
||||
func GetMongo() (*mongo.Database, error) {
|
||||
mgo, err := unrelation.NewMongo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mgo.GetDatabase(), nil
|
||||
}
|
||||
|
||||
func Main(path string) error {
|
||||
if err := InitConfig(path); err != nil {
|
||||
return err
|
||||
}
|
||||
if config.Config.Mysql == nil {
|
||||
return nil
|
||||
}
|
||||
mongoDB, err := GetMongo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var version struct {
|
||||
Key string `bson:"key"`
|
||||
Value string `bson:"value"`
|
||||
}
|
||||
switch mongoDB.Collection(versionTable).FindOne(context.Background(), bson.M{"key": versionKey}).Decode(&version) {
|
||||
case nil:
|
||||
if ver, _ := strconv.Atoi(version.Value); ver >= versionValue {
|
||||
return nil
|
||||
}
|
||||
case mongo.ErrNoDocuments:
|
||||
default:
|
||||
return err
|
||||
}
|
||||
mysqlDB, err := GetMysql()
|
||||
if err != nil {
|
||||
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1049 {
|
||||
if err := SetMongoDataVersion(mongoDB, version.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil // database not exist
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var c convert
|
||||
var tasks []func() error
|
||||
tasks = append(tasks,
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewUserMongo, c.User) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendMongo, c.Friend) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendRequestMongo, c.FriendRequest) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewBlackMongo, c.Black) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMongo, c.Group) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) },
|
||||
|
||||
func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewSignal, c.SignalModel) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewSignalInvitation, c.SignalInvitationModel) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewMeeting, c.Meeting) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewMeetingInvitation, c.MeetingInvitationInfo) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, rtcMgo.NewMeetingRecord, c.MeetingVideoRecord) },
|
||||
)
|
||||
|
||||
for _, task := range tasks {
|
||||
if err := task(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := SetMongoDataVersion(mongoDB, version.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetMongoDataVersion(db *mongo.Database, curver string) error {
|
||||
filter := bson.M{"key": versionKey, "value": curver}
|
||||
update := bson.M{"$set": bson.M{"key": versionKey, "value": strconv.Itoa(versionValue)}}
|
||||
_, err := db.Collection(versionTable).UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true))
|
||||
return err
|
||||
}
|
||||
|
||||
// NewTask A mysql table B mongodb model C mongodb table
|
||||
func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error {
|
||||
obj, err := mongoDBInit(mongoDB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var zero A
|
||||
tableName := zero.TableName()
|
||||
coll, err := getColl(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err)
|
||||
}
|
||||
var count int
|
||||
defer func() {
|
||||
log.Printf("completed convert %s total %d\n", tableName, count)
|
||||
}()
|
||||
const batch = 100
|
||||
for page := 0; ; page++ {
|
||||
res := make([]A, 0, batch)
|
||||
if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil {
|
||||
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 {
|
||||
return nil // table not exist
|
||||
}
|
||||
return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err)
|
||||
}
|
||||
if len(res) == 0 {
|
||||
return nil
|
||||
}
|
||||
temp := make([]any, len(res))
|
||||
for i := range res {
|
||||
temp[i] = convert(res[i])
|
||||
}
|
||||
if err := insertMany(coll, temp); err != nil {
|
||||
return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err)
|
||||
}
|
||||
count += len(res)
|
||||
if len(res) < batch {
|
||||
return nil
|
||||
}
|
||||
log.Printf("current convert %s completed %d\n", tableName, count)
|
||||
}
|
||||
}
|
||||
|
||||
func insertMany(coll *mongo.Collection, objs []any) error {
|
||||
if _, err := coll.InsertMany(context.Background(), objs); err != nil {
|
||||
if !mongo.IsDuplicateKeyError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for i := range objs {
|
||||
_, err := coll.InsertOne(context.Background(), objs[i])
|
||||
switch {
|
||||
case err == nil:
|
||||
case mongo.IsDuplicateKeyError(err):
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getColl(obj any) (_ *mongo.Collection, err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("not found %+v", e)
|
||||
}
|
||||
}()
|
||||
stu := reflect.ValueOf(obj).Elem()
|
||||
typ := reflect.TypeOf(&mongo.Collection{}).String()
|
||||
for i := 0; i < stu.NumField(); i++ {
|
||||
field := stu.Field(i)
|
||||
if field.Type().String() == typ {
|
||||
return (*mongo.Collection)(field.UnsafePointer()), nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("not found")
|
||||
}
|
|
@ -1,367 +1,19 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"github.com/go-sql-driver/mysql"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/mgo"
|
||||
mongoModel "github.com/openimsdk/open-im-server/v3/pkg/common/db/table/relation"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/unrelation"
|
||||
mysqlModel "github.com/openimsdk/open-im-server/v3/tools/data-conversion/openim/mysql/v3"
|
||||
"go.mongodb.org/mongo-driver/bson"
|
||||
"go.mongodb.org/mongo-driver/mongo"
|
||||
"go.mongodb.org/mongo-driver/mongo/options"
|
||||
"gopkg.in/yaml.v3"
|
||||
gormMysql "gorm.io/driver/mysql"
|
||||
"gorm.io/gorm"
|
||||
"gorm.io/gorm/logger"
|
||||
"github.com/openimsdk/open-im-server/v3/tools/up35/pkg"
|
||||
"log"
|
||||
"os"
|
||||
"reflect"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
const (
|
||||
versionTable = "dataver"
|
||||
versionKey = "data_version"
|
||||
versionValue = 35
|
||||
)
|
||||
|
||||
func main() {
|
||||
var path string
|
||||
flag.StringVar(&path, "c", "", "path config file")
|
||||
flag.Parse()
|
||||
if err := Main(path); err != nil {
|
||||
if err := pkg.Main(path); err != nil {
|
||||
log.Fatal(err)
|
||||
return
|
||||
}
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
func InitConfig(path string) error {
|
||||
data, err := os.ReadFile(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return yaml.Unmarshal(data, &config.Config)
|
||||
}
|
||||
|
||||
func GetMysql() (*gorm.DB, error) {
|
||||
conf := config.Config.Mysql
|
||||
mysqlDSN := fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=True&loc=Local", conf.Username, conf.Password, conf.Address[0], conf.Database)
|
||||
return gorm.Open(gormMysql.Open(mysqlDSN), &gorm.Config{Logger: logger.Discard})
|
||||
}
|
||||
|
||||
func GetMongo() (*mongo.Database, error) {
|
||||
mgo, err := unrelation.NewMongo()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return mgo.GetDatabase(), nil
|
||||
}
|
||||
|
||||
func Main(path string) error {
|
||||
if err := InitConfig(path); err != nil {
|
||||
return err
|
||||
}
|
||||
if config.Config.Mysql == nil {
|
||||
return nil
|
||||
}
|
||||
mongoDB, err := GetMongo()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var version struct {
|
||||
Key string `bson:"key"`
|
||||
Value string `bson:"value"`
|
||||
}
|
||||
switch mongoDB.Collection(versionTable).FindOne(context.Background(), bson.M{"key": versionKey}).Decode(&version) {
|
||||
case nil:
|
||||
if ver, _ := strconv.Atoi(version.Value); ver >= versionValue {
|
||||
return nil
|
||||
}
|
||||
case mongo.ErrNoDocuments:
|
||||
default:
|
||||
return err
|
||||
}
|
||||
mysqlDB, err := GetMysql()
|
||||
if err != nil {
|
||||
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1049 {
|
||||
if err := SetMongoDataVersion(mongoDB, version.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil // database not exist
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
var c convert
|
||||
var tasks []func() error
|
||||
tasks = append(tasks,
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewUserMongo, c.User) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendMongo, c.Friend) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewFriendRequestMongo, c.FriendRequest) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewBlackMongo, c.Black) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMongo, c.Group) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupMember, c.GroupMember) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewGroupRequestMgo, c.GroupRequest) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewConversationMongo, c.Conversation) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewS3Mongo, c.Object(config.Config.Object.Enable)) },
|
||||
func() error { return NewTask(mysqlDB, mongoDB, mgo.NewLogMongo, c.Log) },
|
||||
)
|
||||
|
||||
for _, task := range tasks {
|
||||
if err := task(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if err := SetMongoDataVersion(mongoDB, version.Value); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func SetMongoDataVersion(db *mongo.Database, curver string) error {
|
||||
filter := bson.M{"key": versionKey, "value": curver}
|
||||
update := bson.M{"$set": bson.M{"key": versionKey, "value": strconv.Itoa(versionValue)}}
|
||||
_, err := db.Collection(versionTable).UpdateOne(context.Background(), filter, update, options.Update().SetUpsert(true))
|
||||
return err
|
||||
}
|
||||
|
||||
// NewTask A mysql table B mongodb model C mongodb table
|
||||
func NewTask[A interface{ TableName() string }, B any, C any](gormDB *gorm.DB, mongoDB *mongo.Database, mongoDBInit func(db *mongo.Database) (B, error), convert func(v A) C) error {
|
||||
obj, err := mongoDBInit(mongoDB)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
var zero A
|
||||
tableName := zero.TableName()
|
||||
coll, err := getColl(obj)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get mongo collection %s failed, err: %w", tableName, err)
|
||||
}
|
||||
var count int
|
||||
defer func() {
|
||||
log.Printf("completed convert %s total %d\n", tableName, count)
|
||||
}()
|
||||
const batch = 100
|
||||
for page := 0; ; page++ {
|
||||
res := make([]A, 0, batch)
|
||||
if err := gormDB.Limit(batch).Offset(page * batch).Find(&res).Error; err != nil {
|
||||
if mysqlErr, ok := err.(*mysql.MySQLError); ok && mysqlErr.Number == 1146 {
|
||||
return nil // table not exist
|
||||
}
|
||||
return fmt.Errorf("find mysql table %s failed, err: %w", tableName, err)
|
||||
}
|
||||
if len(res) == 0 {
|
||||
return nil
|
||||
}
|
||||
temp := make([]any, len(res))
|
||||
for i := range res {
|
||||
temp[i] = convert(res[i])
|
||||
}
|
||||
if err := insertMany(coll, temp); err != nil {
|
||||
return fmt.Errorf("insert mongo table %s failed, err: %w", tableName, err)
|
||||
}
|
||||
count += len(res)
|
||||
if len(res) < batch {
|
||||
return nil
|
||||
}
|
||||
log.Printf("current convert %s completed %d\n", tableName, count)
|
||||
}
|
||||
}
|
||||
|
||||
func insertMany(coll *mongo.Collection, objs []any) error {
|
||||
if _, err := coll.InsertMany(context.Background(), objs); err != nil {
|
||||
if !mongo.IsDuplicateKeyError(err) {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for i := range objs {
|
||||
_, err := coll.InsertOne(context.Background(), objs[i])
|
||||
switch {
|
||||
case err == nil:
|
||||
case mongo.IsDuplicateKeyError(err):
|
||||
default:
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func getColl(obj any) (_ *mongo.Collection, err error) {
|
||||
defer func() {
|
||||
if e := recover(); e != nil {
|
||||
err = fmt.Errorf("not found %+v", e)
|
||||
}
|
||||
}()
|
||||
stu := reflect.ValueOf(obj).Elem()
|
||||
typ := reflect.TypeOf(&mongo.Collection{}).String()
|
||||
for i := 0; i < stu.NumField(); i++ {
|
||||
field := stu.Field(i)
|
||||
if field.Type().String() == typ {
|
||||
return (*mongo.Collection)(field.UnsafePointer()), nil
|
||||
}
|
||||
}
|
||||
return nil, errors.New("not found")
|
||||
}
|
||||
|
||||
type convert struct{}
|
||||
|
||||
func (convert) User(v mysqlModel.UserModel) mongoModel.UserModel {
|
||||
return mongoModel.UserModel{
|
||||
UserID: v.UserID,
|
||||
Nickname: v.Nickname,
|
||||
FaceURL: v.FaceURL,
|
||||
Ex: v.Ex,
|
||||
AppMangerLevel: v.AppMangerLevel,
|
||||
GlobalRecvMsgOpt: v.GlobalRecvMsgOpt,
|
||||
CreateTime: v.CreateTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Friend(v mysqlModel.FriendModel) mongoModel.FriendModel {
|
||||
return mongoModel.FriendModel{
|
||||
OwnerUserID: v.OwnerUserID,
|
||||
FriendUserID: v.FriendUserID,
|
||||
Remark: v.Remark,
|
||||
CreateTime: v.CreateTime,
|
||||
AddSource: v.AddSource,
|
||||
OperatorUserID: v.OperatorUserID,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) FriendRequest(v mysqlModel.FriendRequestModel) mongoModel.FriendRequestModel {
|
||||
return mongoModel.FriendRequestModel{
|
||||
FromUserID: v.FromUserID,
|
||||
ToUserID: v.ToUserID,
|
||||
HandleResult: v.HandleResult,
|
||||
ReqMsg: v.ReqMsg,
|
||||
CreateTime: v.CreateTime,
|
||||
HandlerUserID: v.HandlerUserID,
|
||||
HandleMsg: v.HandleMsg,
|
||||
HandleTime: v.HandleTime,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Black(v mysqlModel.BlackModel) mongoModel.BlackModel {
|
||||
return mongoModel.BlackModel{
|
||||
OwnerUserID: v.OwnerUserID,
|
||||
BlockUserID: v.BlockUserID,
|
||||
CreateTime: v.CreateTime,
|
||||
AddSource: v.AddSource,
|
||||
OperatorUserID: v.OperatorUserID,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Group(v mysqlModel.GroupModel) mongoModel.GroupModel {
|
||||
return mongoModel.GroupModel{
|
||||
GroupID: v.GroupID,
|
||||
GroupName: v.GroupName,
|
||||
Notification: v.Notification,
|
||||
Introduction: v.Introduction,
|
||||
FaceURL: v.FaceURL,
|
||||
CreateTime: v.CreateTime,
|
||||
Ex: v.Ex,
|
||||
Status: v.Status,
|
||||
CreatorUserID: v.CreatorUserID,
|
||||
GroupType: v.GroupType,
|
||||
NeedVerification: v.NeedVerification,
|
||||
LookMemberInfo: v.LookMemberInfo,
|
||||
ApplyMemberFriend: v.ApplyMemberFriend,
|
||||
NotificationUpdateTime: v.NotificationUpdateTime,
|
||||
NotificationUserID: v.NotificationUserID,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) GroupMember(v mysqlModel.GroupMemberModel) mongoModel.GroupMemberModel {
|
||||
return mongoModel.GroupMemberModel{
|
||||
GroupID: v.GroupID,
|
||||
UserID: v.UserID,
|
||||
Nickname: v.Nickname,
|
||||
FaceURL: v.FaceURL,
|
||||
RoleLevel: v.RoleLevel,
|
||||
JoinTime: v.JoinTime,
|
||||
JoinSource: v.JoinSource,
|
||||
InviterUserID: v.InviterUserID,
|
||||
OperatorUserID: v.OperatorUserID,
|
||||
MuteEndTime: v.MuteEndTime,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) GroupRequest(v mysqlModel.GroupRequestModel) mongoModel.GroupRequestModel {
|
||||
return mongoModel.GroupRequestModel{
|
||||
UserID: v.UserID,
|
||||
GroupID: v.GroupID,
|
||||
HandleResult: v.HandleResult,
|
||||
ReqMsg: v.ReqMsg,
|
||||
HandledMsg: v.HandledMsg,
|
||||
ReqTime: v.ReqTime,
|
||||
HandleUserID: v.HandleUserID,
|
||||
HandledTime: v.HandledTime,
|
||||
JoinSource: v.JoinSource,
|
||||
InviterUserID: v.InviterUserID,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Conversation(v mysqlModel.ConversationModel) mongoModel.ConversationModel {
|
||||
return mongoModel.ConversationModel{
|
||||
OwnerUserID: v.OwnerUserID,
|
||||
ConversationID: v.ConversationID,
|
||||
ConversationType: v.ConversationType,
|
||||
UserID: v.UserID,
|
||||
GroupID: v.GroupID,
|
||||
RecvMsgOpt: v.RecvMsgOpt,
|
||||
IsPinned: v.IsPinned,
|
||||
IsPrivateChat: v.IsPrivateChat,
|
||||
BurnDuration: v.BurnDuration,
|
||||
GroupAtType: v.GroupAtType,
|
||||
AttachedInfo: v.AttachedInfo,
|
||||
Ex: v.Ex,
|
||||
MaxSeq: v.MaxSeq,
|
||||
MinSeq: v.MinSeq,
|
||||
CreateTime: v.CreateTime,
|
||||
IsMsgDestruct: v.IsMsgDestruct,
|
||||
MsgDestructTime: v.MsgDestructTime,
|
||||
LatestMsgDestructTime: v.LatestMsgDestructTime,
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Object(engine string) func(v mysqlModel.ObjectModel) mongoModel.ObjectModel {
|
||||
return func(v mysqlModel.ObjectModel) mongoModel.ObjectModel {
|
||||
return mongoModel.ObjectModel{
|
||||
Name: v.Name,
|
||||
UserID: v.UserID,
|
||||
Hash: v.Hash,
|
||||
Engine: engine,
|
||||
Key: v.Key,
|
||||
Size: v.Size,
|
||||
ContentType: v.ContentType,
|
||||
Group: v.Cause,
|
||||
CreateTime: v.CreateTime,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (convert) Log(v mysqlModel.Log) mongoModel.LogModel {
|
||||
return mongoModel.LogModel{
|
||||
LogID: v.LogID,
|
||||
Platform: v.Platform,
|
||||
UserID: v.UserID,
|
||||
CreateTime: v.CreateTime,
|
||||
Url: v.Url,
|
||||
FileName: v.FileName,
|
||||
SystemType: v.SystemType,
|
||||
Version: v.Version,
|
||||
Ex: v.Ex,
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue