Merge branch 'next' of https://github.com/mbecker20/monitor into next

This commit is contained in:
karamvir
2023-08-15 14:26:20 -07:00
11 changed files with 380 additions and 292 deletions

View File

@@ -1,5 +1,7 @@
use anyhow::{anyhow, Context};
use monitor_types::entities::{alert::Alert, alerter::*, server::stats::SystemProcess};
use monitor_types::entities::{
alert::Alert, alerter::*, deployment::DockerContainerState, server::stats::SystemProcess,
};
use reqwest::StatusCode;
use slack::types::Block;
@@ -115,16 +117,20 @@ pub async fn send_slack_alert(url: &str, alert: &Alert) -> anyhow::Result<()> {
to,
..
} => {
let text = format!("container *{name}*");
let to = fmt_docker_container_state(to);
let text = format!("📦 container *{name}* is now {to}");
let blocks = vec![
Block::header(format!("container *{name}* state change 📦")),
Block::section(format!("server: {server}\nfrom: {from}\nto: {to}")),
Block::header(format!("📦 container *{name}* is now {to}")),
Block::section(format!("server: {server}\nprevious: {from}")),
];
(text, blocks.into())
}
Alert::None {} => Default::default(),
};
let slack = slack::Client::new(url);
slack.send_message(text, blocks).await?;
if !text.is_empty() {
let slack = slack::Client::new(url);
slack.send_message(text, blocks).await?;
}
Ok(())
}
@@ -169,3 +175,13 @@ fn fmt_top_procs(top_procs: &[SystemProcess]) -> String {
.collect::<Vec<_>>()
.join("")
}
fn fmt_docker_container_state(state: &DockerContainerState) -> String {
match state {
DockerContainerState::Running => String::from("Running ▶️"),
DockerContainerState::Exited => String::from("Exited 🛑"),
DockerContainerState::Restarting => String::from("Restarting 🔄"),
DockerContainerState::NotDeployed => String::from("Not Deployed"),
_ => state.to_string(),
}
}

View File

