diff --git a/alert/common/conv.go b/alert/common/conv.go index 6efac8ef..a87efc94 100644 --- a/alert/common/conv.go +++ b/alert/common/conv.go @@ -15,6 +15,7 @@ type AnomalyPoint struct { Value float64 `json:"value"` Severity int `json:"severity"` Triggered bool `json:"triggered"` + Query string `json:"query"` } func NewAnomalyPoint(key string, labels map[string]string, ts int64, value float64, severity int) AnomalyPoint { diff --git a/alert/dispatch/dispatch.go b/alert/dispatch/dispatch.go index aca6712a..dae95447 100644 --- a/alert/dispatch/dispatch.go +++ b/alert/dispatch/dispatch.go @@ -28,9 +28,10 @@ type Dispatch struct { alerting aconf.Alerting - senders map[string]sender.Sender - tpls map[string]*template.Template - ExtraSenders map[string]sender.Sender + Senders map[string]sender.Sender + tpls map[string]*template.Template + ExtraSenders map[string]sender.Sender + BeforeSenderHook func(*models.AlertCurEvent) bool ctx *ctx.Context @@ -51,9 +52,10 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us alerting: alerting, - senders: make(map[string]sender.Sender), - tpls: make(map[string]*template.Template), - ExtraSenders: make(map[string]sender.Sender), + Senders: make(map[string]sender.Sender), + tpls: make(map[string]*template.Template), + ExtraSenders: make(map[string]sender.Sender), + BeforeSenderHook: func(*models.AlertCurEvent) bool { return true }, ctx: ctx, } @@ -63,7 +65,7 @@ func NewDispatch(alertRuleCache *memsto.AlertRuleCacheType, userCache *memsto.Us func (e *Dispatch) ReloadTpls() error { err := e.relaodTpls() if err != nil { - logger.Error("failed to reload tpls: %v", err) + logger.Errorf("failed to reload tpls: %v", err) } duration := time.Duration(9000) * time.Millisecond @@ -100,7 +102,7 @@ func (e *Dispatch) relaodTpls() error { e.RwLock.Lock() e.tpls = tmpTpls - e.senders = senders + e.Senders = senders e.RwLock.Unlock() return nil } @@ -141,7 +143,7 @@ func (e *Dispatch) HandleEventNotify(event *models.AlertCurEvent, isSubscribe bo } // 处理事件发送,这里用一个goroutine处理一个event的所有发送事件 - go e.Send(rule, event, notifyTarget, isSubscribe) + go e.Send(rule, event, notifyTarget) // 如果是不是订阅规则出现的event, 则需要处理订阅规则的event if !isSubscribe { @@ -177,26 +179,41 @@ func (e *Dispatch) handleSub(sub *models.AlertSubscribe, event models.AlertCurEv if sub.ForDuration > (event.TriggerTime - event.FirstTriggerTime) { return } + + if len(sub.SeveritiesJson) != 0 { + match := false + for _, s := range sub.SeveritiesJson { + if s == event.Severity || s == 0 { + match = true + break + } + } + if !match { + return + } + } + sub.ModifyEvent(&event) LogEvent(&event, "subscribe") + + event.SubRuleId = sub.Id e.HandleEventNotify(&event, true) } -func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, notifyTarget *NotifyTarget, isSubscribe bool) { - for channel, uids := range notifyTarget.ToChannelUserMap() { - ctx := sender.BuildMessageContext(rule, event, uids, e.userCache) - e.RwLock.RLock() - s := e.senders[channel] - e.RwLock.RUnlock() - if s == nil { - logger.Debugf("no sender for channel: %s", channel) - continue +func (e *Dispatch) Send(rule *models.AlertRule, event *models.AlertCurEvent, notifyTarget *NotifyTarget) { + needSend := e.BeforeSenderHook(event) + if needSend { + for channel, uids := range notifyTarget.ToChannelUserMap() { + ctx := sender.BuildMessageContext(rule, []*models.AlertCurEvent{event}, uids, e.userCache) + e.RwLock.RLock() + s := e.Senders[channel] + e.RwLock.RUnlock() + if s == nil { + logger.Debugf("no sender for channel: %s", channel) + continue + } + s.Send(ctx) } - logger.Debugf("send event: %s, channel: %s", event.Hash, channel) - for i := 0; i < len(ctx.Users); i++ { - logger.Debug("send event to user: ", ctx.Users[i]) - } - s.Send(ctx) } // handle event callbacks diff --git a/alert/eval/eval.go b/alert/eval/eval.go index eb85991e..0372f6a3 100644 --- a/alert/eval/eval.go +++ b/alert/eval/eval.go @@ -165,6 +165,7 @@ func (arw *AlertRuleWorker) GetPromAnomalyPoint(ruleConfig string) []common.Anom points := common.ConvertAnomalyPoints(value) for i := 0; i < len(points); i++ { points[i].Severity = query.Severity + points[i].Query = promql } lst = append(lst, points...) } diff --git a/alert/mute/mute.go b/alert/mute/mute.go index e2181069..00132bd4 100644 --- a/alert/mute/mute.go +++ b/alert/mute/mute.go @@ -193,5 +193,21 @@ func matchMute(event *models.AlertCurEvent, mute *models.AlertMute, clock ...int return false } + var matchSeverity bool + if len(mute.SeveritiesJson) > 0 { + for _, s := range mute.SeveritiesJson { + if event.Severity == s || s == 0 { + matchSeverity = true + break + } + } + } else { + matchSeverity = true + } + + if !matchSeverity { + return false + } + return common.MatchTags(event.TagsMap, mute.ITags) } diff --git a/alert/naming/heartbeat.go b/alert/naming/heartbeat.go index a2a16ca3..07874061 100644 --- a/alert/naming/heartbeat.go +++ b/alert/naming/heartbeat.go @@ -167,3 +167,13 @@ func (n *Naming) ActiveServers(datasourceId int64) ([]string, error) { // 30秒内有心跳,就认为是活的 return models.AlertingEngineGetsInstances(n.ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30) } + +func (n *Naming) ActiveServersByEngineName() ([]string, error) { + if !n.ctx.IsCenter { + lst, err := poster.GetByUrls[[]string](n.ctx, "/v1/n9e/servers-active?engine_name="+n.heartbeatConfig.EngineName) + return lst, err + } + + // 30秒内有心跳,就认为是活的 + return models.AlertingEngineGetsInstances(n.ctx, "engine_cluster = ? and clock > ?", n.heartbeatConfig.EngineName, time.Now().Unix()-30) +} diff --git a/alert/process/process.go b/alert/process/process.go index 49b9e18f..71987a9d 100644 --- a/alert/process/process.go +++ b/alert/process/process.go @@ -45,6 +45,8 @@ func (e *ExternalProcessorsType) GetExternalAlertRule(datasourceId, id int64) (* return processor, has } +type HandleEventFunc func(event *models.AlertCurEvent) + type Processor struct { datasourceId int64 @@ -69,7 +71,9 @@ type Processor struct { ctx *ctx.Context stats *astats.Stats - EventMuteHook EventMuteHookFunc + HandleFireEventHook HandleEventFunc + HandleRecoverEventHook HandleEventFunc + EventMuteHook EventMuteHookFunc } func (p *Processor) Key() string { @@ -103,10 +107,13 @@ func NewProcessor(rule *models.AlertRule, datasourceId int64, atertRuleCache *me atertRuleCache: atertRuleCache, datasourceCache: datasourceCache, - promClients: promClients, - ctx: ctx, - stats: stats, - EventMuteHook: func(event *models.AlertCurEvent) bool { return false }, + promClients: promClients, + ctx: ctx, + stats: stats, + + HandleFireEventHook: func(event *models.AlertCurEvent) {}, + HandleRecoverEventHook: func(event *models.AlertCurEvent) {}, + EventMuteHook: func(event *models.AlertCurEvent) bool { return false }, } p.mayHandleGroup() @@ -184,6 +191,8 @@ func (p *Processor) BuildEvent(anomalyPoint common.AnomalyPoint, from string, no event.RuleConfig = p.rule.RuleConfig event.RuleConfigJson = p.rule.RuleConfigJson event.Severity = anomalyPoint.Severity + event.ExtraConfig = p.rule.ExtraConfigJSON + event.PromQl = anomalyPoint.Query if from == "inner" { event.LastEvalTime = now @@ -237,6 +246,8 @@ func (p *Processor) RecoverSingle(hash string, now int64, value *string) { cachedRule.UpdateEvent(event) event.IsRecovered = true event.LastEvalTime = now + + p.HandleRecoverEventHook(event) p.pushEventToQueue(event) } @@ -294,9 +305,12 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) { if cachedRule == nil { return } + logger.Debugf("rule_eval:%s event:%+v fire", p.Key(), event) if fired, has := p.fires.Get(event.Hash); has { p.fires.UpdateLastEvalTime(event.Hash, event.LastEvalTime) + event.FirstTriggerTime = fired.FirstTriggerTime + p.HandleFireEventHook(event) if cachedRule.NotifyRepeatStep == 0 { logger.Debugf("rule_eval:%s event:%+v repeat is zero nothing to do", p.Key(), event) @@ -310,7 +324,6 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) { if cachedRule.NotifyMaxNumber == 0 { // 最大可以发送次数如果是0,表示不想限制最大发送次数,一直发即可 event.NotifyCurNumber = fired.NotifyCurNumber + 1 - event.FirstTriggerTime = fired.FirstTriggerTime p.pushEventToQueue(event) } else { // 有最大发送次数的限制,就要看已经发了几次了,是否达到了最大发送次数 @@ -319,7 +332,6 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) { return } else { event.NotifyCurNumber = fired.NotifyCurNumber + 1 - event.FirstTriggerTime = fired.FirstTriggerTime p.pushEventToQueue(event) } } @@ -327,6 +339,7 @@ func (p *Processor) fireEvent(event *models.AlertCurEvent) { } else { event.NotifyCurNumber = 1 event.FirstTriggerTime = event.TriggerTime + p.HandleFireEventHook(event) p.pushEventToQueue(event) } } @@ -442,7 +455,7 @@ func labelMapToArr(m map[string]string) []string { } func Hash(ruleId, datasourceId int64, vector common.AnomalyPoint) string { - return str.MD5(fmt.Sprintf("%d_%s_%d_%d", ruleId, vector.Labels.String(), datasourceId, vector.Severity)) + return str.MD5(fmt.Sprintf("%d_%s_%d_%d_%s", ruleId, vector.Labels.String(), datasourceId, vector.Severity, vector.Query)) } func TagHash(vector common.AnomalyPoint) string { diff --git a/alert/sender/dingtalk.go b/alert/sender/dingtalk.go index b2eb4b78..87985136 100644 --- a/alert/sender/dingtalk.go +++ b/alert/sender/dingtalk.go @@ -32,7 +32,7 @@ type DingtalkSender struct { } func (ds *DingtalkSender) Send(ctx MessageContext) { - if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { + if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return } @@ -40,7 +40,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) { if len(urls) == 0 { return } - message := BuildTplMessage(ds.tpl, ctx.Event) + message := BuildTplMessage(ds.tpl, ctx.Events) for _, url := range urls { var body dingtalk @@ -49,7 +49,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) { body = dingtalk{ Msgtype: "markdown", Markdown: dingtalkMarkdown{ - Title: ctx.Event.RuleName, + Title: ctx.Events[0].RuleName, Text: message, }, } @@ -57,7 +57,7 @@ func (ds *DingtalkSender) Send(ctx MessageContext) { body = dingtalk{ Msgtype: "markdown", Markdown: dingtalkMarkdown{ - Title: ctx.Event.RuleName, + Title: ctx.Events[0].RuleName, Text: message + "\n" + strings.Join(ats, " "), }, At: dingtalkAt{ diff --git a/alert/sender/email.go b/alert/sender/email.go index f23b971c..1b75a156 100644 --- a/alert/sender/email.go +++ b/alert/sender/email.go @@ -22,18 +22,18 @@ type EmailSender struct { } func (es *EmailSender) Send(ctx MessageContext) { - if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { + if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return } tos := extract(ctx.Users) var subject string if es.subjectTpl != nil { - subject = BuildTplMessage(es.subjectTpl, ctx.Event) + subject = BuildTplMessage(es.subjectTpl, []*models.AlertCurEvent{ctx.Events[0]}) } else { - subject = ctx.Event.RuleName + subject = ctx.Events[0].RuleName } - content := BuildTplMessage(es.contentTpl, ctx.Event) + content := BuildTplMessage(es.contentTpl, ctx.Events) es.WriteEmail(subject, content, tos) } diff --git a/alert/sender/feishu.go b/alert/sender/feishu.go index bf03176d..44060710 100644 --- a/alert/sender/feishu.go +++ b/alert/sender/feishu.go @@ -31,11 +31,11 @@ type FeishuSender struct { } func (fs *FeishuSender) Send(ctx MessageContext) { - if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { + if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return } urls, ats := fs.extract(ctx.Users) - message := BuildTplMessage(fs.tpl, ctx.Event) + message := BuildTplMessage(fs.tpl, ctx.Events) for _, url := range urls { body := feishu{ Msgtype: "text", diff --git a/alert/sender/feishucard.go b/alert/sender/feishucard.go index 9d4fc28f..462cf629 100644 --- a/alert/sender/feishucard.go +++ b/alert/sender/feishucard.go @@ -96,11 +96,11 @@ var ( ) func (fs *FeishuCardSender) Send(ctx MessageContext) { - if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { + if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return } urls, _ := fs.extract(ctx.Users) - message := BuildTplMessage(fs.tpl, ctx.Event) + message := BuildTplMessage(fs.tpl, ctx.Events) color := "red" lowerUnicode := strings.ToLower(message) if strings.Count(lowerUnicode, Recovered) > 0 && strings.Count(lowerUnicode, Triggered) > 0 { @@ -109,7 +109,7 @@ func (fs *FeishuCardSender) Send(ctx MessageContext) { color = "green" } - SendTitle := fmt.Sprintf("🔔 %s", ctx.Event.RuleName) + SendTitle := fmt.Sprintf("🔔 %s", ctx.Events[0].RuleName) body.Card.Header.Title.Content = SendTitle body.Card.Header.Template = color body.Card.Elements[0].Text.Content = message diff --git a/alert/sender/mm.go b/alert/sender/mm.go index c4787eb2..29462989 100644 --- a/alert/sender/mm.go +++ b/alert/sender/mm.go @@ -28,7 +28,7 @@ type MmSender struct { } func (ms *MmSender) Send(ctx MessageContext) { - if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { + if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return } @@ -36,7 +36,7 @@ func (ms *MmSender) Send(ctx MessageContext) { if len(urls) == 0 { return } - message := BuildTplMessage(ms.tpl, ctx.Event) + message := BuildTplMessage(ms.tpl, ctx.Events) SendMM(MatterMostMessage{ Text: message, diff --git a/alert/sender/sender.go b/alert/sender/sender.go index ccbcf679..da85ba25 100644 --- a/alert/sender/sender.go +++ b/alert/sender/sender.go @@ -17,9 +17,9 @@ type ( // MessageContext 一个event所生成的告警通知的上下文 MessageContext struct { - Users []*models.User - Rule *models.AlertRule - Event *models.AlertCurEvent + Users []*models.User + Rule *models.AlertRule + Events []*models.AlertCurEvent } ) @@ -43,23 +43,32 @@ func NewSender(key string, tpls map[string]*template.Template, smtp aconf.SMTPCo return nil } -func BuildMessageContext(rule *models.AlertRule, event *models.AlertCurEvent, uids []int64, userCache *memsto.UserCacheType) MessageContext { +func BuildMessageContext(rule *models.AlertRule, events []*models.AlertCurEvent, uids []int64, userCache *memsto.UserCacheType) MessageContext { users := userCache.GetByUserIds(uids) return MessageContext{ - Rule: rule, - Event: event, - Users: users, + Rule: rule, + Events: events, + Users: users, } } -func BuildTplMessage(tpl *template.Template, event *models.AlertCurEvent) string { +type BuildTplMessageFunc func(tpl *template.Template, events []*models.AlertCurEvent) string + +var BuildTplMessage BuildTplMessageFunc = buildTplMessage + +func buildTplMessage(tpl *template.Template, events []*models.AlertCurEvent) string { if tpl == nil { return "tpl for current sender not found, please check configuration" } - var body bytes.Buffer - if err := tpl.Execute(&body, event); err != nil { - return err.Error() + var content string + for _, event := range events { + var body bytes.Buffer + if err := tpl.Execute(&body, event); err != nil { + return err.Error() + } + content += body.String() + "\n\n" } - return body.String() + + return content } diff --git a/alert/sender/telegram.go b/alert/sender/telegram.go index 75ded37a..22c1f7d4 100644 --- a/alert/sender/telegram.go +++ b/alert/sender/telegram.go @@ -26,11 +26,11 @@ type TelegramSender struct { } func (ts *TelegramSender) Send(ctx MessageContext) { - if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { + if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return } tokens := ts.extract(ctx.Users) - message := BuildTplMessage(ts.tpl, ctx.Event) + message := BuildTplMessage(ts.tpl, ctx.Events) SendTelegram(TelegramMessage{ Text: message, diff --git a/alert/sender/wecom.go b/alert/sender/wecom.go index 6044eff8..2e48b742 100644 --- a/alert/sender/wecom.go +++ b/alert/sender/wecom.go @@ -25,11 +25,11 @@ type WecomSender struct { } func (ws *WecomSender) Send(ctx MessageContext) { - if len(ctx.Users) == 0 || ctx.Rule == nil || ctx.Event == nil { + if len(ctx.Users) == 0 || len(ctx.Events) == 0 { return } urls := ws.extract(ctx.Users) - message := BuildTplMessage(ws.tpl, ctx.Event) + message := BuildTplMessage(ws.tpl, ctx.Events) for _, url := range urls { body := wecom{ Msgtype: "markdown", diff --git a/center/router/router_alert_subscribe.go b/center/router/router_alert_subscribe.go index 7e49d7ca..b8ab6d46 100644 --- a/center/router/router_alert_subscribe.go +++ b/center/router/router_alert_subscribe.go @@ -99,6 +99,8 @@ func (rt *Router) alertSubscribePut(c *gin.Context) { "webhooks", "for_duration", "redefine_webhooks", + "severities", + "extra_config", )) } diff --git a/center/router/router_server.go b/center/router/router_server.go index 0ab4228d..84e57db9 100644 --- a/center/router/router_server.go +++ b/center/router/router_server.go @@ -28,6 +28,12 @@ func (rt *Router) serverHeartbeat(c *gin.Context) { func (rt *Router) serversActive(c *gin.Context) { datasourceId := ginx.QueryInt64(c, "dsid") + engineName := ginx.QueryStr(c, "engine_name", "") + if engineName != "" { + servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "engine_cluster = ? and clock > ?", engineName, time.Now().Unix()-30) + ginx.NewRender(c).Data(servers, err) + return + } servers, err := models.AlertingEngineGetsInstances(rt.Ctx, "datasource_id = ? and clock > ?", datasourceId, time.Now().Unix()-30) ginx.NewRender(c).Data(servers, err) diff --git a/docker/initsql/a-n9e.sql b/docker/initsql/a-n9e.sql index 254feefa..2c7e60e4 100644 --- a/docker/initsql/a-n9e.sql +++ b/docker/initsql/a-n9e.sql @@ -281,6 +281,7 @@ CREATE TABLE `alert_rule` ( `runbook_url` varchar(255), `append_tags` varchar(255) not null default '' comment 'split by space: service=n9e mod=api', `annotations` text not null comment 'annotations', + `extra_config` text not null comment 'extra_config', `create_at` bigint not null default 0, `create_by` varchar(64) not null default '', `update_at` bigint not null default 0, @@ -305,6 +306,7 @@ CREATE TABLE `alert_mute` ( `disabled` tinyint(1) not null default 0 comment '0:enabled 1:disabled', `mute_time_type` tinyint(1) not null default 0, `periodic_mutes` varchar(4096) not null default '', + `severities` varchar(32) not null default '', `create_at` bigint not null default 0, `create_by` varchar(64) not null default '', `update_at` bigint not null default 0, @@ -324,6 +326,7 @@ CREATE TABLE `alert_subscribe` ( `datasource_ids` varchar(255) not null default '' comment 'datasource ids', `cluster` varchar(128) not null, `rule_id` bigint not null default 0, + `severities` varchar(32) not null default '', `tags` varchar(4096) not null default '' comment 'json,map,tagkey->regexp|value', `redefine_severity` tinyint(1) default 0 comment 'is redefine severity?', `new_severity` tinyint(1) not null comment '0:Emergency 1:Warning 2:Notice', @@ -331,6 +334,7 @@ CREATE TABLE `alert_subscribe` ( `new_channels` varchar(255) not null default '' comment 'split by space: sms voice email dingtalk wecom', `user_group_ids` varchar(250) not null comment 'split by space 1 34 5, notify cc to user_group_ids', `webhooks` text not null, + `extra_config` text not null comment 'extra_config', `redefine_webhooks` tinyint(1) default 0, `for_duration` bigint not null default 0, `create_at` bigint not null default 0, diff --git a/memsto/alert_subscribe_cache.go b/memsto/alert_subscribe_cache.go index 65258318..05dc820c 100644 --- a/memsto/alert_subscribe_cache.go +++ b/memsto/alert_subscribe_cache.go @@ -69,6 +69,16 @@ func (c *AlertSubscribeCacheType) Get(ruleId int64) ([]*models.AlertSubscribe, b return lst, has } +func (c *AlertSubscribeCacheType) GetAll() []*models.AlertSubscribe { + c.RLock() + defer c.RUnlock() + var ret []*models.AlertSubscribe + for _, v := range c.subs { + ret = append(ret, v...) + } + return ret +} + func (c *AlertSubscribeCacheType) GetStructs(ruleId int64) []models.AlertSubscribe { c.RLock() defer c.RUnlock() diff --git a/models/alert_cur_event.go b/models/alert_cur_event.go index f7bbb334..70b19c49 100644 --- a/models/alert_cur_event.go +++ b/models/alert_cur_event.go @@ -16,48 +16,53 @@ import ( ) type AlertCurEvent struct { - Id int64 `json:"id" gorm:"primaryKey"` - Cate string `json:"cate"` - Cluster string `json:"cluster"` - DatasourceId int64 `json:"datasource_id"` - GroupId int64 `json:"group_id"` // busi group id - GroupName string `json:"group_name"` // busi group name - Hash string `json:"hash"` // rule_id + vector_key - RuleId int64 `json:"rule_id"` - RuleName string `json:"rule_name"` - RuleNote string `json:"rule_note"` - RuleProd string `json:"rule_prod"` - RuleAlgo string `json:"rule_algo"` - Severity int `json:"severity"` - PromForDuration int `json:"prom_for_duration"` - PromQl string `json:"prom_ql"` - RuleConfig string `json:"-" gorm:"rule_config"` // rule config - RuleConfigJson interface{} `json:"rule_config" gorm:"-"` // rule config for fe - PromEvalInterval int `json:"prom_eval_interval"` - Callbacks string `json:"-"` // for db - CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe - RunbookUrl string `json:"runbook_url"` - NotifyRecovered int `json:"notify_recovered"` - NotifyChannels string `json:"-"` // for db - NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe - NotifyGroups string `json:"-"` // for db - NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe - NotifyGroupsObj []*UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe - TargetIdent string `json:"target_ident"` - TargetNote string `json:"target_note"` - TriggerTime int64 `json:"trigger_time"` - TriggerValue string `json:"trigger_value"` - Tags string `json:"-"` // for db - TagsJSON []string `json:"tags" gorm:"-"` // for fe - TagsMap map[string]string `json:"-" gorm:"-"` // for internal usage - Annotations string `json:"-"` // - AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe - IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py - NotifyUsersObj []*User `json:"notify_users_obj" gorm:"-"` // for notify.py - LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间 - LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间 - NotifyCurNumber int `json:"notify_cur_number"` // notify: current number - FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间 + Id int64 `json:"id" gorm:"primaryKey"` + Cate string `json:"cate"` + Cluster string `json:"cluster"` + DatasourceId int64 `json:"datasource_id"` + GroupId int64 `json:"group_id"` // busi group id + GroupName string `json:"group_name"` // busi group name + Hash string `json:"hash"` // rule_id + vector_key + RuleId int64 `json:"rule_id"` + RuleName string `json:"rule_name"` + RuleNote string `json:"rule_note"` + RuleProd string `json:"rule_prod"` + RuleAlgo string `json:"rule_algo"` + Severity int `json:"severity"` + PromForDuration int `json:"prom_for_duration"` + PromQl string `json:"prom_ql"` + RuleConfig string `json:"-" gorm:"rule_config"` // rule config + RuleConfigJson interface{} `json:"rule_config" gorm:"-"` // rule config for fe + PromEvalInterval int `json:"prom_eval_interval"` + Callbacks string `json:"-"` // for db + CallbacksJSON []string `json:"callbacks" gorm:"-"` // for fe + RunbookUrl string `json:"runbook_url"` + NotifyRecovered int `json:"notify_recovered"` + NotifyChannels string `json:"-"` // for db + NotifyChannelsJSON []string `json:"notify_channels" gorm:"-"` // for fe + NotifyGroups string `json:"-"` // for db + NotifyGroupsJSON []string `json:"notify_groups" gorm:"-"` // for fe + NotifyGroupsObj []*UserGroup `json:"notify_groups_obj" gorm:"-"` // for fe + TargetIdent string `json:"target_ident"` + TargetNote string `json:"target_note"` + TriggerTime int64 `json:"trigger_time"` + TriggerValue string `json:"trigger_value"` + Tags string `json:"-"` // for db + TagsJSON []string `json:"tags" gorm:"-"` // for fe + TagsMap map[string]string `json:"-" gorm:"-"` // for internal usage + Annotations string `json:"-"` // + AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe + IsRecovered bool `json:"is_recovered" gorm:"-"` // for notify.py + NotifyUsersObj []*User `json:"notify_users_obj" gorm:"-"` // for notify.py + LastEvalTime int64 `json:"last_eval_time" gorm:"-"` // for notify.py 上次计算的时间 + LastEscalationNotifyTime int64 `json:"last_escalation_notify_time" gorm:"-"` + LastSentTime int64 `json:"last_sent_time" gorm:"-"` // 上次发送时间 + NotifyCurNumber int `json:"notify_cur_number"` // notify: current number + FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间 + ExtraConfig interface{} `json:"extra_config" gorm:"-"` + Status int `json:"status" gorm:"-"` + Claimant string `json:"claimant" gorm:"-"` + SubRuleId int64 `json:"sub_rule_id" gorm:"-"` } func (e *AlertCurEvent) TableName() string { diff --git a/models/alert_his_event.go b/models/alert_his_event.go index b9b9ddf5..5210b422 100644 --- a/models/alert_his_event.go +++ b/models/alert_his_event.go @@ -52,6 +52,7 @@ type AlertHisEvent struct { AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe NotifyCurNumber int `json:"notify_cur_number"` // notify: current number FirstTriggerTime int64 `json:"first_trigger_time"` // 连续告警的首次告警时间 + ExtraConfig interface{} `json:"extra_config" gorm:"-"` } func (e *AlertHisEvent) TableName() string { diff --git a/models/alert_mute.go b/models/alert_mute.go index 8cdceb3d..576c4b98 100644 --- a/models/alert_mute.go +++ b/models/alert_mute.go @@ -48,6 +48,8 @@ type AlertMute struct { MuteTimeType int `json:"mute_time_type"` // 0: mute by time range, 1: mute by periodic time PeriodicMutes string `json:"-" gorm:"periodic_mutes"` PeriodicMutesJson []PeriodicMute `json:"periodic_mutes" gorm:"-"` + Severities string `json:"-" gorm:"severities"` + SeveritiesJson []int `json:"severities" gorm:"-"` } type PeriodicMute struct { @@ -208,12 +210,31 @@ func (m *AlertMute) FE2DB() error { } m.PeriodicMutes = string(periodicMutesBytes) + if len(m.SeveritiesJson) > 0 { + severtiesBytes, err := json.Marshal(m.SeveritiesJson) + if err != nil { + return err + } + m.Severities = string(severtiesBytes) + } + return nil } func (m *AlertMute) DB2FE() error { json.Unmarshal([]byte(m.DatasourceIds), &m.DatasourceIdsJson) err := json.Unmarshal([]byte(m.PeriodicMutes), &m.PeriodicMutesJson) + if err != nil { + return err + } + + if m.Severities != "" { + err = json.Unmarshal([]byte(m.Severities), &m.SeveritiesJson) + if err != nil { + return err + } + } + return err } diff --git a/models/alert_rule.go b/models/alert_rule.go index 69550ca3..d4581baf 100644 --- a/models/alert_rule.go +++ b/models/alert_rule.go @@ -70,6 +70,8 @@ type AlertRule struct { AppendTagsJSON []string `json:"append_tags" gorm:"-"` // for fe Annotations string `json:"-"` // AnnotationsJSON map[string]string `json:"annotations" gorm:"-"` // for fe + ExtraConfig string `json:"-" gorm:"extra_config"` // extra config + ExtraConfigJSON interface{} `json:"extra_config" gorm:"-"` // for fe CreateAt int64 `json:"create_at"` CreateBy string `json:"create_by"` UpdateAt int64 `json:"update_at"` @@ -560,6 +562,14 @@ func (ar *AlertRule) FE2DB() error { ar.Annotations = string(b) } + if ar.ExtraConfigJSON != nil { + b, err := json.Marshal(ar.ExtraConfigJSON) + if err != nil { + return fmt.Errorf("marshal extra_config err:%v", err) + } + ar.ExtraConfig = string(b) + } + return nil } @@ -586,6 +596,7 @@ func (ar *AlertRule) DB2FE() error { json.Unmarshal([]byte(ar.AlgoParams), &ar.AlgoParamsJson) json.Unmarshal([]byte(ar.RuleConfig), &ar.RuleConfigJson) json.Unmarshal([]byte(ar.Annotations), &ar.AnnotationsJSON) + json.Unmarshal([]byte(ar.ExtraConfig), &ar.ExtraConfigJSON) err := ar.FillDatasourceIds() return err diff --git a/models/alert_subscribe.go b/models/alert_subscribe.go index 0e40a972..0c6d6145 100644 --- a/models/alert_subscribe.go +++ b/models/alert_subscribe.go @@ -25,8 +25,10 @@ type AlertSubscribe struct { DatasourceIdsJson []int64 `json:"datasource_ids" gorm:"-"` // for fe Cluster string `json:"cluster"` // take effect by clusters, seperated by space RuleId int64 `json:"rule_id"` - ForDuration int64 `json:"for_duration"` // for duration, unit: second - RuleName string `json:"rule_name" gorm:"-"` // for fe + Severities string `json:"-" gorm:"severities"` // sub severity + SeveritiesJson []int `json:"severities" gorm:"-"` // for fe + ForDuration int64 `json:"for_duration"` // for duration, unit: second + RuleName string `json:"rule_name" gorm:"-"` // for fe Tags ormx.JSONArr `json:"tags"` RedefineSeverity int `json:"redefine_severity"` NewSeverity int `json:"new_severity"` @@ -37,6 +39,8 @@ type AlertSubscribe struct { RedefineWebhooks int `json:"redefine_webhooks"` Webhooks string `json:"-" gorm:"webhooks"` WebhooksJson []string `json:"webhooks" gorm:"-"` + ExtraConfig string `json:"-" grom:"extra_config"` + ExtraConfigJson interface{} `json:"extra_config" gorm:"-"` // for fe CreateBy string `json:"create_by"` CreateAt int64 `json:"create_at"` UpdateBy string `json:"update_by"` @@ -118,6 +122,14 @@ func (s *AlertSubscribe) FE2DB() error { s.Webhooks = string(b) } + b, _ := json.Marshal(s.ExtraConfigJson) + s.ExtraConfig = string(b) + + if len(s.SeveritiesJson) > 0 { + b, _ := json.Marshal(s.SeveritiesJson) + s.Severities = string(b) + } + return nil } @@ -133,6 +145,19 @@ func (s *AlertSubscribe) DB2FE() error { return err } } + + if s.ExtraConfig != "" { + if err := json.Unmarshal([]byte(s.ExtraConfig), &s.ExtraConfigJson); err != nil { + return err + } + } + + if s.Severities != "" { + if err := json.Unmarshal([]byte(s.Severities), &s.SeveritiesJson); err != nil { + return err + } + } + return nil } diff --git a/models/migrate/migrate.go b/models/migrate/migrate.go index a6b518dc..49a42cc3 100644 --- a/models/migrate/migrate.go +++ b/models/migrate/migrate.go @@ -19,5 +19,36 @@ func MigrateRecordingTable(db *gorm.DB) error { logger.Errorf("failed to migrate recording rule table: %v", err) return err } + + err = db.AutoMigrate(&AlertRule{}) + if err != nil { + logger.Errorf("failed to migrate recording rule table: %v", err) + return err + } + + err = db.AutoMigrate(&AlertSubscribe{}) + if err != nil { + logger.Errorf("failed to migrate recording rule table: %v", err) + return err + } + + err = db.AutoMigrate(&AlertMute{}) + if err != nil { + logger.Errorf("failed to migrate recording rule table: %v", err) + return err + } return nil } + +type AlertRule struct { + ExtraConfig string `gorm:"type:text;not null;column:extra_config"` // extra config +} + +type AlertSubscribe struct { + ExtraConfig string `gorm:"type:text;not null;column:extra_config"` // extra config + Severities string `gorm:"column:severities;type:varchar(32);not null;default:''"` +} + +type AlertMute struct { + Severities string `gorm:"column:severities;type:varchar(32);not null;default:''"` +} diff --git a/pushgw/router/router_remotewrite.go b/pushgw/router/router_remotewrite.go index 38d3f5c1..ace63c1a 100644 --- a/pushgw/router/router_remotewrite.go +++ b/pushgw/router/router_remotewrite.go @@ -106,7 +106,7 @@ func (rt *Router) remoteWrite(c *gin.Context) { } rt.EnrichLabels(req.Timeseries[i]) - rt.debugSample(c.Request.RemoteAddr, req.Timeseries[i]) + rt.debugSample(c.ClientIP(), req.Timeseries[i]) if len(ident) > 0 { // use ident as hash key, cause "out of bounds" problem diff --git a/storage/redis.go b/storage/redis.go index e939f263..6ac0f989 100644 --- a/storage/redis.go +++ b/storage/redis.go @@ -34,6 +34,10 @@ type Redis interface { HGetAll(ctx context.Context, key string) *redis.MapStringStringCmd HSet(ctx context.Context, key string, values ...interface{}) *redis.IntCmd HDel(ctx context.Context, key string, fields ...string) *redis.IntCmd + LPush(ctx context.Context, key string, values ...interface{}) *redis.IntCmd + RPop(ctx context.Context, key string) *redis.StringCmd + RPopCount(ctx context.Context, key string, count int) *redis.StringSliceCmd + LLen(ctx context.Context, key string) *redis.IntCmd Close() error Ping(ctx context.Context) *redis.StatusCmd Publish(ctx context.Context, channel string, message interface{}) *redis.IntCmd