work on monitoring

This commit is contained in:
mbecker20
2023-07-16 22:24:44 -04:00
parent e8828ed74b
commit e0f69a6420
8 changed files with 184 additions and 45 deletions

View File

@@ -1,6 +1,7 @@
use anyhow::{anyhow, Context};
use monitor_types::entities::{alert::Alert, alerter::*};
use monitor_types::entities::{alert::Alert, alerter::*, server::stats::SystemProcess};
use reqwest::StatusCode;
use slack::types::Block;
pub async fn send_alert(alerter: &Alerter, alert: &Alert) -> anyhow::Result<()> {
match &alerter.config {
@@ -10,9 +11,16 @@ pub async fn send_alert(alerter: &Alerter, alert: &Alert) -> anyhow::Result<()>
}
pub async fn send_slack_alert(url: &str, alert: &Alert) -> anyhow::Result<()> {
#[allow(unused)]
let (header, info) = match alert {
Alert::ServerUnreachable { id, name, region } => (String::new(), String::new()),
let (text, blocks) = match alert {
Alert::ServerUnreachable { id, name, region } => {
let region = fmt_region(region);
let text = format!("CRITICAL 🚨 | *{name}*{region} is *unreachable* ❌");
let blocks = vec![
Block::header("CRITICAL 🚨"),
Block::section(format!("*{name}*{region} is *unreachable* ❌")),
];
(text, blocks.into())
}
Alert::ServerCpu {
id,
name,
@@ -20,38 +28,134 @@ pub async fn send_slack_alert(url: &str, alert: &Alert) -> anyhow::Result<()> {
state,
percentage,
top_procs,
} => (String::new(), String::new()),
} => {
let region = fmt_region(region);
let text =
format!("{state} 🚨 | *{name}*{region} cpu usage at *{percentage:.1}%* 📈 🚨");
let blocks = vec![
Block::header(format!("{state} 🚨")),
Block::section(format!(
"*{name}*{region} cpu usage at *{percentage:.1}%* 📈 🚨"
)),
Block::section(format!("*top cpu processes*{}", fmt_top_procs(top_procs))),
];
(text, blocks.into())
}
Alert::ServerMem {
id,
name,
region,
state,
used,
total,
used_gb,
total_gb,
top_procs,
} => (String::new(), String::new()),
} => {
let region = fmt_region(region);
let percentage = 100.0 * used_gb / total_gb;
let text =
format!("{state} 🚨 | *{name}*{region} memory usage at *{percentage:.1}%* 💾 🚨");
let blocks = vec![
Block::header(format!("{state} 🚨")),
Block::section(format!(
"*{name}*{region} memory usage at *{percentage:.1}%* 💾 🚨"
)),
Block::section(format!("using *{used_gb:.1} GiB* / *{total_gb:.1} GiB*")),
Block::section(format!("*top cpu processes*{}", fmt_top_procs(top_procs))),
];
(text, blocks.into())
}
Alert::ServerDisk {
id,
name,
region,
state,
path,
used,
total,
} => (String::new(), String::new()),
used_gb,
total_gb,
} => {
let region = fmt_region(region);
let percentage = 100.0 * used_gb / total_gb;
let text =
format!("{state} 🚨 | *{name}*{region} disk usage at *{percentage:.1}%* | mount point: *{path}* 💿 🚨");
let blocks = vec![
Block::header(format!("{state} 🚨")),
Block::section(format!(
"*{name}*{region} disk usage at *{percentage:.1}%* 💿 🚨"
)),
Block::section(format!(
"mount point: {path} | using *{used_gb:.1} GiB* / *{total_gb:.1} GiB*"
)),
];
(text, blocks.into())
}
Alert::ServerTemp {
id,
name,
region,
state,
temp,
max,
} => {
let region = fmt_region(region);
let header = format!("");
let info = format!("");
(String::new(), None)
}
Alert::ContainerStateChange {
id,
name,
server,
from,
to,
} => (String::new(), String::new()),
} => {
let header = format!("");
let info = format!("");
(String::new(), None)
}
};
let slack = slack::Client::new(url);
slack.send_message_with_header(header, info).await?;
slack.send_message(text, blocks).await?;
Ok(())
}
pub async fn send_custom_alert(url: &str, alert: &Alert) -> anyhow::Result<()> {
let res = reqwest::Client::new()
.post(url)
.json(alert)
.send()
.await
.context("failed at post request to alerter")?;
let status = res.status();
if status != StatusCode::OK {
let text = res
.text()
.await
.context("failed to get response text on alerter response")?;
return Err(anyhow!("post to alerter failed | {status} | {text}"));
}
Ok(())
}
fn fmt_region(region: &Option<String>) -> String {
match region {
Some(region) => format!(" ({region})"),
None => String::new(),
}
}
fn fmt_top_procs(top_procs: &[SystemProcess]) -> String {
top_procs
.iter()
.enumerate()
.map(|(i, p)| {
format!(
"\n{}. *{}* | *{:.1}%* CPU | *{:.1} GiB* MEM",
i + 1,
p.name,
p.cpu_perc,
p.mem_mb / 1024.0,
)
})
.collect::<Vec<_>>()
.join("")
}

