broadcast updates to clients, and implement create build

This commit is contained in:
beckerinj
2022-11-20 19:52:23 -05:00
parent 6f03bd5827
commit 052756140b
13 changed files with 386 additions and 197 deletions

1
Cargo.lock generated
View File

@@ -1222,6 +1222,7 @@ dependencies = [
"jwt",
"mungos",
"oauth2",
"periphery_client",
"serde",
"serde_derive",
"serde_json",

View File

@@ -9,6 +9,7 @@ edition = "2021"
helpers = { path = "../lib/helpers" }
db = { package = "db_client", path = "../lib/db_client" }
types = { path = "../lib/types" }
periphery = { package = "periphery_client", path = "../lib/periphery_client" }
tokio = { version = "1.21", features = ["full"] }
axum = { version = "0.5", features = ["ws", "json"] }
axum-extra = { version = "0.3", features = ["spa"] }

60
core/src/api/build.rs Normal file
View File

@@ -0,0 +1,60 @@
use std::str::FromStr;
use anyhow::{anyhow, Context};
use async_timing_util::unix_timestamp_ms;
use axum::{Extension, Json, Router};
use db::DbExtension;
use mungos::ObjectId;
use types::{Build, EntityType, Operation, PermissionLevel, Update};
use crate::{auth::RequestUserExtension, ws::update};
use super::PeripheryExtension;
pub fn router() -> Router {
Router::new()
}
async fn create(
Extension(db): DbExtension,
Extension(user): RequestUserExtension,
Extension(update_ws): update::WsSenderExtension,
Json(mut build): Json<Build>,
) -> anyhow::Result<()> {
let build_server = db
.servers
.find_one_by_id(&build.server_id)
.await
.context("failed at query to find server")?
.ok_or(anyhow!("did not find server with server_id given on build"))?;
let permissions = *build_server.permissions.get(&user.id).ok_or(anyhow!(
"user does not have permissions to create build on this server"
))?;
if permissions != PermissionLevel::Write {
return Err(anyhow!(
"user does not have permissions to create build on this server"
));
}
build.permissions = [(user.id.clone(), PermissionLevel::Write)]
.into_iter()
.collect();
let start_ts = unix_timestamp_ms() as i64;
let build_id = db.builds.create_one(build).await?;
let mut update = Update {
entity_type: EntityType::Build,
entity_id: Some(build_id),
operation: Operation::CreateBuild,
start_ts,
end_ts: unix_timestamp_ms() as i64,
..Default::default()
};
let update_id = db
.updates
.create_one(update.clone())
.await
.context("failed to insert update into db. the create build process was completed.")?;
update.id = Some(ObjectId::from_str(&update_id).context("failed at attaching update id")?);
let update_msg = serde_json::to_string(&update).unwrap();
let _ = update_ws.lock().await.send((update, update_msg));
Ok(())
}

View File

@@ -1,13 +1,17 @@
use std::sync::Arc;
use anyhow::anyhow;
use axum::{http::StatusCode, middleware, routing::get, Extension, Json, Router};
use axum::{middleware, routing::get, Extension, Json, Router};
use db::DbExtension;
use helpers::handle_anyhow_error;
use types::{User, UserId};
use periphery::PeripheryClient;
use types::User;
use crate::{
auth::{auth_request, RequestUserExtension},
ResponseResult,
};
use crate::auth::{auth_request, RequestUserExtension};
mod build;
type PeripheryExtension = Extension<Arc<PeripheryClient>>;
pub fn router() -> Router {
Router::new()
@@ -15,6 +19,8 @@ pub fn router() -> Router {
"/user",
get(|user, db| async { get_user(user, db).await.map_err(handle_anyhow_error) }),
)
.nest("/build", build::router())
.layer(Extension(Arc::new(PeripheryClient::new())))
.layer(middleware::from_fn(auth_request))
}

View File

@@ -1,14 +1,14 @@
use std::sync::Arc;
use anyhow::{anyhow, Context};
use async_timing_util::{get_timelength_in_ms, unix_timestamp_ms, Timelength};
use async_timing_util::{get_timelength_in_ms, unix_timestamp_ms};
use axum::{body::Body, http::Request, Extension};
use db::DbClient;
use hmac::{Hmac, Mac};
use jwt::{SignWithKey, VerifyWithKey};
use mungos::{Deserialize, Serialize};
use sha2::Sha256;
use types::{CoreConfig, User, UserId};
use types::{CoreConfig, UserId};
pub type JwtExtension = Extension<Arc<JwtClient>>;
pub type RequestUserExtension = Extension<Arc<RequestUser>>;

View File

@@ -1,87 +1,87 @@
use anyhow::{anyhow, Context};
use axum::{extract::Json, routing::post, Extension, Router};
use db::{DbClient, DbExtension};
use helpers::handle_anyhow_error;
use mungos::{doc, Deserialize};
use types::{User, UserCredentials};
use super::jwt::JwtExtension;
const BCRYPT_COST: u32 = 10;
pub fn router() -> Router {
Router::new()
.route(
"/create_user",
post(|db, jwt, body| async {
create_user_handler(db, jwt, body)
.await
.map_err(handle_anyhow_error)
}),
)
.route(
"/login",
post(|db, jwt, body| async {
login_handler(db, jwt, body)
.await
.map_err(handle_anyhow_error)
}),
)
}
async fn create_user_handler(
Extension(db): DbExtension,
Extension(jwt): JwtExtension,
Json(UserCredentials { username, password }): Json<UserCredentials>,
) -> anyhow::Result<String> {
let password = bcrypt::hash(password, BCRYPT_COST).context("failed to hash password")?;
let user = User {
username,
password: Some(password),
..Default::default()
};
let user_id = db
.users
.create_one(user)
.await
.context("failed to create user")?;
let jwt = jwt
.generate(user_id)
.context("failed to generate jwt for user")?;
Ok(jwt)
}
async fn login_handler(
Extension(db): DbExtension,
Extension(jwt): JwtExtension,
Json(UserCredentials { username, password }): Json<UserCredentials>,
) -> anyhow::Result<String> {
let user = db
.users
.find_one(doc! { "username": &username }, None)
.await
.context("failed at mongo query")?
.ok_or(anyhow!("did not find user with username {username}"))?;
let user_pw_hash = user
.password
.ok_or(anyhow!("invalid login, user does not have password login"))?;
let verified = bcrypt::verify(password, &user_pw_hash).context("failed at verify password")?;
if !verified {
return Err(anyhow!("invalid credentials"));
}
let user_id = user.id.ok_or(anyhow!("user does not have id"))?.to_string();
let jwt = jwt
.generate(user_id)
.context("failed at generating jwt for user")?;
Ok(jwt)
}
use anyhow::{anyhow, Context};
use axum::{extract::Json, routing::post, Extension, Router};
use db::DbExtension;
use helpers::handle_anyhow_error;
use mungos::doc;
use types::{User, UserCredentials};
use super::jwt::JwtExtension;
const BCRYPT_COST: u32 = 10;
pub fn router() -> Router {
Router::new()
.route(
"/create_user",
post(|db, jwt, body| async {
create_user_handler(db, jwt, body)
.await
.map_err(handle_anyhow_error)
}),
)
.route(
"/login",
post(|db, jwt, body| async {
login_handler(db, jwt, body)
.await
.map_err(handle_anyhow_error)
}),
)
}
async fn create_user_handler(
Extension(db): DbExtension,
Extension(jwt): JwtExtension,
Json(UserCredentials { username, password }): Json<UserCredentials>,
) -> anyhow::Result<String> {
let password = bcrypt::hash(password, BCRYPT_COST).context("failed to hash password")?;
let user = User {
username,
password: Some(password),
..Default::default()
};
let user_id = db
.users
.create_one(user)
.await
.context("failed to create user")?;
let jwt = jwt
.generate(user_id)
.context("failed to generate jwt for user")?;
Ok(jwt)
}
async fn login_handler(
Extension(db): DbExtension,
Extension(jwt): JwtExtension,
Json(UserCredentials { username, password }): Json<UserCredentials>,
) -> anyhow::Result<String> {
let user = db
.users
.find_one(doc! { "username": &username }, None)
.await
.context("failed at mongo query")?
.ok_or(anyhow!("did not find user with username {username}"))?;
let user_pw_hash = user
.password
.ok_or(anyhow!("invalid login, user does not have password login"))?;
let verified = bcrypt::verify(password, &user_pw_hash).context("failed at verify password")?;
if !verified {
return Err(anyhow!("invalid credentials"));
}
let user_id = user.id.ok_or(anyhow!("user does not have id"))?.to_string();
let jwt = jwt
.generate(user_id)
.context("failed at generating jwt for user")?;
Ok(jwt)
}

View File

@@ -1,14 +1,13 @@
use std::sync::Arc;
use anyhow::Context;
use axum::{
body::Body,
http::{header, Request, StatusCode},
http::{Request, StatusCode},
middleware::Next,
response::{IntoResponse, Response},
response::Response,
Router,
};
use types::{CoreConfig, UserId};
use types::CoreConfig;
mod github;
mod jwt;

View File

@@ -1,15 +1,6 @@
use std::{
env,
fs::File,
net::{IpAddr, SocketAddr},
str::FromStr,
time::Duration,
};
use db::{DbClient, DbExtension};
use dotenv::dotenv;
use helpers::parse_config_file;
use mungos::{Deserialize, Mungos};
use mungos::Deserialize;
use types::CoreConfig;
#[derive(Deserialize, Debug)]

View File

@@ -1,5 +1,3 @@
use types::CoreConfig;
#[macro_export]
macro_rules! response {
($x:expr) => {

View File

@@ -1,10 +1,10 @@
#![allow(unused)]
use ::helpers::{docker::DockerClient, get_socket_addr};
use ::helpers::get_socket_addr;
use auth::JwtClient;
use axum::{http::StatusCode, Router, routing::get};
use axum::Router;
use db::DbClient;
use ws::{make_ws_sender_reciver, ws_handler};
use ws::make_update_ws_sender_reciver;
mod api;
mod auth;
@@ -12,18 +12,16 @@ mod config;
mod helpers;
mod ws;
type ResponseResult<T> = Result<T, (StatusCode, String)>;
#[tokio::main]
async fn main() {
let config = config::load();
let (sender, reciever) = make_ws_sender_reciver();
let (sender, reciever) = make_update_ws_sender_reciver();
let app = Router::new()
.nest("/api", api::router())
.nest("/auth", auth::router(&config))
.route("/ws", get(ws_handler))
.nest("/ws", ws::router())
.layer(sender)
.layer(reciever)
.layer(DbClient::extension(config.mongo.clone()).await)

View File

@@ -1,84 +1,21 @@
use std::sync::Arc;
use anyhow::anyhow;
use axum::{routing::get, Router};
use types::{PermissionLevel, PermissionsMap};
use anyhow::{anyhow, Context};
use axum::{
extract::{
ws::{Message, WebSocket},
WebSocketUpgrade,
},
http::StatusCode,
response::IntoResponse,
Extension,
};
use db::{DbClient, DbExtension};
use serde_json::Value;
pub mod update;
use crate::auth::{JwtClient, JwtExtension};
use tokio::sync::watch::{self, error::SendError, Receiver, Sender};
pub use update::make_update_ws_sender_reciver;
pub type WsSender = Arc<Sender<String>>;
pub type WsSenderExtension = Extension<WsSender>;
pub type WsReciever = Receiver<String>;
pub type WsRecieverExtension = Extension<WsReciever>;
pub fn make_ws_sender_reciver() -> (WsSenderExtension, WsRecieverExtension) {
let (sender, reciever) = watch::channel(String::new());
(Extension(Arc::new(sender)), Extension(reciever))
pub fn router() -> Router {
Router::new().route("/update", get(update::ws_handler))
}
pub async fn ws_handler(
Extension(jwt_client): JwtExtension,
Extension(db_client): DbExtension,
Extension(mut reciever): WsRecieverExtension,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(|socket| async move {
match login(socket, &jwt_client, &db_client).await {
Some((ws, user_id)) => {
loop {
reciever.changed().await;
let msg = serde_json::from_str::<Value>(&reciever.borrow()).unwrap();
todo!()
}
}
None => {}
}
})
}
async fn login(
mut socket: WebSocket,
jwt_client: &JwtClient,
db_client: &DbClient,
) -> Option<(WebSocket, String)> {
if let Some(jwt) = socket.recv().await {
match jwt {
Ok(jwt) => match jwt {
Message::Text(jwt) => match jwt_client.auth_jwt(&jwt, db_client).await {
Ok(user) => Some((socket, user.id)),
Err(e) => {
let _ = socket
.send(Message::Text(format!(
"failed to authenticate user | {e:#?}"
)))
.await;
let _ = socket.close().await;
None
}
},
_ => None,
},
Err(e) => {
let _ = socket
.send(Message::Text(format!("failed to get message: {e:#?}")))
.await;
let _ = socket.close().await;
None
}
}
} else {
None
fn user_permissions(user_id: &str, permissions: &PermissionsMap) -> anyhow::Result<()> {
let permission_level = *permissions
.get(user_id)
.ok_or(anyhow!("user has no permissions"))?;
match permission_level {
PermissionLevel::None => Err(anyhow!("user has None permission level")),
_ => Ok(()),
}
}

View File

@@ -0,0 +1,172 @@
use std::sync::Arc;
use anyhow::{anyhow, Context};
use axum::{
extract::{
ws::{Message, WebSocket},
WebSocketUpgrade,
},
response::IntoResponse,
Extension,
};
use db::{DbClient, DbExtension};
use serde_json::json;
use types::{EntityType, Update, User};
use crate::auth::{JwtClient, JwtExtension};
use tokio::sync::{
watch::{self, Receiver, Sender},
Mutex,
};
use super::user_permissions;
pub type WsSender = Arc<Mutex<Sender<(Update, String)>>>;
pub type WsSenderExtension = Extension<WsSender>;
pub type WsReciever = Receiver<(Update, String)>;
pub type WsRecieverExtension = Extension<WsReciever>;
pub fn make_update_ws_sender_reciver() -> (WsSenderExtension, WsRecieverExtension) {
let (sender, reciever) = watch::channel(Default::default());
(
Extension(Arc::new(Mutex::new((sender)))),
Extension(reciever),
)
}
pub async fn ws_handler(
Extension(jwt_client): JwtExtension,
Extension(db_client): DbExtension,
Extension(mut reciever): WsRecieverExtension,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(|socket| async move {
match login(socket, &jwt_client, &db_client).await {
Some((mut socket, user_id)) => loop {
let _ = reciever.changed().await;
let user = db_client.users.find_one_by_id(&user_id).await;
if user.is_err()
|| user.as_ref().unwrap().is_none()
|| !user.as_ref().unwrap().as_ref().unwrap().enabled
{
let _ = socket
.send(Message::Text(json!({ "type": "INVALID_USER" }).to_string()))
.await;
let _ = socket.close().await;
return;
}
let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return
let (update, update_msg) = reciever.borrow().to_owned();
match user_can_see_update(
&user,
&user_id,
update.entity_type,
&update.entity_id,
&db_client,
)
.await
{
Ok(_) => {
let _ = socket.send(Message::Text(update_msg)).await;
}
Err(_) => {
// make these error visible in some way
}
}
},
None => {}
}
})
}
async fn login(
mut socket: WebSocket,
jwt_client: &JwtClient,
db_client: &DbClient,
) -> Option<(WebSocket, String)> {
if let Some(jwt) = socket.recv().await {
match jwt {
Ok(jwt) => match jwt {
Message::Text(jwt) => match jwt_client.auth_jwt(&jwt, db_client).await {
Ok(user) => Some((socket, user.id)),
Err(e) => {
let _ = socket
.send(Message::Text(format!(
"failed to authenticate user | {e:#?}"
)))
.await;
let _ = socket.close().await;
None
}
},
_ => None,
},
Err(e) => {
let _ = socket
.send(Message::Text(format!("failed to get jwt message: {e:#?}")))
.await;
let _ = socket.close().await;
None
}
}
} else {
None
}
}
async fn user_can_see_update(
user: &User,
user_id: &str,
entity_type: EntityType,
entity_id: &Option<String>,
db_client: &DbClient,
) -> anyhow::Result<()> {
match entity_type {
EntityType::System => {
if user.admin {
Ok(())
} else {
Err(anyhow!("user not admin, can't recieve system updates"))
}
}
EntityType::Server => {
let server_id = entity_id
.as_ref()
.ok_or(anyhow!("must pass entity_id for {entity_type}"))?;
let server = db_client
.servers
.find_one_by_id(server_id)
.await
.context(format!("failed at query to get server at {server_id}"))?
.ok_or(anyhow!("did not server with id {server_id}"))?;
user_permissions(user_id, &server.permissions)
}
EntityType::Deployment => {
let deployment_id = entity_id
.as_ref()
.ok_or(anyhow!("must pass entity_id for {entity_type}"))?;
let deployment = db_client
.deployments
.find_one_by_id(deployment_id)
.await
.context(format!(
"failed at query to get deployment at {deployment_id}"
))?
.ok_or(anyhow!("did not deployment with id {deployment_id}"))?;
user_permissions(user_id, &deployment.permissions)
}
EntityType::Build => {
let build_id = entity_id
.as_ref()
.ok_or(anyhow!("must pass entity_id for {entity_type}"))?;
let build = db_client
.builds
.find_one_by_id(build_id)
.await
.context(format!("failed at query to get build at {build_id}"))?
.ok_or(anyhow!("did not build with id {build_id}"))?;
user_permissions(user_id, &build.permissions)
}
}
}

View File

@@ -104,6 +104,7 @@ pub struct Build {
pub id: Option<ObjectId>,
pub name: String,
pub permissions: PermissionsMap,
pub server_id: String, // server which this image should be built on
pub version: Version,
// git related
@@ -128,7 +129,7 @@ pub struct BuildRecord {
pub version: Option<Version>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct Update {
#[serde(rename = "_id", skip_serializing_if = "Option::is_none")]
pub id: Option<ObjectId>,
@@ -371,20 +372,38 @@ pub enum EntityType {
Server,
}
impl Default for EntityType {
fn default() -> Self {
EntityType::System
}
}
#[derive(Serialize, Deserialize, Debug, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum Operation {
// do nothing
None,
// server
CreateServer,
UpdateServer,
DeleteServer,
PruneImagesServer,
PruneContainersServer,
PruneNetworksServer,
// build
CreateBuild,
UpdateBuild,
DeleteBuild,
BuildBuild,
RecloneBuild,
// deployment
CreateDeployment,
UpdateDeployment,
DeleteDeployment,
DeployDeployment,
StopDeployment,
StartDeployment,
@@ -392,10 +411,17 @@ pub enum Operation {
RecloneDeployment,
}
impl Default for Operation {
fn default() -> Self {
Operation::None
}
}
#[derive(Serialize, Deserialize, Debug, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum PermissionLevel {
None,
Read,
Write,
}
@@ -429,4 +455,4 @@ impl Default for RestartMode {
fn default() -> RestartMode {
RestartMode::NoRestart
}
}
}