update auth

This commit is contained in:
mbecker20
2024-01-08 03:07:27 -08:00
parent 9065b6034b
commit 81a4caf23c
35 changed files with 552 additions and 711 deletions

16
Cargo.lock generated
View File

@@ -2133,6 +2133,7 @@ dependencies = [
"serde", "serde",
"serde_json", "serde_json",
"serror", "serror",
"serror_axum",
"sha2", "sha2",
"simple_logger", "simple_logger",
"slack_client_rs", "slack_client_rs",
@@ -2960,9 +2961,9 @@ dependencies = [
[[package]] [[package]]
name = "serror" name = "serror"
version = "0.1.3" version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bc462876e265831d80297a3898a173e3d5c72a1501dec9234423de2d25ac89c" checksum = "7fc001f673de08108f1602eafd1f7fb18894588004b9d91a6bf05a5e284fe50d"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"serde", "serde",
@@ -2970,6 +2971,17 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "serror_axum"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f7ab34bc2fc163055ee8e2731a7ec9bbf6ebff834a10579e44fddce2695b6ea5"
dependencies = [
"anyhow",
"axum",
"serror",
]
[[package]] [[package]]
name = "sha-1" name = "sha-1"
version = "0.10.1" version = "0.10.1"

View File

@@ -29,7 +29,8 @@ resolver_api = "0.1.6"
parse_csl = "0.1.0" parse_csl = "0.1.0"
mungos = "0.5.4" mungos = "0.5.4"
mongo_indexed = "0.2.1" mongo_indexed = "0.2.1"
serror = "0.1.3" serror = "0.1.4"
serror_axum = "0.1.2"
svi = "0.1.4" svi = "0.1.4"
# external # external
clap = { version = "4.4.13", features = ["derive"] } clap = { version = "4.4.13", features = ["derive"] }

View File

@@ -27,6 +27,7 @@ mungos.workspace = true
mongo_indexed.workspace = true mongo_indexed.workspace = true
slack.workspace = true slack.workspace = true
serror.workspace = true serror.workspace = true
serror_axum.workspace = true
# external # external
tokio.workspace = true tokio.workspace = true
tokio-util.workspace = true tokio-util.workspace = true

View File

@@ -1,9 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use monitor_client::api::auth::{ use monitor_client::api::auth::*;
CreateLocalUser, ExchangeForJwt, ExchangeForJwtResponse,
GetLoginOptions, GetLoginOptionsResponse, LoginLocalUser,
LoginWithSecret,
};
use resolver_api::{derive::Resolver, Resolve}; use resolver_api::{derive::Resolver, Resolve};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use typeshare::typeshare; use typeshare::typeshare;
@@ -19,7 +15,6 @@ pub enum AuthRequest {
GetLoginOptions(GetLoginOptions), GetLoginOptions(GetLoginOptions),
CreateLocalUser(CreateLocalUser), CreateLocalUser(CreateLocalUser),
LoginLocalUser(LoginLocalUser), LoginLocalUser(LoginLocalUser),
LoginWithSecret(LoginWithSecret),
ExchangeForJwt(ExchangeForJwt), ExchangeForJwt(ExchangeForJwt),
} }

View File

@@ -268,7 +268,8 @@ impl Resolve<StopContainer, RequestUser> for State {
let periphery = self.periphery_client(&server)?; let periphery = self.periphery_client(&server)?;
let inner = || async move { let inner = || async move {
let mut update = make_update(&deployment, Operation::StopContainer, &user); let mut update =
make_update(&deployment, Operation::StopContainer, &user);
update.id = self.add_update(update.clone()).await?; update.id = self.add_update(update.clone()).await?;

View File

@@ -6,14 +6,13 @@ use axum_extra::{headers::ContentType, TypedHeader};
use monitor_client::api::execute::*; use monitor_client::api::execute::*;
use resolver_api::{derive::Resolver, Resolve, Resolver}; use resolver_api::{derive::Resolver, Resolve, Resolver};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serror_axum::AppResult;
use typeshare::typeshare; use typeshare::typeshare;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
auth::{auth_request, RequestUser, RequestUserExtension}, auth::{auth_request, RequestUser, RequestUserExtension},
helpers::into_response_error,
state::{State, StateExtension}, state::{State, StateExtension},
ResponseResult,
}; };
mod build; mod build;
@@ -66,23 +65,22 @@ pub fn router() -> Router {
user.username, user.id user.username, user.id
); );
let res = tokio::spawn(async move { let res = tokio::spawn(async move {
state.resolve_request(request, user).await let res = state.resolve_request(request, user).await;
if let Err(e) = &res {
info!("/execute request {req_id} ERROR: {e:#?}");
}
let elapsed = timer.elapsed();
info!(
"/execute request {req_id} | resolve time: {elapsed:?}"
);
res
}) })
.await .await
.context("failure in spawned execute task"); .context("failure in spawned execute task");
if let Err(e) = &res { if let Err(e) = &res {
info!("/execute request {req_id} SPAWN ERROR: {e:#?}"); info!("/execute request {req_id} SPAWN ERROR: {e:#?}",);
} }
let res = res.map_err(into_response_error)?; AppResult::Ok((TypedHeader(ContentType::json()), res??))
if let Err(e) = &res {
info!("/execute request {req_id} ERROR: {e:#?}");
}
let res = res.map_err(into_response_error)?;
let elapsed = timer.elapsed();
info!(
"/execute request {req_id} | resolve time: {elapsed:?}"
);
ResponseResult::Ok((TypedHeader(ContentType::json()), res))
}, },
), ),
) )

View File

@@ -1,8 +1,8 @@
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use monitor_client::{ use monitor_client::{
entities::{deployment::Deployment, server::Server},
api::read::{ListAlerts, ListAlertsResponse}, api::read::{ListAlerts, ListAlertsResponse},
entities::{deployment::Deployment, server::Server},
}; };
use mungos::{ use mungos::{
find::find_collect, find::find_collect,

View File

@@ -1,11 +1,11 @@
use anyhow::Context; use anyhow::Context;
use async_trait::async_trait; use async_trait::async_trait;
use monitor_client::{ use monitor_client::{
api::read::*,
entities::{ entities::{
alerter::{Alerter, AlerterListItem}, alerter::{Alerter, AlerterListItem},
PermissionLevel, PermissionLevel,
}, },
api::read::*,
}; };
use mungos::mongodb::bson::doc; use mungos::mongodb::bson::doc;
use resolver_api::Resolve; use resolver_api::Resolve;

View File

@@ -8,14 +8,13 @@ use resolver_api::{
derive::Resolver, Resolve, ResolveToString, Resolver, derive::Resolver, Resolve, ResolveToString, Resolver,
}; };
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serror_axum::AppResult;
use typeshare::typeshare; use typeshare::typeshare;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
auth::{auth_request, RequestUser, RequestUserExtension}, auth::{auth_request, RequestUser, RequestUserExtension},
helpers::into_response_error,
state::{State, StateExtension}, state::{State, StateExtension},
ResponseResult,
}; };
mod alert; mod alert;
@@ -149,12 +148,12 @@ pub fn router() -> Router {
if let Err(e) = &res { if let Err(e) = &res {
warn!("/read request {req_id} ERROR: {e:#?}"); warn!("/read request {req_id} ERROR: {e:#?}");
} }
let res = res.map_err(into_response_error)?; let res = res?;
let elapsed = timer.elapsed(); let elapsed = timer.elapsed();
debug!( debug!(
"/read request {req_id} | resolve time: {elapsed:?}" "/read request {req_id} | resolve time: {elapsed:?}"
); );
ResponseResult::Ok((TypedHeader(ContentType::json()), res)) AppResult::Ok((TypedHeader(ContentType::json()), res))
}, },
), ),
) )