View File

@@ -1,6 +1,6 @@
use monitor_types::entities::{
alerter::Alerter, build::Build, builder::Builder, deployment::Deployment, repo::Repo,
server::Server, tag::CustomTag, update::Update, user::User,
server::{Server, stats::SystemStatsRecord}, tag::CustomTag, update::Update, user::User,
};
use mungos::{Collection, Indexed, Mungos};
@@ -9,6 +9,7 @@ use crate::config::{CoreConfig, MongoConfig};
pub struct DbClient {
pub users: Collection<User>,
pub servers: Collection<Server>,
pub stats: Collection<SystemStatsRecord>,
pub deployments: Collection<Deployment>,
pub builds: Collection<Build>,
pub builders: Collection<Builder>,
@@ -59,6 +60,7 @@ impl DbClient {
let client = DbClient {
users: User::collection(&mungos, db_name, true).await?,
servers: Server::collection(&mungos, db_name, true).await?,
stats: SystemStatsRecord::collection(&mungos, db_name, true).await?,
deployments: Deployment::collection(&mungos, db_name, true).await?,
builds: Build::collection(&mungos, db_name, true).await?,
builders: Builder::collection(&mungos, db_name, true).await?,

View File

@@ -35,7 +35,7 @@ pub struct CachedDeploymentStatus {
impl State {
pub async fn monitor(&self) {
loop {
wait_until_timelength(Timelength::FiveSeconds, 500).await;
let ts = (wait_until_timelength(Timelength::FiveSeconds, 500).await - 500) as i64;
let servers = self.db.servers.get_some(None, None).await;
if let Err(e) = &servers {
error!("failed to get server list (manage status cache) | {e:#?}")
@@ -43,12 +43,12 @@ impl State {
let servers = servers.unwrap();
let futures = servers
.into_iter()
.map(|server| async move { self.update_cache_for_server(&server).await });
.map(|server| async move { self.update_cache_for_server(&server, ts).await });
join_all(futures).await;
}
}
pub async fn update_cache_for_server(&self, server: &Server) {
pub async fn update_cache_for_server(&self, server: &Server, ts: i64) {
let deployments = self
.db
.deployments
@@ -88,14 +88,15 @@ impl State {
return;
}
let stats = stats.unwrap();
self.handle_server_stats(server, &stats).await;
self.insert_server_status(
server,
ServerStatus::Ok,
version.unwrap().version,
stats.into(),
)
.await;
tokio::join!(
self.handle_server_stats(server, stats.clone()),
self.insert_server_status(
server,
ServerStatus::Ok,
version.unwrap().version,
stats.into(),
)
);
let containers = periphery.request(requests::GetContainerList {}).await;
if containers.is_err() {
self.insert_deployments_status_unknown(deployments).await;
@@ -154,9 +155,11 @@ impl State {
}
}
async fn handle_server_stats(&self, server: &Server, stats: &AllSystemStats) {
async fn handle_server_stats(&self, server: &Server, stats: AllSystemStats) {
let inner = || async {
let health = get_server_health(server, stats);
let health = get_server_health(server, &stats);
anyhow::Ok(())
};

View File

@@ -99,7 +99,7 @@ impl Resolve<Deploy, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server).await;
self.update_cache_for_server(&server, 0).await;
self.update_update(update.clone()).await?;
Ok(update)
@@ -182,7 +182,7 @@ impl Resolve<StartContainer, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server).await;
self.update_cache_for_server(&server, 0).await;
self.update_update(update.clone()).await?;
Ok(update)
@@ -273,7 +273,7 @@ impl Resolve<StopContainer, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server).await;
self.update_cache_for_server(&server, 0).await;
self.update_update(update.clone()).await?;
Ok(update)
@@ -364,7 +364,7 @@ impl Resolve<RemoveContainer, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server).await;
self.update_cache_for_server(&server, 0).await;
self.update_update(update.clone()).await?;
Ok(update)

View File

@@ -58,7 +58,7 @@ impl Resolve<CreateServer, RequestUser> for State {
self.add_update(update).await?;
self.update_cache_for_server(&server).await;
self.update_cache_for_server(&server, 0).await;
Ok(server)
}
@@ -173,7 +173,7 @@ impl Resolve<UpdateServer, RequestUser> for State {
let new_server = self.get_server(&id).await?;
self.update_cache_for_server(&new_server).await;
self.update_cache_for_server(&new_server, 0).await;
self.add_update(update).await?;

View File

@@ -67,8 +67,8 @@ impl State {
}
.into();
let state_clone = state.clone();
tokio::spawn(async move { state_clone.monitor().await });
// let state_clone = state.clone();
// tokio::spawn(async move { state_clone.monitor().await });
Ok(state)
}