implement the ws

This commit is contained in:
mbecker20
2023-06-19 07:34:36 +00:00
parent 636023167d
commit 6320d6fbeb
20 changed files with 1027 additions and 81 deletions

2
Cargo.lock generated
View File

@@ -1459,6 +1459,7 @@ dependencies = [
"bcrypt",
"dotenv",
"envy",
"futures",
"hmac",
"jwt",
"log",
@@ -1477,6 +1478,7 @@ dependencies = [
"simple_logger",
"termination_signal",
"tokio",
"tokio-util",
"tower",
"tower-http",
"urlencoding",

View File

@@ -25,6 +25,9 @@ clap = { version = "4.3", features = ["derive"] }
uuid = { version = "1.3", features = ["v4", "fast-rng", "serde"] }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio-util = "0.7"
futures = "0.3"
futures-util = "0.3"
dotenv = "0.15"
envy = "0.4"
anyhow = "1.0"

View File

@@ -14,6 +14,7 @@ monitor_helpers.workspace = true
periphery_client.workspace = true
# external
tokio.workspace = true
tokio-util.workspace = true
axum.workspace = true
tower.workspace = true
tower-http.workspace = true
@@ -33,6 +34,7 @@ hmac.workspace = true
sha2.workspace = true
bcrypt.workspace = true
async-trait.workspace = true
futures.workspace = true
# mogh
async_timing_util.workspace = true
merge_config_files.workspace = true

View File

@@ -1,14 +1,16 @@
use monitor_types::entities::{build::Build, deployment::Deployment, server::Server, user::User};
use monitor_types::entities::{
build::Build, deployment::Deployment, server::Server, update::Update, user::User,
};
use mungos::{Collection, Indexed, Mungos};
use crate::config::CoreConfig;
pub struct DbClient {
// mungos: Mungos,
pub users: Collection<User>,
pub servers: Collection<Server>,
pub deployments: Collection<Deployment>,
pub builds: Collection<Build>,
pub updates: Collection<Update>,
}
impl DbClient {
@@ -23,6 +25,7 @@ impl DbClient {
servers: Server::collection(&mungos, &config.mongo.db_name, true).await?,
deployments: Deployment::collection(&mungos, &config.mongo.db_name, true).await?,
builds: Build::collection(&mungos, &config.mongo.db_name, true).await?,
updates: Update::collection(&mungos, &config.mongo.db_name, true).await?,
// mungos,
};
Ok(client)

View File

@@ -1,8 +1,19 @@
use std::collections::HashMap;
use anyhow::{anyhow, Context};
use monitor_types::{
entities::{server::Server, user::User, PermissionLevel},
busy::Busy,
entities::{
build::Build,
deployment::Deployment,
server::{self, Server},
update::Update,
user::User,
PermissionLevel,
},
permissioned::Permissioned,
};
use tokio::sync::RwLock;
use crate::{auth::RequestUser, state::State};
@@ -15,6 +26,14 @@ impl State {
.context(format!("no user exists with id {user_id}"))
}
pub async fn get_server(&self, server_id: &str) -> anyhow::Result<Server> {
self.db
.servers
.find_one_by_id(server_id)
.await?
.context(format!("did not find any server with id {server_id}"))
}
pub async fn get_server_check_permissions(
&self,
server_id: &str,
@@ -36,4 +55,179 @@ impl State {
))
}
}
pub async fn get_user_permission_on_server(
&self,
user_id: &str,
server_id: &str,
) -> anyhow::Result<PermissionLevel> {
let server = self.get_server(server_id).await?;
Ok(server.get_user_permissions(user_id))
}
pub async fn get_deployment(&self, deployment_id: &str) -> anyhow::Result<Deployment> {
self.db
.deployments
.find_one_by_id(deployment_id)
.await?
.context(format!(
"did not find any deployment with id {deployment_id}"
))
}
pub async fn get_deployment_check_permissions(
&self,
deployment_id: &str,
user: &RequestUser,
permission_level: PermissionLevel,
) -> anyhow::Result<Deployment> {
let deployment = self
.db
.deployments
.find_one_by_id(deployment_id)
.await?
.context(format!(
"did not find any deployment with id {deployment_id}"
))?;
let permissions = deployment.get_user_permissions(&user.id);
if user.is_admin || permissions >= permission_level {
Ok(deployment)
} else {
Err(anyhow!(
"user does not have required permissions on this deployment"
))
}
}
pub async fn get_user_permission_on_deployment(
&self,
user_id: &str,
deployment_id: &str,
) -> anyhow::Result<PermissionLevel> {
let deployment = self.get_deployment(deployment_id).await?;
Ok(deployment.get_user_permissions(user_id))
}
pub async fn get_build(&self, build_id: &str) -> anyhow::Result<Build> {
self.db
.builds
.find_one_by_id(build_id)
.await?
.context(format!("did not find any build with id {build_id}"))
}
pub async fn get_build_check_permissions(
&self,
build_id: &str,
user: &RequestUser,
permission_level: PermissionLevel,
) -> anyhow::Result<Build> {
let build = self
.db
.builds
.find_one_by_id(build_id)
.await?
.context(format!("did not find any build with id {build_id}"))?;
let permissions = build.get_user_permissions(&user.id);
if user.is_admin || permissions >= permission_level {
Ok(build)
} else {
Err(anyhow!(
"user does not have required permissions on this build"
))
}
}
pub async fn get_user_permission_on_build(
&self,
user_id: &str,
build_id: &str,
) -> anyhow::Result<PermissionLevel> {
let build = self.get_build(build_id).await?;
Ok(build.get_user_permissions(user_id))
}
pub async fn send_update(&self, update: Update) -> anyhow::Result<()> {
self.update.sender.lock().await.send(update)?;
Ok(())
}
pub async fn add_update(&self, mut update: Update) -> anyhow::Result<String> {
update.id = self
.db
.updates
.create_one(update.clone())
.await
.context("failed to insert update into db")?;
let id = update.id.clone();
let _ = self.send_update(update).await;
Ok(id)
}
pub async fn update_update(&self, mut update: Update) -> anyhow::Result<()> {
let mut update_id = String::new();
std::mem::swap(&mut update.id, &mut update_id);
self.db
.updates
.update_one(&update_id, mungos::Update::Regular(update.clone()))
.await
.context("failed to update the update on db. the update build process was deleted")?;
std::mem::swap(&mut update.id, &mut update_id);
let _ = self.send_update(update).await;
Ok(())
}
}
#[derive(Default)]
pub struct Cache<T: Clone + Default> {
cache: RwLock<HashMap<String, T>>,
}
impl<T: Clone + Default> Cache<T> {
pub async fn get(&self, key: &str) -> Option<T> {
self.cache.read().await.get(key).cloned()
}
pub async fn get_or_default(&self, key: String) -> T {
let mut cache = self.cache.write().await;
cache.entry(key).or_default().clone()
}
pub async fn _get_list(&self, filter: Option<impl Fn(&String, &T) -> bool>) -> Vec<T> {
let cache = self.cache.read().await;
match filter {
Some(filter) => cache
.iter()
.filter(|(k, v)| filter(k, v))
.map(|(_, e)| e.clone())
.collect(),
None => cache.iter().map(|(_, e)| e.clone()).collect(),
}
}
pub async fn insert(&self, key: String, val: T) {
self.cache.write().await.insert(key, val);
}
pub async fn update_entry(&self, key: String, handler: impl Fn(&mut T)) {
let mut cache = self.cache.write().await;
handler(cache.entry(key).or_default());
}
pub async fn clear(&self) {
self.cache.write().await.clear();
}
pub async fn remove(&self, key: &str) {
self.cache.write().await.remove(key);
}
}
impl<T: Clone + Default + Busy> Cache<T> {
pub async fn busy(&self, id: &str) -> bool {
match self.get(id).await {
Some(state) => state.busy(),
None => false,
}
}
}

