diff --git a/Cargo.lock b/Cargo.lock index 56ad7e0d2..4f7a47a0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1222,6 +1222,7 @@ dependencies = [ "jwt", "mungos", "oauth2", + "periphery_client", "serde", "serde_derive", "serde_json", diff --git a/core/Cargo.toml b/core/Cargo.toml index af9c85297..3ddd9c1cf 100644 --- a/core/Cargo.toml +++ b/core/Cargo.toml @@ -9,6 +9,7 @@ edition = "2021" helpers = { path = "../lib/helpers" } db = { package = "db_client", path = "../lib/db_client" } types = { path = "../lib/types" } +periphery = { package = "periphery_client", path = "../lib/periphery_client" } tokio = { version = "1.21", features = ["full"] } axum = { version = "0.5", features = ["ws", "json"] } axum-extra = { version = "0.3", features = ["spa"] } diff --git a/core/src/api/build.rs b/core/src/api/build.rs new file mode 100644 index 000000000..68b3d6791 --- /dev/null +++ b/core/src/api/build.rs @@ -0,0 +1,60 @@ +use std::str::FromStr; + +use anyhow::{anyhow, Context}; +use async_timing_util::unix_timestamp_ms; +use axum::{Extension, Json, Router}; +use db::DbExtension; +use mungos::ObjectId; +use types::{Build, EntityType, Operation, PermissionLevel, Update}; + +use crate::{auth::RequestUserExtension, ws::update}; + +use super::PeripheryExtension; + +pub fn router() -> Router { + Router::new() +} + +async fn create( + Extension(db): DbExtension, + Extension(user): RequestUserExtension, + Extension(update_ws): update::WsSenderExtension, + Json(mut build): Json, +) -> anyhow::Result<()> { + let build_server = db + .servers + .find_one_by_id(&build.server_id) + .await + .context("failed at query to find server")? + .ok_or(anyhow!("did not find server with server_id given on build"))?; + let permissions = *build_server.permissions.get(&user.id).ok_or(anyhow!( + "user does not have permissions to create build on this server" + ))?; + if permissions != PermissionLevel::Write { + return Err(anyhow!( + "user does not have permissions to create build on this server" + )); + } + build.permissions = [(user.id.clone(), PermissionLevel::Write)] + .into_iter() + .collect(); + let start_ts = unix_timestamp_ms() as i64; + let build_id = db.builds.create_one(build).await?; + let mut update = Update { + entity_type: EntityType::Build, + entity_id: Some(build_id), + operation: Operation::CreateBuild, + start_ts, + end_ts: unix_timestamp_ms() as i64, + ..Default::default() + }; + let update_id = db + .updates + .create_one(update.clone()) + .await + .context("failed to insert update into db. the create build process was completed.")?; + update.id = Some(ObjectId::from_str(&update_id).context("failed at attaching update id")?); + let update_msg = serde_json::to_string(&update).unwrap(); + let _ = update_ws.lock().await.send((update, update_msg)); + Ok(()) +} diff --git a/core/src/api/mod.rs b/core/src/api/mod.rs index 9c15e1cdc..c46d419ff 100644 --- a/core/src/api/mod.rs +++ b/core/src/api/mod.rs @@ -1,13 +1,17 @@ +use std::sync::Arc; + use anyhow::anyhow; -use axum::{http::StatusCode, middleware, routing::get, Extension, Json, Router}; +use axum::{middleware, routing::get, Extension, Json, Router}; use db::DbExtension; use helpers::handle_anyhow_error; -use types::{User, UserId}; +use periphery::PeripheryClient; +use types::User; -use crate::{ - auth::{auth_request, RequestUserExtension}, - ResponseResult, -}; +use crate::auth::{auth_request, RequestUserExtension}; + +mod build; + +type PeripheryExtension = Extension>; pub fn router() -> Router { Router::new() @@ -15,6 +19,8 @@ pub fn router() -> Router { "/user", get(|user, db| async { get_user(user, db).await.map_err(handle_anyhow_error) }), ) + .nest("/build", build::router()) + .layer(Extension(Arc::new(PeripheryClient::new()))) .layer(middleware::from_fn(auth_request)) } diff --git a/core/src/auth/jwt.rs b/core/src/auth/jwt.rs index 10d334c8b..b243ea290 100644 --- a/core/src/auth/jwt.rs +++ b/core/src/auth/jwt.rs @@ -1,14 +1,14 @@ use std::sync::Arc; use anyhow::{anyhow, Context}; -use async_timing_util::{get_timelength_in_ms, unix_timestamp_ms, Timelength}; +use async_timing_util::{get_timelength_in_ms, unix_timestamp_ms}; use axum::{body::Body, http::Request, Extension}; use db::DbClient; use hmac::{Hmac, Mac}; use jwt::{SignWithKey, VerifyWithKey}; use mungos::{Deserialize, Serialize}; use sha2::Sha256; -use types::{CoreConfig, User, UserId}; +use types::{CoreConfig, UserId}; pub type JwtExtension = Extension>; pub type RequestUserExtension = Extension>; diff --git a/core/src/auth/local.rs b/core/src/auth/local.rs index 6cbf05c4b..4e4e01f01 100644 --- a/core/src/auth/local.rs +++ b/core/src/auth/local.rs @@ -1,87 +1,87 @@ -use anyhow::{anyhow, Context}; -use axum::{extract::Json, routing::post, Extension, Router}; -use db::{DbClient, DbExtension}; -use helpers::handle_anyhow_error; -use mungos::{doc, Deserialize}; -use types::{User, UserCredentials}; - -use super::jwt::JwtExtension; - -const BCRYPT_COST: u32 = 10; - -pub fn router() -> Router { - Router::new() - .route( - "/create_user", - post(|db, jwt, body| async { - create_user_handler(db, jwt, body) - .await - .map_err(handle_anyhow_error) - }), - ) - .route( - "/login", - post(|db, jwt, body| async { - login_handler(db, jwt, body) - .await - .map_err(handle_anyhow_error) - }), - ) -} - -async fn create_user_handler( - Extension(db): DbExtension, - Extension(jwt): JwtExtension, - Json(UserCredentials { username, password }): Json, -) -> anyhow::Result { - let password = bcrypt::hash(password, BCRYPT_COST).context("failed to hash password")?; - - let user = User { - username, - password: Some(password), - ..Default::default() - }; - - let user_id = db - .users - .create_one(user) - .await - .context("failed to create user")?; - - let jwt = jwt - .generate(user_id) - .context("failed to generate jwt for user")?; - - Ok(jwt) -} - -async fn login_handler( - Extension(db): DbExtension, - Extension(jwt): JwtExtension, - Json(UserCredentials { username, password }): Json, -) -> anyhow::Result { - let user = db - .users - .find_one(doc! { "username": &username }, None) - .await - .context("failed at mongo query")? - .ok_or(anyhow!("did not find user with username {username}"))?; - - let user_pw_hash = user - .password - .ok_or(anyhow!("invalid login, user does not have password login"))?; - - let verified = bcrypt::verify(password, &user_pw_hash).context("failed at verify password")?; - - if !verified { - return Err(anyhow!("invalid credentials")); - } - - let user_id = user.id.ok_or(anyhow!("user does not have id"))?.to_string(); - - let jwt = jwt - .generate(user_id) - .context("failed at generating jwt for user")?; - - Ok(jwt) -} +use anyhow::{anyhow, Context}; +use axum::{extract::Json, routing::post, Extension, Router}; +use db::DbExtension; +use helpers::handle_anyhow_error; +use mungos::doc; +use types::{User, UserCredentials}; + +use super::jwt::JwtExtension; + +const BCRYPT_COST: u32 = 10; + +pub fn router() -> Router { + Router::new() + .route( + "/create_user", + post(|db, jwt, body| async { + create_user_handler(db, jwt, body) + .await + .map_err(handle_anyhow_error) + }), + ) + .route( + "/login", + post(|db, jwt, body| async { + login_handler(db, jwt, body) + .await + .map_err(handle_anyhow_error) + }), + ) +} + +async fn create_user_handler( + Extension(db): DbExtension, + Extension(jwt): JwtExtension, + Json(UserCredentials { username, password }): Json, +) -> anyhow::Result { + let password = bcrypt::hash(password, BCRYPT_COST).context("failed to hash password")?; + + let user = User { + username, + password: Some(password), + ..Default::default() + }; + + let user_id = db + .users + .create_one(user) + .await + .context("failed to create user")?; + + let jwt = jwt + .generate(user_id) + .context("failed to generate jwt for user")?; + + Ok(jwt) +} + +async fn login_handler( + Extension(db): DbExtension, + Extension(jwt): JwtExtension, + Json(UserCredentials { username, password }): Json, +) -> anyhow::Result { + let user = db + .users + .find_one(doc! { "username": &username }, None) + .await + .context("failed at mongo query")? + .ok_or(anyhow!("did not find user with username {username}"))?; + + let user_pw_hash = user + .password + .ok_or(anyhow!("invalid login, user does not have password login"))?; + + let verified = bcrypt::verify(password, &user_pw_hash).context("failed at verify password")?; + + if !verified { + return Err(anyhow!("invalid credentials")); + } + + let user_id = user.id.ok_or(anyhow!("user does not have id"))?.to_string(); + + let jwt = jwt + .generate(user_id) + .context("failed at generating jwt for user")?; + + Ok(jwt) +} diff --git a/core/src/auth/mod.rs b/core/src/auth/mod.rs index 7fb869d00..457cb04eb 100644 --- a/core/src/auth/mod.rs +++ b/core/src/auth/mod.rs @@ -1,14 +1,13 @@ use std::sync::Arc; -use anyhow::Context; use axum::{ body::Body, - http::{header, Request, StatusCode}, + http::{Request, StatusCode}, middleware::Next, - response::{IntoResponse, Response}, + response::Response, Router, }; -use types::{CoreConfig, UserId}; +use types::CoreConfig; mod github; mod jwt; diff --git a/core/src/config.rs b/core/src/config.rs index 76d2cb05f..6dd6c84c3 100644 --- a/core/src/config.rs +++ b/core/src/config.rs @@ -1,15 +1,6 @@ -use std::{ - env, - fs::File, - net::{IpAddr, SocketAddr}, - str::FromStr, - time::Duration, -}; - -use db::{DbClient, DbExtension}; use dotenv::dotenv; use helpers::parse_config_file; -use mungos::{Deserialize, Mungos}; +use mungos::Deserialize; use types::CoreConfig; #[derive(Deserialize, Debug)] diff --git a/core/src/helpers.rs b/core/src/helpers.rs index e5081390b..9fadd9b54 100644 --- a/core/src/helpers.rs +++ b/core/src/helpers.rs @@ -1,5 +1,3 @@ -use types::CoreConfig; - #[macro_export] macro_rules! response { ($x:expr) => { diff --git a/core/src/main.rs b/core/src/main.rs index 7f6fbc883..3a4c564cf 100644 --- a/core/src/main.rs +++ b/core/src/main.rs @@ -1,10 +1,10 @@ #![allow(unused)] -use ::helpers::{docker::DockerClient, get_socket_addr}; +use ::helpers::get_socket_addr; use auth::JwtClient; -use axum::{http::StatusCode, Router, routing::get}; +use axum::Router; use db::DbClient; -use ws::{make_ws_sender_reciver, ws_handler}; +use ws::make_update_ws_sender_reciver; mod api; mod auth; @@ -12,18 +12,16 @@ mod config; mod helpers; mod ws; -type ResponseResult = Result; - #[tokio::main] async fn main() { let config = config::load(); - let (sender, reciever) = make_ws_sender_reciver(); + let (sender, reciever) = make_update_ws_sender_reciver(); let app = Router::new() .nest("/api", api::router()) .nest("/auth", auth::router(&config)) - .route("/ws", get(ws_handler)) + .nest("/ws", ws::router()) .layer(sender) .layer(reciever) .layer(DbClient::extension(config.mongo.clone()).await) diff --git a/core/src/ws/mod.rs b/core/src/ws/mod.rs index 630f34ccb..52d6a7e62 100644 --- a/core/src/ws/mod.rs +++ b/core/src/ws/mod.rs @@ -1,84 +1,21 @@ -use std::sync::Arc; +use anyhow::anyhow; +use axum::{routing::get, Router}; +use types::{PermissionLevel, PermissionsMap}; -use anyhow::{anyhow, Context}; -use axum::{ - extract::{ - ws::{Message, WebSocket}, - WebSocketUpgrade, - }, - http::StatusCode, - response::IntoResponse, - Extension, -}; -use db::{DbClient, DbExtension}; -use serde_json::Value; +pub mod update; -use crate::auth::{JwtClient, JwtExtension}; -use tokio::sync::watch::{self, error::SendError, Receiver, Sender}; +pub use update::make_update_ws_sender_reciver; -pub type WsSender = Arc>; -pub type WsSenderExtension = Extension; - -pub type WsReciever = Receiver; -pub type WsRecieverExtension = Extension; - -pub fn make_ws_sender_reciver() -> (WsSenderExtension, WsRecieverExtension) { - let (sender, reciever) = watch::channel(String::new()); - (Extension(Arc::new(sender)), Extension(reciever)) +pub fn router() -> Router { + Router::new().route("/update", get(update::ws_handler)) } -pub async fn ws_handler( - Extension(jwt_client): JwtExtension, - Extension(db_client): DbExtension, - Extension(mut reciever): WsRecieverExtension, - ws: WebSocketUpgrade, -) -> impl IntoResponse { - ws.on_upgrade(|socket| async move { - match login(socket, &jwt_client, &db_client).await { - Some((ws, user_id)) => { - loop { - reciever.changed().await; - let msg = serde_json::from_str::(&reciever.borrow()).unwrap(); - - todo!() - } - } - None => {} - } - }) -} - -async fn login( - mut socket: WebSocket, - jwt_client: &JwtClient, - db_client: &DbClient, -) -> Option<(WebSocket, String)> { - if let Some(jwt) = socket.recv().await { - match jwt { - Ok(jwt) => match jwt { - Message::Text(jwt) => match jwt_client.auth_jwt(&jwt, db_client).await { - Ok(user) => Some((socket, user.id)), - Err(e) => { - let _ = socket - .send(Message::Text(format!( - "failed to authenticate user | {e:#?}" - ))) - .await; - let _ = socket.close().await; - None - } - }, - _ => None, - }, - Err(e) => { - let _ = socket - .send(Message::Text(format!("failed to get message: {e:#?}"))) - .await; - let _ = socket.close().await; - None - } - } - } else { - None +fn user_permissions(user_id: &str, permissions: &PermissionsMap) -> anyhow::Result<()> { + let permission_level = *permissions + .get(user_id) + .ok_or(anyhow!("user has no permissions"))?; + match permission_level { + PermissionLevel::None => Err(anyhow!("user has None permission level")), + _ => Ok(()), } } diff --git a/core/src/ws/update.rs b/core/src/ws/update.rs index e69de29bb..e695cfd1c 100644 --- a/core/src/ws/update.rs +++ b/core/src/ws/update.rs @@ -0,0 +1,172 @@ +use std::sync::Arc; + +use anyhow::{anyhow, Context}; +use axum::{ + extract::{ + ws::{Message, WebSocket}, + WebSocketUpgrade, + }, + response::IntoResponse, + Extension, +}; +use db::{DbClient, DbExtension}; +use serde_json::json; +use types::{EntityType, Update, User}; + +use crate::auth::{JwtClient, JwtExtension}; +use tokio::sync::{ + watch::{self, Receiver, Sender}, + Mutex, +}; + +use super::user_permissions; + +pub type WsSender = Arc>>; +pub type WsSenderExtension = Extension; + +pub type WsReciever = Receiver<(Update, String)>; +pub type WsRecieverExtension = Extension; + +pub fn make_update_ws_sender_reciver() -> (WsSenderExtension, WsRecieverExtension) { + let (sender, reciever) = watch::channel(Default::default()); + ( + Extension(Arc::new(Mutex::new((sender)))), + Extension(reciever), + ) +} + +pub async fn ws_handler( + Extension(jwt_client): JwtExtension, + Extension(db_client): DbExtension, + Extension(mut reciever): WsRecieverExtension, + ws: WebSocketUpgrade, +) -> impl IntoResponse { + ws.on_upgrade(|socket| async move { + match login(socket, &jwt_client, &db_client).await { + Some((mut socket, user_id)) => loop { + let _ = reciever.changed().await; + let user = db_client.users.find_one_by_id(&user_id).await; + if user.is_err() + || user.as_ref().unwrap().is_none() + || !user.as_ref().unwrap().as_ref().unwrap().enabled + { + let _ = socket + .send(Message::Text(json!({ "type": "INVALID_USER" }).to_string())) + .await; + let _ = socket.close().await; + return; + } + let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return + let (update, update_msg) = reciever.borrow().to_owned(); + match user_can_see_update( + &user, + &user_id, + update.entity_type, + &update.entity_id, + &db_client, + ) + .await + { + Ok(_) => { + let _ = socket.send(Message::Text(update_msg)).await; + } + Err(_) => { + // make these error visible in some way + } + } + }, + None => {} + } + }) +} + +async fn login( + mut socket: WebSocket, + jwt_client: &JwtClient, + db_client: &DbClient, +) -> Option<(WebSocket, String)> { + if let Some(jwt) = socket.recv().await { + match jwt { + Ok(jwt) => match jwt { + Message::Text(jwt) => match jwt_client.auth_jwt(&jwt, db_client).await { + Ok(user) => Some((socket, user.id)), + Err(e) => { + let _ = socket + .send(Message::Text(format!( + "failed to authenticate user | {e:#?}" + ))) + .await; + let _ = socket.close().await; + None + } + }, + _ => None, + }, + Err(e) => { + let _ = socket + .send(Message::Text(format!("failed to get jwt message: {e:#?}"))) + .await; + let _ = socket.close().await; + None + } + } + } else { + None + } +} + +async fn user_can_see_update( + user: &User, + user_id: &str, + entity_type: EntityType, + entity_id: &Option, + db_client: &DbClient, +) -> anyhow::Result<()> { + match entity_type { + EntityType::System => { + if user.admin { + Ok(()) + } else { + Err(anyhow!("user not admin, can't recieve system updates")) + } + } + EntityType::Server => { + let server_id = entity_id + .as_ref() + .ok_or(anyhow!("must pass entity_id for {entity_type}"))?; + let server = db_client + .servers + .find_one_by_id(server_id) + .await + .context(format!("failed at query to get server at {server_id}"))? + .ok_or(anyhow!("did not server with id {server_id}"))?; + user_permissions(user_id, &server.permissions) + } + EntityType::Deployment => { + let deployment_id = entity_id + .as_ref() + .ok_or(anyhow!("must pass entity_id for {entity_type}"))?; + let deployment = db_client + .deployments + .find_one_by_id(deployment_id) + .await + .context(format!( + "failed at query to get deployment at {deployment_id}" + ))? + .ok_or(anyhow!("did not deployment with id {deployment_id}"))?; + user_permissions(user_id, &deployment.permissions) + } + EntityType::Build => { + let build_id = entity_id + .as_ref() + .ok_or(anyhow!("must pass entity_id for {entity_type}"))?; + let build = db_client + .builds + .find_one_by_id(build_id) + .await + .context(format!("failed at query to get build at {build_id}"))? + .ok_or(anyhow!("did not build with id {build_id}"))?; + user_permissions(user_id, &build.permissions) + } + } +} diff --git a/lib/types/src/lib.rs b/lib/types/src/lib.rs index 3ad3e4d7f..85c1b456f 100644 --- a/lib/types/src/lib.rs +++ b/lib/types/src/lib.rs @@ -104,6 +104,7 @@ pub struct Build { pub id: Option, pub name: String, pub permissions: PermissionsMap, + pub server_id: String, // server which this image should be built on pub version: Version, // git related @@ -128,7 +129,7 @@ pub struct BuildRecord { pub version: Option, } -#[derive(Serialize, Deserialize, Debug, Clone)] +#[derive(Serialize, Deserialize, Debug, Clone, Default)] pub struct Update { #[serde(rename = "_id", skip_serializing_if = "Option::is_none")] pub id: Option, @@ -371,20 +372,38 @@ pub enum EntityType { Server, } +impl Default for EntityType { + fn default() -> Self { + EntityType::System + } +} + #[derive(Serialize, Deserialize, Debug, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy)] #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] pub enum Operation { + // do nothing + None, + // server + CreateServer, + UpdateServer, + DeleteServer, PruneImagesServer, PruneContainersServer, PruneNetworksServer, // build + CreateBuild, + UpdateBuild, + DeleteBuild, BuildBuild, RecloneBuild, // deployment + CreateDeployment, + UpdateDeployment, + DeleteDeployment, DeployDeployment, StopDeployment, StartDeployment, @@ -392,10 +411,17 @@ pub enum Operation { RecloneDeployment, } +impl Default for Operation { + fn default() -> Self { + Operation::None + } +} + #[derive(Serialize, Deserialize, Debug, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy)] #[serde(rename_all = "snake_case")] #[strum(serialize_all = "snake_case")] pub enum PermissionLevel { + None, Read, Write, } @@ -429,4 +455,4 @@ impl Default for RestartMode { fn default() -> RestartMode { RestartMode::NoRestart } -} \ No newline at end of file +}