View File

@@ -20,9 +20,7 @@ impl Resolve<GetUser, RequestUser> for State {
.await .await
.context("failed at mongo query")? .context("failed at mongo query")?
.context("no user found with id")?; .context("no user found with id")?;
for secret in &mut user.secrets { user.sanitize();
secret.hash = String::new();
}
Ok(user) Ok(user)
} }
} }
@@ -55,13 +53,10 @@ impl Resolve<GetUsers, RequestUser> for State {
if !user.is_admin { if !user.is_admin {
return Err(anyhow!("this route is only accessable by admins")); return Err(anyhow!("this route is only accessable by admins"));
} }
let mut users = find_collect(&self.db.users, None, None) let mut users = find_collect(&self.db.users, None, None)
.await .await
.context("failed to pull users from db")?; .context("failed to pull users from db")?;
users.iter_mut().for_each(|user| { users.iter_mut().for_each(|user| user.sanitize());
user.secrets = Vec::new();
});
Ok(users) Ok(users)
} }
} }

View File

@@ -0,0 +1,75 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use monitor_client::{
api::write::*,
entities::{api_key::ApiKey, monitor_timestamp},
};
use mungos::mongodb::bson::doc;
use resolver_api::Resolve;
use crate::{
auth::{random_string, RequestUser},
state::State,
};
const SECRET_LENGTH: usize = 40;
const BCRYPT_COST: u32 = 10;
#[async_trait]
impl Resolve<CreateApiKey, RequestUser> for State {
async fn resolve(
&self,
CreateApiKey { name, expires }: CreateApiKey,
user: RequestUser,
) -> anyhow::Result<CreateApiKeyResponse> {
let user = self.get_user(&user.id).await?;
let key = format!("K-{}", random_string(SECRET_LENGTH));
let secret = format!("S-{}", random_string(SECRET_LENGTH));
let secret_hash = bcrypt::hash(&secret, BCRYPT_COST)
.context("failed at hashing secret string")?;
let api_key = ApiKey {
name,
key: key.clone(),
secret: secret_hash,
user_id: user.id.clone(),
created_at: monitor_timestamp(),
expires,
};
self
.db
.api_keys
.insert_one(api_key, None)
.await
.context("failed to create api key on db")?;
Ok(CreateApiKeyResponse { key, secret })
}
}
#[async_trait]
impl Resolve<DeleteApiKey, RequestUser> for State {
async fn resolve(
&self,
DeleteApiKey { key }: DeleteApiKey,
user: RequestUser,
) -> anyhow::Result<DeleteApiKeyResponse> {
let key = self
.db
.api_keys
.find_one(doc! { "key": &key }, None)
.await
.context("failed at db query")?
.context("no api key with key found")?;
if user.id != key.user_id {
return Err(anyhow!("api key does not belong to user"));
}
self
.db
.api_keys
.delete_one(doc! { "key": key.key }, None)
.await
.context("failed to delete api key from db")?;
Ok(DeleteApiKeyResponse {})
}
}

View File

@@ -1,22 +1,21 @@
use std::time::Instant; use std::time::Instant;
use anyhow::Context;
use axum::{middleware, routing::post, Extension, Json, Router}; use axum::{middleware, routing::post, Extension, Json, Router};
use axum_extra::{headers::ContentType, TypedHeader}; use axum_extra::{headers::ContentType, TypedHeader};
use monitor_client::api::write::*; use monitor_client::api::write::*;
use resolver_api::{derive::Resolver, Resolve, Resolver}; use resolver_api::{derive::Resolver, Resolve, Resolver};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serror_axum::AppResult;
use typeshare::typeshare; use typeshare::typeshare;
use uuid::Uuid; use uuid::Uuid;
use crate::{ use crate::{
auth::{auth_request, RequestUser, RequestUserExtension}, auth::{auth_request, RequestUser, RequestUserExtension},
helpers::into_response_error,
state::{State, StateExtension}, state::{State, StateExtension},
ResponseResult,
}; };
mod alerter; mod alerter;
mod api_key;
mod build; mod build;
mod builder; mod builder;
mod deployment; mod deployment;
@@ -25,7 +24,6 @@ mod launch;
mod permissions; mod permissions;
mod procedure; mod procedure;
mod repo; mod repo;
mod secret;
mod server; mod server;
mod tag; mod tag;
mod user; mod user;
@@ -36,9 +34,9 @@ mod user;
#[resolver_args(RequestUser)] #[resolver_args(RequestUser)]
#[serde(tag = "type", content = "params")] #[serde(tag = "type", content = "params")]
enum WriteRequest { enum WriteRequest {
// ==== SECRET ==== // ==== API KEY ====
CreateLoginSecret(CreateLoginSecret), CreateApiKey(CreateApiKey),
DeleteLoginSecret(DeleteLoginSecret), DeleteApiKey(DeleteApiKey),
// ==== USER ==== // ==== USER ====
PushRecentlyViewed(PushRecentlyViewed), PushRecentlyViewed(PushRecentlyViewed),
@@ -118,23 +116,21 @@ pub fn router() -> Router {
user.username, user.id user.username, user.id
); );
let res = tokio::spawn(async move { let res = tokio::spawn(async move {
state.resolve_request(request, user).await let res = state.resolve_request(request, user).await;
if let Err(e) = &res {
info!("/write request {req_id} ERROR: {e:#?}");
}
let elapsed = timer.elapsed();
info!(
"/write request {req_id} | resolve time: {elapsed:?}"
);
res
}) })
.await .await;
.context("failure in spawned write task");
if let Err(e) = &res { if let Err(e) = &res {
info!("/write request {req_id} SPAWN ERROR: {e:#?}"); info!("/write request {req_id} SPAWN ERROR: {e:#?}");
} }
let res = res.map_err(into_response_error)?; AppResult::Ok((TypedHeader(ContentType::json()), res??))
if let Err(e) = &res {
info!("/write request {req_id} ERROR: {e:#?}");
}
let res = res.map_err(into_response_error)?;
let elapsed = timer.elapsed();
info!(
"/write request {req_id} | resolve time: {elapsed:?}"
);
ResponseResult::Ok((TypedHeader(ContentType::json()), res))
}, },
), ),
) )

View File

