diff --git a/Cargo.lock b/Cargo.lock index 4570582a7..1a36af8d9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -360,6 +360,7 @@ dependencies = [ "db_client", "dotenv", "envy", + "futures-util", "helpers", "hmac", "jwt", @@ -372,6 +373,7 @@ dependencies = [ "sha2", "slack_client_rs", "tokio", + "tokio-util", "tower", "tower-http", "types", diff --git a/core/Cargo.toml b/core/Cargo.toml index 85026fefc..164fb27e9 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -11,6 +11,7 @@ db = { package = "db_client", path = "../lib/db_client" } types = { path = "../lib/types" } periphery = { package = "periphery_client", path = "../lib/periphery_client" } tokio = { version = "1.21", features = ["full"] } +tokio-util = "0.7" axum = { version = "0.5", features = ["ws", "json"] } axum-extra = { version = "0.3", features = ["spa"] } tower = { version = "0.4", features = ["full"] } @@ -28,4 +29,5 @@ bcrypt = "0.13" jwt = "0.16" hmac = "0.12" sha2 = "0.10" -async_timing_util = "0.1.11" \ No newline at end of file +async_timing_util = "0.1.11" +futures-util = "0.3" \ No newline at end of file diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs index 7810e0064..20fcbcba1 100644 --- a/core/src/api/mod.rs +++ b/core/src/api/mod.rs @@ -57,6 +57,6 @@ async fn add_update( .context("failed to insert update into db. the create build process was completed.")?; update.id = Some(ObjectId::from_str(&update_id).context("failed at attaching update id")?); let update_msg = serde_json::to_string(&update).unwrap(); - let _ = update_ws.lock().await.send((update, update_msg)); + let _ = update_ws.lock().await.send(update); Ok(()) } diff --git a/core/src/api/server.rs b/core/src/api/server.rs index a8b85ce94..53d5dd51c 100644 --- a/core/src/api/server.rs +++ b/core/src/api/server.rs @@ -1,6 +1,9 @@ use anyhow::{anyhow, Context}; use async_timing_util::unix_timestamp_ms; -use axum::{routing::post, Extension, Json, Router}; +use axum::{ + routing::{get, post}, + Extension, Json, Router, +}; use db::DbExtension; use helpers::handle_anyhow_error; use types::{EntityType, Operation, PermissionLevel, Server, Update}; @@ -10,14 +13,44 @@ use crate::{auth::RequestUserExtension, ws::update}; use super::add_update; pub fn router() -> Router { - Router::new().route( - "/create", - post(|db, user, update_ws, server| async { - create(db, user, update_ws, server) - .await - .map_err(handle_anyhow_error) - }), - ) + Router::new() + .route( + "/list", + get(|db, user| async { list(db, user).await.map_err(handle_anyhow_error) }), + ) + .route( + "/create", + post(|db, user, update_ws, server| async { + create(db, user, update_ws, server) + .await + .map_err(handle_anyhow_error) + }), + ) +} + +async fn list( + Extension(db): DbExtension, + Extension(user): RequestUserExtension, +) -> anyhow::Result>> { + let mut servers: Vec = db + .servers + .get_some(None, None) + .await + .context("failed at get all servers query")? + .into_iter() + .filter(|s| { + if user.is_admin { + true + } else { + match s.permissions.get(&user.id) { + Some(permissions) => *permissions != PermissionLevel::None, + None => false, + } + } + }) + .collect(); + servers.sort_by(|a, b| a.name.to_lowercase().cmp(&b.name.to_lowercase())); + Ok(Json(servers)) } async fn create( diff --git a/core/src/ws/update.rs b/core/src/ws/update.rs index f7638ee3c..307fa642f 100644 --- a/core/src/ws/update.rs +++ b/core/src/ws/update.rs @@ -10,23 +10,54 @@ use axum::{ Extension, }; use db::{DbClient, DbExtension}; +use futures_util::{SinkExt, StreamExt}; +use mungos::Serialize; use serde_json::json; +use tokio::{ + select, + sync::{ + watch::{self, Receiver, Sender}, + Mutex, + }, +}; +use tokio_util::sync::CancellationToken; use types::{EntityType, Update, User}; use crate::auth::{JwtClient, JwtExtension}; -use tokio::sync::{ - watch::{self, Receiver, Sender}, - Mutex, -}; use super::user_permissions; -pub type WsSender = Arc>>; +pub type WsSender = Arc>>; pub type WsSenderExtension = Extension; -pub type WsReciever = Receiver<(Update, String)>; +pub type WsReciever = Receiver; pub type WsRecieverExtension = Extension; +#[derive(Serialize)] +struct UpdateMsg { + #[serde(rename = "type")] + msg_type: MsgType, + update: Update, +} + +#[derive(Serialize)] +#[serde(rename_all = "UPPERCASE")] +enum MsgType { + Login, + Close, + Update, +} + +impl UpdateMsg { + fn from_update(update: Update) -> Message { + let msg = UpdateMsg { + msg_type: MsgType::Update, + update, + }; + Message::Text(serde_json::to_string(&msg).unwrap()) + } +} + pub fn make_update_ws_sender_reciver() -> (WsSenderExtension, WsRecieverExtension) { let (sender, reciever) = watch::channel(Default::default()); ( @@ -43,38 +74,72 @@ pub async fn ws_handler( ) -> impl IntoResponse { ws.on_upgrade(|socket| async move { match login(socket, &jwt_client, &db_client).await { - Some((mut socket, user_id)) => loop { - let _ = reciever.changed().await; - let user = db_client.users.find_one_by_id(&user_id).await; - if user.is_err() - || user.as_ref().unwrap().is_none() - || !user.as_ref().unwrap().as_ref().unwrap().enabled - { - let _ = socket - .send(Message::Text(json!({ "type": "INVALID_USER" }).to_string())) - .await; - let _ = socket.close().await; - return; - } - let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return - let (update, update_msg) = reciever.borrow().to_owned(); - match user_can_see_update( - &user, - &user_id, - update.entity_type, - &update.entity_id, - &db_client, - ) - .await - { - Ok(_) => { - let _ = socket.send(Message::Text(update_msg)).await; + Some((mut socket, user_id)) => { + let (ws_sender, mut ws_reciever) = socket.split(); + let ws_sender = Arc::new(Mutex::new(ws_sender)); + let ws_sender_clone = ws_sender.clone(); + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + tokio::spawn(async move { + loop { + select! { + _ = cancel_clone.cancelled() => break, + _ = reciever.changed() => {} + }; + let user = db_client.users.find_one_by_id(&user_id).await; + if user.is_err() + || user.as_ref().unwrap().is_none() + || !user.as_ref().unwrap().as_ref().unwrap().enabled + { + let _ = ws_sender + .lock() + .await + .send(Message::Text(json!({ "type": "INVALID_USER" }).to_string())) + .await; + let _ = ws_sender.lock().await.close().await; + return; + } + let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return + let update = reciever.borrow().to_owned(); + match user_can_see_update( + &user, + &user_id, + update.entity_type, + &update.entity_id, + &db_client, + ) + .await + { + Ok(_) => { + let _ = ws_sender + .lock() + .await + .send(UpdateMsg::from_update(update)) + .await; + } + Err(_) => { + // make these error visible in some way + } + } } - Err(_) => { - // make these error visible in some way + }); + + while let Some(msg) = ws_reciever.next().await { + match msg { + Ok(msg) => match msg { + Message::Close(_) => { + cancel.cancel(); + return; + } + _ => {} + }, + Err(_) => { + cancel.cancel(); + return; + } } } - }, + } None => {} } }) @@ -89,7 +154,14 @@ async fn login( match jwt { Ok(jwt) => match jwt { Message::Text(jwt) => match jwt_client.auth_jwt(&jwt, db_client).await { - Ok(user) => Some((socket, user.id)), + Ok(user) => { + socket + .send(Message::Text( + json!({ "type": "LOGIN", "success": true }).to_string(), + )) + .await; + Some((socket, user.id)) + } Err(e) => { let _ = socket .send(Message::Text(format!( @@ -100,7 +172,13 @@ async fn login( None } }, - _ => None, + msg => { + let _ = socket + .send(Message::Text(format!("invalid login msg: {msg:#?}"))) + .await; + let _ = socket.close().await; + None + } }, Err(e) => { let _ = socket diff --git a/lib/types/src/lib.rs b/lib/types/src/lib.rs index bbd959a9d..d3b6668cf 100644 --- a/lib/types/src/lib.rs +++ b/lib/types/src/lib.rs @@ -45,12 +45,16 @@ pub struct Server { pub id: Option, pub name: String, pub address: String, + #[serde(default)] pub permissions: PermissionsMap, + #[serde(default)] pub to_notify: Vec, + #[serde(default = "default_cpu_alert")] pub cpu_alert: f64, + #[serde(default = "default_mem_alert")] pub mem_alert: f64, + #[serde(default = "default_disk_alert")] pub disk_alert: f64, - pub is_builder: bool, #[serde(skip_serializing_if = "Option::is_none")] pub stats_interval: Option, #[serde(skip_serializing_if = "Option::is_none")] @@ -67,10 +71,9 @@ impl Default for Server { address: String::new(), permissions: HashMap::new(), to_notify: Vec::new(), - cpu_alert: 50.0, - mem_alert: 75.0, - disk_alert: 75.0, - is_builder: false, + cpu_alert: default_cpu_alert(), + mem_alert: default_mem_alert(), + disk_alert: default_disk_alert(), stats_interval: None, region: None, instance_id: None, @@ -78,6 +81,18 @@ impl Default for Server { } } +fn default_cpu_alert() -> f64 { + 50.0 +} + +fn default_mem_alert() -> f64 { + 75.0 +} + +fn default_disk_alert() -> f64 { + 75.0 +} + #[derive(Serialize, Deserialize, Debug, Clone)] pub struct Deployment { #[serde(rename = "_id", skip_serializing_if = "Option::is_none")]