procedure execution api

This commit is contained in:
mbecker20
2024-01-07 01:12:50 -08:00
parent 9f4cf475b6
commit e658cb3aa0
21 changed files with 631 additions and 60 deletions

34
Cargo.lock generated
View File

@@ -112,6 +112,17 @@ version = "1.0.79"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "080e9890a082662b09c1ad45f567faeeb47f22b5fb23895fbe1e651e718e25ca"
[[package]]
name = "async-recursion"
version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "async-trait"
version = "0.1.77"
@@ -2093,6 +2104,7 @@ name = "monitor_core"
version = "1.0.1"
dependencies = [
"anyhow",
"async-recursion",
"async-trait",
"async_timing_util",
"aws-config",
@@ -2174,28 +2186,6 @@ dependencies = [
"uuid",
]
[[package]]
name = "monitor_types"
version = "1.0.1"
dependencies = [
"anyhow",
"async-trait",
"async_timing_util",
"bollard",
"derive_builder",
"derive_empty_traits",
"derive_variants",
"mongo_indexed",
"mungos",
"partial_derive2",
"resolver_api",
"serde",
"serde_json",
"strum 0.25.0",
"strum_macros 0.25.3",
"typeshare",
]
[[package]]
name = "mungos"
version = "0.5.4"

View File

@@ -61,6 +61,7 @@ strum = "0.25.0"
strum_macros = "0.25.3"
sysinfo = "0.30.5"
async-trait = "0.1.77"
async-recursion = "1.0.5"
urlencoding = "2.1.3"
rand = "0.8.5"
jwt = "0.16.0"

View File

@@ -51,6 +51,7 @@ sha2.workspace = true
bcrypt.workspace = true
hex.workspace = true
async-trait.workspace = true
async-recursion.workspace = true
futures.workspace = true
aws-config.workspace = true
aws-sdk-ec2.workspace = true

View File

@@ -268,17 +268,7 @@ impl Resolve<StopContainer, RequestUser> for State {
let periphery = self.periphery_client(&server)?;
let inner = || async move {
let start_ts = monitor_timestamp();
let mut update = Update {
target: ResourceTarget::Deployment(deployment.id.clone()),
operation: Operation::StopContainer,
start_ts,
status: UpdateStatus::InProgress,
success: true,
operator: user.id.clone(),
..Default::default()
};
let mut update = make_update(&deployment, Operation::StopContainer, &user);
update.id = self.add_update(update.clone()).await?;

View File

@@ -18,6 +18,7 @@ use crate::{
mod build;
mod deployment;
mod procedure;
mod repo;
mod server;
@@ -45,6 +46,9 @@ enum ExecuteRequest {
// ==== REPO ====
CloneRepo(CloneRepo),
PullRepo(PullRepo),
// ==== PROCEDURE ====
RunProcedure(RunProcedure),
}
pub fn router() -> Router {

View File

@@ -0,0 +1,68 @@
use async_trait::async_trait;
use monitor_client::{
api::execute::RunProcedure,
entities::{
procedure::Procedure, update::Update, Operation, PermissionLevel,
},
};
use resolver_api::Resolve;
use serror::serialize_error_pretty;
use tokio::sync::Mutex;
use crate::{
auth::RequestUser,
helpers::{make_update, resource::StateResource},
state::State,
};
#[async_trait]
impl Resolve<RunProcedure, RequestUser> for State {
async fn resolve(
&self,
RunProcedure { procedure_id }: RunProcedure,
user: RequestUser,
) -> anyhow::Result<Update> {
let procedure: Procedure = self
.get_resource_check_permissions(
&procedure_id,
&user,
PermissionLevel::Execute,
)
.await?;
let map = self.make_procedure_map(&procedure).await?;
let mut update =
make_update(&procedure, Operation::StopContainer, &user);
update.in_progress();
update.push_simple_log(
"execute procedure",
format!("Executing procedure: {}", procedure.name),
);
update.id = self.add_update(update.clone()).await?;
let update = Mutex::new(update);
let res = self.execute_procedure(&procedure, &map, &update).await;
let mut update = update.into_inner();
match res {
Ok(_) => {
update.push_simple_log(
"execution ok",
"the procedure has completed with no errors",
);
}
Err(e) => update
.push_error_log("execution error", serialize_error_pretty(e)),
}
update.finalize();
self.update_update(update.clone()).await?;
Ok(update)
}
}

View File

@@ -9,6 +9,7 @@ use monitor_client::{
build::Build,
builder::Builder,
deployment::Deployment,
procedure::Procedure,
repo::Repo,
server::Server,
update::{ResourceTarget, Update, UpdateListItem},
@@ -191,6 +192,15 @@ impl Resolve<GetUpdate, RequestUser> for State {
)
.await?;
}
ResourceTarget::Procedure(id) => {
let _: Procedure = self
.get_resource_check_permissions(
id,
&user,
PermissionLevel::Read,
)
.await?;
}
}
Ok(update)
}

View File

@@ -4,8 +4,8 @@ use monitor_client::{
api::write::{UpdateDescription, UpdateDescriptionResponse},
entities::{
alerter::Alerter, build::Build, builder::Builder,
deployment::Deployment, repo::Repo, server::Server,
update::ResourceTarget,
deployment::Deployment, procedure::Procedure, repo::Repo,
server::Server, update::ResourceTarget,
},
};
use resolver_api::Resolve;
@@ -84,6 +84,15 @@ impl Resolve<UpdateDescription, RequestUser> for State {
)
.await?;
}
ResourceTarget::Procedure(id) => {
<State as StateResource<Procedure>>::update_description(
self,
&id,
&description,
&user,
)
.await?;
}
}
Ok(UpdateDescriptionResponse {})
}