@@ -1,80 +0,0 @@
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use monitor_client::{
api::write::*,
entities::{monitor_timestamp, user::ApiSecret},
};
use mungos::{
by_id::update_one_by_id,
mongodb::bson::{doc, to_bson},
};
use resolver_api::Resolve;
use crate::{
auth::{random_string, RequestUser},
state::State,
};
const SECRET_LENGTH: usize = 40;
const BCRYPT_COST: u32 = 10;
#[async_trait]
impl Resolve<CreateLoginSecret, RequestUser> for State {
async fn resolve(
&self,
secret: CreateLoginSecret,
user: RequestUser,
) -> anyhow::Result<CreateLoginSecretResponse> {
let user = self.get_user(&user.id).await?;
for s in &user.secrets {
if s.name == secret.name {
return Err(anyhow!(
"secret with name {} already exists",
secret.name
));
}
}
let secret_str = random_string(SECRET_LENGTH);
let api_secret = ApiSecret {
name: secret.name,
created_at: monitor_timestamp(),
expires: secret.expires,
hash: bcrypt::hash(&secret_str, BCRYPT_COST)
.context("failed at hashing secret string")?,
};
update_one_by_id(&self.db.users, &user.id, doc! {
"$push": {
"secrets": to_bson(&api_secret).context("failed at converting secret to bson")?
}
}, None)
.await
.context("failed at mongo update query")?;
Ok(CreateLoginSecretResponse { secret: secret_str })
}
}
#[async_trait]
impl Resolve<DeleteLoginSecret, RequestUser> for State {
async fn resolve(
&self,
DeleteLoginSecret { name }: DeleteLoginSecret,
user: RequestUser,
) -> anyhow::Result<DeleteLoginSecretResponse> {
update_one_by_id(
&self.db.users,
&user.id,
doc! {
"$pull": {
"secrets": {
"name": name
}
}
},
None,
)
.await
.context("failed at mongo update query")?;
Ok(DeleteLoginSecretResponse {})
}
}

View File

@@ -1,11 +1,11 @@
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use axum::{ use axum::{
extract::Query, http::StatusCode, response::Redirect, routing::get, extract::Query, response::Redirect, routing::get, Router,
Router,
}; };
use monitor_client::entities::{monitor_timestamp, user::User}; use monitor_client::entities::{monitor_timestamp, user::User};
use mungos::mongodb::bson::doc; use mungos::mongodb::bson::doc;
use serde::Deserialize; use serde::Deserialize;
use serror_axum::AppError;
use crate::state::StateExtension; use crate::state::StateExtension;
@@ -28,10 +28,8 @@ pub fn router() -> Router {
.route( .route(
"/callback", "/callback",
get(|state, query| async { get(|state, query| async {
let redirect = callback(state, query).await.map_err(|e| { let redirect = callback(state, query).await?;
(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#?}")) Result::<_, AppError>::Ok(redirect)
})?;
Result::<_, (StatusCode, String)>::Ok(redirect)
}), }),
) )
} }

View File

@@ -1,12 +1,12 @@
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use async_timing_util::unix_timestamp_ms; use async_timing_util::unix_timestamp_ms;
use axum::{ use axum::{
extract::Query, http::StatusCode, response::Redirect, routing::get, extract::Query, response::Redirect, routing::get, Router,
Router,
}; };
use monitor_client::entities::user::User; use monitor_client::entities::user::User;
use mungos::mongodb::bson::doc; use mungos::mongodb::bson::doc;
use serde::Deserialize; use serde::Deserialize;
use serror_axum::AppError;
use crate::state::StateExtension; use crate::state::StateExtension;
@@ -30,10 +30,8 @@ pub fn router() -> Router {
.route( .route(
"/callback", "/callback",
get(|state, query| async { get(|state, query| async {
let redirect = callback(state, query).await.map_err(|e| { let redirect = callback(state, query).await?;
(StatusCode::INTERNAL_SERVER_ERROR, format!("{e:#?}")) Result::<_, AppError>::Ok(redirect)
})?;
Result::<_, (StatusCode, String)>::Ok(redirect)
}), }),
) )
} }

View File

@@ -7,7 +7,10 @@ use async_timing_util::{
use axum::{http::HeaderMap, Extension}; use axum::{http::HeaderMap, Extension};
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac};
use jwt::{SignWithKey, VerifyWithKey}; use jwt::{SignWithKey, VerifyWithKey};
use monitor_client::entities::config::CoreConfig; use monitor_client::entities::{
config::CoreConfig, monitor_timestamp,
};
use mungos::mongodb::bson::doc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sha2::Sha256; use sha2::Sha256;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@@ -118,17 +121,64 @@ impl State {
&self, &self,
headers: &HeaderMap, headers: &HeaderMap,
) -> anyhow::Result<RequestUser> { ) -> anyhow::Result<RequestUser> {
let jwt = headers let user_id = match (
.get("authorization") headers.get("authorization"),
.context("no authorization header provided. must be Bearer <jwt_token>")? headers.get("x-api-key"),
.to_str()? headers.get("x-api-secret"),
.replace("Bearer ", "") ) {
.replace("bearer ", ""); (Some(jwt), _, _) => {
let user = self // USE JWT
.auth_jwt_check_enabled(&jwt) let jwt = jwt
.await .to_str()
.context("failed to authenticate jwt")?; .context("jwt is not str")?
Ok(user) .replace("Bearer ", "")
.replace("bearer ", "");
self
.auth_jwt_get_user_id(&jwt)
.await
.context("failed to authenticate jwt")?
}
(None, Some(key), Some(secret)) => {
// USE API KEY / SECRET
let key = key.to_str().context("key is not str")?;
let secret = secret.to_str().context("secret is not str")?;
self
.auth_api_key_get_user_id(key, secret)
.await
.context("failed to authenticate api key")?
}
_ => {
// AUTH FAIL
return Err(anyhow!("must attach either AUTHORIZATION header with jwt OR pass X-API-KEY and X-API-SECRET"));
}
};
let user = self.get_user(&user_id).await?;
if user.enabled {
let user = InnerRequestUser {
id: user_id,
username: user.username,
is_admin: user.admin,
create_server_permissions: user.create_server_permissions,
create_build_permissions: user.create_build_permissions,
};
Ok(user.into())
} else {
Err(anyhow!("user not enabled"))
}
}
pub async fn auth_jwt_get_user_id(
&self,
jwt: &str,
) -> anyhow::Result<String> {
let claims: JwtClaims = jwt
.verify_with_key(&self.jwt.key)
.context("failed to verify claims")?;
if claims.exp > unix_timestamp_ms() {
Ok(claims.id)
} else {
Err(anyhow!("token has expired"))
}
} }
pub async fn auth_jwt_check_enabled( pub async fn auth_jwt_check_enabled(
@@ -156,4 +206,30 @@ impl State {
Err(anyhow!("token has expired")) Err(anyhow!("token has expired"))
} }
} }
pub async fn auth_api_key_get_user_id(
&self,
key: &str,
secret: &str,
) -> anyhow::Result<String> {
let key = self
.db
.api_keys
.find_one(doc! { "key": key }, None)
.await
.context("failed to query db")?
.context("no api key matching key")?;
if key.expires != 0 && key.expires < monitor_timestamp() {
return Err(anyhow!("api key expired"));
}
if bcrypt::verify(secret, &key.secret)
.context("failed to verify secret hash")?
{
// secret matches
Ok(key.user_id)
} else {
// secret mismatch
Err(anyhow!("invalid api secret"))
}
}
} }

