sub and mute rule by severity (#1621)

* sub severity

* mute by severity
This commit is contained in:
Yening Qin 2023-07-13 11:16:32 +08:00 committed by GitHub
parent f6378b055c
commit be1a3c1d8b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 294 additions and 107 deletions

View File

@ -15,6 +15,7 @@ type AnomalyPoint struct {
Value float64 `json:"value"` Value float64 `json:"value"`
Severity int `json:"severity"` Severity int `json:"severity"`
Triggered bool `json:"triggered"` Triggered bool `json:"triggered"`
Query string `json:"query"`
} }
func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint { func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint {

View File

@ -28,9 +28,10 @@ type Dispatch struct {
alerting aconf.Alerting alerting aconf.Alerting
senders map[string]sender.Sender Senders map[string]sender.Sender
tpls map[string]*template.Template tpls map[string]*template.Template
ExtraSenders map[string]sender.Sender ExtraSenders map[string]sender.Sender
BeforeSenderHook func(*models.AlertCurEvent) bool
ctx *ctx.Context ctx *ctx.Context
@ -51,9 +52,10 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
alerting: alerting, alerting: alerting,
senders: make(map[string]sender.Sender), Senders: make(map[string]sender.Sender),
tpls: make(map[string]*template.Template), tpls: make(map[string]*template.Template),
ExtraSenders: make(map[string]sender.Sender), ExtraSenders: make(map[string]sender.Sender),
BeforeSenderHook: func(*models.AlertCurEvent) bool { return true },
ctx: ctx, ctx: ctx,
} }
@ -63,7 +65,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us
func (e *Dispatch) ReloadTpls() error { func (e *Dispatch) ReloadTpls() error {
err := e.relaodTpls() err := e.relaodTpls()
if err != nil { if err != nil {
logger.Error("failed to reload tpls: %v", err) logger.Errorf("failed to reload tpls: %v", err)
} }
duration := time.Duration(9000) * time.Millisecond duration := time.Duration(9000) * time.Millisecond
@ -100,7 +102,7 @@ func (e *Dispatch) relaodTpls() error {
e.RwLock.Lock() e.RwLock.Lock()
e.tpls = tmpTpls e.tpls = tmpTpls
e.senders = senders e.Senders = senders
e.RwLock.Unlock() e.RwLock.Unlock()
return nil return nil
} }
@ -141,7 +143,7 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo
} }
// 处理事件发送,这里用一个goroutine处理一个event的所有发送事件 // 处理事件发送,这里用一个goroutine处理一个event的所有发送事件
go e.Send(rule, event, notifyTarget, isSubscribe) go e.Send(rule, event, notifyTarget)
// 如果是不是订阅规则出现的event, 则需要处理订阅规则的event // 如果是不是订阅规则出现的event, 则需要处理订阅规则的event
if !isSubscribe { if !isSubscribe {
@ -177,26 +179,41 @@ func (e *Dispatch) handleSub(sub *models.AlertSubscribe, event models.AlertCurEv
if sub.ForDuration > (event.TriggerTime - event.FirstTriggerTime) { if sub.ForDuration > (event.TriggerTime - event.FirstTriggerTime) {
return return
} }
if len(sub.SeveritiesJson) != 0 {
match := false
for _, s := range sub.SeveritiesJson {
if s == event.Severity || s == 0 {
match = true
break
}
}
if !match {
return
}
}
sub.ModifyEvent(&event) sub.ModifyEvent(&event)
LogEvent(&event, "subscribe") LogEvent(&event, "subscribe")
event.SubRuleId = sub.Id
e.HandleEventNotify(&event, true) e.HandleEventNotify(&event, true)
} }
func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, notifyTarget *NotifyTarget, isSubscribe bool) { func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, notifyTarget *NotifyTarget) {
for channel, uids := range notifyTarget.ToChannelUserMap() { needSend := e.BeforeSenderHook(event)
ctx := sender.BuildMessageContext(rule, event, uids, e.userCache) if needSend {
e.RwLock.RLock() for channel, uids := range notifyTarget.ToChannelUserMap() {
s := e.senders[channel] ctx := sender.BuildMessageContext(rule, []*models.AlertCurEvent{event}, uids, e.userCache)
e.RwLock.RUnlock() e.RwLock.RLock()
if s == nil { s := e.Senders[channel]
logger.Debugf("no sender for channel: %s", channel) e.RwLock.RUnlock()
continue if s == nil {
logger.Debugf("no sender for channel: %s", channel)
continue
}
s.Send(ctx)
} }
logger.Debugf("send event: %s, channel: %s", event.Hash, channel)
for i := 0; i < len(ctx.Users); i++ {
logger.Debug("send event to user: ", ctx.Users[i])
}
s.Send(ctx)
} }
// handle event callbacks // handle event callbacks

View File

@ -165,6 +165,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom
points := common.ConvertAnomalyPoints(value) points := common.ConvertAnomalyPoints(value)
for i := 0; i < len(points); i++ { for i := 0; i < len(points); i++ {
points[i].Severity = query.Severity points[i].Severity = query.Severity
points[i].Query = promql
} }
lst = append(lst, points...) lst = append(lst, points...)
} }

