add types for system stats

This commit is contained in:
mbecker20
2023-06-08 16:17:48 +00:00
parent 20d496e617
commit 51ebee2910
10 changed files with 554 additions and 1 deletions

105
Cargo.lock generated
View File

@@ -361,6 +361,49 @@ dependencies = [
"libc",
]
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-deque"
version = "0.8.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef"
dependencies = [
"cfg-if",
"crossbeam-epoch",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-epoch"
version = "0.9.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695"
dependencies = [
"autocfg",
"cfg-if",
"crossbeam-utils",
"memoffset",
"scopeguard",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b"
dependencies = [
"cfg-if",
]
[[package]]
name = "crypto-common"
version = "0.1.6"
@@ -409,6 +452,12 @@ version = "0.15.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f"
[[package]]
name = "either"
version = "1.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91"
[[package]]
name = "encoding_rs"
version = "0.8.32"
@@ -910,6 +959,15 @@ version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "memoffset"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1"
dependencies = [
"autocfg",
]
[[package]]
name = "merge_config_files"
version = "0.1.3"
@@ -987,6 +1045,7 @@ dependencies = [
"serde",
"serde_json",
"simple_logger",
"sysinfo",
"termination_signal",
"tokio",
"uuid",
@@ -1021,6 +1080,15 @@ dependencies = [
"tempfile",
]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "num"
version = "0.4.0"
@@ -1304,6 +1372,28 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rayon"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b"
dependencies = [
"either",
"rayon-core",
]
[[package]]
name = "rayon-core"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d"
dependencies = [
"crossbeam-channel",
"crossbeam-deque",
"crossbeam-utils",
"num_cpus",
]
[[package]]
name = "redox_syscall"
version = "0.3.5"
@@ -1634,6 +1724,21 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160"
[[package]]
name = "sysinfo"
version = "0.29.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9557d0845b86eea8182f7b10dff120214fb6cd9fd937b6f4917714e546a38695"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"rayon",
"winapi",
]
[[package]]
name = "tempfile"
version = "3.6.0"

View File

@@ -21,6 +21,7 @@ tower = { version = "0.4", features = ["timeout"] }
tower-http = { version = "0.4", features = ["fs", "cors"] }
reqwest = { version = "0.11", features = ["json"] }
clap = { version = "4.3", features = ["derive"] }
uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
dotenv = "0.15"
@@ -34,7 +35,7 @@ diff-struct = "0.5"
typeshare = "1.0.1"
strum = "0.24"
strum_macros = "0.24"
uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] }
sysinfo = "0.29"
# mogh
run_command = { version = "0.0.6", features = ["async_tokio"] }
slack = { package = "slack_client_rs", version = "0.0.8" }

View File

@@ -0,0 +1,2 @@
pub mod server;

View File

