mirror of
https://github.com/moghtech/komodo.git
synced 2026-04-29 21:27:26 -05:00
prog on alerting
This commit is contained in:
@@ -123,7 +123,7 @@ pub trait StateResource<T: Send + Sync + Unpin + Serialize + DeserializeOwned +
|
||||
.await
|
||||
.into_iter()
|
||||
.collect::<anyhow::Result<Vec<_>>>()
|
||||
.context("failed to convert deployment list item")?;
|
||||
.context(format!("failed to convert {} list item", Self::name()))?;
|
||||
|
||||
Ok(list)
|
||||
}
|
||||
|
||||
@@ -1,35 +0,0 @@
|
||||
use monitor_types::entities::{deployment::DockerContainerState, server::ServerStatus};
|
||||
|
||||
use crate::state::State;
|
||||
|
||||
impl State {
|
||||
// called after cache update
|
||||
pub async fn check_alerts(&self) {
|
||||
tokio::join!(self.alert_servers(), self.alert_deployments());
|
||||
}
|
||||
|
||||
pub async fn alert_servers(&self) {
|
||||
let server_status = self.server_status_cache.get_list().await;
|
||||
|
||||
for v in server_status {
|
||||
match v.status {
|
||||
ServerStatus::Ok => {}
|
||||
ServerStatus::NotOk => {}
|
||||
ServerStatus::Disabled => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn alert_deployments(&self) {
|
||||
let deployment_status = self.deployment_status_cache.get_list().await;
|
||||
|
||||
for v in deployment_status {
|
||||
match v.curr.state {
|
||||
DockerContainerState::Running => {}
|
||||
DockerContainerState::Unknown => {}
|
||||
DockerContainerState::Exited => {}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
30
bin/core/src/monitor/alert/deployment.rs
Normal file
30
bin/core/src/monitor/alert/deployment.rs
Normal file
@@ -0,0 +1,30 @@
|
||||
use monitor_types::entities::{alert::AlertData, deployment::Deployment};
|
||||
|
||||
use crate::{helpers::resource::StateResource, state::State};
|
||||
|
||||
impl State {
|
||||
pub async fn alert_deployments(&self) {
|
||||
for v in self.deployment_status_cache.get_list().await {
|
||||
if v.prev.is_none() {
|
||||
continue;
|
||||
}
|
||||
let prev = v.prev.as_ref().unwrap().to_owned();
|
||||
if v.curr.state != prev {
|
||||
// send alert
|
||||
let d = <State as StateResource<Deployment>>::get_resource(self, &v.curr.id).await;
|
||||
if let Err(e) = d {
|
||||
error!("failed to get deployment from db | {e:#?}");
|
||||
continue;
|
||||
}
|
||||
let d = d.unwrap();
|
||||
let data = AlertData::ContainerStateChange {
|
||||
id: v.curr.id.clone(),
|
||||
name: d.name,
|
||||
server: d.config.server_id,
|
||||
from: prev,
|
||||
to: v.curr.state,
|
||||
};
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
11
bin/core/src/monitor/alert/mod.rs
Normal file
11
bin/core/src/monitor/alert/mod.rs
Normal file
@@ -0,0 +1,11 @@
|
||||
mod deployment;
|
||||
mod server;
|
||||
|
||||
use crate::state::State;
|
||||
|
||||
impl State {
|
||||
// called after cache update
|
||||
pub async fn check_alerts(&self) {
|
||||
tokio::join!(self.alert_servers(), self.alert_deployments());
|
||||
}
|
||||
}
|
||||
141
bin/core/src/monitor/alert/server.rs
Normal file
141
bin/core/src/monitor/alert/server.rs
Normal file
@@ -0,0 +1,141 @@
|
||||
use std::{collections::HashMap, str::FromStr};
|
||||
|
||||
use anyhow::Context;
|
||||
use monitor_types::{
|
||||
entities::{
|
||||
alert::{Alert, AlertData, AlertDataVariant},
|
||||
server::{stats::SeverityLevel, Server, ServerListItem, ServerStatus},
|
||||
update::ResourceTarget,
|
||||
},
|
||||
monitor_timestamp, optional_string,
|
||||
};
|
||||
use mungos::mongodb::bson::{doc, oid::ObjectId};
|
||||
|
||||
use crate::{auth::InnerRequestUser, helpers::resource::StateResource, state::State};
|
||||
|
||||
type OpenAlertMap = HashMap<ResourceTarget, HashMap<AlertDataVariant, Alert>>;
|
||||
|
||||
impl State {
|
||||
pub async fn alert_servers(&self) {
|
||||
let server_status = self.server_status_cache.get_list().await;
|
||||
let servers = self.get_all_servers_map().await;
|
||||
|
||||
if let Err(e) = servers {
|
||||
error!("{e:#?}");
|
||||
return;
|
||||
}
|
||||
|
||||
let mut servers = servers.unwrap();
|
||||
|
||||
let alerts = self.get_open_alerts().await;
|
||||
|
||||
if let Err(e) = alerts {
|
||||
error!("{e:#?}");
|
||||
return;
|
||||
}
|
||||
|
||||
let alerts = alerts.unwrap();
|
||||
|
||||
let mut alerts_to_open = Vec::<Alert>::new();
|
||||
let mut alert_ids_to_close = Vec::<String>::new();
|
||||
|
||||
for v in server_status {
|
||||
let server = servers.remove(&v.id);
|
||||
if server.is_none() {
|
||||
continue;
|
||||
}
|
||||
let server = server.unwrap();
|
||||
let server_alerts = alerts.get(&ResourceTarget::Server(v.id.clone()));
|
||||
let health_alert = server_alerts
|
||||
.as_ref()
|
||||
.and_then(|alerts| alerts.get(&AlertDataVariant::ServerUnreachable));
|
||||
match (v.status, health_alert) {
|
||||
(ServerStatus::Ok | ServerStatus::Disabled, Some(health_alert)) => {
|
||||
// resolve unreachable alert
|
||||
alert_ids_to_close.push(health_alert.id.clone());
|
||||
}
|
||||
(ServerStatus::NotOk, None) => {
|
||||
// open unreachable alert
|
||||
let alert = Alert {
|
||||
id: Default::default(),
|
||||
ts: monitor_timestamp(),
|
||||
resolved: false,
|
||||
resolved_ts: None,
|
||||
level: SeverityLevel::Critical,
|
||||
target: ResourceTarget::Server(v.id.clone()),
|
||||
variant: AlertDataVariant::ServerUnreachable,
|
||||
data: AlertData::ServerUnreachable {
|
||||
id: v.id.clone(),
|
||||
name: server.name,
|
||||
region: optional_string(&server.info.region),
|
||||
},
|
||||
};
|
||||
alerts_to_open.push(alert);
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
||||
let res = self.resolve_alerts(&alert_ids_to_close).await;
|
||||
}
|
||||
|
||||
async fn get_all_servers_map(&self) -> anyhow::Result<HashMap<String, ServerListItem>> {
|
||||
let servers = <State as StateResource<Server>>::list_resources_for_user(
|
||||
self,
|
||||
None,
|
||||
&InnerRequestUser {
|
||||
is_admin: true,
|
||||
..Default::default()
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await
|
||||
.context("failed to get servers from db (in alert_servers)")?;
|
||||
|
||||
let servers = servers
|
||||
.into_iter()
|
||||
.map(|server| (server.id.clone(), server))
|
||||
.collect::<HashMap<_, _>>();
|
||||
|
||||
Ok(servers)
|
||||
}
|
||||
|
||||
async fn resolve_alerts(&self, alert_ids: &[String]) -> anyhow::Result<()> {
|
||||
let alert_ids = alert_ids
|
||||
.iter()
|
||||
.map(|id| ObjectId::from_str(id).context("failed to convert alert id to ObjectId"))
|
||||
.collect::<anyhow::Result<Vec<_>>>()?;
|
||||
self.db
|
||||
.alerts
|
||||
.update_many(
|
||||
doc! { "_id": { "$in": alert_ids } },
|
||||
doc! {
|
||||
"$set": {
|
||||
"resolved": "true",
|
||||
"resolved_ts": monitor_timestamp()
|
||||
}
|
||||
},
|
||||
)
|
||||
.await
|
||||
.context("failed to resolve alerts on db")?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn get_open_alerts(&self) -> anyhow::Result<OpenAlertMap> {
|
||||
let alerts = self
|
||||
.db
|
||||
.alerts
|
||||
.get_some(doc! { "resolved": false }, None)
|
||||
.await
|
||||
.context("failed to get open alerts from db")?;
|
||||
|
||||
let mut map = OpenAlertMap::new();
|
||||
|
||||
for alert in alerts {
|
||||
let inner = map.entry(alert.target.clone()).or_default();
|
||||
inner.insert(alert.variant, alert);
|
||||
}
|
||||
|
||||
Ok(map)
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,10 @@
|
||||
use anyhow::Context;
|
||||
use async_trait::async_trait;
|
||||
use monitor_types::{
|
||||
entities::{alerter::{Alerter, AlerterListItem}, PermissionLevel},
|
||||
entities::{
|
||||
alerter::{Alerter, AlerterListItem},
|
||||
PermissionLevel,
|
||||
},
|
||||
requests::read::*,
|
||||
};
|
||||
use mungos::mongodb::bson::doc;
|
||||
|
||||
@@ -6,7 +6,7 @@ use monitor_types::{
|
||||
entities::{
|
||||
deployment::{
|
||||
Deployment, DeploymentActionState, DeploymentConfig, DeploymentImage,
|
||||
DockerContainerState, DockerContainerStats, DeploymentListItem,
|
||||
DeploymentListItem, DockerContainerState, DockerContainerStats,
|
||||
},
|
||||
server::Server,
|
||||
update::{Log, UpdateStatus},
|
||||
@@ -59,7 +59,11 @@ impl Resolve<GetDeploymentStatus, RequestUser> for State {
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
let response = GetDeploymentStatusResponse {
|
||||
status: status.curr.container.as_ref().and_then(|c| c.status.clone()),
|
||||
status: status
|
||||
.curr
|
||||
.container
|
||||
.as_ref()
|
||||
.and_then(|c| c.status.clone()),
|
||||
state: status.curr.state,
|
||||
};
|
||||
Ok(response)
|
||||
|
||||
@@ -1 +1 @@
|
||||
pub mod v0;
|
||||
pub mod v0;
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use anyhow::{anyhow, Context};
|
||||
use chrono::{DateTime, Utc, SecondsFormat, LocalResult, TimeZone};
|
||||
use chrono::{DateTime, LocalResult, SecondsFormat, TimeZone, Utc};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
pub mod traits;
|
||||
@@ -206,7 +206,6 @@ pub enum Timelength {
|
||||
ThirtyDays,
|
||||
}
|
||||
|
||||
|
||||
pub fn unix_from_monitor_ts(ts: &str) -> anyhow::Result<i64> {
|
||||
Ok(DateTime::parse_from_rfc3339(ts)
|
||||
.context("failed to parse rfc3339 timestamp")?
|
||||
|
||||
@@ -39,17 +39,7 @@ pub struct ProcedureStage {
|
||||
pub target_id: String,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Serialize,
|
||||
Deserialize,
|
||||
Debug,
|
||||
PartialEq,
|
||||
Hash,
|
||||
Eq,
|
||||
Clone,
|
||||
Copy,
|
||||
Default,
|
||||
)]
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Hash, Eq, Clone, Copy, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ProcedureOperation {
|
||||
// do nothing
|
||||
|
||||
@@ -103,9 +103,7 @@ pub struct ServerActionState {
|
||||
pub pruning_images: bool,
|
||||
}
|
||||
|
||||
#[derive(
|
||||
Serialize, Deserialize, Debug, PartialEq, Hash, Eq, Clone, Copy, Default,
|
||||
)]
|
||||
#[derive(Serialize, Deserialize, Debug, PartialEq, Hash, Eq, Clone, Copy, Default)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
pub enum ServerStatus {
|
||||
Ok,
|
||||
|
||||
Reference in New Issue
Block a user