work on monitoring

This commit is contained in:
mbecker20
2023-07-30 13:52:31 -04:00
parent 97180c2f04
commit 2ffbb9d9fa
5 changed files with 91 additions and 37 deletions

View File

@@ -11,7 +11,7 @@ pub async fn send_alert(alerter: &Alerter, alert: &Alert) -> anyhow::Result<()>
}
pub async fn send_slack_alert(url: &str, alert: &Alert) -> anyhow::Result<()> {
let (text, blocks) = match alert {
let (text, blocks): (_, Option<_>) = match alert {
Alert::ServerUnreachable { name, region, .. } => {
let region = fmt_region(region);
let text = format!("CRITICAL 🚨 | *{name}*{region} is *unreachable* ❌");
@@ -89,28 +89,38 @@ pub async fn send_slack_alert(url: &str, alert: &Alert) -> anyhow::Result<()> {
(text, blocks.into())
}
Alert::ServerTemp {
id,
name,
region,
state,
temp,
max,
..
} => {
let region = fmt_region(region);
let header = format!("");
let info = format!("");
(String::new(), None)
let text = format!(
"{state} 🚨 | *{name}*{region} temp at {temp:.0} °C (max: {max:.0} °C) 🌡️ 🚨"
);
let blocks = vec![
Block::header(format!("{state} 🚨")),
Block::section(format!(
"*{name}*{region} temp at {temp:.0} °C (max: {max:.0} °C) 🌡️ 🚨"
)),
];
(text, blocks.into())
}
Alert::ContainerStateChange {
id,
name,
server,
from,
to,
..
} => {
let header = format!("");
let info = format!("");
(String::new(), None)
let text = format!("container *{name}*");
let blocks = vec![
Block::header(format!("container *{name}* state change 📦")),
Block::section(format!("server: {server}\nfrom: {from}\nto: {to}")),
];
(text, blocks.into())
}
};
let slack = slack::Client::new(url);

View File

@@ -18,17 +18,21 @@ impl<T: Clone + Default> Cache<T> {
// cache.entry(key).or_default().clone()
// }
// pub async fn get_list(&self, filter: Option<impl Fn(&String, &T) -> bool>) -> Vec<T> {
// let cache = self.cache.read().await;
// match filter {
// Some(filter) => cache
// .iter()
// .filter(|(k, v)| filter(k, v))
// .map(|(_, e)| e.clone())
// .collect(),
// None => cache.iter().map(|(_, e)| e.clone()).collect(),
// }
// }
pub async fn get_list(
&self,
// filter: Option<impl Fn(&String, &T) -> bool>
) -> Vec<T> {
let cache = self.cache.read().await;
// match filter {
// Some(filter) => cache
// .iter()
// .filter(|(k, v)| filter(k, v))
// .map(|(_, e)| e.clone())
// .collect(),
// None => cache.iter().map(|(_, e)| e.clone()).collect(),
// }
cache.iter().map(|(_, e)| e.clone()).collect()
}
pub async fn insert(&self, key: impl Into<String>, val: T) {
self.cache.write().await.insert(key.into(), val);

View File

@@ -2,11 +2,12 @@ use anyhow::Context;
use async_timing_util::{wait_until_timelength, Timelength};
use futures::future::join_all;
use monitor_types::entities::{
alerter::Alerter,
deployment::{ContainerSummary, Deployment, DockerContainerState},
server::{
stats::{
AllSystemStats, BasicSystemStats, ServerHealth, SingleDiskUsage, StatsState,
SystemComponent,
SystemComponent, SystemStatsRecord,
},
Server, ServerConfig, ServerStatus,
},
@@ -41,14 +42,15 @@ impl State {
error!("failed to get server list (manage status cache) | {e:#?}")
}
let servers = servers.unwrap();
let futures = servers
.into_iter()
.map(|server| async move { self.update_cache_for_server(&server, ts).await });
let futures = servers.into_iter().map(|server| async move {
self.update_cache_for_server(&server).await;
});
join_all(futures).await;
self.record_server_stats(ts).await;
}
}
pub async fn update_cache_for_server(&self, server: &Server, ts: i64) {
pub async fn update_cache_for_server(&self, server: &Server) {
let deployments = self
.db
.deployments
@@ -70,14 +72,19 @@ impl State {
.await;
return;
}
let prev_state = self.server_status_cache.get(&server.id).await;
let prev_server_status = self.server_status_cache.get(&server.id).await;
let periphery = self.periphery_client(server);
let version = periphery.request(requests::GetVersion {}).await;
if version.is_err() {
self.insert_deployments_status_unknown(deployments).await;
self.insert_server_status(server, ServerStatus::NotOk, String::from("unknown"), None)
.await;
self.handle_server_unreachable(server).await;
let alerters = self.db.alerters.get_some(None, None).await;
if let Err(e) = &alerters {
error!("failed to get alerters from db | {e:#?}");
}
let alerters = alerters.unwrap();
self.handle_server_unreachable(server, &alerters).await;
return;
}
let stats = periphery.request(requests::GetAllSystemStats {}).await;
@@ -117,10 +124,8 @@ impl State {
.as_ref()
.map(|c| c.state)
.unwrap_or(DockerContainerState::NotDeployed);
self.handle_deployment_state_change(&deployment, state, prev_state)
.await;
self.deployment_status_cache
.insert(
deployment.id.clone(),
@@ -135,7 +140,42 @@ impl State {
}
}
async fn handle_server_unreachable(&self, server: &Server) {
async fn record_server_stats(&self, ts: i64) {
let status = self.server_status_cache.get_list().await;
let records = status
.into_iter()
.filter(|status| status.stats.is_some())
.map(|status| {
let BasicSystemStats {
system_load,
cpu_perc,
cpu_freq_mhz,
mem_total_gb,
mem_used_gb,
disk_total_gb,
disk_used_gb,
..
} = status.stats.as_ref().unwrap().basic;
SystemStatsRecord {
ts,
sid: status.id.clone(),
system_load,
cpu_perc,
cpu_freq_mhz,
mem_total_gb,
mem_used_gb,
disk_total_gb,
disk_used_gb,
}
})
.collect::<Vec<_>>();
let res = self.db.stats.create_many(records).await;
if let Err(e) = res {
error!("failed to record server stats | {e:#?}");
}
}
async fn handle_server_unreachable(&self, server: &Server, alerters: &[Alerter]) {
let inner = || async { anyhow::Ok(()) };
let res = inner().await.context("failed to handle server unreachable");
@@ -145,7 +185,7 @@ impl State {
}
}
async fn handle_server_rereachable(&self, server: &Server) {
async fn handle_server_rereachable(&self, server: &Server, alerters: &[Alerter]) {
let inner = || async { anyhow::Ok(()) };
let res = inner().await.context("failed to handle server rereachable");

View File

@@ -100,7 +100,7 @@ impl Resolve<Deploy, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server, 0).await;
self.update_cache_for_server(&server).await;
self.update_update(update.clone()).await?;
Ok(update)
@@ -183,7 +183,7 @@ impl Resolve<StartContainer, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server, 0).await;
self.update_cache_for_server(&server).await;
self.update_update(update.clone()).await?;
Ok(update)
@@ -274,7 +274,7 @@ impl Resolve<StopContainer, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server, 0).await;
self.update_cache_for_server(&server).await;
self.update_update(update.clone()).await?;
Ok(update)
@@ -365,7 +365,7 @@ impl Resolve<RemoveContainer, RequestUser> for State {
update.logs.push(log);
update.finalize();
self.update_cache_for_server(&server, 0).await;
self.update_cache_for_server(&server).await;
self.update_update(update.clone()).await?;
Ok(update)

View File

@@ -58,7 +58,7 @@ impl Resolve<CreateServer, RequestUser> for State {
self.add_update(update).await?;
self.update_cache_for_server(&server, 0).await;
self.update_cache_for_server(&server).await;
Ok(server)
}
@@ -174,7 +174,7 @@ impl Resolve<UpdateServer, RequestUser> for State {
let new_server: Server = self.get_resource(&id).await?;
self.update_cache_for_server(&new_server, 0).await;
self.update_cache_for_server(&new_server).await;
self.add_update(update).await?;