View File

@@ -7,6 +7,7 @@ use monitor_client::{
entities::{
alerter::Alerter,
monitor_timestamp,
procedure::Procedure,
repo::Repo,
server::Server,
update::{Log, ResourceTarget, Update, UpdateStatus},
@@ -236,6 +237,22 @@ impl Resolve<UpdateUserPermissionsOnTarget, RequestUser> for State {
user.username, permission, alerter.name
)
}
ResourceTarget::Procedure(id) => {
let procedure: Procedure = self.get_resource(id).await?;
update_one_by_id(
&self.db.procedures,
id,
mungos::update::Update::Set(doc! {
format!("permissions.{}", user_id): permission.to_string()
}),
None,
)
.await?;
format!(
"user {} given {} permissions on procedure {}",
user.username, permission, procedure.id
)
}
};
let mut update = Update {
operation: Operation::UpdateUserPermissionsOnTarget,

View File

@@ -30,6 +30,18 @@ pub struct InnerRequestUser {
pub create_build_permissions: bool,
}
impl InnerRequestUser {
pub fn admin() -> InnerRequestUser {
InnerRequestUser {
id: String::from("admin"),
username: String::from("admin"),
is_admin: true,
create_build_permissions: true,
create_server_permissions: true,
}
}
}
#[derive(Serialize, Deserialize)]
pub struct JwtClaims {
pub id: String,

View File

@@ -1,12 +1,10 @@
use std::str::FromStr;
use anyhow::{anyhow, Context};
use async_timing_util::unix_timestamp_ms;
use axum::async_trait;
use monitor_client::api::auth::{
LoginWithSecret, LoginWithSecretResponse,
};
use mungos::mongodb::bson::{doc, oid::ObjectId};
use mungos::{by_id::update_one_by_id, mongodb::bson::doc};
use resolver_api::Resolve;
use crate::state::State;
@@ -29,16 +27,14 @@ impl Resolve<LoginWithSecret> for State {
for s in user.secrets {
if let Some(expires) = s.expires {
if expires < ts {
self
.db
.users
.update_one(
doc! { "_id": ObjectId::from_str(&user.id).context("user id is not valid ObjectId")? },
doc! { "$pull": { "secrets": { "name": s.name } } },
None,
)
.await
.context("failed to remove expired secret")?;
update_one_by_id(
&self.db.users,
&user.id,
doc! { "$pull": { "secrets": { "name": s.name } } },
None,
)
.await
.context("failed to remove expired secret")?;
continue;
}
}

View File

@@ -1,4 +1,4 @@
use std::{str::FromStr, time::Duration};
use std::time::Duration;
use anyhow::{anyhow, Context};
use axum::http::StatusCode;
@@ -13,8 +13,8 @@ use monitor_client::entities::{
Operation,
};
use mungos::{
by_id::find_one_by_id,
mongodb::bson::{doc, oid::ObjectId, to_bson},
by_id::{find_one_by_id, update_one_by_id},
mongodb::bson::{doc, to_bson, to_document},
};
use periphery_client::{requests, PeripheryClient};
use rand::{thread_rng, Rng};
@@ -27,6 +27,7 @@ use self::resource::StateResource;
pub mod alert;
pub mod cache;
pub mod channel;
pub mod procedure;
pub mod resource;
pub fn empty_or_only_spaces(word: &str) -> bool {
@@ -211,9 +212,7 @@ impl State {
&self,
update: Update,
) -> anyhow::Result<()> {
self.db
.updates
.update_one(doc! { "_id": ObjectId::from_str(&update.id)? }, doc! { "$set": to_bson(&update)? }, None)
update_one_by_id(&self.db.updates, &update.id, mungos::update::Update::Set(to_document(&update)?), None)
.await
.context("failed to update the update on db. the update build process was deleted")?;
let update = self.update_list_item(update).await?;

View File

@@ -0,0 +1,320 @@
use std::{
collections::HashMap, str::FromStr, sync::Arc, time::Duration,
};
use anyhow::{anyhow, Context, Ok};
use async_recursion::async_recursion;
use futures::future::join_all;
use monitor_client::{
api::execute::Execution,
entities::{
monitor_timestamp,
procedure::{Procedure, ProcedureConfig},
update::Update,
},
};
use mungos::{
find::find_collect,
mongodb::bson::{doc, oid::ObjectId},
};
use resolver_api::Resolve;
use tokio::sync::Mutex;
use crate::{auth::InnerRequestUser, state::State};
impl State {
/// ASSUMES FIRST LOG IS ALREADY CREATED
async fn add_line_to_update(
&self,
update: &Mutex<Update>,
line: &str,
) {
let mut lock = update.lock().await;
let log = &mut lock.logs[0];
log.stdout.push('\n');
log.stdout.push_str(line);
let update = lock.clone();
drop(lock);
if let Err(e) = self.update_update(update).await {
error!("failed to update an update during procedure | {e:#?}");
};
}
#[async_recursion]
pub async fn execute_procedure(
&self,
procedure: &Procedure,
map: &HashMap<String, Procedure>,
update: &Mutex<Update>,
) -> anyhow::Result<()> {
let start_ts = monitor_timestamp();
use ProcedureConfig::*;
match &procedure.config {
Execution(execution) => {
self
.add_line_to_update(
update,
&format!(
"executing: {} ({})",
procedure.name, procedure.id
),
)
.await;
self
.execute_execution(execution.to_owned())
.await
.with_context(|| {
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
format!(
"failed execution after {time:?}. {} ({})",
procedure.name, procedure.id
)
})?;
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
self
.add_line_to_update(
update,
&format!(
"finished execution in {time:?}: {} ({}) ✅",
procedure.name, procedure.id
),
)
.await;
Ok(())
}
Sequence(ids) => {
self
.add_line_to_update(
update,
&format!(
"executing sequence: {} ({})",
procedure.name, procedure.id
),
)
.await;
self.execute_sequence(ids, map, update).await.with_context(
|| {
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
format!(
"failed sequence execution after {time:?}. {} ({})",
procedure.name, procedure.id
)
},
)?;
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
self
.add_line_to_update(
update,
&format!(
"finished sequence execution in {time:?}: {} ({}) ✅",
procedure.name, procedure.id
),
)
.await;
Ok(())
}
Parallel(ids) => {
self
.add_line_to_update(
update,
&format!(
"executing parallel: {} ({})",
procedure.name, procedure.id
),
)
.await;
self.execute_parallel(ids, map, update).await.with_context(
|| {
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
format!(
"failed parallel execution after {time:?}. {} ({})",
procedure.name, procedure.id
)
},
)?;
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
self
.add_line_to_update(
update,
&format!(
"finished parallel execution in {time:?}: {} ({}) ✅",
procedure.name, procedure.id
),
)
.await;
Ok(())
}
}
}
async fn execute_execution(
&self,
execution: Execution,
) -> anyhow::Result<()> {
let user: Arc<_> = InnerRequestUser::admin().into();
let update =
match execution {
Execution::RunBuild(req) => self
.resolve(req, user)
.await
.context("failed at RunBuild")?,
Execution::Deploy(req) => {
self.resolve(req, user).await.context("failed at Deploy")?
}
Execution::StartContainer(req) => self
.resolve(req, user)
.await
.context("failed at StartContainer")?,
Execution::StopContainer(req) => self
.resolve(req, user)
.await
.context("failed at StopContainer")?,
Execution::StopAllContainers(req) => self
.resolve(req, user)
.await
.context("failed at StopAllContainers")?,
Execution::RemoveContainer(req) => self
.resolve(req, user)
.await
.context("failed at RemoveContainer")?,
Execution::CloneRepo(req) => self
.resolve(req, user)
.await
.context("failed at CloneRepo")?,
Execution::PullRepo(req) => self
.resolve(req, user)
.await
.context("failed at PullRepo")?,
Execution::PruneDockerNetworks(req) => self
.resolve(req, user)
.await
.context("failed at PruneDockerNetworks")?,
Execution::PruneDockerImages(req) => self
.resolve(req, user)
.await
.context("failed at PruneDockerImages")?,
Execution::PruneDockerContainers(req) => self
.resolve(req, user)
.await
.context("failed at PruneDockerContainers")?,
Execution::RunProcedure(req) => self
.resolve(req, user)
.await
.context("failed at RunProcedure")?,
};
if update.success {
Ok(())
} else {
Err(anyhow!(
"execution not successful. see update {}",
update.id
))
}
}
async fn execute_sequence(
&self,
ids: &[String],
map: &HashMap<String, Procedure>,
update: &Mutex<Update>,
) -> anyhow::Result<()> {
for id in ids {
let procedure = map.get(id).with_context(|| {
format!("no procedure on map with id {id}")
})?;
self.execute_procedure(procedure, map, update).await?;
}
Ok(())
}
async fn execute_parallel(
&self,
ids: &[String],
map: &HashMap<String, Procedure>,
update: &Mutex<Update>,
) -> anyhow::Result<()> {
let futures = ids.iter().map(|id| async {
let procedure = map.get(id).context("no procedure on map")?;
self.execute_procedure(procedure, map, update).await
});
join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
Ok(())
}
pub async fn make_procedure_map(
&self,
procedure: &Procedure,
) -> anyhow::Result<HashMap<String, Procedure>> {
let map = Default::default();
self.make_procedure_map_rec(procedure, &map).await?;
Ok(map.into_inner())
}
#[async_recursion]
async fn make_procedure_map_rec(
&self,
procedure: &Procedure,
map: &Mutex<HashMap<String, Procedure>>,
) -> anyhow::Result<()> {
use ProcedureConfig::*;
let more = match &procedure.config {
Execution(_) => return Ok(()),
Sequence(sequence) => sequence,
Parallel(parallel) => parallel,
};
let more_ids = more
.iter()
.map(|id| ObjectId::from_str(id).context("id is not ObjectId"))
.collect::<anyhow::Result<Vec<_>>>()?;
let procedures = find_collect(
&self.db.procedures,
doc! { "_id": { "$in": &more_ids } },
None,
)
.await
.context("failed to find procedures from db")?
.into_iter()
.map(|proc| (proc.id.clone(), proc))
.collect::<HashMap<_, _>>();
// make sure we aren't missing any procedures
for more in more {
if !procedures.contains_key(more) {
return Err(anyhow!(
"did not find a procedure with id {more}"
));
}
}
let futures = procedures.values().map(|procedure| async {
self.make_procedure_map_rec(procedure, map).await
});
join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<Vec<_>>>()?;
map.lock().await.extend(procedures);
Ok(())
}
}

View File

@@ -16,6 +16,9 @@ use monitor_client::{
Deployment, DeploymentImage, DeploymentListItem,
DeploymentListItemInfo,
},
procedure::{
Procedure, ProcedureListItem, ProcedureListItemInfo,
},
repo::{Repo, RepoInfo, RepoListItem},
server::{Server, ServerListItem, ServerListItemInfo},
update::ResourceTargetVariant,
@@ -390,3 +393,31 @@ impl StateResource<Alerter> for State {
})
}
}
#[async_trait]
impl StateResource<Procedure> for State {
type ListItem = ProcedureListItem;
fn name() -> &'static str {
"procedure"
}
fn coll(&self) -> &Collection<Procedure> {
&self.db.procedures
}
async fn to_list_item(
&self,
procedure: Procedure,
) -> anyhow::Result<ProcedureListItem> {
Ok(ProcedureListItem {
id: procedure.id,
name: procedure.name,
tags: procedure.tags,
resource_type: ResourceTargetVariant::Alerter,
info: ProcedureListItemInfo {
procedure_type: (&procedure.config).into(),
},
})
}
}

View File

@@ -15,6 +15,7 @@ use monitor_client::{
build::Build,
builder::Builder,
deployment::Deployment,
procedure::Procedure,
repo::Repo,
server::Server,
update::{ResourceTarget, ResourceTargetVariant},
@@ -224,6 +225,14 @@ impl State {
ResourceTargetVariant::Alerter,
)
}
ResourceTarget::Procedure(prodecure_id) => {
let resource: Procedure =
self.get_resource(prodecure_id).await?;
(
resource.get_user_permissions(&user.id),
ResourceTargetVariant::Procedure,
)
}
ResourceTarget::System(_) => {
return Err(anyhow!(
"user not admin, can't recieve system updates"

View File

@@ -1,12 +1,44 @@
use resolver_api::HasResponse;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
mod build;
mod deployment;
mod procedure;
mod repo;
mod server;
pub use build::*;
pub use deployment::*;
pub use procedure::*;
pub use repo::*;
use resolver_api::HasResponse;
pub use server::*;
pub trait MonitorExecuteRequest: HasResponse {}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "type", content = "params")]
pub enum Execution {
// PROCEDURE
RunProcedure(RunProcedure),
// BUILD
RunBuild(RunBuild),
// DEPLOYMENT
Deploy(Deploy),
StartContainer(StartContainer),
StopContainer(StopContainer),
StopAllContainers(StopAllContainers),
RemoveContainer(RemoveContainer),
// REPO
CloneRepo(CloneRepo),
PullRepo(PullRepo),
// SERVER
PruneDockerNetworks(PruneDockerNetworks),
PruneDockerImages(PruneDockerImages),
PruneDockerContainers(PruneDockerContainers),
}

View File

@@ -0,0 +1,18 @@
use derive_empty_traits::EmptyTraits;
use resolver_api::derive::Request;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::entities::update::Update;
use super::MonitorExecuteRequest;
#[typeshare]
#[derive(
Serialize, Deserialize, Debug, Clone, Request, EmptyTraits,
)]
#[empty_traits(MonitorExecuteRequest)]
#[response(Update)]
pub struct RunProcedure {
pub procedure_id: String,
}

View File

@@ -12,6 +12,7 @@ pub mod build;
pub mod builder;
pub mod config;
pub mod deployment;
pub mod procedure;
pub mod repo;
pub mod resource;
pub mod server;
@@ -392,6 +393,9 @@ pub enum Operation {
UpdateAlerter,
DeleteAlerter,
// procedure
RunProcedure,
// user
UpdateUserPermissions,
UpdateUserPermissionsOnTarget,

View File

@@ -0,0 +1,47 @@
use derive_variants::EnumVariants;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::api::execute::Execution;
use super::resource::{Resource, ResourceListItem};
#[typeshare]
pub type Procedure = Resource<ProcedureConfig, ()>;
#[typeshare]
pub type ProcedureListItem = ResourceListItem<ProcedureListItemInfo>;
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ProcedureListItemInfo {
pub procedure_type: ProcedureConfigVariant,
}
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, EnumVariants)]
#[variant_derive(Serialize, Deserialize, Debug, Clone, Copy)]
#[serde(tag = "type", content = "data")]
pub enum ProcedureConfig {
Execution(Execution),
/// Vec<ProcedureId>
Sequence(Vec<String>),
/// Vec<ProdecureId>
Parallel(Vec<String>),
}
impl From<&ProcedureConfig> for ProcedureConfigVariant {
fn from(value: &ProcedureConfig) -> Self {
match value {
ProcedureConfig::Execution(_) => {
ProcedureConfigVariant::Execution
}
ProcedureConfig::Parallel(_) => {
ProcedureConfigVariant::Parallel
}
ProcedureConfig::Sequence(_) => {
ProcedureConfigVariant::Sequence
}
}
}
}