View File

@@ -19,8 +19,10 @@ mod auth;
mod config;
mod db;
mod helpers;
mod monitoring;
mod requests;
mod state;
mod ws;
async fn app() -> anyhow::Result<()> {
let state = state::State::load().await?;
@@ -32,6 +34,7 @@ async fn app() -> anyhow::Result<()> {
let app = Router::new()
.nest("/auth", auth::router(&state))
.nest("/api", api())
.nest("/ws", ws::router())
.layer(Extension(state));
info!("starting server on {}", socket_addr);

151
core/src/monitoring.rs Normal file
View File

@@ -0,0 +1,151 @@
use async_timing_util::{wait_until_timelength, Timelength};
use futures::future::join_all;
use monitor_types::entities::{
deployment::{BasicContainerInfo, Deployment, DockerContainerState},
server::{stats::AllSystemStats, Server, ServerStatus},
};
use mungos::mongodb::bson::doc;
use periphery_client::{requests, PeripheryClient};
use crate::state::State;
#[derive(Default)]
pub struct CachedServerStatus {
pub id: String,
pub status: ServerStatus,
pub version: String,
pub stats: Option<AllSystemStats>,
}
#[derive(Default)]
pub struct CachedDeploymentStatus {
pub id: String,
pub state: DockerContainerState,
pub container: Option<BasicContainerInfo>,
}
impl State {
pub async fn monitor(&self) {
loop {
wait_until_timelength(Timelength::FiveSeconds, 500).await;
let servers = self.db.servers.get_some(None, None).await;
if let Err(e) = &servers {
error!("failed to get server list (manage status cache) | {e:#?}")
}
let servers = servers.unwrap();
let futures = servers
.into_iter()
.map(|server| async move { self.update_cache(&server).await });
join_all(futures).await;
}
}
pub async fn update_cache(&self, server: &Server) {
let deployments = self
.db
.deployments
.get_some(doc! { "config.server_id": &server.id }, None)
.await;
if let Err(e) = &deployments {
error!("failed to get deployments list from mongo (update status cache) | server id: {} | {e:#?}", server.id);
return;
}
let deployments = deployments.unwrap();
if !server.config.enabled {
self.insert_deployments_status_unknown(deployments).await;
self.insert_server_status(
server,
ServerStatus::Disabled,
String::from("unknown"),
None,
)
.await;
return;
}
let periphery = PeripheryClient::new(&server.config.address, &self.config.passkey);
let version = periphery.request(requests::GetVersion {}).await;
if version.is_err() {
self.insert_deployments_status_unknown(deployments).await;
self.insert_server_status(server, ServerStatus::NotOk, String::from("unknown"), None)
.await;
return;
}
let stats = periphery.request(requests::GetAllSystemStats {}).await;
if stats.is_err() {
self.insert_deployments_status_unknown(deployments).await;
self.insert_server_status(server, ServerStatus::NotOk, String::from("unknown"), None)
.await;
return;
}
self.insert_server_status(
server,
ServerStatus::Ok,
version.unwrap().version,
stats.unwrap().into(),
)
.await;
let containers = periphery.request(requests::GetContainerList {}).await;
if containers.is_err() {
self.insert_deployments_status_unknown(deployments).await;
return;
}
let containers = containers.unwrap();
for deployment in deployments {
let container = containers
.iter()
.find(|c| c.name == deployment.name)
.cloned();
self.deployment_status_cache
.insert(
deployment.id.clone(),
CachedDeploymentStatus {
id: deployment.id,
state: container
.as_ref()
.map(|c| c.state)
.unwrap_or(DockerContainerState::NotDeployed),
container,
}
.into(),
)
.await;
}
}
async fn insert_deployments_status_unknown(&self, deployments: Vec<Deployment>) {
for deployment in deployments {
self.deployment_status_cache
.insert(
deployment.id.clone(),
CachedDeploymentStatus {
id: deployment.id,
state: DockerContainerState::Unknown,
container: None,
}
.into(),
)
.await;
}
}
async fn insert_server_status(
&self,
server: &Server,
status: ServerStatus,
version: String,
stats: Option<AllSystemStats>,
) {
self.server_status_cache
.insert(
server.id.clone(),
CachedServerStatus {
id: server.id.clone(),
status,
version,
stats,
}
.into(),
)
.await;
}
}

View File

@@ -1,4 +1,9 @@
use monitor_types::requests::api::{CreateLoginSecret, DeleteLoginSecret, GetPeripheryVersion, GetServer, ListServers, CreateServer, DeleteServer, UpdateServer};
use monitor_types::requests::api::{
CreateLoginSecret, CreateServer, DeleteLoginSecret, DeleteServer, GetAllSystemStats,
GetBasicSystemStats, GetCpuUsage, GetDiskUsage, GetNetworkUsage, GetPeripheryVersion,
GetServer, GetSystemComponents, GetSystemInformation, GetSystemProcesses, ListServers,
RenameServer, UpdateServer,
};
use resolver_api::{derive::Resolver, Resolve};
use serde::{Deserialize, Serialize};
@@ -13,13 +18,28 @@ mod server;
#[serde(tag = "type", content = "params")]
#[allow(clippy::enum_variant_names, clippy::large_enum_variant)]
pub enum ApiRequest {
// ==== SECRET ====
CreateLoginSecret(CreateLoginSecret),
DeleteLoginSecret(DeleteLoginSecret),
// SERVER
//
// ==== SERVER ====
//
GetPeripheryVersion(GetPeripheryVersion),
GetSystemInformation(GetSystemInformation),
GetServer(GetServer),
ListServers(ListServers),
// CRUD
CreateServer(CreateServer),
DeleteServer(DeleteServer),
UpdateServer(UpdateServer),
RenameServer(RenameServer),
// Stats
GetAllSystemStats(GetAllSystemStats),
GetBasicSystemStats(GetBasicSystemStats),
GetCpuUsage(GetCpuUsage),
GetDiskUsage(GetDiskUsage),
GetNetworkUsage(GetNetworkUsage),
GetSystemProcesses(GetSystemProcesses),
GetSystemComponents(GetSystemComponents),
}

View File

@@ -1,9 +1,24 @@
use anyhow::{anyhow, Context};
use async_timing_util::unix_timestamp_ms;
use async_trait::async_trait;
use monitor_types::{
entities::{server::Server, PermissionLevel},
entities::{
server::{
stats::{
AllSystemStats, BasicSystemStats, CpuUsage, DiskUsage, NetworkUsage,
SystemComponent, SystemInformation, SystemProcess,
},
Server, ServerBuilder,
},
update::{Update, UpdateTarget},
Operation, PermissionLevel,
},
permissioned::Permissioned,
requests::api::{
CreateServer, DeleteServer, GetPeripheryVersion, GetPeripheryVersionResponse, GetServer,
ListServers, UpdateServer,
CreateServer, DeleteServer, GetAllSystemStats, GetBasicSystemStats, GetCpuUsage,
GetDiskUsage, GetNetworkUsage, GetPeripheryVersion, GetPeripheryVersionResponse, GetServer,
GetSystemComponents, GetSystemInformation, GetSystemProcesses, ListServers, RenameServer,
UpdateServer,
},
};
use resolver_api::Resolve;
@@ -14,35 +29,93 @@ use crate::{auth::RequestUser, state::State};
impl Resolve<GetPeripheryVersion, RequestUser> for State {
async fn resolve(
&self,
GetPeripheryVersion { server_id }: GetPeripheryVersion,
req: GetPeripheryVersion,
user: RequestUser,
) -> anyhow::Result<GetPeripheryVersionResponse> {
let server = self
.get_server_check_permissions(&server_id, &user, PermissionLevel::Read)
self.get_server_check_permissions(&req.server_id, &user, PermissionLevel::Read)
.await?;
todo!()
let version = self
.server_status_cache
.get(&req.server_id)
.await
.map(|s| s.version.clone())
.unwrap_or(String::from("unknown"));
Ok(GetPeripheryVersionResponse { version })
}
}
#[async_trait]
impl Resolve<GetServer, RequestUser> for State {
async fn resolve(&self, req: GetServer, user: RequestUser) -> anyhow::Result<Server> {
todo!()
self.get_server_check_permissions(&req.id, &user, PermissionLevel::Read)
.await
}
}
#[async_trait]
impl Resolve<ListServers, RequestUser> for State {
async fn resolve(&self, req: ListServers, user: RequestUser) -> anyhow::Result<Vec<Server>> {
todo!()
async fn resolve(&self, _: ListServers, user: RequestUser) -> anyhow::Result<Vec<Server>> {
let servers = self
.db
.servers
.get_some(None, None)
.await
.context("failed to pull servers from mongo")?;
let servers = if user.is_admin {
servers
} else {
servers
.into_iter()
.filter(|server| server.get_user_permissions(&user.id) > PermissionLevel::None)
.collect()
};
Ok(servers)
}
}
#[async_trait]
impl Resolve<CreateServer, RequestUser> for State {
async fn resolve(&self, req: CreateServer, user: RequestUser) -> anyhow::Result<Server> {
todo!()
if !user.is_admin && !user.create_server_permissions {
return Err(anyhow!("user does not have create server permissions"));
}
let start_ts = unix_timestamp_ms() as i64;
let server = Server {
id: Default::default(),
name: req.name,
created_at: start_ts,
updated_at: start_ts,
permissions: [(user.id.clone(), PermissionLevel::Update)]
.into_iter()
.collect(),
description: Default::default(),
tags: Default::default(),
config: req.config.into(),
};
let server_id = self
.db
.servers
.create_one(server)
.await
.context("failed to add server to db")?;
let server = self.get_server(&server_id).await?;
let update = Update {
target: UpdateTarget::Server(server_id),
operation: Operation::CreateServer,
start_ts,
end_ts: Some(unix_timestamp_ms() as i64),
operator: user.id.clone(),
success: true,
..Default::default()
};
self.add_update(update).await?;
self.update_cache(&server).await;
Ok(server)
}
}
@@ -59,3 +132,90 @@ impl Resolve<UpdateServer, RequestUser> for State {
todo!()
}
}
#[async_trait]
impl Resolve<RenameServer, RequestUser> for State {
async fn resolve(&self, req: RenameServer, args: RequestUser) -> anyhow::Result<Server> {
todo!()
}
}
#[async_trait]
impl Resolve<GetSystemInformation, RequestUser> for State {
async fn resolve(
&self,
req: GetSystemInformation,
args: RequestUser,
) -> anyhow::Result<SystemInformation> {
todo!()
}
}
#[async_trait]
impl Resolve<GetAllSystemStats, RequestUser> for State {
async fn resolve(
&self,
req: GetAllSystemStats,
args: RequestUser,
) -> anyhow::Result<AllSystemStats> {
todo!()
}
}
#[async_trait]
impl Resolve<GetBasicSystemStats, RequestUser> for State {
async fn resolve(
&self,
req: GetBasicSystemStats,
args: RequestUser,
) -> anyhow::Result<BasicSystemStats> {
todo!()
}
}
#[async_trait]
impl Resolve<GetCpuUsage, RequestUser> for State {
async fn resolve(&self, req: GetCpuUsage, args: RequestUser) -> anyhow::Result<CpuUsage> {
todo!()
}
}
#[async_trait]
impl Resolve<GetDiskUsage, RequestUser> for State {
async fn resolve(&self, req: GetDiskUsage, args: RequestUser) -> anyhow::Result<DiskUsage> {
todo!()
}
}
#[async_trait]
impl Resolve<GetNetworkUsage, RequestUser> for State {
async fn resolve(
&self,
req: GetNetworkUsage,
args: RequestUser,
) -> anyhow::Result<NetworkUsage> {
todo!()
}
}
#[async_trait]
impl Resolve<GetSystemProcesses, RequestUser> for State {
async fn resolve(
&self,
req: GetSystemProcesses,
args: RequestUser,
) -> anyhow::Result<Vec<SystemProcess>> {
todo!()
}
}
#[async_trait]
impl Resolve<GetSystemComponents, RequestUser> for State {
async fn resolve(
&self,
req: GetSystemComponents,
args: RequestUser,
) -> anyhow::Result<Vec<SystemComponent>> {
todo!()
}
}

View File

@@ -2,13 +2,21 @@ use std::{net::SocketAddr, str::FromStr, sync::Arc};
use anyhow::Context;
use axum::Extension;
use monitor_types::requests::auth::GetLoginOptionsResponse;
use monitor_types::{
entities::{
build::BuildActionState, deployment::DeploymentActionState, server::ServerActionState,
},
requests::auth::GetLoginOptionsResponse,
};
use simple_logger::SimpleLogger;
use crate::{
auth::{GithubOauthClient, GoogleOauthClient, JwtClient},
config::{CoreConfig, Env},
db::DbClient,
helpers::Cache,
monitoring::{CachedDeploymentStatus, CachedServerStatus},
ws::UpdateWsChannel,
};
pub type StateExtension = Extension<Arc<State>>;
@@ -17,9 +25,19 @@ pub struct State {
pub env: Env,
pub config: CoreConfig,
pub db: DbClient,
pub update: UpdateWsChannel,
// auth
pub jwt: JwtClient,
pub github_auth: Option<GithubOauthClient>,
pub google_auth: Option<GoogleOauthClient>,
// cache
pub action_states: ActionStates,
pub deployment_status_cache: Cache<Arc<CachedDeploymentStatus>>,
pub server_status_cache: Cache<Arc<CachedServerStatus>>,
// cached responses
pub login_options_response: String,
}
@@ -36,17 +54,25 @@ impl State {
.init()
.context("failed to configure logger")?;
let state = State {
let state: Arc<State> = State {
env,
db: DbClient::new(&config).await?,
jwt: JwtClient::new(&config),
github_auth: GithubOauthClient::new(&config),
google_auth: GoogleOauthClient::new(&config),
login_options_response: login_options_response(&config)?,
action_states: Default::default(),
deployment_status_cache: Default::default(),
server_status_cache: Default::default(),
update: UpdateWsChannel::new(),
config,
};
}
.into();
Ok(state.into())
let state_clone = state.clone();
tokio::spawn(async move { state_clone.monitor().await });
Ok(state)
}
pub fn socket_addr(&self) -> anyhow::Result<SocketAddr> {
@@ -67,3 +93,11 @@ pub fn login_options_response(config: &CoreConfig) -> anyhow::Result<String> {
};
serde_json::to_string(&options).context("failed to serialize login options")
}
#[derive(Default)]
pub struct ActionStates {
pub build: Cache<BuildActionState>,
pub deployment: Cache<DeploymentActionState>,
pub server: Cache<ServerActionState>,
// pub command: Cache<CommandActionState>,
}

213
core/src/ws.rs Normal file
View File

@@ -0,0 +1,213 @@
use anyhow::anyhow;
use axum::{
extract::{
ws::{Message, WebSocket},
WebSocketUpgrade,
},
response::IntoResponse,
routing::get,
Router,
};
use futures::{SinkExt, StreamExt};
use monitor_types::entities::{
update::{Update, UpdateTarget},
user::User,
PermissionLevel,
};
use serde_json::json;
use tokio::{
select,
sync::{
broadcast::{self, Receiver, Sender},
Mutex,
},
};
use tokio_util::sync::CancellationToken;
use crate::{
auth::RequestUser,
state::{State, StateExtension},
};
pub type UpdateWsSender = Mutex<Sender<Update>>;
pub type UpdateWsReciever = Receiver<Update>;
pub fn router() -> Router {
Router::new().route("/update", get(ws_handler))
}
pub struct UpdateWsChannel {
pub sender: UpdateWsSender,
pub reciever: UpdateWsReciever,
}
impl UpdateWsChannel {
pub fn new() -> UpdateWsChannel {
let (sender, reciever) = broadcast::channel(16);
UpdateWsChannel {
sender: Mutex::new(sender),
reciever,
}
}
}
async fn ws_handler(state: StateExtension, ws: WebSocketUpgrade) -> impl IntoResponse {
let mut reciever = state.update.reciever.resubscribe();
ws.on_upgrade(|socket| async move {
let login_res = state.ws_login(socket).await;
if login_res.is_none() {
return;
}
let (socket, user) = login_res.unwrap();
let (mut ws_sender, mut ws_reciever) = socket.split();
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
loop {
let update = select! {
_ = cancel_clone.cancelled() => break,
update = reciever.recv() => {update.expect("failed to recv update msg")}
};
let user = state.db.users.find_one_by_id(&user.id).await;
if user.is_err()
|| user.as_ref().unwrap().is_none()
|| !user.as_ref().unwrap().as_ref().unwrap().enabled
{
let _ = ws_sender
.send(Message::Text(json!({ "type": "INVALID_USER" }).to_string()))
.await;
let _ = ws_sender.close().await;
return;
}
let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return
let res = state
.user_can_see_update(&user, &user.id, &update.target)
.await;
if let Err(_e) = res {
// handle
return;
} else {
let _ = ws_sender
.send(Message::Text(serde_json::to_string(&update).unwrap()))
.await;
}
}
});
while let Some(msg) = ws_reciever.next().await {
match msg {
Ok(msg) => {
if let Message::Close(_) = msg {
cancel.cancel();
return;
}
}
Err(_) => {
cancel.cancel();
return;
}
}
}
})
}
impl State {
pub async fn ws_login(&self, mut socket: WebSocket) -> Option<(WebSocket, RequestUser)> {
if let Some(jwt) = socket.recv().await {
match jwt {
Ok(jwt) => match jwt {
Message::Text(jwt) => match self.auth_jwt_check_enabled(&jwt).await {
Ok(user) => {
let _ = socket.send(Message::Text("LOGGED_IN".to_string())).await;
Some((socket, user))
}
Err(e) => {
let _ = socket
.send(Message::Text(format!(
"failed to authenticate user | {e:#?}"
)))
.await;
let _ = socket.close().await;
None
}
},
msg => {
let _ = socket
.send(Message::Text(format!("invalid login msg: {msg:#?}")))
.await;
let _ = socket.close().await;
None
}
},
Err(e) => {
let _ = socket
.send(Message::Text(format!("failed to get jwt message: {e:#?}")))
.await;
let _ = socket.close().await;
None
}
}
} else {
let _ = socket
.send(Message::Text(String::from("failed to get jwt message")))
.await;
let _ = socket.close().await;
None
}
}
async fn user_can_see_update(
&self,
user: &User,
user_id: &str,
update_target: &UpdateTarget,
) -> anyhow::Result<()> {
if user.admin {
return Ok(());
}
let (permissions, target) = match update_target {
UpdateTarget::Server(server_id) => {
let permissions = self
.get_user_permission_on_server(user_id, server_id)
.await?;
(permissions, "server")
}
UpdateTarget::Deployment(deployment_id) => {
let permissions = self
.get_user_permission_on_deployment(user_id, deployment_id)
.await?;
(permissions, "deployment")
}
UpdateTarget::Build(build_id) => {
let permissions = self.get_user_permission_on_build(user_id, build_id).await?;
(permissions, "build")
}
// UpdateTarget::Procedure(procedure_id) => {
// let permissions = db_client
// .get_user_permission_on_procedure(user_id, procedure_id)
// .await?;
// (permissions, "procedure")
// }
// UpdateTarget::Group(group_id) => {
// let permissions = db_client
// .get_user_permission_on_group(user_id, group_id)
// .await?;
// (permissions, "group")
// }
// UpdateTarget::Command(command_id) => {
// let permissions = db_client
// .get_user_permission_on_command(user_id, command_id)
// .await?;
// (permissions, "command")
// }
UpdateTarget::System => {
return Err(anyhow!("user not admin, can't recieve system updates"))
}
};
if permissions != PermissionLevel::None {
Ok(())
} else {
Err(anyhow!("user does not have permissions on {target}"))
}
}
}