View File

@@ -1,29 +1,23 @@
use std::{sync::Arc, time::Instant}; use std::{sync::Arc, time::Instant};
use axum::{ use axum::{
extract::Request, extract::Request, http::HeaderMap, middleware::Next,
http::{HeaderMap, StatusCode}, response::Response, routing::post, Extension, Json, Router,
middleware::Next,
response::Response,
routing::post,
Extension, Json, Router,
}; };
use axum_extra::{headers::ContentType, TypedHeader}; use axum_extra::{headers::ContentType, TypedHeader};
use rand::{distributions::Alphanumeric, thread_rng, Rng}; use rand::{distributions::Alphanumeric, thread_rng, Rng};
use resolver_api::Resolver; use resolver_api::Resolver;
use serror_axum::{AppError, AuthError};
use uuid::Uuid; use uuid::Uuid;
mod github; mod github;
mod google; mod google;
mod jwt; mod jwt;
mod local; mod local;
mod secret;
use crate::{ use crate::{
helpers::into_response_error,
api::auth::AuthRequest, api::auth::AuthRequest,
state::{State, StateExtension}, state::{State, StateExtension},
ResponseResult,
}; };
pub use self::jwt::{ pub use self::jwt::{
@@ -37,17 +31,8 @@ pub async fn auth_request(
headers: HeaderMap, headers: HeaderMap,
mut req: Request, mut req: Request,
next: Next, next: Next,
) -> ResponseResult<Response> { ) -> Result<Response, AuthError> {
let user = state let user = state.authenticate_check_enabled(&headers).await?;
.authenticate_check_enabled(&headers)
.await
.map_err(|e| {
(
StatusCode::UNAUTHORIZED,
TypedHeader(ContentType::json()),
format!("{e:#?}"),
)
})?;
req.extensions_mut().insert(user); req.extensions_mut().insert(user);
Ok(next.run(req).await) Ok(next.run(req).await)
} }
@@ -64,11 +49,11 @@ pub fn router(state: &State) -> Router {
if let Err(e) = &res { if let Err(e) = &res {
info!("/auth request {req_id} | ERROR: {e:?}"); info!("/auth request {req_id} | ERROR: {e:?}");
} }
let res = res.map_err(into_response_error)?; let res = res?;
let elapsed = timer.elapsed(); let elapsed = timer.elapsed();
info!("/auth request {req_id} | resolve time: {elapsed:?}"); info!("/auth request {req_id} | resolve time: {elapsed:?}");
debug!("/auth request {req_id} | RESPONSE: {res}"); debug!("/auth request {req_id} | RESPONSE: {res}");
ResponseResult::Ok((TypedHeader(ContentType::json()), res)) Result::<_, AppError>::Ok((TypedHeader(ContentType::json()), res))
}, },
), ),
); );

View File

@@ -1,98 +0,0 @@
use anyhow::{anyhow, Context};
use async_timing_util::unix_timestamp_ms;
use axum::async_trait;
use monitor_client::api::auth::{
LoginWithSecret, LoginWithSecretResponse,
};
use mungos::{by_id::update_one_by_id, mongodb::bson::doc};
use resolver_api::Resolve;
use crate::state::State;
#[async_trait]
impl Resolve<LoginWithSecret> for State {
async fn resolve(
&self,
LoginWithSecret { username, secret }: LoginWithSecret,
_: (),
) -> anyhow::Result<LoginWithSecretResponse> {
let user = self
.db
.users
.find_one(doc! { "username": &username }, None)
.await
.context("failed at mongo query")?
.ok_or(anyhow!("did not find user with username {username}"))?;
let ts = unix_timestamp_ms() as i64;
for s in user.secrets {
if let Some(expires) = s.expires {
if expires < ts {
update_one_by_id(
&self.db.users,
&user.id,
doc! { "$pull": { "secrets": { "name": s.name } } },
None,
)
.await
.context("failed to remove expired secret")?;
continue;
}
}
if bcrypt::verify(&secret, &s.hash)
.context("failed at verifying hash")?
{
let jwt = self
.jwt
.generate(user.id)
.context("failed at generating jwt for user")?;
return Ok(LoginWithSecretResponse { jwt });
}
}
Err(anyhow!("invalid secret"))
}
}
// pub fn router() -> Router {
// Router::new().route(
// "/login",
// post(|db, jwt, body| async { login(db, jwt, body).await.map_err(|e| ()) }),
// )
// }
// pub async fn login(
// state: StateExtension,
// Json(SecretLoginBody { username, secret }): Json<SecretLoginBody>,
// ) -> anyhow::Result<String> {
// let user = state
// .db
// .users
// .find_one(doc! { "username": &username }, None)
// .await
// .context("failed at mongo query")?
// .ok_or(anyhow!("did not find user with username {username}"))?;
// let ts = unix_timestamp_ms() as i64;
// for s in user.secrets {
// if let Some(expires) = s.expires {
// let expires = unix_from_monitor_ts(&expires)?;
// if expires < ts {
// state
// .db
// .users
// .update_one::<Document>(
// &user.id,
// Update::Custom(doc! { "$pull": { "secrets": { "name": s.name } } }),
// )
// .await
// .context("failed to remove expired secret")?;
// continue;
// }
// }
// if bcrypt::verify(&secret, &s.hash).context("failed at verifying hash")? {
// let jwt = jwt
// .generate(user.id)
// .context("failed at generating jwt for user")?;
// return Ok(jwt);
// }
// }
// Err(anyhow!("invalid secret"))
// }

View File