View File

@@ -14,7 +14,8 @@ use crate::entities::{
use super::{
alerter::Alerter, build::Build, builder::Builder,
deployment::Deployment, repo::Repo, server::Server, Version,
deployment::Deployment, procedure::Procedure, repo::Repo,
server::Server, Version,
};
#[typeshare]
@@ -162,6 +163,7 @@ pub enum ResourceTarget {
Server(String),
Repo(String),
Alerter(String),
Procedure(String),
}
impl Default for ResourceTarget {
@@ -206,6 +208,12 @@ impl From<&Alerter> for ResourceTarget {
}
}
impl From<&Procedure> for ResourceTarget {
fn from(procedure: &Procedure) -> Self {
Self::Procedure(procedure.id.clone())
}
}
#[typeshare]
#[derive(
Serialize,

View File

@@ -9,6 +9,7 @@ use monitor_client::entities::{
builder::Builder,
config::MongoConfig,
deployment::Deployment,
procedure::Procedure,
repo::Repo,
server::{stats::SystemStatsRecord, Server},
tag::CustomTag,
@@ -22,16 +23,19 @@ use mungos::{
pub struct DbClient {
pub users: Collection<User>,
pub servers: Collection<Server>,
pub tags: Collection<CustomTag>,
pub updates: Collection<Update>,
pub alerts: Collection<Alert>,
pub stats: Collection<SystemStatsRecord>,
// RESOURCES
pub servers: Collection<Server>,
pub deployments: Collection<Deployment>,
pub builds: Collection<Build>,
pub builders: Collection<Builder>,
pub repos: Collection<Repo>,
pub tags: Collection<CustomTag>,
pub procedures: Collection<Procedure>,
pub alerters: Collection<Alerter>,
pub updates: Collection<Update>,
pub alerts: Collection<Alert>,
//
pub db: Database,
}
@@ -82,6 +86,7 @@ impl DbClient {
builders: resource_collection(&db, "Builder").await?,
repos: resource_collection(&db, "Repo").await?,
alerters: resource_collection(&db, "Alerter").await?,
procedures: resource_collection(&db, "Procedure").await?,
db,
};
Ok(client)