30
lib/types/src/busy.rs Normal file
View File

@@ -0,0 +1,30 @@
use crate::entities::{server::ServerActionState, deployment::DeploymentActionState, build::BuildActionState};
pub trait Busy {
fn busy(&self) -> bool;
}
impl Busy for ServerActionState {
fn busy(&self) -> bool {
self.pruning_containers || self.pruning_images || self.pruning_networks
}
}
impl Busy for DeploymentActionState {
fn busy(&self) -> bool {
self.deploying
|| self.pulling
|| self.recloning
|| self.removing
|| self.starting
|| self.stopping
|| self.updating
|| self.renaming
}
}
impl Busy for BuildActionState {
fn busy(&self) -> bool {
self.building || self.updating
}
}

View File

@@ -5,6 +5,8 @@ use partial_derive2::Partial;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::{I64, i64_is_zero};
use super::{EnvironmentVar, PermissionsMap, SystemCommand, Version};
#[typeshare]
@@ -30,13 +32,13 @@ pub struct Build {
#[builder(setter(skip))]
pub permissions: PermissionsMap,
#[serde(default, skip_serializing_if = "String::is_empty")]
#[serde(default, skip_serializing_if = "i64_is_zero")]
#[builder(setter(skip))]
pub created_at: String,
pub created_at: I64,
#[serde(default)]
#[builder(setter(skip))]
pub updated_at: String,
pub updated_at: I64,
#[serde(default)]
#[builder(default)]
@@ -107,3 +109,10 @@ pub struct BuildConfig {
#[builder(default)]
pub use_buildx: bool,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct BuildActionState {
pub building: bool,
pub updating: bool,
}

View File

@@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
use strum_macros::{Display, EnumString};
use typeshare::typeshare;
use crate::{i64_is_zero, I64};
use super::{EnvironmentVar, PermissionsMap};
#[typeshare]
@@ -31,13 +33,13 @@ pub struct Deployment {
#[builder(setter(skip))]
pub permissions: PermissionsMap,
#[serde(default, skip_serializing_if = "String::is_empty")]
#[serde(default, skip_serializing_if = "i64_is_zero")]
#[builder(setter(skip))]
pub created_at: String,
pub created_at: I64,
#[serde(default)]
#[builder(setter(skip))]
pub updated_at: String,
pub updated_at: I64,
#[serde(default)]
pub tags: Vec<String>,
@@ -73,7 +75,7 @@ pub struct DeploymentConfig {
pub redeploy_on_build: bool,
#[serde(default = "default_term_signal_labels")]
#[builder(default = "vec![TerminationSignalLabel::default()]")]
#[builder(default = "default_term_signal_labels()")]
pub term_signal_labels: Vec<TerminationSignalLabel>,
#[serde(default)]
@@ -130,14 +132,6 @@ fn default_network() -> String {
String::from("host")
}
// #[typeshare]
// #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, MungosIndexed)]
// #[serde(tag = "type", content = "params")]
// pub enum DeploymentImage {
// Image { image: String },
// Build { id: String, version: String }, // empty version string means latest
// }
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default, PartialEq)]
pub struct Conversion {
@@ -179,18 +173,16 @@ pub struct DockerContainerStats {
Serialize,
Deserialize,
Debug,
Display,
EnumString,
PartialEq,
Hash,
Eq,
Clone,
Copy,
Default,
Display,
EnumString,
MungosIndexed,
)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum DockerContainerState {
#[default]
Unknown,
@@ -209,14 +201,14 @@ pub enum DockerContainerState {
Serialize,
Deserialize,
Debug,
Display,
EnumString,
PartialEq,
Hash,
Eq,
Clone,
Copy,
Default,
Display,
EnumString,
MungosIndexed,
)]
pub enum RestartMode {
@@ -240,14 +232,14 @@ pub enum RestartMode {
Serialize,
Deserialize,
Debug,
Display,
EnumString,
PartialEq,
Hash,
Eq,
Clone,
Copy,
Default,
Display,
EnumString,
MungosIndexed,
)]
#[serde(rename_all = "UPPERCASE")]
@@ -272,3 +264,16 @@ pub struct TerminationSignalLabel {
#[builder(default)]
pub label: String,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct DeploymentActionState {
pub deploying: bool,
pub stopping: bool,
pub starting: bool,
pub removing: bool,
pub pulling: bool,
pub recloning: bool,
pub updating: bool,
pub renaming: bool,
}

View File

@@ -199,10 +199,17 @@ impl Default for &PermissionLevel {
#[typeshare]
#[derive(
Serialize, Deserialize, Debug, Default, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy,
Serialize,
Deserialize,
Debug,
Default,
PartialEq,
Hash,
Eq,
Clone,
Copy,
MungosIndexed,
)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum Operation {
// do nothing
#[default]

View File

@@ -5,6 +5,8 @@ use partial_derive2::Partial;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::{I64, i64_is_zero};
use super::PermissionsMap;
pub mod docker_image;
@@ -34,13 +36,13 @@ pub struct Server {
#[builder(setter(skip))]
pub permissions: PermissionsMap,
#[serde(default, skip_serializing_if = "String::is_empty")]
#[serde(default, skip_serializing_if = "i64_is_zero")]
#[builder(setter(skip))]
pub created_at: String,
pub created_at: I64,
#[serde(default)]
#[builder(setter(skip))]
pub updated_at: String,
pub updated_at: I64,
#[serde(default)]
pub tags: Vec<String>,
@@ -105,3 +107,35 @@ fn default_mem_alert() -> f64 {
fn default_disk_alert() -> f64 {
75.0
}
impl From<PartialServerConfig> for ServerConfig {
fn from(value: PartialServerConfig) -> ServerConfig {
ServerConfig {
address: value.address.unwrap_or_default(),
enabled: value.enabled.unwrap_or(default_enabled()),
auto_prune: value.auto_prune.unwrap_or(default_auto_prune()),
region: value.region.unwrap_or_default(),
cpu_alert: value.cpu_alert.unwrap_or(default_cpu_alert()),
mem_alert: value.mem_alert.unwrap_or(default_mem_alert()),
disk_alert: value.disk_alert.unwrap_or(default_disk_alert()),
to_notify: value.to_notify.unwrap_or_default()
}
}
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct ServerActionState {
pub pruning_networks: bool,
pub pruning_containers: bool,
pub pruning_images: bool,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, PartialEq, Hash, Eq, Clone, Copy, Default)]
pub enum ServerStatus {
#[default]
NotOk,
Ok,
Disabled,
}

View File

@@ -1,7 +1,7 @@
use async_timing_util::unix_timestamp_ms;
use bson::serde_helpers::hex_string_as_object_id;
use mungos::MungosIndexed;
use serde::{Deserialize, Serialize};
use strum_macros::{Display, EnumString};
use typeshare::typeshare;
use crate::{entities::Operation, I64};
@@ -9,7 +9,7 @@ use crate::{entities::Operation, I64};
use super::Version;
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
#[derive(Serialize, Deserialize, Debug, Clone, Default, MungosIndexed)]
pub struct Update {
#[serde(
default,
@@ -26,7 +26,7 @@ pub struct Update {
pub status: UpdateStatus,
pub success: bool,
pub operator: String,
pub version: Option<Version>,
pub version: Version,
}
#[typeshare]
@@ -68,7 +68,7 @@ impl Log {
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default)]
#[derive(Serialize, Deserialize, Debug, Clone, Default, MungosIndexed)]
#[serde(tag = "type", content = "id")]
pub enum UpdateTarget {
#[default]
@@ -76,9 +76,9 @@ pub enum UpdateTarget {
Build(String),
Deployment(String),
Server(String),
Procedure(String),
Group(String),
Command(String),
// Procedure(String),
// Group(String),
// Command(String),
}
// impl From<&Build> for UpdateTarget {
@@ -143,10 +143,8 @@ pub enum UpdateTarget {
#[typeshare]
#[derive(
Serialize, Deserialize, Debug, Display, EnumString, PartialEq, Hash, Eq, Clone, Copy, Default,
Serialize, Deserialize, Debug, PartialEq, Hash, Eq, Clone, Copy, Default, MungosIndexed,
)]
#[serde(rename_all = "snake_case")]
#[strum(serialize_all = "snake_case")]
pub enum UpdateStatus {
Queued,
InProgress,

View File

@@ -3,7 +3,7 @@ use mungos::MungosIndexed;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::I64;
use crate::{I64, i64_is_zero};
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default, MungosIndexed)]
@@ -59,7 +59,3 @@ pub struct ApiSecret {
pub created_at: I64,
pub expires: Option<I64>,
}
fn i64_is_zero(n: &I64) -> bool {
*n == 0
}

View File

@@ -3,8 +3,13 @@ use typeshare::typeshare;
pub mod entities;
pub mod requests;
pub mod permissioned;
pub mod busy;
#[typeshare(serialized_as = "number")]
pub type I64 = i64;
#[typeshare(serialized_as = "any")]
pub type MongoDocument = mungos::mongodb::bson::Document;
fn i64_is_zero(n: &I64) -> bool {
*n == 0
}

View File

@@ -2,22 +2,7 @@ use resolver_api::derive::Request;
use serde::{Serialize, Deserialize};
use typeshare::typeshare;
use crate::{entities::server::{Server, PartialServerConfig}, MongoDocument};
//
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(GetPeripheryVersionResponse)]
pub struct GetPeripheryVersion {
pub server_id: String,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GetPeripheryVersionResponse {
pub version: String,
}
use crate::{entities::server::{Server, PartialServerConfig, stats::{AllSystemStats, SystemInformation, BasicSystemStats, CpuUsage, DiskUsage, NetworkUsage, SystemProcess, SystemComponent}}, MongoDocument};
//
@@ -65,3 +50,95 @@ pub struct UpdateServer {
pub config: PartialServerConfig,
}
//
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(Server)]
pub struct RenameServer {
pub id: String,
pub name: String,
}
//
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(GetPeripheryVersionResponse)]
pub struct GetPeripheryVersion {
pub server_id: String,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct GetPeripheryVersionResponse {
pub version: String,
}
//
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(SystemInformation)]
pub struct GetSystemInformation {
pub server_id: String,
}
//
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(AllSystemStats)]
pub struct GetAllSystemStats {
pub server_id: String,
}
//
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(BasicSystemStats)]
pub struct GetBasicSystemStats {
pub server_id: String,
}
//
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(CpuUsage)]
pub struct GetCpuUsage {
pub server_id: String,
}
//
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(DiskUsage)]
pub struct GetDiskUsage {
pub server_id: String,
}
//
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(NetworkUsage)]
pub struct GetNetworkUsage {
pub server_id: String,
}
//
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(Vec<SystemProcess>)]
pub struct GetSystemProcesses {
pub server_id: String,
}
//
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(Vec<SystemComponent>)]
pub struct GetSystemComponents {
pub server_id: String,
}
//