@@ -1,8 +1,6 @@
use std::time::Duration; use std::time::Duration;
use anyhow::{anyhow, Context}; use anyhow::{anyhow, Context};
use axum::http::StatusCode;
use axum_extra::{headers::ContentType, TypedHeader};
use monitor_client::entities::{ use monitor_client::entities::{
deployment::{Deployment, DockerContainerState}, deployment::{Deployment, DockerContainerState},
monitor_timestamp, monitor_timestamp,
@@ -18,7 +16,6 @@ use mungos::{
}; };
use periphery_client::{requests, PeripheryClient}; use periphery_client::{requests, PeripheryClient};
use rand::{thread_rng, Rng}; use rand::{thread_rng, Rng};
use serror::serialize_error_pretty;
use crate::{auth::RequestUser, state::State}; use crate::{auth::RequestUser, state::State};
@@ -61,16 +58,6 @@ pub fn make_update(
} }
} }
pub fn into_response_error(
e: anyhow::Error,
) -> (StatusCode, TypedHeader<ContentType>, String) {
(
StatusCode::INTERNAL_SERVER_ERROR,
TypedHeader(ContentType::json()),
serialize_error_pretty(e),
)
}
impl State { impl State {
pub async fn get_user( pub async fn get_user(
&self, &self,

View File

@@ -2,8 +2,7 @@
extern crate log; extern crate log;
use anyhow::Context; use anyhow::Context;
use axum::{http::StatusCode, Extension, Router}; use axum::{Extension, Router};
use axum_extra::{headers::ContentType, TypedHeader};
use termination_signal::tokio::immediate_term_handle; use termination_signal::tokio::immediate_term_handle;
mod api; mod api;
@@ -16,9 +15,6 @@ mod monitor;
mod state; mod state;
mod ws; mod ws;
type ResponseResult<T> =
Result<T, (StatusCode, TypedHeader<ContentType>, String)>;
async fn app() -> anyhow::Result<()> { async fn app() -> anyhow::Result<()> {
let state = state::State::load().await?; let state = state::State::load().await?;

View File

@@ -60,22 +60,22 @@ pub struct ApiSecret {
pub expires: Option<String>, pub expires: Option<String>,
} }
impl TryFrom<ApiSecret> // impl TryFrom<ApiSecret>
for monitor_client::entities::user::ApiSecret // for monitor_client::entities::user::ApiSecret
{ // {
type Error = anyhow::Error; // type Error = anyhow::Error;
fn try_from(value: ApiSecret) -> Result<Self, Self::Error> { // fn try_from(value: ApiSecret) -> Result<Self, Self::Error> {
let secret = Self { // let secret = Self {
name: value.name, // name: value.name,
hash: value.hash, // hash: value.hash,
created_at: unix_from_monitor_ts(&value.created_at)?, // created_at: unix_from_monitor_ts(&value.created_at)?,
expires: value // expires: value
.expires // .expires
.and_then(|exp| unix_from_monitor_ts(&exp).ok()), // .and_then(|exp| unix_from_monitor_ts(&exp).ok()),
}; // };
Ok(secret) // Ok(secret)
} // }
} // }
impl TryFrom<User> for monitor_client::entities::user::User { impl TryFrom<User> for monitor_client::entities::user::User {
type Error = anyhow::Error; type Error = anyhow::Error;
@@ -88,11 +88,6 @@ impl TryFrom<User> for monitor_client::entities::user::User {
create_server_permissions: value.create_server_permissions, create_server_permissions: value.create_server_permissions,
create_build_permissions: value.create_build_permissions, create_build_permissions: value.create_build_permissions,
avatar: value.avatar, avatar: value.avatar,
secrets: value
.secrets
.into_iter()
.map(|s| s.try_into())
.collect::<anyhow::Result<_>>()?,
password: value.password, password: value.password,
github_id: value.github_id, github_id: value.github_id,
google_id: value.google_id, google_id: value.google_id,

View File

@@ -6,7 +6,6 @@ use monitor_client::{
server::PartialServerConfig, server::PartialServerConfig,
}, },
}; };
use serde::Deserialize;
#[allow(unused)] #[allow(unused)]
pub async fn tests() -> anyhow::Result<()> { pub async fn tests() -> anyhow::Result<()> {
@@ -118,32 +117,32 @@ async fn create_server(
Ok(()) Ok(())
} }
#[derive(Deserialize)] // #[derive(Deserialize)]
struct CreateSecretEnv { // struct CreateSecretEnv {
monitor_address: String, // monitor_address: String,
monitor_username: String, // monitor_username: String,
monitor_password: String, // monitor_password: String,
} // }
#[allow(unused)] // #[allow(unused)]
async fn create_secret() -> anyhow::Result<()> { // async fn create_secret() -> anyhow::Result<()> {
let env: CreateSecretEnv = envy::from_env()?; // let env: CreateSecretEnv = envy::from_env()?;
let monitor = MonitorClient::new_with_new_account( // let monitor = MonitorClient::new_with_new_account(
env.monitor_address, // env.monitor_address,
env.monitor_username, // env.monitor_username,
env.monitor_password, // env.monitor_password,
) // )
.await?; // .await?;
let secret = monitor // let secret = monitor
.write(write::CreateLoginSecret { // .write(write::CreateLoginSecret {
name: "tests".to_string(), // name: "tests".to_string(),
expires: None, // expires: None,
}) // })
.await?; // .await?;
println!("{secret:#?}"); // println!("{secret:#?}");
Ok(()) // Ok(())
} // }

View File

@@ -11,16 +11,16 @@ async fn app() -> anyhow::Result<()> {
let monitor = MonitorClient::new_from_env().await?; let monitor = MonitorClient::new_from_env().await?;
let (mut rx, _) = monitor.subscribe_to_updates(1000, 5); // let (mut rx, _) = monitor.subscribe_to_updates(1000, 5);
loop { // loop {
let msg = rx.recv().await; // let msg = rx.recv().await;
if let Err(e) = msg { // if let Err(e) = msg {
error!("🚨 recv error | {e:#?}"); // error!("🚨 recv error | {e:#?}");
break; // break;
} // }
info!("{msg:#?}") // info!("{msg:#?}")
} // }
Ok(()) Ok(())
} }

View File

@@ -80,22 +80,3 @@ pub struct ExchangeForJwtResponse {
} }
// //
#[typeshare]
#[derive(
Serialize, Deserialize, Debug, Clone, Request, EmptyTraits,
)]
#[empty_traits(MonitorAuthRequest)]
#[response(LoginWithSecretResponse)]
pub struct LoginWithSecret {
pub username: String,
pub secret: String,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct LoginWithSecretResponse {
pub jwt: String,
}
//

View File

@@ -27,7 +27,7 @@ pub use server::*;
pub use tag::*; pub use tag::*;
pub use update::*; pub use update::*;
use crate::entities::{user::User, Timelength}; use crate::entities::{api_key::ApiKey, user::User, Timelength};
pub trait MonitorReadRequest: HasResponse {} pub trait MonitorReadRequest: HasResponse {}
@@ -62,6 +62,19 @@ pub type GetUserResponse = User;
// //
#[typeshare]
#[derive(
Serialize, Deserialize, Debug, Clone, Request, EmptyTraits,
)]
#[empty_traits(MonitorReadRequest)]
#[response(ListApiKeysResponse)]
pub struct ListApiKeys {}
#[typeshare]
pub type ListApiKeysResponse = Vec<ApiKey>;
//
#[typeshare] #[typeshare]
#[derive( #[derive(
Serialize, Deserialize, Debug, Clone, Request, EmptyTraits, Serialize, Deserialize, Debug, Clone, Request, EmptyTraits,

View File

@@ -14,15 +14,22 @@ use super::MonitorWriteRequest;
Serialize, Deserialize, Debug, Clone, Request, EmptyTraits, Serialize, Deserialize, Debug, Clone, Request, EmptyTraits,
)] )]
#[empty_traits(MonitorWriteRequest)] #[empty_traits(MonitorWriteRequest)]
#[response(CreateLoginSecretResponse)] #[response(CreateApiKeyResponse)]
pub struct CreateLoginSecret { pub struct CreateApiKey {
pub name: String, pub name: String,
pub expires: Option<I64>,
#[serde(default)]
pub expires: I64,
} }
#[typeshare] #[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CreateLoginSecretResponse { pub struct CreateApiKeyResponse {
/// X-API-KEY
pub key: String,
/// X-API-SECRET
/// There is no way to get the secret again after it is distributed in this message
pub secret: String, pub secret: String,
} }
@@ -33,11 +40,11 @@ pub struct CreateLoginSecretResponse {
Serialize, Deserialize, Debug, Clone, Request, EmptyTraits, Serialize, Deserialize, Debug, Clone, Request, EmptyTraits,
)] )]
#[empty_traits(MonitorWriteRequest)] #[empty_traits(MonitorWriteRequest)]
#[response(DeleteLoginSecretResponse)] #[response(DeleteApiKeyResponse)]
pub struct DeleteLoginSecret { pub struct DeleteApiKey {
pub name: String, pub key: String,
} }
#[typeshare] #[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)] #[derive(Serialize, Deserialize, Debug, Clone)]
pub struct DeleteLoginSecretResponse {} pub struct DeleteApiKeyResponse {}

