mirror of
https://github.com/moghtech/komodo.git
synced 2026-04-28 11:49:39 -05:00
work on the state change monitoring
This commit is contained in:
@@ -1,8 +1,12 @@
|
||||
use anyhow::Context;
|
||||
use async_timing_util::{wait_until_timelength, Timelength};
|
||||
use futures::future::join_all;
|
||||
use monitor_types::entities::{
|
||||
deployment::{BasicContainerInfo, Deployment, DockerContainerState},
|
||||
server::{stats::AllSystemStats, Server, ServerStatus},
|
||||
server::{
|
||||
stats::{AllSystemStats, BasicSystemStats, CpuUsage, ServerHealth, StatsState},
|
||||
Server, ServerConfig, ServerStatus,
|
||||
},
|
||||
};
|
||||
use mungos::mongodb::bson::doc;
|
||||
use periphery_client::requests;
|
||||
@@ -15,6 +19,7 @@ pub struct CachedServerStatus {
|
||||
pub status: ServerStatus,
|
||||
pub version: String,
|
||||
pub stats: Option<AllSystemStats>,
|
||||
pub health: Option<ServerHealth>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
@@ -62,12 +67,14 @@ impl State {
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
let prev_state = self.server_status_cache.get(&server.id).await;
|
||||
let periphery = self.periphery_client(server);
|
||||
let version = periphery.request(requests::GetVersion {}).await;
|
||||
if version.is_err() {
|
||||
self.insert_deployments_status_unknown(deployments).await;
|
||||
self.insert_server_status(server, ServerStatus::NotOk, String::from("unknown"), None)
|
||||
.await;
|
||||
self.handle_server_unreachable(server).await;
|
||||
return;
|
||||
}
|
||||
let stats = periphery.request(requests::GetAllSystemStats {}).await;
|
||||
@@ -77,11 +84,13 @@ impl State {
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
let stats = stats.unwrap();
|
||||
self.handle_server_stats(server, &stats).await;
|
||||
self.insert_server_status(
|
||||
server,
|
||||
ServerStatus::Ok,
|
||||
version.unwrap().version,
|
||||
stats.unwrap().into(),
|
||||
stats.into(),
|
||||
)
|
||||
.await;
|
||||
let containers = periphery.request(requests::GetContainerList {}).await;
|
||||
@@ -95,15 +104,25 @@ impl State {
|
||||
.iter()
|
||||
.find(|c| c.name == deployment.name)
|
||||
.cloned();
|
||||
let prev_state = self
|
||||
.deployment_status_cache
|
||||
.get(&deployment.id)
|
||||
.await
|
||||
.map(|s| s.state);
|
||||
let state = container
|
||||
.as_ref()
|
||||
.map(|c| c.state)
|
||||
.unwrap_or(DockerContainerState::NotDeployed);
|
||||
|
||||
self.handle_deployment_state_change(&deployment, state, prev_state)
|
||||
.await;
|
||||
|
||||
self.deployment_status_cache
|
||||
.insert(
|
||||
deployment.id.clone(),
|
||||
CachedDeploymentStatus {
|
||||
id: deployment.id,
|
||||
state: container
|
||||
.as_ref()
|
||||
.map(|c| c.state)
|
||||
.unwrap_or(DockerContainerState::NotDeployed),
|
||||
state,
|
||||
container,
|
||||
}
|
||||
.into(),
|
||||
@@ -112,6 +131,67 @@ impl State {
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_server_unreachable(&self, server: &Server) {
|
||||
let inner = || async { anyhow::Ok(()) };
|
||||
|
||||
let res = inner().await.context("failed to handle server unreachable");
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("{e:#?}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_server_rereachable(&self, server: &Server) {
|
||||
let inner = || async { anyhow::Ok(()) };
|
||||
|
||||
let res = inner().await.context("failed to handle server rereachable");
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("{e:#?}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_server_stats(&self, server: &Server, stats: &AllSystemStats) {
|
||||
let inner = || async {
|
||||
let health = get_server_health(server, stats);
|
||||
|
||||
anyhow::Ok(())
|
||||
};
|
||||
|
||||
let res = inner().await.context("failed to handle server stats");
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("{e:#?}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_deployment_state_change(
|
||||
&self,
|
||||
deployment: &Deployment,
|
||||
state: DockerContainerState,
|
||||
prev_state: Option<DockerContainerState>,
|
||||
) {
|
||||
if prev_state.is_none() {
|
||||
return;
|
||||
}
|
||||
|
||||
let prev_state = prev_state.unwrap();
|
||||
|
||||
if state == prev_state {
|
||||
return;
|
||||
}
|
||||
|
||||
let inner = || async { anyhow::Ok(()) };
|
||||
|
||||
let res = inner()
|
||||
.await
|
||||
.context("failed to handle deployment state change");
|
||||
|
||||
if let Err(e) = res {
|
||||
error!("{e:#?}");
|
||||
}
|
||||
}
|
||||
|
||||
async fn insert_deployments_status_unknown(&self, deployments: Vec<Deployment>) {
|
||||
for deployment in deployments {
|
||||
self.deployment_status_cache
|
||||
@@ -135,6 +215,7 @@ impl State {
|
||||
version: String,
|
||||
stats: Option<AllSystemStats>,
|
||||
) {
|
||||
let health = stats.as_ref().map(|s| get_server_health(server, s));
|
||||
self.server_status_cache
|
||||
.insert(
|
||||
server.id.clone(),
|
||||
@@ -143,9 +224,66 @@ impl State {
|
||||
status,
|
||||
version,
|
||||
stats,
|
||||
health,
|
||||
}
|
||||
.into(),
|
||||
)
|
||||
.await;
|
||||
}
|
||||
}
|
||||
|
||||
fn get_server_health(server: &Server, stats: &AllSystemStats) -> ServerHealth {
|
||||
let BasicSystemStats {
|
||||
cpu_perc,
|
||||
mem_used_gb,
|
||||
mem_total_gb,
|
||||
disk_used_gb,
|
||||
disk_total_gb,
|
||||
..
|
||||
} = &stats.basic;
|
||||
let ServerConfig {
|
||||
cpu_warning,
|
||||
cpu_critical,
|
||||
mem_warning,
|
||||
mem_critical,
|
||||
disk_warning,
|
||||
disk_critical,
|
||||
..
|
||||
} = &server.config;
|
||||
let mut health = ServerHealth::default();
|
||||
|
||||
if cpu_perc >= cpu_critical {
|
||||
health.cpu = StatsState::Critical
|
||||
} else if cpu_perc >= cpu_warning {
|
||||
health.cpu = StatsState::Warning
|
||||
}
|
||||
|
||||
let mem_perc = 100.0 * mem_used_gb / mem_total_gb;
|
||||
if mem_perc >= *mem_critical {
|
||||
health.mem = StatsState::Critical
|
||||
} else if mem_perc >= *mem_warning {
|
||||
health.mem = StatsState::Warning
|
||||
}
|
||||
|
||||
let disk_perc = 100.0 * disk_used_gb / disk_total_gb;
|
||||
if disk_perc >= *disk_critical {
|
||||
health.disk = StatsState::Critical
|
||||
} else if disk_perc >= *disk_warning {
|
||||
health.disk = StatsState::Warning
|
||||
}
|
||||
|
||||
for disk in &stats.disk.disks {
|
||||
let perc = 100.0 * disk.used_gb / disk.total_gb;
|
||||
if perc >= *disk_critical {
|
||||
health
|
||||
.disks
|
||||
.insert(disk.mount.clone(), StatsState::Critical);
|
||||
} else if perc >= *disk_warning {
|
||||
health.disks.insert(disk.mount.clone(), StatsState::Warning);
|
||||
} else {
|
||||
health.disks.insert(disk.mount.clone(), StatsState::Ok);
|
||||
}
|
||||
}
|
||||
|
||||
health
|
||||
}
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::path::PathBuf;
|
||||
use std::{collections::HashMap, path::PathBuf};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
use typeshare::typeshare;
|
||||
@@ -120,3 +120,21 @@ pub struct SystemComponent {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub critical: Option<f32>,
|
||||
}
|
||||
|
||||
#[typeshare]
|
||||
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
|
||||
pub enum StatsState {
|
||||
#[default]
|
||||
Ok,
|
||||
Warning,
|
||||
Critical,
|
||||
}
|
||||
|
||||
#[typeshare]
|
||||
#[derive(Serialize, Deserialize, Default, Debug, Clone)]
|
||||
pub struct ServerHealth {
|
||||
pub cpu: StatsState,
|
||||
pub mem: StatsState,
|
||||
pub disk: StatsState,
|
||||
pub disks: HashMap<PathBuf, StatsState>,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user