diff --git a/Cargo.lock b/Cargo.lock index a8aec2947..d8126d688 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2169,9 +2169,9 @@ dependencies = [ [[package]] name = "slack_client_rs" -version = "0.0.7" +version = "0.0.8" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "920a08baac5050ace95378c15984e2ee5552fdcd193302e0f80e294190b8c148" +checksum = "a9c9597f2b960b592f97701c89c9ceb2658c8f804f39f876e1ab25379080fe2e" dependencies = [ "anyhow", "reqwest", diff --git a/cli/src/helpers.rs b/cli/src/helpers.rs index b10d9c3c2..7393f856b 100644 --- a/cli/src/helpers.rs +++ b/cli/src/helpers.rs @@ -65,6 +65,8 @@ pub fn gen_core_config(sub_matches: &ArgMatches) { host, port, jwt_valid_for, + monitoring_interval: Timelength::OneMinute, + daily_offset_hours: 0, slack_url, local_auth: true, github_oauth: Default::default(), diff --git a/cli/src/types.rs b/cli/src/types.rs index e521f4401..bbc1d5f63 100644 --- a/cli/src/types.rs +++ b/cli/src/types.rs @@ -13,11 +13,18 @@ pub struct CoreConfig { #[serde(default = "default_core_port")] pub port: u16, + // daily utc offset in hours to run daily update. eg 8:00 eastern time is 13:00 UTC, so offset should be 13. default of 0 runs at UTC midnight. + #[serde(default)] + pub daily_offset_hours: u8, + // jwt config pub jwt_secret: String, #[serde(default = "default_jwt_valid_for")] pub jwt_valid_for: Timelength, + // interval at which to collect server stats and alert for out of bounds + pub monitoring_interval: Timelength, + // used to verify validity from github webhooks pub github_webhook_secret: String, diff --git a/config_example/core.config.example.toml b/config_example/core.config.example.toml index 6e6b1edb5..877f58077 100644 --- a/config_example/core.config.example.toml +++ b/config_example/core.config.example.toml @@ -1,14 +1,39 @@ +# this should be the url used to access monitor in browser, potentially behind DNS, eg https://monitor.mogh.tech or http://12.34.56.78:9000 +host = "http://localhost:9000" + +# the port the core system will run on. if running core in docker container, leave as this port as 9000 and use port bind eg. -p 9001:9000 port = 9000 + +# daily utc offset in hours to run daily update. eg 8:00 eastern time is 13:00 UTC, so offset should be 13. default of 0 runs at UTC midnight. +daily_offset_hours = 13 + +# secret used to generate the jwt. should be some randomly generated hash. jwt_secret = "your_jwt_secret" + +# can be 1-hr, 12-hr, 1-day, 3-day, 1-wk, 2-wk, 30-day jwt_valid_for = "1-wk" + +# webhook url given by slack app slack_url = "your_slack_app_webhook_url" + +# token that has to be given to github during webhook config as the Secret github_webhook_secret = "your_random_webhook_secret" +# can be 30-sec, 1-min, 2-min, 5-min +monitoring_interval = "1-min" + +# allow or deny user login with username / password +local_auth = true + [github_oauth] -id = "your_client_id" -secret = "your_client_secret" +id = "your_github_client_id" +secret = "your_github_client_secret" + +[google_oauth] +id = "your_google_client_id" +secret = "your_google_client_secret" [mongo] uri = "your_mongo_uri" app_name = "monitor_core" -db_name = "monitor" \ No newline at end of file +db_name = "monitor" diff --git a/core/Cargo.toml b/core/Cargo.toml index 1737648c7..57e97ece5 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -18,7 +18,7 @@ axum = { version = "0.6", features = ["ws", "json"] } axum-extra = { version = "0.4", features = ["spa"] } tower = { version = "0.4", features = ["full"] } tower-http = { version = "0.3", features = ["cors"] } -slack = { package = "slack_client_rs", version = "0.0.7" } +slack = { package = "slack_client_rs", version = "0.0.8" } mungos = "0.2.27" serde = "1.0" serde_derive = "1.0" diff --git a/core/src/actions/build.rs b/core/src/actions/build.rs index 447f343be..5a41bf61b 100644 --- a/core/src/actions/build.rs +++ b/core/src/actions/build.rs @@ -32,8 +32,8 @@ impl State { } } - pub fn build_busy(&self, id: &str) -> bool { - match self.build_action_states.lock().unwrap().get(id) { + pub async fn build_busy(&self, id: &str) -> bool { + match self.build_action_states.lock().await.get(id) { Some(a) => a.busy(), None => false, } @@ -108,7 +108,7 @@ impl State { } pub async fn delete_build(&self, build_id: &str, user: &RequestUser) -> anyhow::Result { - if self.build_busy(build_id) { + if self.build_busy(build_id).await { return Err(anyhow!("build busy")); } let build = self @@ -147,18 +147,18 @@ impl State { new_build: Build, user: &RequestUser, ) -> anyhow::Result { - if self.build_busy(&new_build.id) { + if self.build_busy(&new_build.id).await { return Err(anyhow!("build busy")); } let id = new_build.id.clone(); { - let mut lock = self.build_action_states.lock().unwrap(); + let mut lock = self.build_action_states.lock().await; let entry = lock.entry(id.clone()).or_default(); entry.updating = true; } let res = self.update_build_inner(new_build, user).await; { - let mut lock = self.build_action_states.lock().unwrap(); + let mut lock = self.build_action_states.lock().await; let entry = lock.entry(id).or_default(); entry.updating = false; } @@ -230,17 +230,17 @@ impl State { } pub async fn build(&self, build_id: &str, user: &RequestUser) -> anyhow::Result { - if self.build_busy(build_id) { + if self.build_busy(build_id).await { return Err(anyhow!("build busy")); } { - let mut lock = self.build_action_states.lock().unwrap(); + let mut lock = self.build_action_states.lock().await; let entry = lock.entry(build_id.to_string()).or_default(); entry.building = true; } let res = self.build_inner(build_id, user).await; { - let mut lock = self.build_action_states.lock().unwrap(); + let mut lock = self.build_action_states.lock().await; let entry = lock.entry(build_id.to_string()).or_default(); entry.building = false; } @@ -309,17 +309,17 @@ impl State { build_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.build_busy(build_id) { + if self.build_busy(build_id).await { return Err(anyhow!("build busy")); } { - let mut lock = self.build_action_states.lock().unwrap(); + let mut lock = self.build_action_states.lock().await; let entry = lock.entry(build_id.to_string()).or_default(); entry.recloning = true; } let res = self.reclone_build_inner(build_id, user).await; { - let mut lock = self.build_action_states.lock().unwrap(); + let mut lock = self.build_action_states.lock().await; let entry = lock.entry(build_id.to_string()).or_default(); entry.recloning = false; } diff --git a/core/src/actions/deployment.rs b/core/src/actions/deployment.rs index 7a76a02f2..e16607641 100644 --- a/core/src/actions/deployment.rs +++ b/core/src/actions/deployment.rs @@ -31,8 +31,8 @@ impl State { } } - pub fn deployment_busy(&self, id: &str) -> bool { - match self.deployment_action_states.lock().unwrap().get(id) { + pub async fn deployment_busy(&self, id: &str) -> bool { + match self.deployment_action_states.lock().await.get(id) { Some(a) => a.busy(), None => false, } @@ -111,7 +111,7 @@ impl State { deployment_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(deployment_id) { + if self.deployment_busy(deployment_id).await { return Err(anyhow!("deployment busy")); } let deployment = self @@ -158,18 +158,18 @@ impl State { new_deployment: Deployment, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(&new_deployment.id) { + if self.deployment_busy(&new_deployment.id).await { return Err(anyhow!("deployment busy")); } let id = new_deployment.id.clone(); { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(id.clone()).or_default(); entry.updating = true; } let res = self.update_deployment_inner(new_deployment, user).await; { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(id).or_default(); entry.updating = false; } @@ -248,17 +248,17 @@ impl State { deployment_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(deployment_id) { + if self.deployment_busy(deployment_id).await { return Err(anyhow!("deployment busy")); } { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.recloning = true; } let res = self.reclone_deployment_inner(deployment_id, user).await; { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.recloning = false; } @@ -311,17 +311,17 @@ impl State { deployment_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(deployment_id) { + if self.deployment_busy(deployment_id).await { return Err(anyhow!("deployment busy")); } { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.deploying = true; } let res = self.deploy_container_inner(deployment_id, user).await; { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.deploying = false; } @@ -384,17 +384,17 @@ impl State { deployment_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(deployment_id) { + if self.deployment_busy(deployment_id).await { return Err(anyhow!("deployment busy")); } { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.starting = true; } let res = self.start_container_inner(deployment_id, user).await; { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.starting = false; } @@ -454,17 +454,17 @@ impl State { deployment_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(deployment_id) { + if self.deployment_busy(deployment_id).await { return Err(anyhow!("deployment busy")); } { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.stopping = true; } let res = self.stop_container_inner(deployment_id, user).await; { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.stopping = false; } @@ -524,17 +524,17 @@ impl State { deployment_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(deployment_id) { + if self.deployment_busy(deployment_id).await { return Err(anyhow!("deployment busy")); } { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.removing = true; } let res = self.remove_container_inner(deployment_id, user).await; { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.removing = false; } @@ -594,17 +594,17 @@ impl State { deployment_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.deployment_busy(deployment_id) { + if self.deployment_busy(deployment_id).await { return Err(anyhow!("deployment busy")); } { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.pulling = true; } let res = self.pull_deployment_repo_inner(deployment_id, user).await; { - let mut lock = self.deployment_action_states.lock().unwrap(); + let mut lock = self.deployment_action_states.lock().await; let entry = lock.entry(deployment_id.to_string()).or_default(); entry.pulling = false; } diff --git a/core/src/actions/server.rs b/core/src/actions/server.rs index 0f21f8dec..6c8c1bffe 100644 --- a/core/src/actions/server.rs +++ b/core/src/actions/server.rs @@ -27,8 +27,8 @@ impl State { } } - pub fn server_busy(&self, id: &str) -> bool { - match self.server_action_states.lock().unwrap().get(id) { + pub async fn server_busy(&self, id: &str) -> bool { + match self.server_action_states.lock().await.get(id) { Some(a) => a.busy(), None => false, } @@ -95,7 +95,7 @@ impl State { server_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.server_busy(server_id) { + if self.server_busy(server_id).await { return Err(anyhow!("server busy")); } let server = self @@ -125,7 +125,7 @@ impl State { mut new_server: Server, user: &RequestUser, ) -> anyhow::Result { - if self.server_busy(&new_server.id) { + if self.server_busy(&new_server.id).await { return Err(anyhow!("server busy")); } let current_server = self @@ -169,17 +169,17 @@ impl State { server_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.server_busy(server_id) { + if self.server_busy(server_id).await { return Err(anyhow!("server busy")); } { - let mut lock = self.server_action_states.lock().unwrap(); + let mut lock = self.server_action_states.lock().await; let entry = lock.entry(server_id.to_string()).or_default(); entry.pruning_networks = true; } let res = self.prune_networks_inner(server_id, user).await; { - let mut lock = self.server_action_states.lock().unwrap(); + let mut lock = self.server_action_states.lock().await; let entry = lock.entry(server_id.to_string()).or_default(); entry.pruning_networks = false; } @@ -230,17 +230,17 @@ impl State { server_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.server_busy(server_id) { + if self.server_busy(server_id).await { return Err(anyhow!("server busy")); } { - let mut lock = self.server_action_states.lock().unwrap(); + let mut lock = self.server_action_states.lock().await; let entry = lock.entry(server_id.to_string()).or_default(); entry.pruning_images = true; } let res = self.prune_images_inner(server_id, user).await; { - let mut lock = self.server_action_states.lock().unwrap(); + let mut lock = self.server_action_states.lock().await; let entry = lock.entry(server_id.to_string()).or_default(); entry.pruning_images = false; } @@ -292,17 +292,17 @@ impl State { server_id: &str, user: &RequestUser, ) -> anyhow::Result { - if self.server_busy(server_id) { + if self.server_busy(server_id).await { return Err(anyhow!("server busy")); } { - let mut lock = self.server_action_states.lock().unwrap(); + let mut lock = self.server_action_states.lock().await; let entry = lock.entry(server_id.to_string()).or_default(); entry.pruning_containers = true; } let res = self.prune_containers_inner(server_id, user).await; { - let mut lock = self.server_action_states.lock().unwrap(); + let mut lock = self.server_action_states.lock().await; let entry = lock.entry(server_id.to_string()).or_default(); entry.pruning_containers = false; } diff --git a/core/src/api/build.rs b/core/src/api/build.rs index e1ff5f39b..11e6a469c 100644 --- a/core/src/api/build.rs +++ b/core/src/api/build.rs @@ -214,7 +214,7 @@ impl State { let action_state = self .build_action_states .lock() - .unwrap() + .await .entry(id) .or_default() .clone(); diff --git a/core/src/api/deployment.rs b/core/src/api/deployment.rs index 6be0d07a1..ff426c3dc 100644 --- a/core/src/api/deployment.rs +++ b/core/src/api/deployment.rs @@ -358,7 +358,7 @@ impl State { let action_state = self .deployment_action_states .lock() - .unwrap() + .await .entry(id) .or_default() .clone(); diff --git a/core/src/api/server.rs b/core/src/api/server.rs index 0b34267b2..00a3a8e8c 100644 --- a/core/src/api/server.rs +++ b/core/src/api/server.rs @@ -479,7 +479,7 @@ impl State { let action_state = self .server_action_states .lock() - .unwrap() + .await .entry(id) .or_default() .clone(); diff --git a/core/src/main.rs b/core/src/main.rs index 0734155a9..51a8f1154 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -11,6 +11,7 @@ mod api; mod auth; mod config; mod helpers; +mod monitoring; mod state; mod ws; diff --git a/core/src/monitoring.rs b/core/src/monitoring.rs new file mode 100644 index 000000000..30238ce33 --- /dev/null +++ b/core/src/monitoring.rs @@ -0,0 +1,449 @@ +use async_timing_util::{unix_timestamp_ms, wait_until_timelength, Timelength, ONE_HOUR_MS}; +use futures_util::future::join_all; +use mungos::doc; +use slack::types::Block; +use types::{Server, SystemStats, SystemStatsQuery, SystemStatsRecord}; + +use crate::state::State; + +#[derive(Default)] +pub struct AlertStatus { + cpu_alert: bool, + mem_alert: bool, + disk_alert: bool, + component_alert: bool, +} + +impl State { + pub async fn collect_server_stats(&self) { + loop { + let ts = wait_until_timelength( + self.config.monitoring_interval.to_string().parse().unwrap(), + 0, + ) + .await as i64; + let servers = self.get_enabled_servers_with_stats().await; + if let Err(e) = servers { + eprintln!("failed to get server list from db: {e:?}"); + continue; + } + for (server, res) in servers.unwrap() { + if let Err(e) = res { + println!("server unreachable: {e:?}"); + if let Some(slack) = &self.slack { + let (header, info) = generate_unreachable_message(&server); + let res = slack.send_message_with_header(&header, info.clone()).await; + if let Err(e) = res { + eprintln!("failed to send message to slack: {e} | header: {header} | info: {info:?}") + } + } + continue; + } + let stats = res.unwrap(); + self.check_server_stats(&server, &stats).await; + let res = self + .db + .stats + .create_one(SystemStatsRecord::from_stats(server.id, ts, stats)) + .await; + if let Err(e) = res { + eprintln!("failed to insert stats into mongo | {e}"); + } + } + } + } + + async fn get_enabled_servers_with_stats( + &self, + ) -> anyhow::Result)>> { + let servers = self + .db + .servers + .get_some(doc! { "enabled": true }, None) + .await?; + + let futures = servers.into_iter().map(|server| async move { + let stats = self + .periphery + .get_system_stats( + &server, + &SystemStatsQuery { + networks: true, + components: true, + processes: false, + }, + ) + .await; + (server, stats) + }); + + Ok(join_all(futures).await) + } + + async fn check_server_stats(&self, server: &Server, stats: &SystemStats) { + self.check_cpu(server, stats).await; + self.check_mem(server, stats).await; + self.check_disk(server, stats).await; + self.check_components(server, stats).await; + } + + async fn check_cpu(&self, server: &Server, stats: &SystemStats) { + let lock = self.server_alert_status.lock().await; + if self.slack.is_none() || lock.get(&server.id).map(|s| s.cpu_alert).unwrap_or(false) { + return; + } + drop(lock); + if stats.cpu_perc > server.cpu_alert { + let region = if let Some(region) = &server.region { + format!(" ({region})") + } else { + String::new() + }; + let mut blocks = vec![ + Block::header("WARNING 🚨"), + Block::section(format!( + "*{}*{region} has high *CPU usage* 📈 🚨", + server.name + )), + Block::section(format!("cpu: *{:.1}%*", stats.cpu_perc)), + ]; + + if let Some(to_notify) = generate_to_notify(server) { + blocks.push(Block::section(to_notify)) + } + + let res = self + .slack + .as_ref() + .unwrap() + .send_message( + format!( + "WARNING 🚨 | *{}*{region} has high *CPU usage* 📈 🚨", + server.name + ), + blocks, + ) + .await; + if let Err(e) = res { + eprintln!( + "failed to send message to slack | high cpu usage on {} | usage: {:.1}% | {e:?}", + server.name, stats.cpu_perc + ) + } else { + let mut lock = self.server_alert_status.lock().await; + let entry = lock.entry(server.id.clone()).or_default(); + entry.cpu_alert = true; + } + } + } + + async fn check_mem(&self, server: &Server, stats: &SystemStats) { + let lock = self.server_alert_status.lock().await; + if self.slack.is_none() || lock.get(&server.id).map(|s| s.mem_alert).unwrap_or(false) { + return; + } + drop(lock); + if (stats.mem_used_gb / stats.mem_total_gb) * 100.0 > server.mem_alert { + let region = if let Some(region) = &server.region { + format!(" ({region})") + } else { + String::new() + }; + let mut blocks = vec![ + Block::header("WARNING 🚨"), + Block::section(format!( + "*{}*{region} has high *memory usage* 💾 🚨", + server.name + )), + Block::section(format!( + "memory: used *{:.2} GB* of *{:.2} GB* (*{:.1}%*)", + stats.mem_used_gb, + stats.mem_total_gb, + (stats.mem_used_gb / stats.mem_total_gb) * 100.0 + )), + ]; + + if let Some(to_notify) = generate_to_notify(server) { + blocks.push(Block::section(to_notify)) + } + + let res = self + .slack + .as_ref() + .unwrap() + .send_message( + format!( + "WARNING 🚨 | *{}*{region} has high *memory usage* 💾 🚨", + server.name + ), + blocks, + ) + .await; + if let Err(e) = res { + eprintln!( + "failed to send message to slack | high mem usage on {} | usage: {:.2}GB of {:.2}GB | {e:?}", + server.name, stats.mem_used_gb, stats.mem_total_gb, + ) + } else { + let mut lock = self.server_alert_status.lock().await; + let entry = lock.entry(server.id.clone()).or_default(); + entry.mem_alert = true; + } + } + } + + async fn check_disk(&self, server: &Server, stats: &SystemStats) { + let lock = self.server_alert_status.lock().await; + if self.slack.is_none() || lock.get(&server.id).map(|s| s.disk_alert).unwrap_or(false) { + return; + } + drop(lock); + if (stats.disk.used_gb / stats.disk.total_gb) * 100.0 > server.disk_alert { + let region = if let Some(region) = &server.region { + format!(" ({region})") + } else { + String::new() + }; + let mut blocks = vec![ + Block::header("WARNING 🚨"), + Block::section(format!( + "*{}*{region} has high *disk usage* 💿 🚨", + server.name + )), + Block::section(format!( + "disk: used *{:.2} GB* of *{:.2} GB* (*{:.1}%*)", + stats.disk.used_gb, + stats.disk.total_gb, + (stats.disk.used_gb / stats.disk.total_gb) * 100.0 + )), + ]; + + if let Some(to_notify) = generate_to_notify(server) { + blocks.push(Block::section(to_notify)) + } + + let res = self + .slack + .as_ref() + .unwrap() + .send_message( + format!( + "WARNING 🚨 | *{}*{region} has high *disk usage* 💿 🚨", + server.name + ), + blocks, + ) + .await; + if let Err(e) = res { + eprintln!( + "failed to send message to slack | high disk usage on {} | usage: {:.2}GB of {:.2}GB | {e:?}", + server.name, stats.disk.used_gb, stats.disk.total_gb, + ) + } else { + let mut lock = self.server_alert_status.lock().await; + let entry = lock.entry(server.id.clone()).or_default(); + entry.disk_alert = true; + } + } + } + + async fn check_components(&self, server: &Server, stats: &SystemStats) { + let lock = self.server_alert_status.lock().await; + if self.slack.is_none() + || lock + .get(&server.id) + .map(|s| s.component_alert) + .unwrap_or(false) + { + return; + } + drop(lock); + let info = stats + .components + .iter() + .map(|c| { + if let Some(critical) = c.critical { + if c.temp / critical > 0.75 { + format!( + "{}: *{:.1}°* (*{:.1}%* to critical) 🌡️", + c.label, + c.temp, + (c.temp / critical) * 100.0 + ) + } else { + String::new() + } + } else { + String::new() + } + }) + .filter(|s| !s.is_empty()) + .collect::>(); + if info.len() > 0 { + let region = if let Some(region) = &server.region { + format!(" ({region})") + } else { + String::new() + }; + let mut blocks = vec![ + Block::header("WARNING 🚨"), + Block::section(format!( + "*{}*{region} has high *tempurature* 🌡️ 🚨", + server.name + )), + Block::section(info.join("\n")), + ]; + + if let Some(to_notify) = generate_to_notify(server) { + blocks.push(Block::section(to_notify)) + } + + let res = self + .slack + .as_ref() + .unwrap() + .send_message( + format!( + "WARNING 🚨 | *{}*{region} has high *tempurature* 🌡️ 🚨", + server.name + ), + blocks, + ) + .await; + if let Err(e) = res { + eprintln!( + "failed to send message to slack | high tempurature on {} | {} | {e:?}", + server.name, + info.join(" | "), + ) + } else { + let mut lock = self.server_alert_status.lock().await; + let entry = lock.entry(server.id.clone()).or_default(); + entry.component_alert = true; + } + } + } + + pub async fn daily_update(&self) { + let offset = self.config.daily_offset_hours as u128 * ONE_HOUR_MS; + loop { + wait_until_timelength(Timelength::OneDay, offset).await; + let servers = self.get_enabled_servers_with_stats().await; + if let Err(e) = &servers { + eprintln!( + "{} | failed to get servers with stats for daily update | {e:#?}", + unix_timestamp_ms() + ); + continue; + } + let mut blocks = vec![Block::header("INFO | daily update"), Block::divider()]; + for (server, stats) in servers.unwrap() { + let region = if let Some(region) = &server.region { + format!(" | {region}") + } else { + String::new() + }; + if let Ok(stats) = stats { + let cpu_warning = if stats.cpu_perc > server.cpu_alert { + " 🚨" + } else { + "" + }; + let mem_warning = + if (stats.mem_used_gb / stats.mem_total_gb) * 100.0 > server.mem_alert { + " 🚨" + } else { + "" + }; + let disk_warning = + if (stats.disk.used_gb / stats.disk.total_gb) * 100.0 > server.disk_alert { + " 🚨" + } else { + "" + }; + let status = if !cpu_warning.is_empty() + || !mem_warning.is_empty() + || !disk_warning.is_empty() + { + "*WARNING* 🚨" + } else { + "*OK* ✅" + }; + let name_line = format!("*{}*{region} | {status}", server.name); + let cpu_line = format!("CPU: *{:.1}%*{cpu_warning}", stats.cpu_perc); + let mem_line = format!( + "MEM: *{:.1}%* ({:.2} GB of {:.2} GB){mem_warning}", + (stats.mem_used_gb / stats.mem_total_gb) * 100.0, + stats.mem_used_gb, + stats.mem_total_gb, + ); + let disk_line = format!( + "DISK: *{:.1}%* ({:.2} GB of {:.2} GB){disk_warning}", + (stats.disk.used_gb / stats.disk.total_gb) * 100.0, + stats.disk.used_gb, + stats.disk.total_gb, + ); + blocks.push(Block::section(format!( + "{name_line}\n{cpu_line}\n{mem_line}\n{disk_line}", + ))); + } else { + blocks.push(Block::section(format!( + "*{}*{region} | *UNREACHABLE* ❌", + server.name + ))); + } + blocks.push(Block::divider()) + } + let res = self + .slack + .as_ref() + .unwrap() + .send_message(format!("INFO | daily update"), blocks) + .await; + if let Err(e) = res { + eprintln!( + "{} | failed to send daily update message | {e:?}", + unix_timestamp_ms() + ); + } + { + self.server_alert_status.lock().await.clear(); + } + } + } +} + +fn generate_unreachable_message(server: &Server) -> (String, Option) { + let region = match &server.region { + Some(region) => format!(" ({region})"), + None => String::new(), + }; + let header = format!("WARNING 🚨 | {}{region} is unreachable ❌", server.name); + let to_notify = server + .to_notify + .iter() + .map(|u| format!("<@{u}>")) + .collect::>() + .join(" "); + let info = if to_notify.len() > 0 { + Some(to_notify) + } else { + None + }; + (header, info) +} + +fn generate_to_notify(server: &Server) -> Option { + if server.to_notify.len() > 0 { + Some( + server + .to_notify + .iter() + .map(|u| format!("<@{u}>")) + .collect::>() + .join(" "), + ) + } else { + None + } +} diff --git a/core/src/state.rs b/core/src/state.rs index 4f1754f69..deb2a8a96 100644 --- a/core/src/state.rs +++ b/core/src/state.rs @@ -1,20 +1,12 @@ -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::{collections::HashMap, sync::Arc}; -use async_timing_util::{wait_until_timelength, Timelength}; use axum::Extension; use db::DbClient; -use futures_util::future::join_all; -use mungos::doc; use periphery::PeripheryClient; -use types::{ - BuildActionState, CoreConfig, DeploymentActionState, Server, ServerActionState, - SystemStatsQuery, SystemStatsRecord, -}; +use tokio::sync::Mutex; +use types::{BuildActionState, CoreConfig, DeploymentActionState, ServerActionState}; -use crate::ws::update::UpdateWsChannel; +use crate::{monitoring::AlertStatus, ws::update::UpdateWsChannel}; pub type StateExtension = Extension>; @@ -29,6 +21,7 @@ pub struct State { pub build_action_states: ActionStateMap, pub deployment_action_states: ActionStateMap, pub server_action_states: ActionStateMap, + pub server_alert_status: Mutex>, // (server_id, AlertStatus) } impl State { @@ -42,85 +35,19 @@ impl State { build_action_states: Default::default(), deployment_action_states: Default::default(), server_action_states: Default::default(), + server_alert_status: Default::default(), }; let state = Arc::new(state); let state_clone = state.clone(); tokio::spawn(async move { state_clone.collect_server_stats().await }); + if state.slack.is_some() { + let state_clone = state.clone(); + tokio::spawn(async move { state_clone.daily_update().await }); + } state } pub async fn extension(config: CoreConfig) -> StateExtension { Extension(State::new(config).await) } - - pub async fn collect_server_stats(&self) { - loop { - let ts = wait_until_timelength(Timelength::OneMinute, 0).await as i64; - let servers = self - .db - .servers - .get_some(doc! { "enabled": true }, None) - .await; - if let Err(e) = servers { - eprintln!("failed to get server list from db: {e:?}"); - continue; - } - let futures = servers.unwrap().into_iter().map(|server| async move { - let stats = self - .periphery - .get_system_stats( - &server, - &SystemStatsQuery { - networks: true, - components: true, - processes: false, - }, - ) - .await; - (server, stats) - }); - for (server, res) in join_all(futures).await { - if let Err(e) = res { - println!("server unreachable: {e:?}"); - if let Some(slack) = &self.slack { - let (header, info) = generate_unreachable_message(&server); - let res = slack.send_message_with_header(&header, info.clone()).await; - if let Err(e) = res { - eprintln!("failed to send message to slack: {e} | header: {header} | info: {info:?}") - } - } - continue; - } - let stats = res.unwrap(); - let res = self - .db - .stats - .create_one(SystemStatsRecord::from_stats(server.id, ts, stats)) - .await; - if let Err(e) = res { - eprintln!("failed to insert stats into mongo | {e}"); - } - } - } - } -} - -fn generate_unreachable_message(server: &Server) -> (String, Option) { - let region = match &server.region { - Some(region) => format!(" ({region})"), - None => String::new(), - }; - let header = format!("WARNING 🚨 | {}{region} is unreachable ❌", server.name); - let to_notify = server - .to_notify - .iter() - .map(|u| format!("<@{u}>")) - .collect::>() - .join(" "); - let info = if to_notify.len() > 0 { - Some(to_notify) - } else { - None - }; - (header, info) } diff --git a/lib/types/src/config.rs b/lib/types/src/config.rs index 8d4c7a065..d49e8a471 100644 --- a/lib/types/src/config.rs +++ b/lib/types/src/config.rs @@ -28,6 +28,13 @@ pub struct CoreConfig { #[serde(default = "default_jwt_valid_for")] pub jwt_valid_for: Timelength, + // interval at which to collect server stats and alert for out of bounds + pub monitoring_interval: Timelength, + + // daily utc offset in hours to run daily update. eg 8:00 eastern time is 13:00 UTC, so offset should be 13. default of 0 runs at UTC midnight. + #[serde(default)] + pub daily_offset_hours: u8, + // used to verify validity from github webhooks pub github_webhook_secret: String, diff --git a/lib/types/src/diff.rs b/lib/types/src/diff.rs index 57a1ecc49..1ab081c95 100644 --- a/lib/types/src/diff.rs +++ b/lib/types/src/diff.rs @@ -9,6 +9,10 @@ pub fn f64_diff_no_change(f64_diff: &f64) -> bool { *f64_diff == 0.0 } +pub fn f32_diff_no_change(f32_diff: &f32) -> bool { + *f32_diff == 0.0 +} + pub fn option_diff_no_change(option_diff: &OptionDiff) -> bool where ::Repr: PartialEq, diff --git a/lib/types/src/server.rs b/lib/types/src/server.rs index 8c0a6c618..034da6f8b 100644 --- a/lib/types/src/server.rs +++ b/lib/types/src/server.rs @@ -42,8 +42,8 @@ pub struct Server { pub to_notify: Vec, // slack users to notify #[serde(default = "default_cpu_alert")] - #[diff(attr(#[serde(skip_serializing_if = "f64_diff_no_change")]))] - pub cpu_alert: f64, + #[diff(attr(#[serde(skip_serializing_if = "f32_diff_no_change")]))] + pub cpu_alert: f32, #[serde(default = "default_mem_alert")] #[diff(attr(#[serde(skip_serializing_if = "f64_diff_no_change")]))] @@ -96,7 +96,7 @@ impl Default for Server { } } -fn default_cpu_alert() -> f64 { +fn default_cpu_alert() -> f32 { 50.0 }