View File

@@ -1,4 +1,5 @@
mod alerter; mod alerter;
mod api_key;
mod build; mod build;
mod builder; mod builder;
mod deployment; mod deployment;
@@ -7,12 +8,12 @@ mod launch;
mod permissions; mod permissions;
mod procedure; mod procedure;
mod repo; mod repo;
mod secret;
mod server; mod server;
mod tags; mod tags;
mod user; mod user;
pub use alerter::*; pub use alerter::*;
pub use api_key::*;
pub use build::*; pub use build::*;
pub use builder::*; pub use builder::*;
pub use deployment::*; pub use deployment::*;
@@ -21,7 +22,6 @@ pub use launch::*;
pub use permissions::*; pub use permissions::*;
pub use procedure::*; pub use procedure::*;
pub use repo::*; pub use repo::*;
pub use secret::*;
pub use server::*; pub use server::*;
pub use tags::*; pub use tags::*;
pub use user::*; pub use user::*;

View File

@@ -0,0 +1,37 @@
use mongo_indexed::derive::MongoIndexed;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use super::I64;
#[typeshare]
#[derive(
Serialize, Deserialize, Debug, Clone, Default, MongoIndexed,
)]
pub struct ApiKey {
/// UNIQUE KEY ASSOCIATED WITH SECRET
#[unique_index]
pub key: String,
/// HASH OF THE SECRET
pub secret: String,
/// USER ASSOCIATED WITH THE API KEY
#[index]
pub user_id: String,
/// NAME ASSOCIATED WITH THE API KEY FOR MANAGEMENT
pub name: String,
/// TIMESTAMP OF KEY CREATION
pub created_at: I64,
/// EXPIRY OF KEY, OR 0 IF NEVER EXPIRES
pub expires: I64,
}
impl ApiKey {
pub fn sanitize(&mut self) {
self.secret.clear()
}
}

View File

@@ -8,6 +8,7 @@ use typeshare::typeshare;
pub mod alert; pub mod alert;
pub mod alerter; pub mod alerter;
pub mod api_key;
pub mod build; pub mod build;
pub mod builder; pub mod builder;
pub mod config; pub mod config;

View File

@@ -49,5 +49,5 @@ impl From<&ProcedureConfig> for ProcedureConfigVariant {
#[typeshare] #[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Default)] #[derive(Serialize, Deserialize, Debug, Clone, Default)]
pub struct ProcedureActionState { pub struct ProcedureActionState {
pub running: bool pub running: bool,
} }

View File

@@ -38,9 +38,6 @@ pub struct User {
pub avatar: Option<String>, pub avatar: Option<String>,
#[serde(default)]
pub secrets: Vec<ApiSecret>,
pub password: Option<String>, pub password: Option<String>,
#[sparse_index] #[sparse_index]
@@ -59,14 +56,9 @@ pub struct User {
pub updated_at: I64, pub updated_at: I64,
} }
#[typeshare] impl User {
#[derive( /// Prepares user object for transport by removing any sensitive fields
Serialize, Deserialize, Debug, Clone, Default, PartialEq, pub fn sanitize(&mut self) {
)] self.password = None;
pub struct ApiSecret { }
pub name: String,
#[serde(default, skip_serializing_if = "String::is_empty")]
pub hash: String,
pub created_at: I64,
pub expires: Option<I64>,
} }

View File

@@ -1,4 +1,5 @@
use anyhow::{anyhow, Context}; use anyhow::Context;
use api::read::GetVersion;
use serde::Deserialize; use serde::Deserialize;
pub mod api; pub mod api;
@@ -12,156 +13,46 @@ mod subscribe;
#[derive(Deserialize)] #[derive(Deserialize)]
struct MonitorEnv { struct MonitorEnv {
monitor_address: String, monitor_address: String,
monitor_token: Option<String>, monitor_api_key: String,
monitor_username: Option<String>, monitor_api_secret: String,
monitor_password: Option<String>,
monitor_secret: Option<String>,
} }
#[derive(Clone)] #[derive(Clone)]
pub struct MonitorClient { pub struct MonitorClient {
reqwest: reqwest::Client, reqwest: reqwest::Client,
address: String, address: String,
jwt: String, key: String,
creds: Option<RefreshTokenCreds>,
}
#[derive(Clone)]
struct RefreshTokenCreds {
username: String,
secret: String, secret: String,
} }
impl MonitorClient { impl MonitorClient {
pub fn new_with_token( pub async fn new(
address: impl Into<String>, address: impl Into<String>,
token: impl Into<String>, key: impl Into<String>,
) -> MonitorClient {
MonitorClient {
reqwest: Default::default(),
address: address.into(),
jwt: token.into(),
creds: None,
}
}
pub async fn new_with_password(
address: impl Into<String>,
username: impl Into<String>,
password: impl Into<String>,
) -> anyhow::Result<MonitorClient> {
let mut client = MonitorClient {
reqwest: Default::default(),
address: address.into(),
jwt: Default::default(),
creds: None,
};
let api::auth::LoginLocalUserResponse { jwt } = client
.auth(api::auth::LoginLocalUser {
username: username.into(),
password: password.into(),
})
.await?;
client.jwt = jwt;
Ok(client)
}
pub async fn new_with_new_account(
address: impl Into<String>,
username: impl Into<String>,
password: impl Into<String>,
) -> anyhow::Result<MonitorClient> {
let mut client = MonitorClient {
reqwest: Default::default(),
address: address.into(),
jwt: Default::default(),
creds: None,
};
let api::auth::CreateLocalUserResponse { jwt } = client
.auth(api::auth::CreateLocalUser {
username: username.into(),
password: password.into(),
})
.await?;
client.jwt = jwt;
Ok(client)
}
pub async fn new_with_secret(
address: impl Into<String>,
username: impl Into<String>,
secret: impl Into<String>, secret: impl Into<String>,
) -> anyhow::Result<MonitorClient> { ) -> anyhow::Result<MonitorClient> {
let mut client = MonitorClient { let client = MonitorClient {
reqwest: Default::default(), reqwest: Default::default(),
address: address.into(), address: address.into(),
jwt: Default::default(), key: key.into(),
creds: RefreshTokenCreds { secret: secret.into(),
username: username.into(),
secret: secret.into(),
}
.into(),
}; };
client.read(GetVersion {}).await?;
client.refresh_jwt().await?;
Ok(client) Ok(client)
} }
pub async fn new_from_env() -> anyhow::Result<MonitorClient> { pub async fn new_from_env() -> anyhow::Result<MonitorClient> {
let env = envy::from_env::<MonitorEnv>() let MonitorEnv {
monitor_address,
monitor_api_key,
monitor_api_secret,
} = envy::from_env()
.context("failed to parse environment for monitor client")?; .context("failed to parse environment for monitor client")?;
if let Some(token) = env.monitor_token { MonitorClient::new(
Ok(MonitorClient::new_with_token(&env.monitor_address, token)) monitor_address,
} else if let Some(password) = env.monitor_password { monitor_api_key,
let username = env.monitor_username.ok_or(anyhow!( monitor_api_secret,
"must provide MONITOR_USERNAME to authenticate with MONITOR_PASSWORD" )
))?; .await
MonitorClient::new_with_password(
&env.monitor_address,
username,
password,
)
.await
} else if let Some(secret) = env.monitor_secret {
let username = env.monitor_username.ok_or(anyhow!(
"must provide MONITOR_USERNAME to authenticate with MONITOR_SECRET"
))?;
MonitorClient::new_with_secret(
&env.monitor_address,
username,
secret,
)
.await
} else {
Err(anyhow!("failed to initialize monitor client from env | must provide one of: (MONITOR_TOKEN), (MONITOR_USERNAME and MONITOR_PASSWORD), (MONITOR_USERNAME and MONITOR_SECRET)"))
}
}
pub async fn refresh_jwt(&mut self) -> anyhow::Result<()> {
if self.creds.is_none() {
return Err(anyhow!(
"only clients initialized using the secret login method can refresh their jwt"
));
}
let creds = self.creds.clone().unwrap();
let api::auth::LoginWithSecretResponse { jwt } = self
.auth(api::auth::LoginWithSecret {
username: creds.username,
secret: creds.secret,
})
.await?;
self.jwt = jwt;
Ok(())
} }
} }