@@ -1,15 +1,15 @@
use std::collections::HashMap;
use std::{collections::HashMap, hash::Hash};
use monitor_types::busy::Busy;
use tokio::sync::RwLock;
#[derive(Default)]
pub struct Cache<T: Clone + Default> {
cache: RwLock<HashMap<String, T>>,
pub struct Cache<K: PartialEq + Eq + Hash, T: Clone + Default> {
cache: RwLock<HashMap<K, T>>,
}
impl<T: Clone + Default> Cache<T> {
pub async fn get(&self, key: &str) -> Option<T> {
impl<K: PartialEq + Eq + Hash, T: Clone + Default> Cache<K, T> {
pub async fn get(&self, key: &K) -> Option<T> {
self.cache.read().await.get(key).cloned()
}
@@ -34,11 +34,11 @@ impl<T: Clone + Default> Cache<T> {
cache.iter().map(|(_, e)| e.clone()).collect()
}
pub async fn insert(&self, key: impl Into<String>, val: T) {
pub async fn insert(&self, key: impl Into<K>, val: T) {
self.cache.write().await.insert(key.into(), val);
}
pub async fn update_entry(&self, key: impl Into<String>, handler: impl Fn(&mut T)) {
pub async fn update_entry(&self, key: impl Into<K>, handler: impl Fn(&mut T)) {
let mut cache = self.cache.write().await;
handler(cache.entry(key.into()).or_default());
}
@@ -47,13 +47,13 @@ impl<T: Clone + Default> Cache<T> {
// self.cache.write().await.clear();
// }
pub async fn remove(&self, key: &str) {
pub async fn remove(&self, key: &K) {
self.cache.write().await.remove(key);
}
}
impl<T: Clone + Default + Busy> Cache<T> {
pub async fn busy(&self, id: &str) -> bool {
impl<K: PartialEq + Eq + Hash, T: Clone + Default + Busy> Cache<K, T> {
pub async fn busy(&self, id: &K) -> bool {
match self.get(id).await {
Some(state) => state.busy(),
None => false,

View File

@@ -202,10 +202,10 @@ impl StateResource<Deployment> for State {
tags: deployment.tags,
resource_type: ResourceTargetVariant::Deployment,
info: DeploymentListItemInfo {
state: status.as_ref().map(|s| s.state).unwrap_or_default(),
state: status.as_ref().map(|s| s.curr.state).unwrap_or_default(),
status: status
.as_ref()
.and_then(|s| s.container.as_ref().and_then(|c| c.status.to_owned())),
.and_then(|s| s.curr.container.as_ref().and_then(|c| c.status.to_owned())),
image,
server_id: deployment.config.server_id,
build_id,

View File

@@ -0,0 +1,38 @@
use std::collections::HashMap;
use monitor_types::entities::{
alert::{Alert, AlertVariant},
deployment::DockerContainerState,
server::ServerStatus,
};
use tokio::sync::RwLock;
use crate::state::State;
impl State {
// called after cache update
pub async fn alert_servers(&self) {
let server_status = self.server_status_cache.get_list().await;
for v in server_status {
match v.status {
ServerStatus::Ok => {}
ServerStatus::NotOk => {}
ServerStatus::Disabled => {}
}
}
}
pub async fn alert_deployments(&self) {
let deployment_status = self.deployment_status_cache.get_list().await;
for v in deployment_status {
match v.curr.state {
DockerContainerState::Running => {}
DockerContainerState::Unknown => {}
DockerContainerState::Exited => {}
_ => {}
}
}
}
}

View File

@@ -0,0 +1,145 @@
use monitor_types::entities::{
deployment::{Deployment, DockerContainerState},
server::{
stats::{
AllSystemStats, BasicSystemStats, ServerHealth, SingleDiskUsage, StatsState,
SystemComponent,
},
Server, ServerConfig, ServerStatus,
},
};
use crate::state::State;
use super::{CachedDeploymentStatus, CachedServerStatus, History};
impl State {
pub async fn insert_deployments_status_unknown(&self, deployments: Vec<Deployment>) {
for deployment in deployments {
let prev = self
.deployment_status_cache
.get(&deployment.id)
.await
.map(|s| s.curr.state);
self.deployment_status_cache
.insert(
deployment.id.clone(),
History {
curr: CachedDeploymentStatus {
id: deployment.id,
state: DockerContainerState::Unknown,
container: None,
},
prev,
}
.into(),
)
.await;
}
}
pub async fn insert_server_status(
&self,
server: &Server,
status: ServerStatus,
version: String,
stats: Option<AllSystemStats>,
) {
let health = stats.as_ref().map(|s| get_server_health(server, s));
self.server_status_cache
.insert(
server.id.clone(),
CachedServerStatus {
id: server.id.clone(),
status,
version,
stats,
health,
}
.into(),
)
.await;
}
}
fn get_server_health(server: &Server, stats: &AllSystemStats) -> ServerHealth {
let BasicSystemStats {
cpu_perc,
mem_used_gb,
mem_total_gb,
disk_used_gb,
disk_total_gb,
..
} = &stats.basic;
let ServerConfig {
cpu_warning,
cpu_critical,
mem_warning,
mem_critical,
disk_warning,
disk_critical,
..
} = &server.config;
let mut health = ServerHealth::default();
if cpu_perc >= cpu_critical {
health.cpu = StatsState::Critical
} else if cpu_perc >= cpu_warning {
health.cpu = StatsState::Warning
}
let mem_perc = 100.0 * mem_used_gb / mem_total_gb;
if mem_perc >= *mem_critical {
health.mem = StatsState::Critical
} else if mem_perc >= *mem_warning {
health.mem = StatsState::Warning
}
let disk_perc = 100.0 * disk_used_gb / disk_total_gb;
if disk_perc >= *disk_critical {
health.disk = StatsState::Critical
} else if disk_perc >= *disk_warning {
health.disk = StatsState::Warning
}
for SingleDiskUsage {
mount,
used_gb,
total_gb,
} in &stats.disk.disks
{
let perc = 100.0 * used_gb / total_gb;
let stats_state = if perc >= *disk_critical {
StatsState::Critical
} else if perc >= *disk_warning {
StatsState::Warning
} else {
StatsState::Ok
};
health.disks.insert(mount.clone(), stats_state);
}
for SystemComponent {
label,
temp,
critical,
..
} in &stats.components
{
let stats_state = if let Some(critical) = critical {
let perc = temp / critical;
if perc >= 0.95 {
StatsState::Critical
} else if perc >= 0.85 {
StatsState::Warning
} else {
StatsState::Ok
}
} else {
StatsState::Ok
};
health.temps.insert(label.clone(), stats_state);
}
health
}

View File

@@ -1,15 +1,10 @@
use anyhow::Context;
use async_timing_util::{wait_until_timelength, Timelength};
use futures::future::join_all;
use monitor_types::entities::{
alerter::Alerter,
deployment::{ContainerSummary, Deployment, DockerContainerState},
deployment::{ContainerSummary, DockerContainerState},
server::{
stats::{
AllSystemStats, BasicSystemStats, ServerHealth, SingleDiskUsage, StatsState,
SystemComponent, SystemStatsRecord,
},
Server, ServerConfig, ServerStatus,
stats::{AllSystemStats, ServerHealth},
Server, ServerStatus,
},
};
use mungos::mongodb::bson::doc;
@@ -17,7 +12,17 @@ use periphery_client::requests;
use crate::state::State;
mod alert;
mod helpers;
mod record;
#[derive(Default)]
pub struct History<Curr: Default, Prev> {
pub curr: Curr,
pub prev: Option<Prev>,
}
#[derive(Default, Clone)]
pub struct CachedServerStatus {
pub id: String,
pub status: ServerStatus,
@@ -26,7 +31,7 @@ pub struct CachedServerStatus {
pub health: Option<ServerHealth>,
}
#[derive(Default)]
#[derive(Default, Clone)]
pub struct CachedDeploymentStatus {
pub id: String,
pub state: DockerContainerState,
@@ -72,19 +77,12 @@ impl State {
.await;
return;
}
let prev_server_status = self.server_status_cache.get(&server.id).await;
let periphery = self.periphery_client(server);
let version = periphery.request(requests::GetVersion {}).await;
if version.is_err() {
self.insert_deployments_status_unknown(deployments).await;
self.insert_server_status(server, ServerStatus::NotOk, String::from("unknown"), None)
.await;
let alerters = self.db.alerters.get_some(None, None).await;
if let Err(e) = &alerters {
error!("failed to get alerters from db | {e:#?}");
}
let alerters = alerters.unwrap();
self.handle_server_unreachable(server, &alerters).await;
return;
}
let stats = periphery.request(requests::GetAllSystemStats {}).await;
@@ -95,15 +93,13 @@ impl State {
return;
}
let stats = stats.unwrap();
tokio::join!(
self.handle_server_stats(server, stats.clone()),
self.insert_server_status(
server,
ServerStatus::Ok,
version.unwrap().version,
stats.into(),
)
);
self.insert_server_status(
server,
ServerStatus::Ok,
version.unwrap().version,
stats.into(),
)
.await;
let containers = periphery.request(requests::GetContainerList {}).await;
if containers.is_err() {
self.insert_deployments_status_unknown(deployments).await;
@@ -115,245 +111,29 @@ impl State {
.iter()
.find(|c| c.name == deployment.name)
.cloned();
let prev_state = self
let prev = self
.deployment_status_cache
.get(&deployment.id)
.await
.map(|s| s.state);
.map(|s| s.curr.state);
let state = container
.as_ref()
.map(|c| c.state)
.unwrap_or(DockerContainerState::NotDeployed);
self.handle_deployment_state_change(&deployment, state, prev_state)
.await;
self.deployment_status_cache
.insert(
deployment.id.clone(),
CachedDeploymentStatus {
id: deployment.id,
state,
container,
History {
curr: CachedDeploymentStatus {
id: deployment.id,
state,
container,
},
prev,
}
.into(),
)
.await;
}
}
async fn record_server_stats(&self, ts: i64) {
let status = self.server_status_cache.get_list().await;
let records = status
.into_iter()
.filter(|status| status.stats.is_some())
.map(|status| {
let BasicSystemStats {
system_load,
cpu_perc,
cpu_freq_mhz,
mem_total_gb,
mem_used_gb,
disk_total_gb,
disk_used_gb,
..
} = status.stats.as_ref().unwrap().basic;
SystemStatsRecord {
ts,
sid: status.id.clone(),
system_load,
cpu_perc,
cpu_freq_mhz,
mem_total_gb,
mem_used_gb,
disk_total_gb,
disk_used_gb,
}
})
.collect::<Vec<_>>();
let res = self.db.stats.create_many(records).await;
if let Err(e) = res {
error!("failed to record server stats | {e:#?}");
}
}
async fn handle_server_unreachable(&self, server: &Server, alerters: &[Alerter]) {
let inner = || async { anyhow::Ok(()) };
let res = inner().await.context("failed to handle server unreachable");
if let Err(e) = res {
error!("{e:#?}");
}
}
async fn handle_server_rereachable(&self, server: &Server, alerters: &[Alerter]) {
let inner = || async { anyhow::Ok(()) };
let res = inner().await.context("failed to handle server rereachable");
if let Err(e) = res {
error!("{e:#?}");
}
}
async fn handle_server_stats(&self, server: &Server, stats: AllSystemStats) {
let inner = || async {
let health = get_server_health(server, &stats);
anyhow::Ok(())
};
let res = inner().await.context("failed to handle server stats");
if let Err(e) = res {
error!("{e:#?}");
}
}
async fn handle_deployment_state_change(
&self,
deployment: &Deployment,
state: DockerContainerState,
prev_state: Option<DockerContainerState>,
) {
if prev_state.is_none() {
return;
}
let prev_state = prev_state.unwrap();
if state == prev_state {
return;
}
let inner = || async { anyhow::Ok(()) };
let res = inner()
.await
.context("failed to handle deployment state change");
if let Err(e) = res {
error!("{e:#?}");
}
}
async fn insert_deployments_status_unknown(&self, deployments: Vec<Deployment>) {
for deployment in deployments {
self.deployment_status_cache
.insert(
deployment.id.clone(),
CachedDeploymentStatus {
id: deployment.id,
state: DockerContainerState::Unknown,
container: None,
}
.into(),
)
.await;
}
}
async fn insert_server_status(
&self,
server: &Server,
status: ServerStatus,
version: String,
stats: Option<AllSystemStats>,
) {
let health = stats.as_ref().map(|s| get_server_health(server, s));
self.server_status_cache
.insert(
server.id.clone(),
CachedServerStatus {
id: server.id.clone(),
status,
version,
stats,
health,
}
.into(),
)
.await;
}
}
fn get_server_health(server: &Server, stats: &AllSystemStats) -> ServerHealth {
let BasicSystemStats {
cpu_perc,
mem_used_gb,
mem_total_gb,
disk_used_gb,
disk_total_gb,
..
} = &stats.basic;
let ServerConfig {
cpu_warning,
cpu_critical,
mem_warning,
mem_critical,
disk_warning,
disk_critical,
..
} = &server.config;
let mut health = ServerHealth::default();
if cpu_perc >= cpu_critical {
health.cpu = StatsState::Critical
} else if cpu_perc >= cpu_warning {
health.cpu = StatsState::Warning
}
let mem_perc = 100.0 * mem_used_gb / mem_total_gb;
if mem_perc >= *mem_critical {
health.mem = StatsState::Critical
} else if mem_perc >= *mem_warning {
health.mem = StatsState::Warning
}
let disk_perc = 100.0 * disk_used_gb / disk_total_gb;
if disk_perc >= *disk_critical {
health.disk = StatsState::Critical
} else if disk_perc >= *disk_warning {
health.disk = StatsState::Warning
}
for SingleDiskUsage {
mount,
used_gb,
total_gb,
} in &stats.disk.disks
{
let perc = 100.0 * used_gb / total_gb;
let stats_state = if perc >= *disk_critical {
StatsState::Critical
} else if perc >= *disk_warning {
StatsState::Warning
} else {
StatsState::Ok
};
health.disks.insert(mount.clone(), stats_state);
}
for SystemComponent {
label,
temp,
critical,
..
} in &stats.components
{
let stats_state = if let Some(critical) = critical {
let perc = temp / critical;
if perc >= 0.95 {
StatsState::Critical
} else if perc >= 0.85 {
StatsState::Warning
} else {
StatsState::Ok
}
} else {
StatsState::Ok
};
health.temps.insert(label.clone(), stats_state);
}
health
}

View File

@@ -0,0 +1,40 @@
use monitor_types::entities::server::stats::{BasicSystemStats, SystemStatsRecord};
use crate::state::State;
impl State {
pub async fn record_server_stats(&self, ts: i64) {
let status = self.server_status_cache.get_list().await;
let records = status
.into_iter()
.filter(|status| status.stats.is_some())
.map(|status| {
let BasicSystemStats {
system_load,
cpu_perc,
cpu_freq_mhz,
mem_total_gb,
mem_used_gb,
disk_total_gb,
disk_used_gb,
..
} = status.stats.as_ref().unwrap().basic;
SystemStatsRecord {
ts,
sid: status.id.clone(),
system_load,
cpu_perc,
cpu_freq_mhz,
mem_total_gb,
mem_used_gb,
disk_total_gb,
disk_used_gb,
}
})
.collect::<Vec<_>>();
let res = self.db.stats.create_many(records).await;
if let Err(e) = res {
error!("failed to record server stats | {e:#?}");
}
}
}

View File

@@ -59,8 +59,8 @@ impl Resolve<GetDeploymentStatus, RequestUser> for State {
.await
.unwrap_or_default();
let response = GetDeploymentStatusResponse {
status: status.container.as_ref().and_then(|c| c.status.clone()),
state: status.state,
status: status.curr.container.as_ref().and_then(|c| c.status.clone()),
state: status.curr.state,
};
Ok(response)
}
@@ -228,7 +228,7 @@ impl Resolve<GetDeploymentsSummary, RequestUser> for State {
.get(&deployment.id)
.await
.unwrap_or_default();
match status.state {
match status.curr.state {
DockerContainerState::Running => {
res.running += 1;
}

View File

@@ -3,15 +3,19 @@ use std::{net::SocketAddr, str::FromStr, sync::Arc};
use anyhow::Context;
use axum::Extension;
use monitor_types::entities::{
build::BuildActionState, deployment::DeploymentActionState, repo::RepoActionState,
server::ServerActionState, update::UpdateListItem,
alert::{Alert, AlertVariant},
build::BuildActionState,
deployment::{DeploymentActionState, DockerContainerState},
repo::RepoActionState,
server::ServerActionState,
update::UpdateListItem,
};
use crate::{
auth::{GithubOauthClient, GoogleOauthClient, JwtClient},
config::{CoreConfig, Env},
helpers::{cache::Cache, channel::BroadcastChannel, db::DbClient},
monitor::{CachedDeploymentStatus, CachedServerStatus},
monitor::{CachedDeploymentStatus, CachedServerStatus, History},
};
pub type StateExtension = Extension<Arc<State>>;
@@ -28,8 +32,10 @@ pub struct State {
// cache
pub action_states: ActionStates,
pub deployment_status_cache: Cache<Arc<CachedDeploymentStatus>>,
pub server_status_cache: Cache<Arc<CachedServerStatus>>,
pub deployment_status_cache:
Cache<String, Arc<History<CachedDeploymentStatus, DockerContainerState>>>,
pub server_status_cache: Cache<String, Arc<CachedServerStatus>>,
pub alerts: Cache<(String, AlertVariant), Arc<Alert>>,
// channels
pub build_cancel: BroadcastChannel<String>, // build id to cancel
@@ -49,7 +55,7 @@ impl State {
debug!("loading state");
let state: Arc<State> = State {
let state: Arc<_> = State {
env,
db: DbClient::new(&config).await?,
jwt: JwtClient::new(&config),
@@ -58,6 +64,7 @@ impl State {
action_states: Default::default(),
deployment_status_cache: Default::default(),
server_status_cache: Default::default(),
alerts: Default::default(),
update: BroadcastChannel::new(100),
build_cancel: BroadcastChannel::new(10),
config,
@@ -78,9 +85,9 @@ impl State {
#[derive(Default)]
pub struct ActionStates {
pub build: Cache<BuildActionState>,
pub deployment: Cache<DeploymentActionState>,
pub server: Cache<ServerActionState>,
pub repo: Cache<RepoActionState>,
pub build: Cache<String, BuildActionState>,
pub deployment: Cache<String, DeploymentActionState>,
pub server: Cache<String, ServerActionState>,
pub repo: Cache<String, RepoActionState>,
// pub command: Cache<CommandActionState>,
}

View File

@@ -1,6 +1,8 @@
#[macro_use]
extern crate log;
use std::time::Duration;
use anyhow::{anyhow, Context};
use reqwest::StatusCode;
use resolver_api::HasResponse;
@@ -24,18 +26,35 @@ impl PeripheryClient {
}
pub async fn request<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
self.health_check().await?;
self.request_inner(request, None).await
}
pub async fn health_check(&self) -> anyhow::Result<()> {
self.request_inner(requests::GetHealth {}, Some(Duration::from_secs(1)))
.await?;
Ok(())
}
async fn request_inner<T: HasResponse>(
&self,
request: T,
timeout: Option<Duration>,
) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
trace!("sending request | type: {req_type} | body: {request:?}");
let res = self
let mut req = self
.reqwest
.post(&self.address)
.json(&json!({
"type": req_type,
"params": request
}))
.header("authorization", &self.passkey)
.send()
.await?;
.header("authorization", &self.passkey);
if let Some(timeout) = timeout {
req = req.timeout(timeout);
}
let res = req.send().await?;
let status = res.status();
debug!("got response | type: {req_type} | {status} | body: {res:?}",);
if status == StatusCode::OK {
@@ -50,9 +69,4 @@ impl PeripheryClient {
Err(anyhow!("request failed | {status} | {text}"))
}
}
pub async fn health_check(&self) -> anyhow::Result<()> {
self.request(requests::GetHealth {}).await?;
Ok(())
}
}

View File

@@ -1,15 +1,49 @@
use derive_variants::EnumVariants;
use mungos::{
derive::{MungosIndexed, StringObjectId},
mongodb::bson::{doc, serde_helpers::hex_string_as_object_id},
};
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::{MongoId, I64};
use super::{
deployment::DockerContainerState,
server::stats::{StatsState, SystemProcess},
update::ResourceTarget,
};
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, EnumVariants)]
#[variant_derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[derive(Serialize, Deserialize, Debug, Clone, Default, MungosIndexed, StringObjectId)]
#[doc_index(doc! { "target.type": 1 })]
#[sparse_doc_index(doc! { "target.id": 1 })]
pub struct AlertRecord {
#[serde(
default,
rename = "_id",
skip_serializing_if = "String::is_empty",
with = "hex_string_as_object_id"
)]
pub id: MongoId,
#[index]
pub start_ts: I64,
#[index]
pub resolved: bool,
#[index]
pub alert_type: AlertVariant,
pub target: ResourceTarget,
pub alert: Alert,
pub resolved_ts: Option<I64>,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, EnumVariants, MungosIndexed)]
#[variant_derive(Serialize, Deserialize, Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[serde(tag = "type", content = "data")]
pub enum Alert {
ServerUnreachable {
@@ -58,4 +92,18 @@ pub enum Alert {
from: DockerContainerState,
to: DockerContainerState,
},
None {},
}
impl Default for Alert {
fn default() -> Self {
Alert::None {}
}
}
#[allow(clippy::derivable_impls)]
impl Default for AlertVariant {
fn default() -> Self {
AlertVariant::None
}
}