list servers

This commit is contained in:
beckerinj
2022-11-20 23:46:36 -05:00
parent 56269b2fcf
commit 37d5606eb3
6 changed files with 183 additions and 53 deletions

2
Cargo.lock generated
View File

@@ -360,6 +360,7 @@ dependencies = [
"db_client",
"dotenv",
"envy",
"futures-util",
"helpers",
"hmac",
"jwt",
@@ -372,6 +373,7 @@ dependencies = [
"sha2",
"slack_client_rs",
"tokio",
"tokio-util",
"tower",
"tower-http",
"types",

View File

@@ -11,6 +11,7 @@ db = { package = "db_client", path = "../lib/db_client" }
types = { path = "../lib/types" }
periphery = { package = "periphery_client", path = "../lib/periphery_client" }
tokio = { version = "1.21", features = ["full"] }
tokio-util = "0.7"
axum = { version = "0.5", features = ["ws", "json"] }
axum-extra = { version = "0.3", features = ["spa"] }
tower = { version = "0.4", features = ["full"] }
@@ -28,4 +29,5 @@ bcrypt = "0.13"
jwt = "0.16"
hmac = "0.12"
sha2 = "0.10"
async_timing_util = "0.1.11"
async_timing_util = "0.1.11"
futures-util = "0.3"

View File

@@ -57,6 +57,6 @@ async fn add_update(
.context("failed to insert update into db. the create build process was completed.")?;
update.id = Some(ObjectId::from_str(&update_id).context("failed at attaching update id")?);
let update_msg = serde_json::to_string(&update).unwrap();
let _ = update_ws.lock().await.send((update, update_msg));
let _ = update_ws.lock().await.send(update);
Ok(())
}

View File

@@ -1,6 +1,9 @@
use anyhow::{anyhow, Context};
use async_timing_util::unix_timestamp_ms;
use axum::{routing::post, Extension, Json, Router};
use axum::{
routing::{get, post},
Extension, Json, Router,
};
use db::DbExtension;
use helpers::handle_anyhow_error;
use types::{EntityType, Operation, PermissionLevel, Server, Update};
@@ -10,14 +13,44 @@ use crate::{auth::RequestUserExtension, ws::update};
use super::add_update;
pub fn router() -> Router {
Router::new().route(
"/create",
post(|db, user, update_ws, server| async {
create(db, user, update_ws, server)
.await
.map_err(handle_anyhow_error)
}),
)
Router::new()
.route(
"/list",
get(|db, user| async { list(db, user).await.map_err(handle_anyhow_error) }),
)
.route(
"/create",
post(|db, user, update_ws, server| async {
create(db, user, update_ws, server)
.await
.map_err(handle_anyhow_error)
}),
)
}
async fn list(
Extension(db): DbExtension,
Extension(user): RequestUserExtension,
) -> anyhow::Result<Json<Vec<Server>>> {
let mut servers: Vec<Server> = db
.servers
.get_some(None, None)
.await
.context("failed at get all servers query")?
.into_iter()
.filter(|s| {
if user.is_admin {
true
} else {
match s.permissions.get(&user.id) {
Some(permissions) => *permissions != PermissionLevel::None,
None => false,
}
}
})
.collect();
servers.sort_by(|a, b| a.name.to_lowercase().cmp(&b.name.to_lowercase()));
Ok(Json(servers))
}
async fn create(

View File

@@ -10,23 +10,54 @@ use axum::{
Extension,
};
use db::{DbClient, DbExtension};
use futures_util::{SinkExt, StreamExt};
use mungos::Serialize;
use serde_json::json;
use tokio::{
select,
sync::{
watch::{self, Receiver, Sender},
Mutex,
},
};
use tokio_util::sync::CancellationToken;
use types::{EntityType, Update, User};
use crate::auth::{JwtClient, JwtExtension};
use tokio::sync::{
watch::{self, Receiver, Sender},
Mutex,
};
use super::user_permissions;
pub type WsSender = Arc<Mutex<Sender<(Update, String)>>>;
pub type WsSender = Arc<Mutex<Sender<Update>>>;
pub type WsSenderExtension = Extension<WsSender>;
pub type WsReciever = Receiver<(Update, String)>;
pub type WsReciever = Receiver<Update>;
pub type WsRecieverExtension = Extension<WsReciever>;
#[derive(Serialize)]
struct UpdateMsg {
#[serde(rename = "type")]
msg_type: MsgType,
update: Update,
}
#[derive(Serialize)]
#[serde(rename_all = "UPPERCASE")]
enum MsgType {
Login,
Close,
Update,
}
impl UpdateMsg {
fn from_update(update: Update) -> Message {
let msg = UpdateMsg {
msg_type: MsgType::Update,
update,
};
Message::Text(serde_json::to_string(&msg).unwrap())
}
}
pub fn make_update_ws_sender_reciver() -> (WsSenderExtension, WsRecieverExtension) {
let (sender, reciever) = watch::channel(Default::default());
(
@@ -43,38 +74,72 @@ pub async fn ws_handler(
) -> impl IntoResponse {
ws.on_upgrade(|socket| async move {
match login(socket, &jwt_client, &db_client).await {
Some((mut socket, user_id)) => loop {
let _ = reciever.changed().await;
let user = db_client.users.find_one_by_id(&user_id).await;
if user.is_err()
|| user.as_ref().unwrap().is_none()
|| !user.as_ref().unwrap().as_ref().unwrap().enabled
{
let _ = socket
.send(Message::Text(json!({ "type": "INVALID_USER" }).to_string()))
.await;
let _ = socket.close().await;
return;
}
let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return
let (update, update_msg) = reciever.borrow().to_owned();
match user_can_see_update(
&user,
&user_id,
update.entity_type,
&update.entity_id,
&db_client,
)
.await
{
Ok(_) => {
let _ = socket.send(Message::Text(update_msg)).await;
Some((mut socket, user_id)) => {
let (ws_sender, mut ws_reciever) = socket.split();
let ws_sender = Arc::new(Mutex::new(ws_sender));
let ws_sender_clone = ws_sender.clone();
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
loop {
select! {
_ = cancel_clone.cancelled() => break,
_ = reciever.changed() => {}
};
let user = db_client.users.find_one_by_id(&user_id).await;
if user.is_err()
|| user.as_ref().unwrap().is_none()
|| !user.as_ref().unwrap().as_ref().unwrap().enabled
{
let _ = ws_sender
.lock()
.await
.send(Message::Text(json!({ "type": "INVALID_USER" }).to_string()))
.await;
let _ = ws_sender.lock().await.close().await;
return;
}
let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return
let update = reciever.borrow().to_owned();
match user_can_see_update(
&user,
&user_id,
update.entity_type,
&update.entity_id,
&db_client,
)
.await
{
Ok(_) => {
let _ = ws_sender
.lock()
.await
.send(UpdateMsg::from_update(update))
.await;
}
Err(_) => {
// make these error visible in some way
}
}
}
Err(_) => {
// make these error visible in some way
});
while let Some(msg) = ws_reciever.next().await {
match msg {
Ok(msg) => match msg {
Message::Close(_) => {
cancel.cancel();
return;
}
_ => {}
},
Err(_) => {
cancel.cancel();
return;
}
}
}
},
}
None => {}
}
})
@@ -89,7 +154,14 @@ async fn login(
match jwt {
Ok(jwt) => match jwt {
Message::Text(jwt) => match jwt_client.auth_jwt(&jwt, db_client).await {
Ok(user) => Some((socket, user.id)),
Ok(user) => {
socket
.send(Message::Text(
json!({ "type": "LOGIN", "success": true }).to_string(),
))
.await;
Some((socket, user.id))
}
Err(e) => {
let _ = socket
.send(Message::Text(format!(
@@ -100,7 +172,13 @@ async fn login(
None
}
},
_ => None,
msg => {
let _ = socket
.send(Message::Text(format!("invalid login msg: {msg:#?}")))
.await;
let _ = socket.close().await;
None
}
},
Err(e) => {
let _ = socket

View File

@@ -45,12 +45,16 @@ pub struct Server {
pub id: Option<ObjectId>,
pub name: String,
pub address: String,
#[serde(default)]
pub permissions: PermissionsMap,
#[serde(default)]
pub to_notify: Vec<String>,
#[serde(default = "default_cpu_alert")]
pub cpu_alert: f64,
#[serde(default = "default_mem_alert")]
pub mem_alert: f64,
#[serde(default = "default_disk_alert")]
pub disk_alert: f64,
pub is_builder: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub stats_interval: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
@@ -67,10 +71,9 @@ impl Default for Server {
address: String::new(),
permissions: HashMap::new(),
to_notify: Vec::new(),
cpu_alert: 50.0,
mem_alert: 75.0,
disk_alert: 75.0,
is_builder: false,
cpu_alert: default_cpu_alert(),
mem_alert: default_mem_alert(),
disk_alert: default_disk_alert(),
stats_interval: None,
region: None,
instance_id: None,
@@ -78,6 +81,18 @@ impl Default for Server {
}
}
fn default_cpu_alert() -> f64 {
50.0
}
fn default_mem_alert() -> f64 {
75.0
}
fn default_disk_alert() -> f64 {
75.0
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Deployment {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]