View File

@@ -17,12 +17,11 @@ impl MonitorClient {
&self, &self,
request: T, request: T,
) -> anyhow::Result<T::Response> { ) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self self
.post( .post(
"/auth", "/auth",
json!({ json!({
"type": req_type, "type": T::req_type(),
"params": request "params": request
}), }),
) )
@@ -33,13 +32,12 @@ impl MonitorClient {
&self, &self,
request: T, request: T,
) -> anyhow::Result<T::Response> { ) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self self
.post( .post(
"/read", "/read",
json!({ json!({
"type": req_type, "type": T::req_type(),
"params": request "params": request
}), }),
) )
.await .await
@@ -49,13 +47,12 @@ impl MonitorClient {
&self, &self,
request: T, request: T,
) -> anyhow::Result<T::Response> { ) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self self
.post( .post(
"/write", "/write",
json!({ json!({
"type": req_type, "type": T::req_type(),
"params": request "params": request
}), }),
) )
.await .await
@@ -65,13 +62,12 @@ impl MonitorClient {
&self, &self,
request: T, request: T,
) -> anyhow::Result<T::Response> { ) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self self
.post( .post(
"/execute", "/execute",
json!({ json!({
"type": req_type, "type": T::req_type(),
"params": request "params": request
}), }),
) )
.await .await
@@ -85,7 +81,8 @@ impl MonitorClient {
let req = self let req = self
.reqwest .reqwest
.post(format!("{}{endpoint}", self.address)) .post(format!("{}{endpoint}", self.address))
.header("Authorization", format!("Bearer {}", self.jwt)); .header("x-api-key", &self.key)
.header("x-api-secret", &self.secret);
let req = if let Some(body) = body.into() { let req = if let Some(body) = body.into() {
req.header("Content-Type", "application/json").json(&body) req.header("Content-Type", "application/json").json(&body)
} else { } else {

View File

@@ -1,185 +1,175 @@
use std::time::Duration; use std::time::Duration;
use anyhow::Context; // use anyhow::Context;
use futures::{SinkExt, TryStreamExt}; // use futures::{SinkExt, TryStreamExt};
use serror::serialize_error; // use serror::serialize_error;
use thiserror::Error; // use thiserror::Error;
use tokio::sync::broadcast; // use tokio::sync::broadcast;
use tokio_tungstenite::{connect_async, tungstenite::Message}; // use tokio_tungstenite::{connect_async, tungstenite::Message};
use tokio_util::sync::CancellationToken; // use tokio_util::sync::CancellationToken;
use crate::{entities::update::UpdateListItem, MonitorClient}; // use crate::{entities::update::UpdateListItem, MonitorClient};
#[derive(Debug, Clone)] // #[derive(Debug, Clone)]
pub enum UpdateWsMessage { // pub enum UpdateWsMessage {
Update(UpdateListItem), // Update(UpdateListItem),
Error(UpdateWsError), // Error(UpdateWsError),
Disconnected, // Disconnected,
Reconnected, // Reconnected,
} // }
#[derive(Error, Debug, Clone)] // #[derive(Error, Debug, Clone)]
pub enum UpdateWsError { // pub enum UpdateWsError {
#[error("failed to connect | {0}")] // #[error("failed to connect | {0}")]
ConnectionError(String), // ConnectionError(String),
#[error("failed to login | {0}")] // #[error("failed to login | {0}")]
LoginError(String), // LoginError(String),
#[error("failed to recieve message | {0}")] // #[error("failed to recieve message | {0}")]
MessageError(String), // MessageError(String),
#[error("did not recognize message | {0}")] // #[error("did not recognize message | {0}")]
MessageUnrecognized(String), // MessageUnrecognized(String),
} // }
impl MonitorClient { // impl MonitorClient {
pub fn subscribe_to_updates( // pub fn subscribe_to_updates(
&self, // &self,
capacity: usize, // capacity: usize,
retry_cooldown_secs: u64, // retry_cooldown_secs: u64,
) -> (broadcast::Receiver<UpdateWsMessage>, CancellationToken) { // ) -> (broadcast::Receiver<UpdateWsMessage>, CancellationToken) {
let (tx, rx) = broadcast::channel(capacity); // let (tx, rx) = broadcast::channel(capacity);
let cancel = CancellationToken::new(); // let cancel = CancellationToken::new();
let cancel_clone = cancel.clone(); // let cancel_clone = cancel.clone();
let address = // let address =
format!("{}/ws/update", self.address.replacen("http", "ws", 1)); // format!("{}/ws/update", self.address.replacen("http", "ws", 1));
let mut client = self.clone(); // let mut client = self.clone();
tokio::spawn(async move { // tokio::spawn(async move {
loop { // loop {
// OUTER LOOP (LONG RECONNECT) // // OUTER LOOP (LONG RECONNECT)
if cancel.is_cancelled() { // if cancel.is_cancelled() {
break; // break;
} // }
loop { // loop {
// INNER LOOP (SHORT RECONNECT) // // INNER LOOP (SHORT RECONNECT)
if cancel.is_cancelled() { // if cancel.is_cancelled() {
break; // break;
} // }
if client.creds.is_some() { // let res = connect_async(&address)
let res = client.refresh_jwt().await; // .await
if let Err(e) = res { // .context("failed to connect to websocket endpoint");
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::LoginError(serialize_error(e)),
));
break;
}
}
let res = connect_async(&address) // if let Err(e) = res {
.await // let _ = tx.send(UpdateWsMessage::Error(
.context("failed to connect to websocket endpoint"); // UpdateWsError::ConnectionError(serialize_error(e)),
// ));
// break;
// }
if let Err(e) = res { // let (mut ws, _) = res.unwrap();
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::ConnectionError(serialize_error(e)),
));
break;
}
let (mut ws, _) = res.unwrap(); // // ==================
// // SEND LOGIN MSG
// // ==================
// let login_send_res = ws
// .send(Message::Text(client.jwt.clone()))
// .await
// .context("failed to send login message");
// ================== // if let Err(e) = login_send_res {
// SEND LOGIN MSG // let _ = tx.send(UpdateWsMessage::Error(
// ================== // UpdateWsError::LoginError(serialize_error(e)),
let login_send_res = ws // ));
.send(Message::Text(client.jwt.clone())) // break;
.await // }
.context("failed to send login message");
if let Err(e) = login_send_res { // // ==================
let _ = tx.send(UpdateWsMessage::Error( // // HANDLE LOGIN RES
UpdateWsError::LoginError(serialize_error(e)), // // ==================
)); // match ws.try_next().await {
break; // Ok(Some(Message::Text(msg))) => {
} // if msg != "LOGGED_IN" {
// let _ = tx.send(UpdateWsMessage::Error(
// UpdateWsError::LoginError(msg),
// ));
// let _ = ws.close(None).await;
// break;
// }
// }
// Ok(Some(msg)) => {
// let _ = tx.send(UpdateWsMessage::Error(
// UpdateWsError::LoginError(format!("{msg:#?}")),
// ));
// let _ = ws.close(None).await;
// break;
// }
// Ok(None) => {
// let _ = tx.send(UpdateWsMessage::Error(
// UpdateWsError::LoginError(String::from(
// "got None message after login message",
// )),
// ));
// let _ = ws.close(None).await;
// break;
// }
// Err(e) => {
// let _ = tx.send(UpdateWsMessage::Error(
// UpdateWsError::LoginError(format!(
// "failed to recieve message | {e:#?}"
// )),
// ));
// let _ = ws.close(None).await;
// break;
// }
// }
// ================== // let _ = tx.send(UpdateWsMessage::Reconnected);
// HANDLE LOGIN RES
// ==================
match ws.try_next().await {
Ok(Some(Message::Text(msg))) => {
if msg != "LOGGED_IN" {
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::LoginError(msg),
));
let _ = ws.close(None).await;
break;
}
}
Ok(Some(msg)) => {
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::LoginError(format!("{msg:#?}")),
));
let _ = ws.close(None).await;
break;
}
Ok(None) => {
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::LoginError(String::from(
"got None message after login message",
)),
));
let _ = ws.close(None).await;
break;
}
Err(e) => {
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::LoginError(format!(
"failed to recieve message | {e:#?}"
)),
));
let _ = ws.close(None).await;
break;
}
}
let _ = tx.send(UpdateWsMessage::Reconnected); // // ==================
// // HANLDE MSGS
// // ==================
// loop {
// match ws
// .try_next()
// .await
// .context("failed to recieve message")
// {
// Ok(Some(Message::Text(msg))) => {
// match serde_json::from_str::<UpdateListItem>(&msg) {
// Ok(msg) => {
// let _ = tx.send(UpdateWsMessage::Update(msg));
// }
// Err(_) => {
// let _ = tx.send(UpdateWsMessage::Error(
// UpdateWsError::MessageUnrecognized(msg),
// ));
// }
// }
// }
// Ok(Some(Message::Close(_))) => {
// let _ = tx.send(UpdateWsMessage::Disconnected);
// let _ = ws.close(None).await;
// break;
// }
// Err(e) => {
// let _ = tx.send(UpdateWsMessage::Error(
// UpdateWsError::MessageError(serialize_error(e)),
// ));
// let _ = tx.send(UpdateWsMessage::Disconnected);
// let _ = ws.close(None).await;
// break;
// }
// Ok(_) => {
// // ignore
// }
// }
// }
// }
// tokio::time::sleep(Duration::from_secs(retry_cooldown_secs))
// .await;
// }
// });
// ================== // (rx, cancel_clone)
// HANLDE MSGS // }
// ================== // }
loop {
match ws
.try_next()
.await
.context("failed to recieve message")
{
Ok(Some(Message::Text(msg))) => {
match serde_json::from_str::<UpdateListItem>(&msg) {
Ok(msg) => {
let _ = tx.send(UpdateWsMessage::Update(msg));
}
Err(_) => {
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::MessageUnrecognized(msg),
));
}
}
}
Ok(Some(Message::Close(_))) => {
let _ = tx.send(UpdateWsMessage::Disconnected);
let _ = ws.close(None).await;
break;
}
Err(e) => {
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::MessageError(serialize_error(e)),
));
let _ = tx.send(UpdateWsMessage::Disconnected);
let _ = ws.close(None).await;
break;
}
Ok(_) => {
// ignore
}
}
}
}
tokio::time::sleep(Duration::from_secs(retry_cooldown_secs))
.await;
}
});
(rx, cancel_clone)
}
}

