forked from github-starred/komodo
begin recording system stats
This commit is contained in:
@@ -258,7 +258,10 @@ async fn modify_user_enabled(
|
||||
async fn modify_user_create_server_permissions(
|
||||
Extension(state): StateExtension,
|
||||
Extension(user): RequestUserExtension,
|
||||
Json(ModifyUserCreateServerBody { user_id, create_server_permissions }): Json<ModifyUserCreateServerBody>,
|
||||
Json(ModifyUserCreateServerBody {
|
||||
user_id,
|
||||
create_server_permissions,
|
||||
}): Json<ModifyUserCreateServerBody>,
|
||||
) -> anyhow::Result<Update> {
|
||||
if !user.is_admin {
|
||||
return Err(anyhow!(
|
||||
@@ -275,16 +278,26 @@ async fn modify_user_create_server_permissions(
|
||||
state
|
||||
.db
|
||||
.users
|
||||
.update_one::<Document>(&user_id, mungos::Update::Set(doc! { "create_server_permissions": create_server_permissions }))
|
||||
.update_one::<Document>(
|
||||
&user_id,
|
||||
mungos::Update::Set(doc! { "create_server_permissions": create_server_permissions }),
|
||||
)
|
||||
.await?;
|
||||
let update_type = if create_server_permissions { "enabled" } else { "disabled" };
|
||||
let update_type = if create_server_permissions {
|
||||
"enabled"
|
||||
} else {
|
||||
"disabled"
|
||||
};
|
||||
let ts = monitor_timestamp();
|
||||
let mut update = Update {
|
||||
target: UpdateTarget::System,
|
||||
operation: Operation::ModifyUserCreateServerPermissions,
|
||||
logs: vec![Log::simple(
|
||||
"modify user create server permissions",
|
||||
format!("{update_type} create server permissions for {} (id: {})", user.username, user.id),
|
||||
format!(
|
||||
"{update_type} create server permissions for {} (id: {})",
|
||||
user.username, user.id
|
||||
),
|
||||
)],
|
||||
start_ts: ts.clone(),
|
||||
end_ts: Some(ts),
|
||||
|
||||
@@ -3,10 +3,16 @@ use std::{
|
||||
sync::{Arc, Mutex},
|
||||
};
|
||||
|
||||
use async_timing_util::{wait_until_timelength, Timelength};
|
||||
use axum::Extension;
|
||||
use db::DbClient;
|
||||
use futures_util::future::join_all;
|
||||
use mungos::doc;
|
||||
use periphery::PeripheryClient;
|
||||
use types::{BuildActionState, CoreConfig, DeploymentActionState, ServerActionState};
|
||||
use types::{
|
||||
BuildActionState, CoreConfig, DeploymentActionState, ServerActionState, SystemStatsQuery,
|
||||
SystemStatsRecord,
|
||||
};
|
||||
|
||||
use crate::ws::update::UpdateWsChannel;
|
||||
|
||||
@@ -19,25 +25,76 @@ pub struct State {
|
||||
pub db: DbClient,
|
||||
pub update: UpdateWsChannel,
|
||||
pub periphery: PeripheryClient,
|
||||
pub slack: Option<slack::Client>,
|
||||
pub build_action_states: ActionStateMap<BuildActionState>,
|
||||
pub deployment_action_states: ActionStateMap<DeploymentActionState>,
|
||||
pub server_action_states: ActionStateMap<ServerActionState>,
|
||||
}
|
||||
|
||||
impl State {
|
||||
pub async fn new(config: CoreConfig) -> State {
|
||||
State {
|
||||
pub async fn new(config: CoreConfig) -> Arc<State> {
|
||||
let state = State {
|
||||
db: DbClient::new(config.mongo.clone()).await,
|
||||
slack: config.slack_url.clone().map(|url| slack::Client::new(&url)),
|
||||
config,
|
||||
update: UpdateWsChannel::new(),
|
||||
periphery: PeripheryClient::default(),
|
||||
build_action_states: Default::default(),
|
||||
deployment_action_states: Default::default(),
|
||||
server_action_states: Default::default(),
|
||||
}
|
||||
};
|
||||
let state = Arc::new(state);
|
||||
let state_clone = state.clone();
|
||||
tokio::spawn(async move { state_clone.collect_server_stats().await });
|
||||
state
|
||||
}
|
||||
|
||||
pub async fn extension(config: CoreConfig) -> StateExtension {
|
||||
Extension(Arc::new(State::new(config).await))
|
||||
Extension(State::new(config).await)
|
||||
}
|
||||
|
||||
pub async fn collect_server_stats(&self) {
|
||||
loop {
|
||||
let ts = wait_until_timelength(Timelength::OneMinute, 0).await as i64;
|
||||
let servers = self
|
||||
.db
|
||||
.servers
|
||||
.get_some(doc! { "enabled": true }, None)
|
||||
.await;
|
||||
if let Err(e) = servers {
|
||||
eprintln!("failed to get server list from db: {e:?}");
|
||||
continue;
|
||||
}
|
||||
let futures = servers.unwrap().into_iter().map(|server| async move {
|
||||
let stats = self
|
||||
.periphery
|
||||
.get_system_stats(
|
||||
&server,
|
||||
&SystemStatsQuery {
|
||||
networks: true,
|
||||
components: true,
|
||||
processes: false,
|
||||
},
|
||||
)
|
||||
.await;
|
||||
(server, stats)
|
||||
});
|
||||
for (server, res) in join_all(futures).await {
|
||||
if let Err(e) = res {
|
||||
if let Some(slack) = &self.slack {
|
||||
let res = slack
|
||||
.send_message_with_header(format!(""), format!(""))
|
||||
.await;
|
||||
}
|
||||
continue;
|
||||
}
|
||||
let stats = res.unwrap();
|
||||
let res = self
|
||||
.db
|
||||
.stats
|
||||
.create_one(SystemStatsRecord::from_stats(server.id, ts, stats))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use anyhow::Context;
|
||||
use mungos::{Collection, Mungos};
|
||||
use types::{Build, Deployment, Group, Procedure, Server, Update, User};
|
||||
use types::{Build, Deployment, Group, Procedure, Server, SystemStatsRecord, Update, User};
|
||||
|
||||
pub async fn users_collection(mungos: &Mungos, db_name: &str) -> anyhow::Result<Collection<User>> {
|
||||
let coll = mungos.collection(db_name, "users");
|
||||
@@ -81,3 +81,17 @@ pub async fn groups_collection(
|
||||
.context("failed at creating name index")?;
|
||||
Ok(coll)
|
||||
}
|
||||
|
||||
pub async fn server_stats_collection(
|
||||
mungos: &Mungos,
|
||||
db_name: &str,
|
||||
) -> anyhow::Result<Collection<SystemStatsRecord>> {
|
||||
let coll = mungos.collection(db_name, "stats");
|
||||
coll.create_index("server_id")
|
||||
.await
|
||||
.context("failed at creating server_id index")?;
|
||||
coll.create_index("ts")
|
||||
.await
|
||||
.context("failed at creating ts index")?;
|
||||
Ok(coll)
|
||||
}
|
||||
|
||||
@@ -3,11 +3,12 @@ use std::time::Duration;
|
||||
use anyhow::{anyhow, Context};
|
||||
use collections::{
|
||||
builds_collection, deployments_collection, groups_collection, procedures_collection,
|
||||
servers_collection, updates_collection, users_collection,
|
||||
server_stats_collection, servers_collection, updates_collection, users_collection,
|
||||
};
|
||||
use mungos::{Collection, Mungos};
|
||||
use types::{
|
||||
Build, Deployment, Group, MongoConfig, PermissionLevel, Procedure, Server, Update, User,
|
||||
Build, Deployment, Group, MongoConfig, PermissionLevel, Procedure, Server, SystemStatsRecord,
|
||||
Update, User,
|
||||
};
|
||||
|
||||
mod collections;
|
||||
@@ -20,6 +21,7 @@ pub struct DbClient {
|
||||
pub procedures: Collection<Procedure>,
|
||||
pub groups: Collection<Group>,
|
||||
pub updates: Collection<Update>,
|
||||
pub stats: Collection<SystemStatsRecord>,
|
||||
}
|
||||
|
||||
impl DbClient {
|
||||
@@ -50,6 +52,9 @@ impl DbClient {
|
||||
groups: groups_collection(&mungos, db_name)
|
||||
.await
|
||||
.expect("failed to make groups collection"),
|
||||
stats: server_stats_collection(&mungos, db_name)
|
||||
.await
|
||||
.expect("failed to make stats collection"),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
use diff::{Diff, HashMapDiff, OptionDiff, VecDiff};
|
||||
|
||||
use crate::{deployment::{DockerRunArgsDiff, RestartModeDiff}, TimelengthDiff};
|
||||
use crate::{
|
||||
deployment::{DockerRunArgsDiff, RestartModeDiff},
|
||||
TimelengthDiff,
|
||||
};
|
||||
|
||||
pub fn f64_diff_no_change(f64_diff: &f64) -> bool {
|
||||
*f64_diff == 0.0
|
||||
|
||||
@@ -174,7 +174,9 @@ pub enum PermissionsTarget {
|
||||
}
|
||||
|
||||
#[typeshare]
|
||||
#[derive(Serialize, Deserialize, Debug, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy, Diff)]
|
||||
#[derive(
|
||||
Serialize, Deserialize, Debug, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy, Diff,
|
||||
)]
|
||||
#[serde(rename_all = "snake_case")]
|
||||
#[strum(serialize_all = "snake_case")]
|
||||
#[diff(attr(#[derive(Debug, PartialEq, Serialize)]))]
|
||||
|
||||
@@ -222,3 +222,33 @@ pub struct SystemProcess {
|
||||
pub disk_read_kb: f64,
|
||||
pub disk_write_kb: f64,
|
||||
}
|
||||
|
||||
#[typeshare]
|
||||
#[derive(Serialize, Deserialize, Debug, Default, Clone)]
|
||||
pub struct SystemStatsRecord {
|
||||
pub server_id: String,
|
||||
pub ts: i64, // unix ts milliseconds
|
||||
pub cpu_perc: f32, // in %
|
||||
pub mem_used_gb: f64, // in GB
|
||||
pub mem_total_gb: f64, // in GB
|
||||
pub disk: DiskUsage,
|
||||
pub networks: Vec<SystemNetwork>,
|
||||
pub components: Vec<SystemComponent>,
|
||||
pub polling_rate: Timelength,
|
||||
}
|
||||
|
||||
impl SystemStatsRecord {
|
||||
pub fn from_stats(server_id: String, ts: i64, stats: SystemStats) -> SystemStatsRecord {
|
||||
SystemStatsRecord {
|
||||
server_id,
|
||||
ts,
|
||||
cpu_perc: stats.cpu_perc,
|
||||
mem_used_gb: stats.mem_used_gb,
|
||||
mem_total_gb: stats.mem_total_gb,
|
||||
disk: stats.disk,
|
||||
networks: stats.networks,
|
||||
components: stats.components,
|
||||
polling_rate: stats.polling_rate,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user