Feature middleware (#1476)
* fix:fix error values&logs * modify: add logs * feature:add redis io retry logic * feature:add redis error alert rule * test:for test alert * fix:fix prometheus rules * del:del test code --------- Co-authored-by: lin.huang <lin.huang@apulis.com>
This commit is contained in:
parent
02142c55b2
commit
ceb669dfb8
|
@ -8,4 +8,15 @@ groups:
|
|||
severity: critical
|
||||
annotations:
|
||||
summary: "Instance {{ $labels.instance }} down"
|
||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 1 minutes."
|
||||
description: "{{ $labels.instance }} of job {{ $labels.job }} has been down for more than 1 minutes."
|
||||
|
||||
- name: database_insert_failure_alerts
|
||||
rules:
|
||||
- alert: DatabaseInsertFailed
|
||||
expr: (increase(msg_insert_redis_failed_total[5m]) > 0) or (increase(msg_insert_mongo_failed_total[5m]) > 0)
|
||||
for: 1m
|
||||
labels:
|
||||
severity: critical
|
||||
annotations:
|
||||
summary: "Increase in MsgInsertRedisFailedCounter or MsgInsertMongoFailedCounter detected"
|
||||
description: "Either MsgInsertRedisFailedCounter or MsgInsertMongoFailedCounter has increased in the last 5 minutes, indicating failures in message insert operations to Redis or MongoDB,maybe the redis or mongodb is crash."
|
||||
|
|
|
@ -17,16 +17,15 @@ package msgtransfer
|
|||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/OpenIMSDK/tools/mw"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/prometheus/client_golang/prometheus/collectors"
|
||||
"github.com/prometheus/client_golang/prometheus/promhttp"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"log"
|
||||
"net/http"
|
||||
"sync"
|
||||
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/config"
|
||||
"github.com/openimsdk/open-im-server/v3/pkg/common/db/cache"
|
||||
|
|
|
@ -252,7 +252,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleNotification(
|
|||
return
|
||||
}
|
||||
log.ZDebug(ctx, "success to next topic", "conversationID", conversationID)
|
||||
och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
|
||||
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgToMongoMQ error", err)
|
||||
}
|
||||
och.toPushTopic(ctx, key, conversationID, storageList)
|
||||
}
|
||||
}
|
||||
|
@ -277,9 +280,6 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
|
|||
lastSeq, isNewConversation, err := och.msgDatabase.BatchInsertChat2Cache(ctx, conversationID, storageList)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
log.ZError(ctx, "batch data insert to redis err", err, "storageMsgList", storageList)
|
||||
och.singleMsgFailedCountMutex.Lock()
|
||||
och.singleMsgFailedCount += uint64(len(storageList))
|
||||
och.singleMsgFailedCountMutex.Unlock()
|
||||
return
|
||||
}
|
||||
if isNewConversation {
|
||||
|
@ -311,10 +311,10 @@ func (och *OnlineHistoryRedisConsumerHandler) handleMsg(
|
|||
}
|
||||
|
||||
log.ZDebug(ctx, "success incr to next topic")
|
||||
och.singleMsgSuccessCountMutex.Lock()
|
||||
och.singleMsgSuccessCount += uint64(len(storageList))
|
||||
och.singleMsgSuccessCountMutex.Unlock()
|
||||
och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
|
||||
err = och.msgDatabase.MsgToMongoMQ(ctx, key, conversationID, storageList, lastSeq)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "MsgToMongoMQ error", err)
|
||||
}
|
||||
och.toPushTopic(ctx, key, conversationID, storageList)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -42,15 +42,8 @@ func (m *msgServer) PullMessageBySeqs(
|
|||
log.ZError(ctx, "GetConversation error", err, "conversationID", seq.ConversationID)
|
||||
continue
|
||||
}
|
||||
minSeq, maxSeq, msgs, err := m.MsgDatabase.GetMsgBySeqsRange(
|
||||
ctx,
|
||||
req.UserID,
|
||||
seq.ConversationID,
|
||||
seq.Begin,
|
||||
seq.End,
|
||||
seq.Num,
|
||||
conversation.MaxSeq,
|
||||
)
|
||||
minSeq, maxSeq, msgs, err := m.MsgDatabase.GetMsgBySeqsRange(ctx, req.UserID, seq.ConversationID,
|
||||
seq.Begin, seq.End, seq.Num, conversation.MaxSeq)
|
||||
if err != nil {
|
||||
log.ZWarn(ctx, "GetMsgBySeqsRange error", err, "conversationID", seq.ConversationID, "seq", seq)
|
||||
continue
|
||||
|
@ -64,7 +57,6 @@ func (m *msgServer) PullMessageBySeqs(
|
|||
}
|
||||
if len(msgs) == 0 {
|
||||
log.ZWarn(ctx, "not have msgs", nil, "conversationID", seq.ConversationID, "seq", seq)
|
||||
|
||||
continue
|
||||
}
|
||||
resp.Msgs[seq.ConversationID] = &sdkws.PullMsgs{Msgs: msgs, IsEnd: isEnd}
|
||||
|
|
|
@ -173,7 +173,20 @@ func (c *msgCache) getSeqs(ctx context.Context, items []string, getkey func(s st
|
|||
}
|
||||
|
||||
func (c *msgCache) SetMaxSeq(ctx context.Context, conversationID string, maxSeq int64) error {
|
||||
return c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
|
||||
var retErr error
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return errs.Wrap(retErr, "SetMaxSeq redis retry too many amount")
|
||||
default:
|
||||
retErr = c.setSeq(ctx, conversationID, maxSeq, c.getMaxSeqKey)
|
||||
if retErr != nil {
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m map[string]int64, err error) {
|
||||
|
@ -181,7 +194,21 @@ func (c *msgCache) GetMaxSeqs(ctx context.Context, conversationIDs []string) (m
|
|||
}
|
||||
|
||||
func (c *msgCache) GetMaxSeq(ctx context.Context, conversationID string) (int64, error) {
|
||||
return c.getSeq(ctx, conversationID, c.getMaxSeqKey)
|
||||
var retErr error
|
||||
var retData int64
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return -1, errs.Wrap(retErr, "GetMaxSeq redis retry too many amount")
|
||||
default:
|
||||
retData, retErr = c.getSeq(ctx, conversationID, c.getMaxSeqKey)
|
||||
if retErr != nil && errs.Unwrap(retErr) != redis.Nil {
|
||||
time.Sleep(time.Second * 2)
|
||||
continue
|
||||
}
|
||||
return retData, retErr
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *msgCache) SetMinSeq(ctx context.Context, conversationID string, minSeq int64) error {
|
||||
|
|
|
@ -357,7 +357,9 @@ func (db *commonMsgDatabase) DelUserDeleteMsgsList(ctx context.Context, conversa
|
|||
}
|
||||
|
||||
func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversationID string, msgs []*sdkws.MsgData) (seq int64, isNew bool, err error) {
|
||||
currentMaxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
|
||||
cancelCtx, cancel := context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
currentMaxSeq, err := db.cache.GetMaxSeq(cancelCtx, conversationID)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
log.ZError(ctx, "db.cache.GetMaxSeq", err)
|
||||
return 0, false, err
|
||||
|
@ -384,19 +386,21 @@ func (db *commonMsgDatabase) BatchInsertChat2Cache(ctx context.Context, conversa
|
|||
prommetrics.MsgInsertRedisFailedCounter.Add(float64(failedNum))
|
||||
log.ZError(ctx, "setMessageToCache error", err, "len", len(msgs), "conversationID", conversationID)
|
||||
} else {
|
||||
prommetrics.MsgInsertRedisSuccessCounter.Inc()
|
||||
prommetrics.MsgInsertRedisSuccessCounter.Add(float64(len(msgs)))
|
||||
}
|
||||
err = db.cache.SetMaxSeq(ctx, conversationID, currentMaxSeq)
|
||||
cancelCtx, cancel = context.WithTimeout(ctx, 1*time.Minute)
|
||||
defer cancel()
|
||||
err = db.cache.SetMaxSeq(cancelCtx, conversationID, currentMaxSeq)
|
||||
if err != nil {
|
||||
log.ZError(ctx, "db.cache.SetMaxSeq error", err, "conversationID", conversationID)
|
||||
prommetrics.SeqSetFailedCounter.Inc()
|
||||
}
|
||||
err2 := db.cache.SetHasReadSeqs(ctx, conversationID, userSeqMap)
|
||||
if err != nil {
|
||||
if err2 != nil {
|
||||
log.ZError(ctx, "SetHasReadSeqs error", err2, "userSeqMap", userSeqMap, "conversationID", conversationID)
|
||||
prommetrics.SeqSetFailedCounter.Inc()
|
||||
}
|
||||
return lastMaxSeq, isNew, utils.Wrap(err, "")
|
||||
return lastMaxSeq, isNew, errs.Wrap(err, "redis SetMaxSeq error")
|
||||
}
|
||||
|
||||
func (db *commonMsgDatabase) getMsgBySeqs(ctx context.Context, userID, conversationID string, seqs []int64) (totalMsgs []*sdkws.MsgData, err error) {
|
||||
|
@ -654,16 +658,26 @@ func (db *commonMsgDatabase) GetMsgBySeqsRange(ctx context.Context, userID strin
|
|||
|
||||
func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, conversationID string, seqs []int64) (int64, int64, []*sdkws.MsgData, error) {
|
||||
userMinSeq, err := db.cache.GetConversationUserMinSeq(ctx, conversationID, userID)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
if err != nil {
|
||||
log.ZError(ctx, "cache.GetConversationUserMinSeq error", err)
|
||||
if errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
}
|
||||
minSeq, err := db.cache.GetMinSeq(ctx, conversationID)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
if err != nil {
|
||||
log.ZError(ctx, "cache.GetMinSeq error", err)
|
||||
if errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
}
|
||||
maxSeq, err := db.cache.GetMaxSeq(ctx, conversationID)
|
||||
if err != nil && errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
if err != nil {
|
||||
log.ZError(ctx, "cache.GetMaxSeq error", err)
|
||||
if errs.Unwrap(err) != redis.Nil {
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
|
||||
}
|
||||
if userMinSeq < minSeq {
|
||||
minSeq = userMinSeq
|
||||
|
@ -676,34 +690,16 @@ func (db *commonMsgDatabase) GetMsgBySeqs(ctx context.Context, userID string, co
|
|||
}
|
||||
successMsgs, failedSeqs, err := db.cache.GetMessagesBySeq(ctx, conversationID, newSeqs)
|
||||
if err != nil {
|
||||
if err != redis.Nil {
|
||||
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
|
||||
}
|
||||
log.ZError(ctx, "get message from redis exception", err, "failedSeqs", failedSeqs, "conversationID", conversationID)
|
||||
}
|
||||
log.ZInfo(
|
||||
ctx,
|
||||
"db.cache.GetMessagesBySeq",
|
||||
"userID",
|
||||
userID,
|
||||
"conversationID",
|
||||
conversationID,
|
||||
"seqs",
|
||||
seqs,
|
||||
"successMsgs",
|
||||
len(successMsgs),
|
||||
"failedSeqs",
|
||||
failedSeqs,
|
||||
"conversationID",
|
||||
conversationID,
|
||||
)
|
||||
log.ZInfo(ctx, "db.cache.GetMessagesBySeq", "userID", userID, "conversationID", conversationID, "seqs", seqs, "successMsgs",
|
||||
len(successMsgs), "failedSeqs", failedSeqs, "conversationID", conversationID)
|
||||
|
||||
if len(failedSeqs) > 0 {
|
||||
mongoMsgs, err := db.getMsgBySeqs(ctx, userID, conversationID, failedSeqs)
|
||||
if err != nil {
|
||||
|
||||
return 0, 0, nil, err
|
||||
}
|
||||
|
||||
successMsgs = append(successMsgs, mongoMsgs...)
|
||||
}
|
||||
return minSeq, maxSeq, successMsgs, nil
|
||||
|
|
Loading…
Reference in New Issue