View File

@@ -5,6 +5,7 @@ use mongo_indexed::{create_index, create_unique_index, Indexed};
use monitor_client::entities::{ use monitor_client::entities::{
alert::Alert, alert::Alert,
alerter::Alerter, alerter::Alerter,
api_key::ApiKey,
build::Build, build::Build,
builder::Builder, builder::Builder,
config::MongoConfig, config::MongoConfig,
@@ -23,6 +24,7 @@ use mungos::{
pub struct DbClient { pub struct DbClient {
pub users: Collection<User>, pub users: Collection<User>,
pub api_keys: Collection<ApiKey>,
pub tags: Collection<CustomTag>, pub tags: Collection<CustomTag>,
pub updates: Collection<Update>, pub updates: Collection<Update>,
pub alerts: Collection<Alert>, pub alerts: Collection<Alert>,
@@ -76,6 +78,7 @@ impl DbClient {
let client = DbClient { let client = DbClient {
users: User::collection(&db, true).await?, users: User::collection(&db, true).await?,
api_keys: ApiKey::collection(&db, true).await?,
tags: CustomTag::collection(&db, true).await?, tags: CustomTag::collection(&db, true).await?,
updates: Update::collection(&db, true).await?, updates: Update::collection(&db, true).await?,
alerts: Alert::collection(&db, true).await?, alerts: Alert::collection(&db, true).await?,