close failed procedure execution updates

This commit is contained in:
mbecker20
2024-07-19 23:21:21 -07:00
parent 8c6f38cafb
commit c2f9e29605
2 changed files with 155 additions and 61 deletions

View File

@@ -79,9 +79,6 @@ impl Resolve<RunBuild, (User, Update)> for State {
)
.await?;
let (registry_token, aws_ecr) =
validate_account_extract_registry_token_aws_ecr(&build).await?;
// get the action state for the build (or insert default).
let action_state =
action_states().build.get_or_insert_default(&build.id).await;
@@ -91,6 +88,9 @@ impl Resolve<RunBuild, (User, Update)> for State {
let _action_guard =
action_state.update(|state| state.building = true)?;
let (registry_token, aws_ecr) =
validate_account_extract_registry_token_aws_ecr(&build).await?;
build.config.version.increment();
update.version = build.config.version;
update_update(update.clone()).await?;

View File

@@ -1,18 +1,24 @@
use std::time::{Duration, Instant};
use anyhow::{anyhow, Context, Ok};
use formatting::{bold, colored, muted, Color};
use anyhow::{anyhow, Context};
use formatting::{bold, colored, format_serror, muted, Color};
use futures::future::join_all;
use monitor_client::{
api::execute::Execution,
entities::{
procedure::Procedure, update::Update, user::procedure_user,
procedure::Procedure,
update::{Log, Update},
user::procedure_user,
},
};
use mungos::by_id::find_one_by_id;
use resolver_api::Resolve;
use tokio::sync::Mutex;
use crate::{api::execute::ExecuteRequest, state::State};
use crate::{
api::execute::ExecuteRequest,
state::{db_client, State},
};
use super::update::{init_execution_update, update_update};
@@ -49,8 +55,7 @@ pub async fn execute_procedure(
.await
.with_context(|| {
format!(
"{}: failed stage '{}' execution after {:?}",
colored("ERROR", Color::Red),
"failed stage '{}' execution after {:?}",
bold(&stage.name),
timer.elapsed(),
)
@@ -130,10 +135,15 @@ async fn execute_execution(
let ExecuteRequest::RunProcedure(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at RunProcedure")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at RunProcedure"),
&update_id,
)
.await?
}
Execution::RunBuild(req) => {
let req = ExecuteRequest::RunBuild(req);
@@ -141,10 +151,15 @@ async fn execute_execution(
let ExecuteRequest::RunBuild(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at RunBuild")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at RunBuild"),
&update_id,
)
.await?
}
Execution::Deploy(req) => {
let req = ExecuteRequest::Deploy(req);
@@ -152,10 +167,15 @@ async fn execute_execution(
let ExecuteRequest::Deploy(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at Deploy")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at Deploy"),
&update_id,
)
.await?
}
Execution::StartContainer(req) => {
let req = ExecuteRequest::StartContainer(req);
@@ -163,10 +183,15 @@ async fn execute_execution(
let ExecuteRequest::StartContainer(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at StartContainer")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at StartContainer"),
&update_id,
)
.await?
}
Execution::StopContainer(req) => {
let req = ExecuteRequest::StopContainer(req);
@@ -174,10 +199,15 @@ async fn execute_execution(
let ExecuteRequest::StopContainer(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at StopContainer")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at StopContainer"),
&update_id,
)
.await?
}
Execution::StopAllContainers(req) => {
let req = ExecuteRequest::StopAllContainers(req);
@@ -185,10 +215,15 @@ async fn execute_execution(
let ExecuteRequest::StopAllContainers(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at StopAllContainers")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at StopAllContainers"),
&update_id,
)
.await?
}
Execution::RemoveContainer(req) => {
let req = ExecuteRequest::RemoveContainer(req);
@@ -196,10 +231,15 @@ async fn execute_execution(
let ExecuteRequest::RemoveContainer(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at RemoveContainer")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at RemoveContainer"),
&update_id,
)
.await?
}
Execution::CloneRepo(req) => {
let req = ExecuteRequest::CloneRepo(req);
@@ -207,10 +247,15 @@ async fn execute_execution(
let ExecuteRequest::CloneRepo(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at CloneRepo")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at CloneRepo"),
&update_id,
)
.await?
}
Execution::PullRepo(req) => {
let req = ExecuteRequest::PullRepo(req);
@@ -218,10 +263,15 @@ async fn execute_execution(
let ExecuteRequest::PullRepo(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at PullRepo")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at PullRepo"),
&update_id,
)
.await?
}
Execution::PruneNetworks(req) => {
let req = ExecuteRequest::PruneNetworks(req);
@@ -229,10 +279,15 @@ async fn execute_execution(
let ExecuteRequest::PruneNetworks(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at PruneNetworks")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at PruneNetworks"),
&update_id,
)
.await?
}
Execution::PruneImages(req) => {
let req = ExecuteRequest::PruneImages(req);
@@ -240,10 +295,15 @@ async fn execute_execution(
let ExecuteRequest::PruneImages(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at PruneImages")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at PruneImages"),
&update_id,
)
.await?
}
Execution::PruneContainers(req) => {
let req = ExecuteRequest::PruneContainers(req);
@@ -251,10 +311,15 @@ async fn execute_execution(
let ExecuteRequest::PruneContainers(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at PruneContainers")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at PruneContainers"),
&update_id,
)
.await?
}
Execution::RunSync(req) => {
let req = ExecuteRequest::RunSync(req);
@@ -262,10 +327,15 @@ async fn execute_execution(
let ExecuteRequest::RunSync(req) = req else {
unreachable!()
};
State
.resolve(req, (user, update))
.await
.context("failed at RunSync")?
let update_id = update.id.clone();
handle_resolve_result(
State
.resolve(req, (user, update))
.await
.context("failed at RunSync"),
&update_id,
)
.await?
}
Execution::Sleep(req) => {
tokio::time::sleep(Duration::from_millis(
@@ -289,6 +359,30 @@ async fn execute_execution(
}
}
/// If the call to .resolve returns Err, the update may not be closed.
/// This will ensure it is closed with error log attached.
async fn handle_resolve_result(
res: anyhow::Result<Update>,
update_id: &str,
) -> anyhow::Result<Update> {
match res {
Ok(res) => Ok(res),
Err(e) => {
let log =
Log::error("execution error", format_serror(&e.into()));
let mut update =
find_one_by_id(&db_client().await.updates, update_id)
.await
.context("failed to query to db")?
.context("no update exists with given id")?;
update.logs.push(log);
update.finalize();
update_update(update.clone()).await?;
Ok(update)
}
}
}
/// ASSUMES FIRST LOG IS ALREADY CREATED
#[instrument(level = "debug")]
async fn add_line_to_update(update: &Mutex<Update>, line: &str) {