fix: alert deletion & notification panic (#1930)
This commit is contained in:
parent
30fedf1926
commit
1bbece0385
|
@ -12,11 +12,11 @@ rustflags = ["-C", "target-feature=+neon"]
|
|||
linker = "aarch64-linux-gnu-gcc"
|
||||
rustflags = ["-C", "target-feature=+neon"]
|
||||
|
||||
[target.x86_64-apple-darwin]
|
||||
rustflags = ["-C", "target-feature=+sse2,+ssse3,+sse4.1,+sse4.2"]
|
||||
|
||||
[target.aarch64-apple-darwin]
|
||||
rustflags = ["-C", "target-feature=+neon"]
|
||||
|
||||
[target.x86_64-apple-darwin]
|
||||
rustflags = ["-C", "target-feature=+sse2,+ssse3,+sse4.1,+sse4.2"]
|
||||
|
||||
[target.x86_64-pc-windows-msvc]
|
||||
rustflags = ["-C", "target-feature=+sse2,+ssse3,+sse4.1,+sse4.2"]
|
||||
|
|
|
@ -159,6 +159,8 @@ pub struct Trigger {
|
|||
pub is_ingest_time: bool,
|
||||
#[serde(default)]
|
||||
pub stream_type: StreamType,
|
||||
#[serde(default)]
|
||||
pub parent_alert_deleted: bool,
|
||||
}
|
||||
|
||||
impl Default for Trigger {
|
||||
|
@ -173,6 +175,7 @@ impl Default for Trigger {
|
|||
count: 0,
|
||||
stream_type: StreamType::Logs,
|
||||
is_ingest_time: false,
|
||||
parent_alert_deleted: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -67,31 +67,37 @@ pub async fn send_notification(
|
|||
} else {
|
||||
reqwest::Client::new()
|
||||
};
|
||||
let url = url::Url::parse(&local_dest.url);
|
||||
let mut req = match local_dest.method {
|
||||
alert::AlertHTTPType::POST => client.post(url.unwrap()),
|
||||
alert::AlertHTTPType::PUT => client.put(url.unwrap()),
|
||||
alert::AlertHTTPType::GET => client.get(url.unwrap()),
|
||||
}
|
||||
.header("Content-type", "application/json");
|
||||
match url::Url::parse(&local_dest.url) {
|
||||
Ok(url) => {
|
||||
let mut req = match local_dest.method {
|
||||
alert::AlertHTTPType::POST => client.post(url),
|
||||
alert::AlertHTTPType::PUT => client.put(url),
|
||||
alert::AlertHTTPType::GET => client.get(url),
|
||||
}
|
||||
.header("Content-type", "application/json");
|
||||
|
||||
// Add additional headers if any from destination description
|
||||
if local_dest.headers.is_some() {
|
||||
for (key, value) in local_dest.headers.unwrap() {
|
||||
if !key.is_empty() && !value.is_empty() {
|
||||
req = req.header(key, value);
|
||||
// Add additional headers if any from destination description
|
||||
if local_dest.headers.is_some() {
|
||||
for (key, value) in local_dest.headers.unwrap() {
|
||||
if !key.is_empty() && !value.is_empty() {
|
||||
req = req.header(key, value);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let resp = req.json(&msg).send().await;
|
||||
match resp {
|
||||
Ok(resp) => {
|
||||
if !resp.status().is_success() {
|
||||
log::error!("Notification sent error: {:?}", resp.bytes().await);
|
||||
}
|
||||
}
|
||||
Err(err) => log::error!("Notification sending error {:?}", err),
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let resp = req.json(&msg).send().await;
|
||||
match resp {
|
||||
Ok(resp) => {
|
||||
if !resp.status().is_success() {
|
||||
log::error!("Notification sent error: {:?}", resp.bytes().await);
|
||||
}
|
||||
Err(err) => {
|
||||
log::error!("Notification sending error {:?}", err);
|
||||
}
|
||||
Err(err) => log::error!("Notification sending error {:?}", err),
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
@ -133,6 +139,7 @@ mod tests {
|
|||
last_sent_at: chrono::Utc::now().timestamp_micros(),
|
||||
count: 1,
|
||||
is_ingest_time: true,
|
||||
parent_alert_deleted: false,
|
||||
};
|
||||
let alert = Alert {
|
||||
name: "testAlert".to_string(),
|
||||
|
|
|
@ -296,7 +296,6 @@ async fn init_http_server() -> Result<(), anyhow::Error> {
|
|||
if CONFIG.common.feature_per_thread_lock {
|
||||
thread_id.fetch_add(1, Ordering::SeqCst);
|
||||
}
|
||||
|
||||
log::info!(
|
||||
"starting HTTP server at: {}, thread_id: {}",
|
||||
haddr,
|
||||
|
|
|
@ -26,7 +26,7 @@ use crate::service::triggers;
|
|||
|
||||
pub async fn run() -> Result<(), anyhow::Error> {
|
||||
for trigger in TRIGGERS.iter() {
|
||||
if !trigger.is_ingest_time {
|
||||
if !trigger.is_ingest_time && !trigger.parent_alert_deleted {
|
||||
let trigger = trigger.clone();
|
||||
tokio::task::spawn(async move { handle_triggers(trigger).await });
|
||||
}
|
||||
|
@ -43,7 +43,12 @@ pub async fn handle_triggers(trigger: Trigger) {
|
|||
)
|
||||
.await
|
||||
{
|
||||
Err(_) => log::error!("[ALERT MANAGER] Error fetching alert"),
|
||||
Err(_) => {
|
||||
let trigger_key = format!("{}/{}", &trigger.org, &trigger.alert_name);
|
||||
let mut local_trigger = trigger;
|
||||
local_trigger.parent_alert_deleted = true;
|
||||
let _ = crate::service::db::triggers::set(&trigger_key, &local_trigger).await;
|
||||
}
|
||||
Ok(result) => {
|
||||
let key = format!("{}/{}", &trigger.org, &trigger.alert_name);
|
||||
if let Some(alert) = result {
|
||||
|
@ -89,7 +94,7 @@ async fn handle_trigger(alert_key: &str, frequency: i64) {
|
|||
break;
|
||||
}
|
||||
};
|
||||
if TRIGGERS_IN_PROCESS.clone().contains_key(alert_key) {
|
||||
if !trigger.parent_alert_deleted && TRIGGERS_IN_PROCESS.clone().contains_key(alert_key) {
|
||||
let alert_resp = super::db::alerts::get(
|
||||
&trigger.org,
|
||||
&trigger.stream,
|
||||
|
|
|
@ -133,6 +133,7 @@ pub async fn save_alert(
|
|||
last_sent_at: 0,
|
||||
count: 0,
|
||||
is_ingest_time: false,
|
||||
parent_alert_deleted: false,
|
||||
};
|
||||
let _ = triggers::save_trigger(&trigger.alert_name, &trigger).await;
|
||||
}
|
||||
|
@ -223,6 +224,7 @@ pub async fn trigger_alert(
|
|||
count: 0,
|
||||
is_ingest_time: alert.is_real_time,
|
||||
stream_type,
|
||||
parent_alert_deleted: false,
|
||||
};
|
||||
let _ = send_notification(&alert, &trigger).await;
|
||||
Ok(HttpResponse::Ok().json(MetaHttpResponse::message(
|
||||
|
|
|
@ -145,6 +145,7 @@ pub async fn watch() -> Result<(), anyhow::Error> {
|
|||
let item_key = ev.key.strip_prefix(key).unwrap();
|
||||
let alert_key = item_key[0..item_key.rfind('/').unwrap()].to_string();
|
||||
let item_name = item_key[item_key.rfind('/').unwrap() + 1..].to_string();
|
||||
let org_name = item_key[0..item_key.find('/').unwrap()].to_string();
|
||||
if alert_key.contains('/') {
|
||||
let mut group = match STREAM_ALERTS.get(&alert_key) {
|
||||
Some(v) => v.clone(),
|
||||
|
@ -155,6 +156,12 @@ pub async fn watch() -> Result<(), anyhow::Error> {
|
|||
} else {
|
||||
STREAM_ALERTS.remove(item_key);
|
||||
}
|
||||
let trigger_key = format!("{org_name}/{item_name}");
|
||||
if let Ok(Some(mut trigger)) = crate::service::db::triggers::get(&trigger_key).await
|
||||
{
|
||||
trigger.parent_alert_deleted = true;
|
||||
let _ = crate::service::db::triggers::set(&trigger_key, &trigger).await;
|
||||
}
|
||||
}
|
||||
infra_db::Event::Empty => {}
|
||||
}
|
||||
|
|
|
@ -297,6 +297,7 @@ async fn add_valid_record(
|
|||
last_sent_at: 0,
|
||||
count: 0,
|
||||
is_ingest_time: true,
|
||||
parent_alert_deleted: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -254,6 +254,7 @@ pub async fn handle_grpc_request(
|
|||
last_sent_at: 0,
|
||||
count: 0,
|
||||
is_ingest_time: true,
|
||||
parent_alert_deleted: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
|
|
@ -357,6 +357,7 @@ pub async fn metrics_json_handler(
|
|||
last_sent_at: 0,
|
||||
count: 0,
|
||||
is_ingest_time: true,
|
||||
parent_alert_deleted: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
|
|
@ -324,6 +324,7 @@ pub async fn remote_write(
|
|||
last_sent_at: 0,
|
||||
count: 0,
|
||||
is_ingest_time: true,
|
||||
parent_alert_deleted: false,
|
||||
},
|
||||
);
|
||||
}
|
||||
|
|
|
@ -304,6 +304,7 @@ pub async fn handle_trace_request(
|
|||
last_sent_at: 0,
|
||||
count: 0,
|
||||
is_ingest_time: true,
|
||||
parent_alert_deleted: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -368,6 +368,7 @@ pub async fn traces_json(
|
|||
last_sent_at: 0,
|
||||
count: 0,
|
||||
is_ingest_time: true,
|
||||
parent_alert_deleted: false,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
@ -69,6 +69,7 @@ mod tests {
|
|||
stream_type: crate::common::meta::StreamType::Logs,
|
||||
count: 0,
|
||||
is_ingest_time: false,
|
||||
parent_alert_deleted: false,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
|
|
Loading…
Reference in New Issue