@@ -0,0 +1,100 @@
use std::path::PathBuf;
use serde::{Serialize, Deserialize};
use typeshare::typeshare;
use crate::Timelength;
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SystemInformation {
pub name: Option<String>,
pub os: Option<String>,
pub kernel: Option<String>,
pub core_count: Option<u32>,
pub host_name: Option<String>,
pub cpu_brand: String,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct SystemStats {
#[serde(default)]
pub system_load: f64,
pub cpu_perc: f32,
pub cpu_freq_mhz: f64,
pub mem_used_gb: f64, // in GB
pub mem_total_gb: f64, // in GB
pub disk: DiskUsage,
#[serde(default)]
pub cpus: Vec<SingleCpuUsage>,
#[serde(default)]
pub networks: Vec<SystemNetwork>,
#[serde(default)]
pub components: Vec<SystemComponent>,
#[serde(default)]
pub processes: Vec<SystemProcess>,
pub polling_rate: Timelength,
pub refresh_ts: u128,
pub refresh_list_ts: u128,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SystemProcess {
pub pid: u32,
pub name: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub exe: String,
pub cmd: Vec<String>,
#[serde(default)]
pub start_time: f64,
pub cpu_perc: f32,
pub mem_mb: f64,
pub disk_read_kb: f64,
pub disk_write_kb: f64,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SystemNetwork {
pub name: String,
pub recieved_kb: f64, // in kB
pub transmitted_kb: f64, // in kB
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SystemComponent {
pub label: String,
pub temp: f32,
pub max: f32,
#[serde(skip_serializing_if = "Option::is_none")]
pub critical: Option<f32>,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
pub struct DiskUsage {
pub used_gb: f64, // in GB
pub total_gb: f64, // in GB
pub read_kb: f64, // in kB
pub write_kb: f64, // in kB
#[serde(default)]
pub disks: Vec<SingleDiskUsage>,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SingleDiskUsage {
pub mount: PathBuf,
pub used_gb: f64, // in GB
pub total_gb: f64, // in GB
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct SingleCpuUsage {
pub name: String,
pub usage: f32,
}

View File

@@ -27,6 +27,7 @@ clap.workspace = true
log.workspace = true
simple_logger.workspace = true
uuid.workspace = true
sysinfo.workspace = true
# mogh
async_timing_util.workspace = true
merge_config_files.workspace = true

View File

@@ -3,6 +3,8 @@ use monitor_types::periphery_api::{requests::GetVersionResponse, PeripheryReques
use crate::state::State;
mod system_info;
impl State {
pub async fn handle_request(&self, request: PeripheryRequest) -> anyhow::Result<String> {
match request {

View File

View File

@@ -0,0 +1,2 @@
pub mod stats;

View File

@@ -0,0 +1,339 @@
use std::{cmp::Ordering, sync::{Arc, RwLock}};
use async_timing_util::wait_until_timelength;
use monitor_types::{entities::server::{SystemInformation, SystemProcess, SystemComponent, SingleDiskUsage, DiskUsage, SystemNetwork, SingleCpuUsage, SystemStats}, Timelength};
use sysinfo::{SystemExt, CpuExt, PidExt, ProcessExt, ComponentExt, DiskExt, NetworkExt};
type StatsClient = Arc<RwLock<InnerStatsClient>>;
pub struct InnerStatsClient {
pub info: SystemInformation,
sys: sysinfo::System,
cache: SystemStats,
polling_rate: Timelength,
refresh_ts: u128,
refresh_list_ts: u128,
// receiver: Receiver<SystemStats>,
}
const BYTES_PER_GB: f64 = 1073741824.0;
const BYTES_PER_MB: f64 = 1048576.0;
const BYTES_PER_KB: f64 = 1024.0;
impl InnerStatsClient {
pub fn new(polling_rate: Timelength) -> StatsClient {
// let (sender, receiver) = broadcast::channel::<SystemStats>(10);
let sys = sysinfo::System::new_all();
let client = InnerStatsClient {
info: get_system_information(&sys),
sys,
cache: SystemStats::default(),
polling_rate,
refresh_ts: 0,
refresh_list_ts: 0,
// receiver,
};
let client = Arc::new(RwLock::new(client));
let clone = client.clone();
tokio::spawn(async move {
let polling_rate = polling_rate.to_string().parse().unwrap();
loop {
let ts = wait_until_timelength(polling_rate, 0).await;
{
let mut client = clone.write().unwrap();
client.refresh();
client.refresh_ts = ts;
client.cache = client.get_stats();
}
// sender
// .send(clone.read().unwrap().cache.clone())
// .expect("failed to broadcast new stats to reciever");
}
});
let clone = client.clone();
tokio::spawn(async move {
loop {
let ts = wait_until_timelength(async_timing_util::Timelength::FiveMinutes, 0).await;
let mut client = clone.write().unwrap();
client.refresh_lists();
client.refresh_list_ts = ts;
}
});
client
}
// fn ws_subscribe(
// &self,
// ws: WebSocketUpgrade,
// query: Arc<SystemStatsQuery>,
// ) -> impl IntoResponse {
// // println!("client subscribe");
// let mut reciever = self.get_receiver();
// ws.on_upgrade(|socket| async move {
// let (mut ws_sender, mut ws_reciever) = socket.split();
// let cancel = CancellationToken::new();
// let cancel_clone = cancel.clone();
// tokio::spawn(async move {
// loop {
// let mut stats = select! {
// _ = cancel_clone.cancelled() => break,
// stats = reciever.recv() => { stats.expect("failed to recv stats msg") }
// };
// if query.cpus {
// stats.cpus = vec![]
// }
// if !query.disks {
// stats.disk.disks = vec![]
// }
// if !query.components {
// stats.components = vec![]
// }
// if !query.networks {
// stats.networks = vec![]
// }
// if !query.processes {
// stats.processes = vec![]
// }
// let _ = ws_sender
// .send(Message::Text(serde_json::to_string(&stats).unwrap()))
// .await;
// }
// });
// while let Some(msg) = ws_reciever.next().await {
// match msg {
// Ok(msg) => match msg {
// Message::Close(_) => {
// // println!("client CLOSE");
// cancel.cancel();
// return;
// }
// _ => {}
// },
// Err(_) => {
// // println!("client CLOSE");
// cancel.cancel();
// return;
// }
// }
// }
// })
// }
// fn get_receiver(&self) -> Receiver<SystemStats> {
// self.receiver.resubscribe()
// }
fn refresh(&mut self) {
self.sys.refresh_cpu();
self.sys.refresh_memory();
self.sys.refresh_networks();
self.sys.refresh_disks();
self.sys.refresh_components();
self.sys.refresh_processes();
}
fn refresh_lists(&mut self) {
self.sys.refresh_networks_list();
self.sys.refresh_disks_list();
self.sys.refresh_components_list();
}
fn get_cached_stats(&self, query: SystemStatsQuery) -> SystemStats {
SystemStats {
system_load: self.cache.system_load,
cpu_perc: self.cache.cpu_perc,
cpu_freq_mhz: self.cache.cpu_freq_mhz,
mem_used_gb: self.cache.mem_used_gb,
mem_total_gb: self.cache.mem_total_gb,
disk: DiskUsage {
used_gb: self.cache.disk.used_gb,
total_gb: self.cache.disk.total_gb,
read_kb: self.cache.disk.read_kb,
write_kb: self.cache.disk.write_kb,
disks: if query.disks {
self.cache.disk.disks.clone()
} else {
vec![]
},
},
cpus: if query.cpus {
self.cache.cpus.clone()
} else {
vec![]
},
networks: if query.networks {
self.cache.networks.clone()
} else {
vec![]
},
components: if query.components {
self.cache.components.clone()
} else {
vec![]
},
processes: if query.processes {
self.cache.processes.clone()
} else {
vec![]
},
polling_rate: self.cache.polling_rate,
refresh_ts: self.cache.refresh_ts,
refresh_list_ts: self.cache.refresh_list_ts,
}
}
fn get_stats(&self) -> SystemStats {
let cpu = self.sys.global_cpu_info();
SystemStats {
system_load: self.sys.load_average().one,
cpu_perc: self.sys.global_cpu_info().cpu_usage(),
cpu_freq_mhz: cpu.frequency() as f64,
mem_used_gb: self.sys.used_memory() as f64 / BYTES_PER_GB,
mem_total_gb: self.sys.total_memory() as f64 / BYTES_PER_GB,
disk: self.get_disk_usage(),
cpus: self.get_cpus(),
networks: self.get_networks(),
components: self.get_components(),
processes: self.get_processes(),
polling_rate: self.polling_rate,
refresh_ts: self.refresh_ts,
refresh_list_ts: self.refresh_list_ts,
}
}
fn get_cpus(&self) -> Vec<SingleCpuUsage> {
self.sys
.cpus()
.iter()
.map(|cpu| SingleCpuUsage {
name: cpu.name().to_string(),
usage: cpu.cpu_usage(),
})
.collect()
}
fn get_networks(&self) -> Vec<SystemNetwork> {
self.sys
.networks()
.into_iter()
.map(|(name, n)| SystemNetwork {
name: name.clone(),
recieved_kb: n.received() as f64 / BYTES_PER_KB,
transmitted_kb: n.transmitted() as f64 / BYTES_PER_KB,
})
.filter(|n| n.recieved_kb > 0.0 || n.transmitted_kb > 0.0)
.collect()
}
fn get_disk_usage(&self) -> DiskUsage {
let mut free_gb = 0.0;
let mut total_gb = 0.0;
let disks = self
.sys
.disks()
.iter()
.map(|disk| {
let disk_total = disk.total_space() as f64 / BYTES_PER_GB;
let disk_free = disk.available_space() as f64 / BYTES_PER_GB;
total_gb += disk_total;
free_gb += disk_free;
SingleDiskUsage {
mount: disk.mount_point().to_owned(),
used_gb: disk_total - disk_free,
total_gb: disk_total,
}
})
.collect::<Vec<_>>();
let used_gb = total_gb - free_gb;
let mut read_bytes = 0;
let mut write_bytes = 0;
for process in self.sys.processes().values() {
let disk_usage = process.disk_usage();
read_bytes += disk_usage.read_bytes;
write_bytes += disk_usage.written_bytes;
}
DiskUsage {
used_gb,
total_gb,
read_kb: read_bytes as f64 / BYTES_PER_KB,
write_kb: write_bytes as f64 / BYTES_PER_KB,
disks,
}
}
fn get_components(&self) -> Vec<SystemComponent> {
let mut comps: Vec<_> = self
.sys
.components()
.iter()
.map(|c| SystemComponent {
label: c.label().to_string(),
temp: c.temperature(),
max: c.max(),
critical: c.critical(),
})
.collect();
comps.sort_by(|a, b| {
if a.critical.is_some() {
if b.critical.is_some() {
let a_perc = a.temp / *a.critical.as_ref().unwrap();
let b_perc = b.temp / *b.critical.as_ref().unwrap();
if a_perc > b_perc {
Ordering::Less
} else {
Ordering::Greater
}
} else {
Ordering::Less
}
} else if b.critical.is_some() {
Ordering::Greater
} else {
Ordering::Equal
}
});
comps
}
fn get_processes(&self) -> Vec<SystemProcess> {
let mut procs: Vec<_> = self
.sys
.processes()
.iter()
.map(|(pid, p)| {
let disk_usage = p.disk_usage();
SystemProcess {
pid: pid.as_u32(),
name: p.name().to_string(),
exe: p.exe().to_str().unwrap_or("").to_string(),
cmd: p.cmd().to_vec(),
start_time: (p.start_time() * 1000) as f64,
cpu_perc: p.cpu_usage(),
mem_mb: p.memory() as f64 / BYTES_PER_MB,
disk_read_kb: disk_usage.read_bytes as f64 / BYTES_PER_KB,
disk_write_kb: disk_usage.written_bytes as f64 / BYTES_PER_KB,
}
})
.collect();
procs.sort_by(|a, b| {
if a.cpu_perc > b.cpu_perc {
Ordering::Less
} else {
Ordering::Greater
}
});
procs
}
}
fn get_system_information(sys: &sysinfo::System) -> SystemInformation {
let cpu = sys.global_cpu_info();
SystemInformation {
name: sys.name(),
os: sys.long_os_version(),
kernel: sys.kernel_version(),
core_count: sys.physical_core_count().map(|c| c as u32),
host_name: sys.host_name(),
cpu_brand: cpu.brand().to_string(),
}
}

View File

@@ -16,6 +16,7 @@ mod api;
mod config;
mod guard;
mod state;
mod helpers;
async fn app() -> anyhow::Result<()> {
let state = State::load().await?;