View File

@ -193,5 +193,21 @@ func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int
return false return false
} }
var matchSeverity bool
if len(mute.SeveritiesJson) > 0 {
for _, s := range mute.SeveritiesJson {
if event.Severity == s || s == 0 {
matchSeverity = true
break
}
}
} else {
matchSeverity = true
}
if !matchSeverity {
return false
}
return common.MatchTags(event.TagsMap, mute.ITags) return common.MatchTags(event.TagsMap, mute.ITags)
} }

View File

@ -167,3 +167,13 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) {
// 30秒内有心跳就认为是活的 // 30秒内有心跳就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30) return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
} }
func (n *Naming) ActiveServersByEngineName() ([]string, error) {
if !n.ctx.IsCenter {
lst, err := poster.GetByUrls[[]string](n.ctx, "/v1/n9e/servers-active?engine_name="+n.heartbeatConfig.EngineName)
return lst, err
}
// 30秒内有心跳就认为是活的
return models.AlertingEngineGetsInstances(n.ctx, "engine_cluster = ? and clock > ?", n.heartbeatConfig.EngineName, time.Now().Unix()-30)
}

View File

@ -45,6 +45,8 @@ func (e *ExternalProcessorsType) GetExternalAlertRule(datasourceId, id int64) (*
return processor, has return processor, has
} }
type HandleEventFunc func(event *models.AlertCurEvent)
type Processor struct { type Processor struct {
datasourceId int64 datasourceId int64
@ -69,7 +71,9 @@ type Processor struct {
ctx *ctx.Context ctx *ctx.Context
stats *astats.Stats stats *astats.Stats
EventMuteHook EventMuteHookFunc HandleFireEventHook HandleEventFunc
HandleRecoverEventHook HandleEventFunc
EventMuteHook EventMuteHookFunc
} }
func (p *Processor) Key() string { func (p *Processor) Key() string {
@ -103,10 +107,13 @@ func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *me
atertRuleCache: atertRuleCache, atertRuleCache: atertRuleCache,
datasourceCache: datasourceCache, datasourceCache: datasourceCache,
promClients: promClients, promClients: promClients,
ctx: ctx, ctx: ctx,
stats: stats, stats: stats,
EventMuteHook: func(event *models.AlertCurEvent) bool { return false },
HandleFireEventHook: func(event *models.AlertCurEvent) {},
HandleRecoverEventHook: func(event *models.AlertCurEvent) {},
EventMuteHook: func(event *models.AlertCurEvent) bool { return false },
} }
p.mayHandleGroup() p.mayHandleGroup()
@ -184,6 +191,8 @@ func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, no
event.RuleConfig = p.rule.RuleConfig event.RuleConfig = p.rule.RuleConfig
event.RuleConfigJson = p.rule.RuleConfigJson event.RuleConfigJson = p.rule.RuleConfigJson
event.Severity = anomalyPoint.Severity event.Severity = anomalyPoint.Severity
event.ExtraConfig = p.rule.ExtraConfigJSON
event.PromQl = anomalyPoint.Query
if from == "inner" { if from == "inner" {
event.LastEvalTime = now event.LastEvalTime = now
@ -237,6 +246,8 @@ func (p *Processor) RecoverSingle(hash string, now int64, value *string) {
cachedRule.UpdateEvent(event) cachedRule.UpdateEvent(event)
event.IsRecovered = true event.IsRecovered = true
event.LastEvalTime = now event.LastEvalTime = now
p.HandleRecoverEventHook(event)
p.pushEventToQueue(event) p.pushEventToQueue(event)
} }
@ -294,9 +305,12 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
if cachedRule == nil { if cachedRule == nil {
return return
} }
logger.Debugf("rule_eval:%s event:%+v fire", p.Key(), event) logger.Debugf("rule_eval:%s event:%+v fire", p.Key(), event)
if fired, has := p.fires.Get(event.Hash); has { if fired, has := p.fires.Get(event.Hash); has {
p.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime) p.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime)
event.FirstTriggerTime = fired.FirstTriggerTime
p.HandleFireEventHook(event)
if cachedRule.NotifyRepeatStep == 0 { if cachedRule.NotifyRepeatStep == 0 {
logger.Debugf("rule_eval:%s event:%+v repeat is zero nothing to do", p.Key(), event) logger.Debugf("rule_eval:%s event:%+v repeat is zero nothing to do", p.Key(), event)
@ -310,7 +324,6 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
if cachedRule.NotifyMaxNumber == 0 { if cachedRule.NotifyMaxNumber == 0 {
// 最大可以发送次数如果是0表示不想限制最大发送次数一直发即可 // 最大可以发送次数如果是0表示不想限制最大发送次数一直发即可
event.NotifyCurNumber = fired.NotifyCurNumber + 1 event.NotifyCurNumber = fired.NotifyCurNumber + 1
event.FirstTriggerTime = fired.FirstTriggerTime
p.pushEventToQueue(event) p.pushEventToQueue(event)
} else { } else {
// 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数 // 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数
@ -319,7 +332,6 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
return return
} else { } else {
event.NotifyCurNumber = fired.NotifyCurNumber + 1 event.NotifyCurNumber = fired.NotifyCurNumber + 1
event.FirstTriggerTime = fired.FirstTriggerTime
p.pushEventToQueue(event) p.pushEventToQueue(event)
} }
} }
@ -327,6 +339,7 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) {
} else { } else {
event.NotifyCurNumber = 1 event.NotifyCurNumber = 1
event.FirstTriggerTime = event.TriggerTime event.FirstTriggerTime = event.TriggerTime
p.HandleFireEventHook(event)
p.pushEventToQueue(event) p.pushEventToQueue(event)
} }
} }
@ -442,7 +455,7 @@ func labelMapToArr(m map[string]string) []string {
} }
func Hash(ruleId, datasourceId int64, vector common.AnomalyPoint) string { func Hash(ruleId, datasourceId int64, vector common.AnomalyPoint) string {
return str.MD5(fmt.Sprintf("%d_%s_%d_%d", ruleId, vector.Labels.String(), datasourceId, vector.Severity)) return str.MD5(fmt.Sprintf("%d_%s_%d_%d_%s", ruleId, vector.Labels.String(), datasourceId, vector.Severity, vector.Query))
} }
func TagHash(vector common.AnomalyPoint) string { func TagHash(vector common.AnomalyPoint) string {

View File

@ -32,7 +32,7 @@ type DingtalkSender struct {
} }
func (ds *DingtalkSender) Send(ctx MessageContext) { func (ds *DingtalkSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return return
} }
@ -40,7 +40,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) {
if len(urls) == 0 { if len(urls) == 0 {
return return
} }
message := BuildTplMessage(ds.tpl, ctx.Event) message := BuildTplMessage(ds.tpl, ctx.Events)
for _, url := range urls { for _, url := range urls {
var body dingtalk var body dingtalk
@ -49,7 +49,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) {
body = dingtalk{ body = dingtalk{
Msgtype: "markdown", Msgtype: "markdown",
Markdown: dingtalkMarkdown{ Markdown: dingtalkMarkdown{
Title: ctx.Event.RuleName, Title: ctx.Events[0].RuleName,
Text: message, Text: message,
}, },
} }
@ -57,7 +57,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) {
body = dingtalk{ body = dingtalk{
Msgtype: "markdown", Msgtype: "markdown",
Markdown: dingtalkMarkdown{ Markdown: dingtalkMarkdown{
Title: ctx.Event.RuleName, Title: ctx.Events[0].RuleName,
Text: message + "\n" + strings.Join(ats, " "), Text: message + "\n" + strings.Join(ats, " "),
}, },
At: dingtalkAt{ At: dingtalkAt{

View File

@ -22,18 +22,18 @@ type EmailSender struct {
} }
func (es *EmailSender) Send(ctx MessageContext) { func (es *EmailSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return return
} }
tos := extract(ctx.Users) tos := extract(ctx.Users)
var subject string var subject string
if es.subjectTpl != nil { if es.subjectTpl != nil {
subject = BuildTplMessage(es.subjectTpl, ctx.Event) subject = BuildTplMessage(es.subjectTpl, []*models.AlertCurEvent{ctx.Events[0]})
} else { } else {
subject = ctx.Event.RuleName subject = ctx.Events[0].RuleName
} }
content := BuildTplMessage(es.contentTpl, ctx.Event) content := BuildTplMessage(es.contentTpl, ctx.Events)
es.WriteEmail(subject, content, tos) es.WriteEmail(subject, content, tos)
} }

View File

@ -31,11 +31,11 @@ type FeishuSender struct {
} }
func (fs *FeishuSender) Send(ctx MessageContext) { func (fs *FeishuSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return return
} }
urls, ats := fs.extract(ctx.Users) urls, ats := fs.extract(ctx.Users)
message := BuildTplMessage(fs.tpl, ctx.Event) message := BuildTplMessage(fs.tpl, ctx.Events)
for _, url := range urls { for _, url := range urls {
body := feishu{ body := feishu{
Msgtype: "text", Msgtype: "text",

View File

@ -96,11 +96,11 @@ var (
) )
func (fs *FeishuCardSender) Send(ctx MessageContext) { func (fs *FeishuCardSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return return
} }
urls, _ := fs.extract(ctx.Users) urls, _ := fs.extract(ctx.Users)
message := BuildTplMessage(fs.tpl, ctx.Event) message := BuildTplMessage(fs.tpl, ctx.Events)
color := "red" color := "red"
lowerUnicode := strings.ToLower(message) lowerUnicode := strings.ToLower(message)
if strings.Count(lowerUnicode, Recovered) > 0 && strings.Count(lowerUnicode, Triggered) > 0 { if strings.Count(lowerUnicode, Recovered) > 0 && strings.Count(lowerUnicode, Triggered) > 0 {
@ -109,7 +109,7 @@ func (fs *FeishuCardSender) Send(ctx MessageContext) {
color = "green" color = "green"
} }
SendTitle := fmt.Sprintf("🔔 %s", ctx.Event.RuleName) SendTitle := fmt.Sprintf("🔔 %s", ctx.Events[0].RuleName)
body.Card.Header.Title.Content = SendTitle body.Card.Header.Title.Content = SendTitle
body.Card.Header.Template = color body.Card.Header.Template = color
body.Card.Elements[0].Text.Content = message body.Card.Elements[0].Text.Content = message

View File

@ -28,7 +28,7 @@ type MmSender struct {
} }
func (ms *MmSender) Send(ctx MessageContext) { func (ms *MmSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return return
} }
@ -36,7 +36,7 @@ func (ms *MmSender) Send(ctx MessageContext) {
if len(urls) == 0 { if len(urls) == 0 {
return return
} }
message := BuildTplMessage(ms.tpl, ctx.Event) message := BuildTplMessage(ms.tpl, ctx.Events)
SendMM(MatterMostMessage{ SendMM(MatterMostMessage{
Text: message, Text: message,

View File

@ -17,9 +17,9 @@ type (
// MessageContext 一个event所生成的告警通知的上下文 // MessageContext 一个event所生成的告警通知的上下文
MessageContext struct { MessageContext struct {
Users []*models.User Users []*models.User
Rule *models.AlertRule Rule *models.AlertRule
Event *models.AlertCurEvent Events []*models.AlertCurEvent
} }
) )
@ -43,23 +43,32 @@ func NewSender(key string, tpls map[string]*template.Template, smtp aconf.SMTPCo
return nil return nil
} }
func BuildMessageContext(rule *models.AlertRule, event *models.AlertCurEvent, uids []int64, userCache *memsto.UserCacheType) MessageContext { func BuildMessageContext(rule *models.AlertRule, events []*models.AlertCurEvent, uids []int64, userCache *memsto.UserCacheType) MessageContext {
users := userCache.GetByUserIds(uids) users := userCache.GetByUserIds(uids)
return MessageContext{ return MessageContext{
Rule: rule, Rule: rule,
Event: event, Events: events,
Users: users, Users: users,
} }
} }
func BuildTplMessage(tpl *template.Template, event *models.AlertCurEvent) string { type BuildTplMessageFunc func(tpl *template.Template, events []*models.AlertCurEvent) string
var BuildTplMessage BuildTplMessageFunc = buildTplMessage
func buildTplMessage(tpl *template.Template, events []*models.AlertCurEvent) string {
if tpl == nil { if tpl == nil {
return "tpl for current sender not found, please check configuration" return "tpl for current sender not found, please check configuration"
} }
var body bytes.Buffer var content string
if err := tpl.Execute(&body, event); err != nil { for _, event := range events {
return err.Error() var body bytes.Buffer
if err := tpl.Execute(&body, event); err != nil {
return err.Error()
}
content += body.String() + "\n\n"
} }
return body.String()
return content
} }

View File

@ -26,11 +26,11 @@ type TelegramSender struct {
} }
func (ts *TelegramSender) Send(ctx MessageContext) { func (ts *TelegramSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return return
} }
tokens := ts.extract(ctx.Users) tokens := ts.extract(ctx.Users)
message := BuildTplMessage(ts.tpl, ctx.Event) message := BuildTplMessage(ts.tpl, ctx.Events)
SendTelegram(TelegramMessage{ SendTelegram(TelegramMessage{
Text: message, Text: message,

View File

@ -25,11 +25,11 @@ type WecomSender struct {
} }
func (ws *WecomSender) Send(ctx MessageContext) { func (ws *WecomSender) Send(ctx MessageContext) {
if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { if len(ctx.Users) == 0 || len(ctx.Events) == 0 {
return return
} }
urls := ws.extract(ctx.Users) urls := ws.extract(ctx.Users)
message := BuildTplMessage(ws.tpl, ctx.Event) message := BuildTplMessage(ws.tpl, ctx.Events)
for _, url := range urls { for _, url := range urls {
body := wecom{ body := wecom{
Msgtype: "markdown", Msgtype: "markdown",

View File

@ -99,6 +99,8 @@ func (rt *Router) alertSubscribePut(c *gin.Context) {
"webhooks", "webhooks",
"for_duration", "for_duration",
"redefine_webhooks", "redefine_webhooks",
"severities",
"extra_config",
)) ))
} }

View File

@ -28,6 +28,12 @@ func (rt *Router) serverHeartbeat(c *gin.Context) {
func (rt *Router) serversActive(c *gin.Context) { func (rt *Router) serversActive(c *gin.Context) {
datasourceId := ginx.QueryInt64(c, "dsid") datasourceId := ginx.QueryInt64(c, "dsid")
engineName := ginx.QueryStr(c, "engine_name", "")
if engineName != "" {
servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "engine_cluster = ? and clock > ?", engineName, time.Now().Unix()-30)
ginx.NewRender(c).Data(servers, err)
return
}
servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30) servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30)
ginx.NewRender(c).Data(servers, err) ginx.NewRender(c).Data(servers, err)

View File

@ -281,6 +281,7 @@ CREATE TABLE `alert_rule` (
`runbook_url` varchar(255), `runbook_url` varchar(255),
`append_tags` varchar(255) not null default '' comment 'split by space: service=n9e mod=api', `append_tags` varchar(255) not null default '' comment 'split by space: service=n9e mod=api',
`annotations` text not null comment 'annotations', `annotations` text not null comment 'annotations',
`extra_config` text not null comment 'extra_config',
`create_at` bigint not null default 0, `create_at` bigint not null default 0,
`create_by` varchar(64) not null default '', `create_by` varchar(64) not null default '',
`update_at` bigint not null default 0, `update_at` bigint not null default 0,
@ -305,6 +306,7 @@ CREATE TABLE `alert_mute` (
`disabled` tinyint(1) not null default 0 comment '0:enabled 1:disabled', `disabled` tinyint(1) not null default 0 comment '0:enabled 1:disabled',
`mute_time_type` tinyint(1) not null default 0, `mute_time_type` tinyint(1) not null default 0,
`periodic_mutes` varchar(4096) not null default '', `periodic_mutes` varchar(4096) not null default '',
`severities` varchar(32) not null default '',
`create_at` bigint not null default 0, `create_at` bigint not null default 0,
`create_by` varchar(64) not null default '', `create_by` varchar(64) not null default '',
`update_at` bigint not null default 0, `update_at` bigint not null default 0,
@ -324,6 +326,7 @@ CREATE TABLE `alert_subscribe` (
`datasource_ids` varchar(255) not null default '' comment 'datasource ids', `datasource_ids` varchar(255) not null default '' comment 'datasource ids',
`cluster` varchar(128) not null, `cluster` varchar(128) not null,
`rule_id` bigint not null default 0, `rule_id` bigint not null default 0,
`severities` varchar(32) not null default '',
`tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value', `tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value',
`redefine_severity` tinyint(1) default 0 comment 'is redefine severity?', `redefine_severity` tinyint(1) default 0 comment 'is redefine severity?',
`new_severity` tinyint(1) not null comment '0:Emergency 1:Warning 2:Notice', `new_severity` tinyint(1) not null comment '0:Emergency 1:Warning 2:Notice',
@ -331,6 +334,7 @@ CREATE TABLE `alert_subscribe` (
`new_channels` varchar(255) not null default '' comment 'split by space: sms voice email dingtalk wecom', `new_channels` varchar(255) not null default '' comment 'split by space: sms voice email dingtalk wecom',
`user_group_ids` varchar(250) not null comment 'split by space 1 34 5, notify cc to user_group_ids', `user_group_ids` varchar(250) not null comment 'split by space 1 34 5, notify cc to user_group_ids',
`webhooks` text not null, `webhooks` text not null,
`extra_config` text not null comment 'extra_config',
`redefine_webhooks` tinyint(1) default 0, `redefine_webhooks` tinyint(1) default 0,
`for_duration` bigint not null default 0, `for_duration` bigint not null default 0,
`create_at` bigint not null default 0, `create_at` bigint not null default 0,

View File

@ -69,6 +69,16 @@ func (c *AlertSubscribeCacheType) Get(ruleId int64) ([]*models.AlertSubscribe, b
return lst, has return lst, has
} }
func (c *AlertSubscribeCacheType) GetAll() []*models.AlertSubscribe {
c.RLock()
defer c.RUnlock()
var ret []*models.AlertSubscribe
for _, v := range c.subs {
ret = append(ret, v...)
}
return ret
}
func (c *AlertSubscribeCacheType) GetStructs(ruleId int64) []models.AlertSubscribe { func (c *AlertSubscribeCacheType) GetStructs(ruleId int64) []models.AlertSubscribe {
c.RLock() c.RLock()
defer c.RUnlock() defer c.RUnlock()

View File

@ -16,48 +16,53 @@ import (
) )
type AlertCurEvent struct { type AlertCurEvent struct {
Id int64 `json:"id" gorm:"primaryKey"` Id int64 `json:"id" gorm:"primaryKey"`
Cate string `json:"cate"` Cate string `json:"cate"`
Cluster string `json:"cluster"` Cluster string `json:"cluster"`
DatasourceId int64 `json:"datasource_id"` DatasourceId int64 `json:"datasource_id"`
GroupId int64 `json:"group_id"` // busi group id GroupId int64 `json:"group_id"` // busi group id
GroupName string `json:"group_name"` // busi group name GroupName string `json:"group_name"` // busi group name
Hash string `json:"hash"` // rule_id + vector_key Hash string `json:"hash"` // rule_id + vector_key
RuleId int64 `json:"rule_id"` RuleId int64 `json:"rule_id"`
RuleName string `json:"rule_name"` RuleName string `json:"rule_name"`
RuleNote string `json:"rule_note"` RuleNote string `json:"rule_note"`
RuleProd string `json:"rule_prod"` RuleProd string `json:"rule_prod"`
RuleAlgo string `json:"rule_algo"` RuleAlgo string `json:"rule_algo"`
Severity int `json:"severity"` Severity int `json:"severity"`
PromForDuration int `json:"prom_for_duration"` PromForDuration int `json:"prom_for_duration"`
PromQl string `json:"prom_ql"` PromQl string `json:"prom_ql"`
RuleConfig string `json:"-" gorm:"rule_config"` // rule config RuleConfig string `json:"-" gorm:"rule_config"` // rule config
RuleConfigJson interface{} `json:"rule_config" gorm:"-"` // rule config for fe RuleConfigJson interface{} `json:"rule_config" gorm:"-"` // rule config for fe
PromEvalInterval int `json:"prom_eval_interval"` PromEvalInterval int `json:"prom_eval_interval"`
Callbacks string `json:"-"` // for db Callbacks string `json:"-"` // for db
CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe
RunbookUrl string `json:"runbook_url"` RunbookUrl string `json:"runbook_url"`
NotifyRecovered int `json:"notify_recovered"` NotifyRecovered int `json:"notify_recovered"`
NotifyChannels string `json:"-"` // for db NotifyChannels string `json:"-"` // for db
NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe
NotifyGroups string `json:"-"` // for db NotifyGroups string `json:"-"` // for db
NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe
NotifyGroupsObj []*UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe NotifyGroupsObj []*UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe
TargetIdent string `json:"target_ident"` TargetIdent string `json:"target_ident"`
TargetNote string `json:"target_note"` TargetNote string `json:"target_note"`
TriggerTime int64 `json:"trigger_time"` TriggerTime int64 `json:"trigger_time"`
TriggerValue string `json:"trigger_value"` TriggerValue string `json:"trigger_value"`
Tags string `json:"-"` // for db Tags string `json:"-"` // for db
TagsJSON []string `json:"tags" gorm:"-"` // for fe TagsJSON []string `json:"tags" gorm:"-"` // for fe
TagsMap map[string]string `json:"-" gorm:"-"` // for internal usage TagsMap map[string]string `json:"-" gorm:"-"` // for internal usage
Annotations string `json:"-"` // Annotations string `json:"-"` //
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py
NotifyUsersObj []*User `json:"notify_users_obj" gorm:"-"` // for notify.py NotifyUsersObj []*User `json:"notify_users_obj" gorm:"-"` // for notify.py
LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间 LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间
LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间 LastEscalationNotifyTime int64 `json:"last_escalation_notify_time" gorm:"-"`
NotifyCurNumber int `json:"notify_cur_number"` // notify: current number LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间
FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间 NotifyCurNumber int `json:"notify_cur_number"` // notify: current number
FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间
ExtraConfig interface{} `json:"extra_config" gorm:"-"`
Status int `json:"status" gorm:"-"`
Claimant string `json:"claimant" gorm:"-"`
SubRuleId int64 `json:"sub_rule_id" gorm:"-"`
} }
func (e *AlertCurEvent) TableName() string { func (e *AlertCurEvent) TableName() string {

View File

@ -52,6 +52,7 @@ type AlertHisEvent struct {
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
NotifyCurNumber int `json:"notify_cur_number"` // notify: current number NotifyCurNumber int `json:"notify_cur_number"` // notify: current number
FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间 FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间
ExtraConfig interface{} `json:"extra_config" gorm:"-"`
} }
func (e *AlertHisEvent) TableName() string { func (e *AlertHisEvent) TableName() string {

View File

@ -48,6 +48,8 @@ type AlertMute struct {
MuteTimeType int `json:"mute_time_type"` // 0: mute by time range, 1: mute by periodic time MuteTimeType int `json:"mute_time_type"` // 0: mute by time range, 1: mute by periodic time
PeriodicMutes string `json:"-" gorm:"periodic_mutes"` PeriodicMutes string `json:"-" gorm:"periodic_mutes"`
PeriodicMutesJson []PeriodicMute `json:"periodic_mutes" gorm:"-"` PeriodicMutesJson []PeriodicMute `json:"periodic_mutes" gorm:"-"`
Severities string `json:"-" gorm:"severities"`
SeveritiesJson []int `json:"severities" gorm:"-"`
} }
type PeriodicMute struct { type PeriodicMute struct {
@ -208,12 +210,31 @@ func (m *AlertMute) FE2DB() error {
} }
m.PeriodicMutes = string(periodicMutesBytes) m.PeriodicMutes = string(periodicMutesBytes)
if len(m.SeveritiesJson) > 0 {
severtiesBytes, err := json.Marshal(m.SeveritiesJson)
if err != nil {
return err
}
m.Severities = string(severtiesBytes)
}
return nil return nil
} }
func (m *AlertMute) DB2FE() error { func (m *AlertMute) DB2FE() error {
json.Unmarshal([]byte(m.DatasourceIds), &m.DatasourceIdsJson) json.Unmarshal([]byte(m.DatasourceIds), &m.DatasourceIdsJson)
err := json.Unmarshal([]byte(m.PeriodicMutes), &m.PeriodicMutesJson) err := json.Unmarshal([]byte(m.PeriodicMutes), &m.PeriodicMutesJson)
if err != nil {
return err
}
if m.Severities != "" {
err = json.Unmarshal([]byte(m.Severities), &m.SeveritiesJson)
if err != nil {
return err
}
}
return err return err
} }

View File

@ -70,6 +70,8 @@ type AlertRule struct {
AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe
Annotations string `json:"-"` // Annotations string `json:"-"` //
AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe
ExtraConfig string `json:"-" gorm:"extra_config"` // extra config
ExtraConfigJSON interface{} `json:"extra_config" gorm:"-"` // for fe
CreateAt int64 `json:"create_at"` CreateAt int64 `json:"create_at"`
CreateBy string `json:"create_by"` CreateBy string `json:"create_by"`
UpdateAt int64 `json:"update_at"` UpdateAt int64 `json:"update_at"`
@ -560,6 +562,14 @@ func (ar *AlertRule) FE2DB() error {
ar.Annotations = string(b) ar.Annotations = string(b)
} }
if ar.ExtraConfigJSON != nil {
b, err := json.Marshal(ar.ExtraConfigJSON)
if err != nil {
return fmt.Errorf("marshal extra_config err:%v", err)
}
ar.ExtraConfig = string(b)
}
return nil return nil
} }
@ -586,6 +596,7 @@ func (ar *AlertRule) DB2FE() error {
json.Unmarshal([]byte(ar.AlgoParams), &ar.AlgoParamsJson) json.Unmarshal([]byte(ar.AlgoParams), &ar.AlgoParamsJson)
json.Unmarshal([]byte(ar.RuleConfig), &ar.RuleConfigJson) json.Unmarshal([]byte(ar.RuleConfig), &ar.RuleConfigJson)
json.Unmarshal([]byte(ar.Annotations), &ar.AnnotationsJSON) json.Unmarshal([]byte(ar.Annotations), &ar.AnnotationsJSON)
json.Unmarshal([]byte(ar.ExtraConfig), &ar.ExtraConfigJSON)
err := ar.FillDatasourceIds() err := ar.FillDatasourceIds()
return err return err

View File

@ -25,8 +25,10 @@ type AlertSubscribe struct {
DatasourceIdsJson []int64 `json:"datasource_ids" gorm:"-"` // for fe DatasourceIdsJson []int64 `json:"datasource_ids" gorm:"-"` // for fe
Cluster string `json:"cluster"` // take effect by clusters, seperated by space Cluster string `json:"cluster"` // take effect by clusters, seperated by space
RuleId int64 `json:"rule_id"` RuleId int64 `json:"rule_id"`
ForDuration int64 `json:"for_duration"` // for duration, unit: second Severities string `json:"-" gorm:"severities"` // sub severity
RuleName string `json:"rule_name" gorm:"-"` // for fe SeveritiesJson []int `json:"severities" gorm:"-"` // for fe
ForDuration int64 `json:"for_duration"` // for duration, unit: second
RuleName string `json:"rule_name" gorm:"-"` // for fe
Tags ormx.JSONArr `json:"tags"` Tags ormx.JSONArr `json:"tags"`
RedefineSeverity int `json:"redefine_severity"` RedefineSeverity int `json:"redefine_severity"`
NewSeverity int `json:"new_severity"` NewSeverity int `json:"new_severity"`
@ -37,6 +39,8 @@ type AlertSubscribe struct {
RedefineWebhooks int `json:"redefine_webhooks"` RedefineWebhooks int `json:"redefine_webhooks"`
Webhooks string `json:"-" gorm:"webhooks"` Webhooks string `json:"-" gorm:"webhooks"`
WebhooksJson []string `json:"webhooks" gorm:"-"` WebhooksJson []string `json:"webhooks" gorm:"-"`
ExtraConfig string `json:"-" grom:"extra_config"`
ExtraConfigJson interface{} `json:"extra_config" gorm:"-"` // for fe
CreateBy string `json:"create_by"` CreateBy string `json:"create_by"`
CreateAt int64 `json:"create_at"` CreateAt int64 `json:"create_at"`
UpdateBy string `json:"update_by"` UpdateBy string `json:"update_by"`
@ -118,6 +122,14 @@ func (s *AlertSubscribe) FE2DB() error {
s.Webhooks = string(b) s.Webhooks = string(b)
} }
b, _ := json.Marshal(s.ExtraConfigJson)
s.ExtraConfig = string(b)
if len(s.SeveritiesJson) > 0 {
b, _ := json.Marshal(s.SeveritiesJson)
s.Severities = string(b)
}
return nil return nil
} }
@ -133,6 +145,19 @@ func (s *AlertSubscribe) DB2FE() error {
return err return err
} }
} }
if s.ExtraConfig != "" {
if err := json.Unmarshal([]byte(s.ExtraConfig), &s.ExtraConfigJson); err != nil {
return err
}
}
if s.Severities != "" {
if err := json.Unmarshal([]byte(s.Severities), &s.SeveritiesJson); err != nil {
return err
}
}
return nil return nil
} }

View File

@ -19,5 +19,36 @@ func MigrateRecordingTable(db *gorm.DB) error {
logger.Errorf("failed to migrate recording rule table: %v", err) logger.Errorf("failed to migrate recording rule table: %v", err)
return err return err
} }
err = db.AutoMigrate(&AlertRule{})
if err != nil {
logger.Errorf("failed to migrate recording rule table: %v", err)
return err
}
err = db.AutoMigrate(&AlertSubscribe{})
if err != nil {
logger.Errorf("failed to migrate recording rule table: %v", err)
return err
}
err = db.AutoMigrate(&AlertMute{})
if err != nil {
logger.Errorf("failed to migrate recording rule table: %v", err)
return err
}
return nil return nil
} }
type AlertRule struct {
ExtraConfig string `gorm:"type:text;not null;column:extra_config"` // extra config
}
type AlertSubscribe struct {
ExtraConfig string `gorm:"type:text;not null;column:extra_config"` // extra config
Severities string `gorm:"column:severities;type:varchar(32);not null;default:''"`
}
type AlertMute struct {
Severities string `gorm:"column:severities;type:varchar(32);not null;default:''"`
}

View File

@ -106,7 +106,7 @@ func (rt *Router) remoteWrite(c *gin.Context) {
} }
rt.EnrichLabels(req.Timeseries[i]) rt.EnrichLabels(req.Timeseries[i])
rt.debugSample(c.Request.RemoteAddr, req.Timeseries[i]) rt.debugSample(c.ClientIP(), req.Timeseries[i])
if len(ident) > 0 { if len(ident) > 0 {
// use ident as hash key, cause "out of bounds" problem // use ident as hash key, cause "out of bounds" problem

View File

@ -34,6 +34,10 @@ type Redis interface {
HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd
HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd
LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd
RPop(ctx context.Context, key string) *redis.StringCmd
RPopCount(ctx context.Context, key string, count int) *redis.StringSliceCmd
LLen(ctx context.Context, key string) *redis.IntCmd
Close() error Close() error
Ping(ctx context.Context) *redis.StatusCmd Ping(ctx context.Context) *redis.StatusCmd
Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd