finish stat collection and slack alerts, daily update. async mutex

This commit is contained in:
mbecker20
2023-01-04 08:01:25 +00:00
parent 17a5d624d9
commit c90e91d78b
17 changed files with 566 additions and 144 deletions

4
Cargo.lock generated
View File

@@ -2169,9 +2169,9 @@ dependencies = [
[[package]]
name = "slack_client_rs"
version = "0.0.7"
version = "0.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "920a08baac5050ace95378c15984e2ee5552fdcd193302e0f80e294190b8c148"
checksum = "a9c9597f2b960b592f97701c89c9ceb2658c8f804f39f876e1ab25379080fe2e"
dependencies = [
"anyhow",
"reqwest",

View File

@@ -65,6 +65,8 @@ pub fn gen_core_config(sub_matches: &ArgMatches) {
host,
port,
jwt_valid_for,
monitoring_interval: Timelength::OneMinute,
daily_offset_hours: 0,
slack_url,
local_auth: true,
github_oauth: Default::default(),

View File

@@ -13,11 +13,18 @@ pub struct CoreConfig {
#[serde(default = "default_core_port")]
pub port: u16,
// daily utc offset in hours to run daily update. eg 8:00 eastern time is 13:00 UTC, so offset should be 13. default of 0 runs at UTC midnight.
#[serde(default)]
pub daily_offset_hours: u8,
// jwt config
pub jwt_secret: String,
#[serde(default = "default_jwt_valid_for")]
pub jwt_valid_for: Timelength,
// interval at which to collect server stats and alert for out of bounds
pub monitoring_interval: Timelength,
// used to verify validity from github webhooks
pub github_webhook_secret: String,

View File

@@ -1,14 +1,39 @@
# this should be the url used to access monitor in browser, potentially behind DNS, eg https://monitor.mogh.tech or http://12.34.56.78:9000
host = "http://localhost:9000"
# the port the core system will run on. if running core in docker container, leave as this port as 9000 and use port bind eg. -p 9001:9000
port = 9000
# daily utc offset in hours to run daily update. eg 8:00 eastern time is 13:00 UTC, so offset should be 13. default of 0 runs at UTC midnight.
daily_offset_hours = 13
# secret used to generate the jwt. should be some randomly generated hash.
jwt_secret = "your_jwt_secret"
# can be 1-hr, 12-hr, 1-day, 3-day, 1-wk, 2-wk, 30-day
jwt_valid_for = "1-wk"
# webhook url given by slack app
slack_url = "your_slack_app_webhook_url"
# token that has to be given to github during webhook config as the Secret
github_webhook_secret = "your_random_webhook_secret"
# can be 30-sec, 1-min, 2-min, 5-min
monitoring_interval = "1-min"
# allow or deny user login with username / password
local_auth = true
[github_oauth]
id = "your_client_id"
secret = "your_client_secret"
id = "your_github_client_id"
secret = "your_github_client_secret"
[google_oauth]
id = "your_google_client_id"
secret = "your_google_client_secret"
[mongo]
uri = "your_mongo_uri"
app_name = "monitor_core"
db_name = "monitor"
db_name = "monitor"

View File

@@ -18,7 +18,7 @@ axum = { version = "0.6", features = ["ws", "json"] }
axum-extra = { version = "0.4", features = ["spa"] }
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.3", features = ["cors"] }
slack = { package = "slack_client_rs", version = "0.0.7" }
slack = { package = "slack_client_rs", version = "0.0.8" }
mungos = "0.2.27"
serde = "1.0"
serde_derive = "1.0"

View File

@@ -32,8 +32,8 @@ impl State {
}
}
pub fn build_busy(&self, id: &str) -> bool {
match self.build_action_states.lock().unwrap().get(id) {
pub async fn build_busy(&self, id: &str) -> bool {
match self.build_action_states.lock().await.get(id) {
Some(a) => a.busy(),
None => false,
}
@@ -108,7 +108,7 @@ impl State {
}
pub async fn delete_build(&self, build_id: &str, user: &RequestUser) -> anyhow::Result<Build> {
if self.build_busy(build_id) {
if self.build_busy(build_id).await {
return Err(anyhow!("build busy"));
}
let build = self
@@ -147,18 +147,18 @@ impl State {
new_build: Build,
user: &RequestUser,
) -> anyhow::Result<Build> {
if self.build_busy(&new_build.id) {
if self.build_busy(&new_build.id).await {
return Err(anyhow!("build busy"));
}
let id = new_build.id.clone();
{
let mut lock = self.build_action_states.lock().unwrap();
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(id.clone()).or_default();
entry.updating = true;
}
let res = self.update_build_inner(new_build, user).await;
{
let mut lock = self.build_action_states.lock().unwrap();
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(id).or_default();
entry.updating = false;
}
@@ -230,17 +230,17 @@ impl State {
}
pub async fn build(&self, build_id: &str, user: &RequestUser) -> anyhow::Result<Update> {
if self.build_busy(build_id) {
if self.build_busy(build_id).await {
return Err(anyhow!("build busy"));
}
{
let mut lock = self.build_action_states.lock().unwrap();
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(build_id.to_string()).or_default();
entry.building = true;
}
let res = self.build_inner(build_id, user).await;
{
let mut lock = self.build_action_states.lock().unwrap();
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(build_id.to_string()).or_default();
entry.building = false;
}
@@ -309,17 +309,17 @@ impl State {
build_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.build_busy(build_id) {
if self.build_busy(build_id).await {
return Err(anyhow!("build busy"));
}
{
let mut lock = self.build_action_states.lock().unwrap();
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(build_id.to_string()).or_default();
entry.recloning = true;
}
let res = self.reclone_build_inner(build_id, user).await;
{
let mut lock = self.build_action_states.lock().unwrap();
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(build_id.to_string()).or_default();
entry.recloning = false;
}

View File

@@ -31,8 +31,8 @@ impl State {
}
}
pub fn deployment_busy(&self, id: &str) -> bool {
match self.deployment_action_states.lock().unwrap().get(id) {
pub async fn deployment_busy(&self, id: &str) -> bool {
match self.deployment_action_states.lock().await.get(id) {
Some(a) => a.busy(),
None => false,
}
@@ -111,7 +111,7 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Deployment> {
if self.deployment_busy(deployment_id) {
if self.deployment_busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
let deployment = self
@@ -158,18 +158,18 @@ impl State {
new_deployment: Deployment,
user: &RequestUser,
) -> anyhow::Result<Deployment> {
if self.deployment_busy(&new_deployment.id) {
if self.deployment_busy(&new_deployment.id).await {
return Err(anyhow!("deployment busy"));
}
let id = new_deployment.id.clone();
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(id.clone()).or_default();
entry.updating = true;
}
let res = self.update_deployment_inner(new_deployment, user).await;
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(id).or_default();
entry.updating = false;
}
@@ -248,17 +248,17 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id) {
if self.deployment_busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.recloning = true;
}
let res = self.reclone_deployment_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.recloning = false;
}
@@ -311,17 +311,17 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id) {
if self.deployment_busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.deploying = true;
}
let res = self.deploy_container_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.deploying = false;
}
@@ -384,17 +384,17 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id) {
if self.deployment_busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.starting = true;
}
let res = self.start_container_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.starting = false;
}
@@ -454,17 +454,17 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id) {
if self.deployment_busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.stopping = true;
}
let res = self.stop_container_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.stopping = false;
}
@@ -524,17 +524,17 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id) {
if self.deployment_busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.removing = true;
}
let res = self.remove_container_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.removing = false;
}
@@ -594,17 +594,17 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id) {
if self.deployment_busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.pulling = true;
}
let res = self.pull_deployment_repo_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().unwrap();
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.pulling = false;
}

View File

@@ -27,8 +27,8 @@ impl State {
}
}
pub fn server_busy(&self, id: &str) -> bool {
match self.server_action_states.lock().unwrap().get(id) {
pub async fn server_busy(&self, id: &str) -> bool {
match self.server_action_states.lock().await.get(id) {
Some(a) => a.busy(),
None => false,
}
@@ -95,7 +95,7 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Server> {
if self.server_busy(server_id) {
if self.server_busy(server_id).await {
return Err(anyhow!("server busy"));
}
let server = self
@@ -125,7 +125,7 @@ impl State {
mut new_server: Server,
user: &RequestUser,
) -> anyhow::Result<Server> {
if self.server_busy(&new_server.id) {
if self.server_busy(&new_server.id).await {
return Err(anyhow!("server busy"));
}
let current_server = self
@@ -169,17 +169,17 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.server_busy(server_id) {
if self.server_busy(server_id).await {
return Err(anyhow!("server busy"));
}
{
let mut lock = self.server_action_states.lock().unwrap();
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_networks = true;
}
let res = self.prune_networks_inner(server_id, user).await;
{
let mut lock = self.server_action_states.lock().unwrap();
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_networks = false;
}
@@ -230,17 +230,17 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.server_busy(server_id) {
if self.server_busy(server_id).await {
return Err(anyhow!("server busy"));
}
{
let mut lock = self.server_action_states.lock().unwrap();
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_images = true;
}
let res = self.prune_images_inner(server_id, user).await;
{
let mut lock = self.server_action_states.lock().unwrap();
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_images = false;
}
@@ -292,17 +292,17 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.server_busy(server_id) {
if self.server_busy(server_id).await {
return Err(anyhow!("server busy"));
}
{
let mut lock = self.server_action_states.lock().unwrap();
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_containers = true;
}
let res = self.prune_containers_inner(server_id, user).await;
{
let mut lock = self.server_action_states.lock().unwrap();
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_containers = false;
}

View File

@@ -214,7 +214,7 @@ impl State {
let action_state = self
.build_action_states
.lock()
.unwrap()
.await
.entry(id)
.or_default()
.clone();

View File

@@ -358,7 +358,7 @@ impl State {
let action_state = self
.deployment_action_states
.lock()
.unwrap()
.await
.entry(id)
.or_default()
.clone();

View File

@@ -479,7 +479,7 @@ impl State {
let action_state = self
.server_action_states
.lock()
.unwrap()
.await
.entry(id)
.or_default()
.clone();

View File

@@ -11,6 +11,7 @@ mod api;
mod auth;
mod config;
mod helpers;
mod monitoring;
mod state;
mod ws;

449
core/src/monitoring.rs Normal file
View File

@@ -0,0 +1,449 @@
use async_timing_util::{unix_timestamp_ms, wait_until_timelength, Timelength, ONE_HOUR_MS};
use futures_util::future::join_all;
use mungos::doc;
use slack::types::Block;
use types::{Server, SystemStats, SystemStatsQuery, SystemStatsRecord};
use crate::state::State;
#[derive(Default)]
pub struct AlertStatus {
cpu_alert: bool,
mem_alert: bool,
disk_alert: bool,
component_alert: bool,
}
impl State {
pub async fn collect_server_stats(&self) {
loop {
let ts = wait_until_timelength(
self.config.monitoring_interval.to_string().parse().unwrap(),
0,
)
.await as i64;
let servers = self.get_enabled_servers_with_stats().await;
if let Err(e) = servers {
eprintln!("failed to get server list from db: {e:?}");
continue;
}
for (server, res) in servers.unwrap() {
if let Err(e) = res {
println!("server unreachable: {e:?}");
if let Some(slack) = &self.slack {
let (header, info) = generate_unreachable_message(&server);
let res = slack.send_message_with_header(&header, info.clone()).await;
if let Err(e) = res {
eprintln!("failed to send message to slack: {e} | header: {header} | info: {info:?}")
}
}
continue;
}
let stats = res.unwrap();
self.check_server_stats(&server, &stats).await;
let res = self
.db
.stats
.create_one(SystemStatsRecord::from_stats(server.id, ts, stats))
.await;
if let Err(e) = res {
eprintln!("failed to insert stats into mongo | {e}");
}
}
}
}
async fn get_enabled_servers_with_stats(
&self,
) -> anyhow::Result<Vec<(Server, anyhow::Result<SystemStats>)>> {
let servers = self
.db
.servers
.get_some(doc! { "enabled": true }, None)
.await?;
let futures = servers.into_iter().map(|server| async move {
let stats = self
.periphery
.get_system_stats(
&server,
&SystemStatsQuery {
networks: true,
components: true,
processes: false,
},
)
.await;
(server, stats)
});
Ok(join_all(futures).await)
}
async fn check_server_stats(&self, server: &Server, stats: &SystemStats) {
self.check_cpu(server, stats).await;
self.check_mem(server, stats).await;
self.check_disk(server, stats).await;
self.check_components(server, stats).await;
}
async fn check_cpu(&self, server: &Server, stats: &SystemStats) {
let lock = self.server_alert_status.lock().await;
if self.slack.is_none() || lock.get(&server.id).map(|s| s.cpu_alert).unwrap_or(false) {
return;
}
drop(lock);
if stats.cpu_perc > server.cpu_alert {
let region = if let Some(region) = &server.region {
format!(" ({region})")
} else {
String::new()
};
let mut blocks = vec![
Block::header("WARNING 🚨"),
Block::section(format!(
"*{}*{region} has high *CPU usage* 📈 🚨",
server.name
)),
Block::section(format!("cpu: *{:.1}%*", stats.cpu_perc)),
];
if let Some(to_notify) = generate_to_notify(server) {
blocks.push(Block::section(to_notify))
}
let res = self
.slack
.as_ref()
.unwrap()
.send_message(
format!(
"WARNING 🚨 | *{}*{region} has high *CPU usage* 📈 🚨",
server.name
),
blocks,
)
.await;
if let Err(e) = res {
eprintln!(
"failed to send message to slack | high cpu usage on {} | usage: {:.1}% | {e:?}",
server.name, stats.cpu_perc
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.cpu_alert = true;
}
}
}
async fn check_mem(&self, server: &Server, stats: &SystemStats) {
let lock = self.server_alert_status.lock().await;
if self.slack.is_none() || lock.get(&server.id).map(|s| s.mem_alert).unwrap_or(false) {
return;
}
drop(lock);
if (stats.mem_used_gb / stats.mem_total_gb) * 100.0 > server.mem_alert {
let region = if let Some(region) = &server.region {
format!(" ({region})")
} else {
String::new()
};
let mut blocks = vec![
Block::header("WARNING 🚨"),
Block::section(format!(
"*{}*{region} has high *memory usage* 💾 🚨",
server.name
)),
Block::section(format!(
"memory: used *{:.2} GB* of *{:.2} GB* (*{:.1}%*)",
stats.mem_used_gb,
stats.mem_total_gb,
(stats.mem_used_gb / stats.mem_total_gb) * 100.0
)),
];
if let Some(to_notify) = generate_to_notify(server) {
blocks.push(Block::section(to_notify))
}
let res = self
.slack
.as_ref()
.unwrap()
.send_message(
format!(
"WARNING 🚨 | *{}*{region} has high *memory usage* 💾 🚨",
server.name
),
blocks,
)
.await;
if let Err(e) = res {
eprintln!(
"failed to send message to slack | high mem usage on {} | usage: {:.2}GB of {:.2}GB | {e:?}",
server.name, stats.mem_used_gb, stats.mem_total_gb,
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.mem_alert = true;
}
}
}
async fn check_disk(&self, server: &Server, stats: &SystemStats) {
let lock = self.server_alert_status.lock().await;
if self.slack.is_none() || lock.get(&server.id).map(|s| s.disk_alert).unwrap_or(false) {
return;
}
drop(lock);
if (stats.disk.used_gb / stats.disk.total_gb) * 100.0 > server.disk_alert {
let region = if let Some(region) = &server.region {
format!(" ({region})")
} else {
String::new()
};
let mut blocks = vec![
Block::header("WARNING 🚨"),
Block::section(format!(
"*{}*{region} has high *disk usage* 💿 🚨",
server.name
)),
Block::section(format!(
"disk: used *{:.2} GB* of *{:.2} GB* (*{:.1}%*)",
stats.disk.used_gb,
stats.disk.total_gb,
(stats.disk.used_gb / stats.disk.total_gb) * 100.0
)),
];
if let Some(to_notify) = generate_to_notify(server) {
blocks.push(Block::section(to_notify))
}
let res = self
.slack
.as_ref()
.unwrap()
.send_message(
format!(
"WARNING 🚨 | *{}*{region} has high *disk usage* 💿 🚨",
server.name
),
blocks,
)
.await;
if let Err(e) = res {
eprintln!(
"failed to send message to slack | high disk usage on {} | usage: {:.2}GB of {:.2}GB | {e:?}",
server.name, stats.disk.used_gb, stats.disk.total_gb,
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.disk_alert = true;
}
}
}
async fn check_components(&self, server: &Server, stats: &SystemStats) {
let lock = self.server_alert_status.lock().await;
if self.slack.is_none()
|| lock
.get(&server.id)
.map(|s| s.component_alert)
.unwrap_or(false)
{
return;
}
drop(lock);
let info = stats
.components
.iter()
.map(|c| {
if let Some(critical) = c.critical {
if c.temp / critical > 0.75 {
format!(
"{}: *{:.1}°* (*{:.1}%* to critical) 🌡️",
c.label,
c.temp,
(c.temp / critical) * 100.0
)
} else {
String::new()
}
} else {
String::new()
}
})
.filter(|s| !s.is_empty())
.collect::<Vec<_>>();
if info.len() > 0 {
let region = if let Some(region) = &server.region {
format!(" ({region})")
} else {
String::new()
};
let mut blocks = vec![
Block::header("WARNING 🚨"),
Block::section(format!(
"*{}*{region} has high *tempurature* 🌡️ 🚨",
server.name
)),
Block::section(info.join("\n")),
];
if let Some(to_notify) = generate_to_notify(server) {
blocks.push(Block::section(to_notify))
}
let res = self
.slack
.as_ref()
.unwrap()
.send_message(
format!(
"WARNING 🚨 | *{}*{region} has high *tempurature* 🌡️ 🚨",
server.name
),
blocks,
)
.await;
if let Err(e) = res {
eprintln!(
"failed to send message to slack | high tempurature on {} | {} | {e:?}",
server.name,
info.join(" | "),
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.component_alert = true;
}
}
}
pub async fn daily_update(&self) {
let offset = self.config.daily_offset_hours as u128 * ONE_HOUR_MS;
loop {
wait_until_timelength(Timelength::OneDay, offset).await;
let servers = self.get_enabled_servers_with_stats().await;
if let Err(e) = &servers {
eprintln!(
"{} | failed to get servers with stats for daily update | {e:#?}",
unix_timestamp_ms()
);
continue;
}
let mut blocks = vec![Block::header("INFO | daily update"), Block::divider()];
for (server, stats) in servers.unwrap() {
let region = if let Some(region) = &server.region {
format!(" | {region}")
} else {
String::new()
};
if let Ok(stats) = stats {
let cpu_warning = if stats.cpu_perc > server.cpu_alert {
" 🚨"
} else {
""
};
let mem_warning =
if (stats.mem_used_gb / stats.mem_total_gb) * 100.0 > server.mem_alert {
" 🚨"
} else {
""
};
let disk_warning =
if (stats.disk.used_gb / stats.disk.total_gb) * 100.0 > server.disk_alert {
" 🚨"
} else {
""
};
let status = if !cpu_warning.is_empty()
|| !mem_warning.is_empty()
|| !disk_warning.is_empty()
{
"*WARNING* 🚨"
} else {
"*OK* ✅"
};
let name_line = format!("*{}*{region} | {status}", server.name);
let cpu_line = format!("CPU: *{:.1}%*{cpu_warning}", stats.cpu_perc);
let mem_line = format!(
"MEM: *{:.1}%* ({:.2} GB of {:.2} GB){mem_warning}",
(stats.mem_used_gb / stats.mem_total_gb) * 100.0,
stats.mem_used_gb,
stats.mem_total_gb,
);
let disk_line = format!(
"DISK: *{:.1}%* ({:.2} GB of {:.2} GB){disk_warning}",
(stats.disk.used_gb / stats.disk.total_gb) * 100.0,
stats.disk.used_gb,
stats.disk.total_gb,
);
blocks.push(Block::section(format!(
"{name_line}\n{cpu_line}\n{mem_line}\n{disk_line}",
)));
} else {
blocks.push(Block::section(format!(
"*{}*{region} | *UNREACHABLE* ❌",
server.name
)));
}
blocks.push(Block::divider())
}
let res = self
.slack
.as_ref()
.unwrap()
.send_message(format!("INFO | daily update"), blocks)
.await;
if let Err(e) = res {
eprintln!(
"{} | failed to send daily update message | {e:?}",
unix_timestamp_ms()
);
}
{
self.server_alert_status.lock().await.clear();
}
}
}
}
fn generate_unreachable_message(server: &Server) -> (String, Option<String>) {
let region = match &server.region {
Some(region) => format!(" ({region})"),
None => String::new(),
};
let header = format!("WARNING 🚨 | {}{region} is unreachable ❌", server.name);
let to_notify = server
.to_notify
.iter()
.map(|u| format!("<@{u}>"))
.collect::<Vec<_>>()
.join(" ");
let info = if to_notify.len() > 0 {
Some(to_notify)
} else {
None
};
(header, info)
}
fn generate_to_notify(server: &Server) -> Option<String> {
if server.to_notify.len() > 0 {
Some(
server
.to_notify
.iter()
.map(|u| format!("<@{u}>"))
.collect::<Vec<String>>()
.join(" "),
)
} else {
None
}
}

View File

@@ -1,20 +1,12 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use std::{collections::HashMap, sync::Arc};
use async_timing_util::{wait_until_timelength, Timelength};
use axum::Extension;
use db::DbClient;
use futures_util::future::join_all;
use mungos::doc;
use periphery::PeripheryClient;
use types::{
BuildActionState, CoreConfig, DeploymentActionState, Server, ServerActionState,
SystemStatsQuery, SystemStatsRecord,
};
use tokio::sync::Mutex;
use types::{BuildActionState, CoreConfig, DeploymentActionState, ServerActionState};
use crate::ws::update::UpdateWsChannel;
use crate::{monitoring::AlertStatus, ws::update::UpdateWsChannel};
pub type StateExtension = Extension<Arc<State>>;
@@ -29,6 +21,7 @@ pub struct State {
pub build_action_states: ActionStateMap<BuildActionState>,
pub deployment_action_states: ActionStateMap<DeploymentActionState>,
pub server_action_states: ActionStateMap<ServerActionState>,
pub server_alert_status: Mutex<HashMap<String, AlertStatus>>, // (server_id, AlertStatus)
}
impl State {
@@ -42,85 +35,19 @@ impl State {
build_action_states: Default::default(),
deployment_action_states: Default::default(),
server_action_states: Default::default(),
server_alert_status: Default::default(),
};
let state = Arc::new(state);
let state_clone = state.clone();
tokio::spawn(async move { state_clone.collect_server_stats().await });
if state.slack.is_some() {
let state_clone = state.clone();
tokio::spawn(async move { state_clone.daily_update().await });
}
state
}
pub async fn extension(config: CoreConfig) -> StateExtension {
Extension(State::new(config).await)
}
pub async fn collect_server_stats(&self) {
loop {
let ts = wait_until_timelength(Timelength::OneMinute, 0).await as i64;
let servers = self
.db
.servers
.get_some(doc! { "enabled": true }, None)
.await;
if let Err(e) = servers {
eprintln!("failed to get server list from db: {e:?}");
continue;
}
let futures = servers.unwrap().into_iter().map(|server| async move {
let stats = self
.periphery
.get_system_stats(
&server,
&SystemStatsQuery {
networks: true,
components: true,
processes: false,
},
)
.await;
(server, stats)
});
for (server, res) in join_all(futures).await {
if let Err(e) = res {
println!("server unreachable: {e:?}");
if let Some(slack) = &self.slack {
let (header, info) = generate_unreachable_message(&server);
let res = slack.send_message_with_header(&header, info.clone()).await;
if let Err(e) = res {
eprintln!("failed to send message to slack: {e} | header: {header} | info: {info:?}")
}
}
continue;
}
let stats = res.unwrap();
let res = self
.db
.stats
.create_one(SystemStatsRecord::from_stats(server.id, ts, stats))
.await;
if let Err(e) = res {
eprintln!("failed to insert stats into mongo | {e}");
}
}
}
}
}
fn generate_unreachable_message(server: &Server) -> (String, Option<String>) {
let region = match &server.region {
Some(region) => format!(" ({region})"),
None => String::new(),
};
let header = format!("WARNING 🚨 | {}{region} is unreachable ❌", server.name);
let to_notify = server
.to_notify
.iter()
.map(|u| format!("<@{u}>"))
.collect::<Vec<_>>()
.join(" ");
let info = if to_notify.len() > 0 {
Some(to_notify)
} else {
None
};
(header, info)
}

View File

@@ -28,6 +28,13 @@ pub struct CoreConfig {
#[serde(default = "default_jwt_valid_for")]
pub jwt_valid_for: Timelength,
// interval at which to collect server stats and alert for out of bounds
pub monitoring_interval: Timelength,
// daily utc offset in hours to run daily update. eg 8:00 eastern time is 13:00 UTC, so offset should be 13. default of 0 runs at UTC midnight.
#[serde(default)]
pub daily_offset_hours: u8,
// used to verify validity from github webhooks
pub github_webhook_secret: String,

View File

@@ -9,6 +9,10 @@ pub fn f64_diff_no_change(f64_diff: &f64) -> bool {
*f64_diff == 0.0
}
pub fn f32_diff_no_change(f32_diff: &f32) -> bool {
*f32_diff == 0.0
}
pub fn option_diff_no_change<T: Diff>(option_diff: &OptionDiff<T>) -> bool
where
<T as Diff>::Repr: PartialEq,

View File

@@ -42,8 +42,8 @@ pub struct Server {
pub to_notify: Vec<String>, // slack users to notify
#[serde(default = "default_cpu_alert")]
#[diff(attr(#[serde(skip_serializing_if = "f64_diff_no_change")]))]
pub cpu_alert: f64,
#[diff(attr(#[serde(skip_serializing_if = "f32_diff_no_change")]))]
pub cpu_alert: f32,
#[serde(default = "default_mem_alert")]
#[diff(attr(#[serde(skip_serializing_if = "f64_diff_no_change")]))]
@@ -96,7 +96,7 @@ impl Default for Server {
}
}
fn default_cpu_alert() -> f64 {
fn default_cpu_alert() -> f32 {
50.0
}