diff --git a/Cargo.lock b/Cargo.lock index a7fb78264..0c2aa13f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -361,6 +361,49 @@ dependencies = [ "libc", ] +[[package]] +name = "crossbeam-channel" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce6fd6f855243022dcecf8702fef0c297d4338e226845fe067f6341ad9fa0cef" +dependencies = [ + "cfg-if", + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "46bd5f3f85273295a9d14aedfb86f6aadbff6d8f5295c4a9edb08e819dcf5695" +dependencies = [ + "autocfg", + "cfg-if", + "crossbeam-utils", + "memoffset", + "scopeguard", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c063cd8cc95f5c377ed0d4b49a4b21f632396ff690e8470c29b3359b346984b" +dependencies = [ + "cfg-if", +] + [[package]] name = "crypto-common" version = "0.1.6" @@ -409,6 +452,12 @@ version = "0.15.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "77c90badedccf4105eca100756a0b1289e191f6fcbdadd3cee1d2f614f97da8f" +[[package]] +name = "either" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fcaabb2fef8c910e7f4c7ce9f67a1283a1715879a7c230ca9d6d1ae31f16d91" + [[package]] name = "encoding_rs" version = "0.8.32" @@ -910,6 +959,15 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" +[[package]] +name = "memoffset" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d61c719bcfbcf5d62b3a09efa6088de8c54bc0bfcd3ea7ae39fcc186108b8de1" +dependencies = [ + "autocfg", +] + [[package]] name = "merge_config_files" version = "0.1.3" @@ -987,6 +1045,7 @@ dependencies = [ "serde", "serde_json", "simple_logger", + "sysinfo", "termination_signal", "tokio", "uuid", @@ -1021,6 +1080,15 @@ dependencies = [ "tempfile", ] +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "num" version = "0.4.0" @@ -1304,6 +1372,28 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d2df5196e37bcc87abebc0053e20787d73847bb33134a69841207dd0a47f03b" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b8f95bd6966f5c87776639160a66bd8ab9895d9d4ab01ddba9fc60661aebe8d" +dependencies = [ + "crossbeam-channel", + "crossbeam-deque", + "crossbeam-utils", + "num_cpus", +] + [[package]] name = "redox_syscall" version = "0.3.5" @@ -1634,6 +1724,21 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sysinfo" +version = "0.29.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9557d0845b86eea8182f7b10dff120214fb6cd9fd937b6f4917714e546a38695" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "rayon", + "winapi", +] + [[package]] name = "tempfile" version = "3.6.0" diff --git a/Cargo.toml b/Cargo.toml index e3d601ff9..5665b9627 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ tower = { version = "0.4", features = ["timeout"] } tower-http = { version = "0.4", features = ["fs", "cors"] } reqwest = { version = "0.11", features = ["json"] } clap = { version = "4.3", features = ["derive"] } +uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] } serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" dotenv = "0.15" @@ -34,7 +35,7 @@ diff-struct = "0.5" typeshare = "1.0.1" strum = "0.24" strum_macros = "0.24" -uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] } +sysinfo = "0.29" # mogh run_command = { version = "0.0.6", features = ["async_tokio"] } slack = { package = "slack_client_rs", version = "0.0.8" } diff --git a/lib/types/src/entities/mod.rs b/lib/types/src/entities/mod.rs index e69de29bb..3420c2185 100644 --- a/lib/types/src/entities/mod.rs +++ b/lib/types/src/entities/mod.rs @@ -0,0 +1,2 @@ +pub mod server; + diff --git a/lib/types/src/entities/server.rs b/lib/types/src/entities/server.rs new file mode 100644 index 000000000..6551f4c52 --- /dev/null +++ b/lib/types/src/entities/server.rs @@ -0,0 +1,100 @@ +use std::path::PathBuf; + +use serde::{Serialize, Deserialize}; +use typeshare::typeshare; + +use crate::Timelength; + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SystemInformation { + pub name: Option, + pub os: Option, + pub kernel: Option, + pub core_count: Option, + pub host_name: Option, + pub cpu_brand: String, +} + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct SystemStats { + #[serde(default)] + pub system_load: f64, + pub cpu_perc: f32, + pub cpu_freq_mhz: f64, + pub mem_used_gb: f64, // in GB + pub mem_total_gb: f64, // in GB + pub disk: DiskUsage, + #[serde(default)] + pub cpus: Vec, + #[serde(default)] + pub networks: Vec, + #[serde(default)] + pub components: Vec, + #[serde(default)] + pub processes: Vec, + pub polling_rate: Timelength, + pub refresh_ts: u128, + pub refresh_list_ts: u128, +} + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SystemProcess { + pub pid: u32, + pub name: String, + #[serde(default, skip_serializing_if = "String::is_empty")] + pub exe: String, + pub cmd: Vec, + #[serde(default)] + pub start_time: f64, + pub cpu_perc: f32, + pub mem_mb: f64, + pub disk_read_kb: f64, + pub disk_write_kb: f64, +} + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SystemNetwork { + pub name: String, + pub recieved_kb: f64, // in kB + pub transmitted_kb: f64, // in kB +} + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SystemComponent { + pub label: String, + pub temp: f32, + pub max: f32, + #[serde(skip_serializing_if = "Option::is_none")] + pub critical: Option, +} + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Default, Clone)] +pub struct DiskUsage { + pub used_gb: f64, // in GB + pub total_gb: f64, // in GB + pub read_kb: f64, // in kB + pub write_kb: f64, // in kB + #[serde(default)] + pub disks: Vec, +} + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SingleDiskUsage { + pub mount: PathBuf, + pub used_gb: f64, // in GB + pub total_gb: f64, // in GB +} + +#[typeshare] +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct SingleCpuUsage { + pub name: String, + pub usage: f32, +} \ No newline at end of file diff --git a/periphery/Cargo.toml b/periphery/Cargo.toml index 1b6d84954..28be0d4e3 100644 --- a/periphery/Cargo.toml +++ b/periphery/Cargo.toml @@ -27,6 +27,7 @@ clap.workspace = true log.workspace = true simple_logger.workspace = true uuid.workspace = true +sysinfo.workspace = true # mogh async_timing_util.workspace = true merge_config_files.workspace = true diff --git a/periphery/src/api/mod.rs b/periphery/src/api/mod.rs index de1ca2662..ae44900d4 100644 --- a/periphery/src/api/mod.rs +++ b/periphery/src/api/mod.rs @@ -3,6 +3,8 @@ use monitor_types::periphery_api::{requests::GetVersionResponse, PeripheryReques use crate::state::State; +mod system_info; + impl State { pub async fn handle_request(&self, request: PeripheryRequest) -> anyhow::Result { match request { diff --git a/periphery/src/api/system_info.rs b/periphery/src/api/system_info.rs new file mode 100644 index 000000000..e69de29bb diff --git a/periphery/src/helpers/mod.rs b/periphery/src/helpers/mod.rs new file mode 100644 index 000000000..a8c7d2d14 --- /dev/null +++ b/periphery/src/helpers/mod.rs @@ -0,0 +1,2 @@ +pub mod stats; + diff --git a/periphery/src/helpers/stats.rs b/periphery/src/helpers/stats.rs new file mode 100644 index 000000000..f10c98178 --- /dev/null +++ b/periphery/src/helpers/stats.rs @@ -0,0 +1,339 @@ +use std::{cmp::Ordering, sync::{Arc, RwLock}}; + +use async_timing_util::wait_until_timelength; +use monitor_types::{entities::server::{SystemInformation, SystemProcess, SystemComponent, SingleDiskUsage, DiskUsage, SystemNetwork, SingleCpuUsage, SystemStats}, Timelength}; +use sysinfo::{SystemExt, CpuExt, PidExt, ProcessExt, ComponentExt, DiskExt, NetworkExt}; + +type StatsClient = Arc>; + +pub struct InnerStatsClient { + pub info: SystemInformation, + sys: sysinfo::System, + cache: SystemStats, + polling_rate: Timelength, + refresh_ts: u128, + refresh_list_ts: u128, + // receiver: Receiver, +} + +const BYTES_PER_GB: f64 = 1073741824.0; +const BYTES_PER_MB: f64 = 1048576.0; +const BYTES_PER_KB: f64 = 1024.0; + +impl InnerStatsClient { + pub fn new(polling_rate: Timelength) -> StatsClient { + // let (sender, receiver) = broadcast::channel::(10); + let sys = sysinfo::System::new_all(); + let client = InnerStatsClient { + info: get_system_information(&sys), + sys, + cache: SystemStats::default(), + polling_rate, + refresh_ts: 0, + refresh_list_ts: 0, + // receiver, + }; + let client = Arc::new(RwLock::new(client)); + let clone = client.clone(); + tokio::spawn(async move { + let polling_rate = polling_rate.to_string().parse().unwrap(); + loop { + let ts = wait_until_timelength(polling_rate, 0).await; + { + let mut client = clone.write().unwrap(); + client.refresh(); + client.refresh_ts = ts; + client.cache = client.get_stats(); + } + // sender + // .send(clone.read().unwrap().cache.clone()) + // .expect("failed to broadcast new stats to reciever"); + } + }); + let clone = client.clone(); + tokio::spawn(async move { + loop { + let ts = wait_until_timelength(async_timing_util::Timelength::FiveMinutes, 0).await; + let mut client = clone.write().unwrap(); + client.refresh_lists(); + client.refresh_list_ts = ts; + } + }); + client + } + + // fn ws_subscribe( + // &self, + // ws: WebSocketUpgrade, + // query: Arc, + // ) -> impl IntoResponse { + // // println!("client subscribe"); + // let mut reciever = self.get_receiver(); + // ws.on_upgrade(|socket| async move { + // let (mut ws_sender, mut ws_reciever) = socket.split(); + // let cancel = CancellationToken::new(); + // let cancel_clone = cancel.clone(); + // tokio::spawn(async move { + // loop { + // let mut stats = select! { + // _ = cancel_clone.cancelled() => break, + // stats = reciever.recv() => { stats.expect("failed to recv stats msg") } + // }; + // if query.cpus { + // stats.cpus = vec![] + // } + // if !query.disks { + // stats.disk.disks = vec![] + // } + // if !query.components { + // stats.components = vec![] + // } + // if !query.networks { + // stats.networks = vec![] + // } + // if !query.processes { + // stats.processes = vec![] + // } + // let _ = ws_sender + // .send(Message::Text(serde_json::to_string(&stats).unwrap())) + // .await; + // } + // }); + // while let Some(msg) = ws_reciever.next().await { + // match msg { + // Ok(msg) => match msg { + // Message::Close(_) => { + // // println!("client CLOSE"); + // cancel.cancel(); + // return; + // } + // _ => {} + // }, + // Err(_) => { + // // println!("client CLOSE"); + // cancel.cancel(); + // return; + // } + // } + // } + // }) + // } + + // fn get_receiver(&self) -> Receiver { + // self.receiver.resubscribe() + // } + + fn refresh(&mut self) { + self.sys.refresh_cpu(); + self.sys.refresh_memory(); + self.sys.refresh_networks(); + self.sys.refresh_disks(); + self.sys.refresh_components(); + self.sys.refresh_processes(); + } + + fn refresh_lists(&mut self) { + self.sys.refresh_networks_list(); + self.sys.refresh_disks_list(); + self.sys.refresh_components_list(); + } + + fn get_cached_stats(&self, query: SystemStatsQuery) -> SystemStats { + SystemStats { + system_load: self.cache.system_load, + cpu_perc: self.cache.cpu_perc, + cpu_freq_mhz: self.cache.cpu_freq_mhz, + mem_used_gb: self.cache.mem_used_gb, + mem_total_gb: self.cache.mem_total_gb, + disk: DiskUsage { + used_gb: self.cache.disk.used_gb, + total_gb: self.cache.disk.total_gb, + read_kb: self.cache.disk.read_kb, + write_kb: self.cache.disk.write_kb, + disks: if query.disks { + self.cache.disk.disks.clone() + } else { + vec![] + }, + }, + cpus: if query.cpus { + self.cache.cpus.clone() + } else { + vec![] + }, + networks: if query.networks { + self.cache.networks.clone() + } else { + vec![] + }, + components: if query.components { + self.cache.components.clone() + } else { + vec![] + }, + processes: if query.processes { + self.cache.processes.clone() + } else { + vec![] + }, + polling_rate: self.cache.polling_rate, + refresh_ts: self.cache.refresh_ts, + refresh_list_ts: self.cache.refresh_list_ts, + } + } + + fn get_stats(&self) -> SystemStats { + let cpu = self.sys.global_cpu_info(); + SystemStats { + system_load: self.sys.load_average().one, + cpu_perc: self.sys.global_cpu_info().cpu_usage(), + cpu_freq_mhz: cpu.frequency() as f64, + mem_used_gb: self.sys.used_memory() as f64 / BYTES_PER_GB, + mem_total_gb: self.sys.total_memory() as f64 / BYTES_PER_GB, + disk: self.get_disk_usage(), + cpus: self.get_cpus(), + networks: self.get_networks(), + components: self.get_components(), + processes: self.get_processes(), + polling_rate: self.polling_rate, + refresh_ts: self.refresh_ts, + refresh_list_ts: self.refresh_list_ts, + } + } + + fn get_cpus(&self) -> Vec { + self.sys + .cpus() + .iter() + .map(|cpu| SingleCpuUsage { + name: cpu.name().to_string(), + usage: cpu.cpu_usage(), + }) + .collect() + } + + fn get_networks(&self) -> Vec { + self.sys + .networks() + .into_iter() + .map(|(name, n)| SystemNetwork { + name: name.clone(), + recieved_kb: n.received() as f64 / BYTES_PER_KB, + transmitted_kb: n.transmitted() as f64 / BYTES_PER_KB, + }) + .filter(|n| n.recieved_kb > 0.0 || n.transmitted_kb > 0.0) + .collect() + } + + fn get_disk_usage(&self) -> DiskUsage { + let mut free_gb = 0.0; + let mut total_gb = 0.0; + let disks = self + .sys + .disks() + .iter() + .map(|disk| { + let disk_total = disk.total_space() as f64 / BYTES_PER_GB; + let disk_free = disk.available_space() as f64 / BYTES_PER_GB; + total_gb += disk_total; + free_gb += disk_free; + SingleDiskUsage { + mount: disk.mount_point().to_owned(), + used_gb: disk_total - disk_free, + total_gb: disk_total, + } + }) + .collect::>(); + let used_gb = total_gb - free_gb; + let mut read_bytes = 0; + let mut write_bytes = 0; + for process in self.sys.processes().values() { + let disk_usage = process.disk_usage(); + read_bytes += disk_usage.read_bytes; + write_bytes += disk_usage.written_bytes; + } + DiskUsage { + used_gb, + total_gb, + read_kb: read_bytes as f64 / BYTES_PER_KB, + write_kb: write_bytes as f64 / BYTES_PER_KB, + disks, + } + } + + fn get_components(&self) -> Vec { + let mut comps: Vec<_> = self + .sys + .components() + .iter() + .map(|c| SystemComponent { + label: c.label().to_string(), + temp: c.temperature(), + max: c.max(), + critical: c.critical(), + }) + .collect(); + comps.sort_by(|a, b| { + if a.critical.is_some() { + if b.critical.is_some() { + let a_perc = a.temp / *a.critical.as_ref().unwrap(); + let b_perc = b.temp / *b.critical.as_ref().unwrap(); + if a_perc > b_perc { + Ordering::Less + } else { + Ordering::Greater + } + } else { + Ordering::Less + } + } else if b.critical.is_some() { + Ordering::Greater + } else { + Ordering::Equal + } + }); + comps + } + + fn get_processes(&self) -> Vec { + let mut procs: Vec<_> = self + .sys + .processes() + .iter() + .map(|(pid, p)| { + let disk_usage = p.disk_usage(); + SystemProcess { + pid: pid.as_u32(), + name: p.name().to_string(), + exe: p.exe().to_str().unwrap_or("").to_string(), + cmd: p.cmd().to_vec(), + start_time: (p.start_time() * 1000) as f64, + cpu_perc: p.cpu_usage(), + mem_mb: p.memory() as f64 / BYTES_PER_MB, + disk_read_kb: disk_usage.read_bytes as f64 / BYTES_PER_KB, + disk_write_kb: disk_usage.written_bytes as f64 / BYTES_PER_KB, + } + }) + .collect(); + procs.sort_by(|a, b| { + if a.cpu_perc > b.cpu_perc { + Ordering::Less + } else { + Ordering::Greater + } + }); + procs + } +} + +fn get_system_information(sys: &sysinfo::System) -> SystemInformation { + let cpu = sys.global_cpu_info(); + SystemInformation { + name: sys.name(), + os: sys.long_os_version(), + kernel: sys.kernel_version(), + core_count: sys.physical_core_count().map(|c| c as u32), + host_name: sys.host_name(), + cpu_brand: cpu.brand().to_string(), + } +} \ No newline at end of file diff --git a/periphery/src/main.rs b/periphery/src/main.rs index fe2851eab..34f0a26c1 100644 --- a/periphery/src/main.rs +++ b/periphery/src/main.rs @@ -16,6 +16,7 @@ mod api; mod config; mod guard; mod state; +mod helpers; async fn app() -> anyhow::Result<()> { let state = State::load().await?;