should fix procedure

This commit is contained in:
mbecker20
2024-06-05 22:42:04 -07:00
parent ac449e38d5
commit 25fcca7246
2 changed files with 44 additions and 36 deletions

View File

@@ -32,7 +32,7 @@ impl Resolve<RunProcedure, (User, Update)> for State {
fn resolve_inner(
procedure: String,
user: User,
update: Update,
mut update: Update,
) -> Pin<
Box<
dyn std::future::Future<Output = anyhow::Result<Update>> + Send,
@@ -46,6 +46,14 @@ fn resolve_inner(
)
.await?;
// Need to push the initial log, as execute_procedure
// assumes first log is already created
// and will panic otherwise.
update.push_simple_log(
"execute_procedure",
format!("executing procedure {}", procedure.name),
);
// get the action state for the procedure (or insert default).
let action_state = action_states()
.procedure

View File

@@ -11,12 +11,13 @@ use monitor_client::{
};
use resolver_api::Resolve;
use tokio::sync::Mutex;
use tracing::Instrument;
use crate::{api::execute::ExecuteRequest, state::State};
use super::update::{init_execution_update, update_update};
#[instrument]
#[instrument(skip_all)]
pub async fn execute_procedure(
procedure: &Procedure,
update: &Mutex<Update>,
@@ -65,7 +66,39 @@ pub async fn execute_procedure(
Ok(())
}
#[instrument]
#[instrument(skip(update))]
async fn execute_stage(
executions: Vec<Execution>,
parent_id: &str,
parent_name: &str,
update: &Mutex<Update>,
) -> anyhow::Result<()> {
let futures = executions.into_iter().map(|execution| async move {
let now = Instant::now();
add_line_to_update(update, &format!("executing: {execution:?}"))
.await;
let fail_log = format!("failed on {execution:?}");
let res =
execute_execution(execution.clone(), parent_id, parent_name)
.await
.context(fail_log);
add_line_to_update(
update,
&format!(
"finished execution in {:?}: {execution:?}",
now.elapsed()
),
)
.await;
res
});
join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
Ok(())
}
async fn execute_execution(
execution: Execution,
// used to prevent recursive procedure
@@ -224,39 +257,6 @@ async fn execute_execution(
}
}
#[instrument(skip(update))]
async fn execute_stage(
executions: Vec<Execution>,
parent_id: &str,
parent_name: &str,
update: &Mutex<Update>,
) -> anyhow::Result<()> {
let futures = executions.into_iter().map(|execution| async move {
let now = Instant::now();
add_line_to_update(update, &format!("executing: {execution:?}"))
.await;
let fail_log = format!("failed on {execution:?}");
let res =
execute_execution(execution.clone(), parent_id, parent_name)
.await
.context(fail_log);
add_line_to_update(
update,
&format!(
"finished execution in {:?}: {execution:?}",
now.elapsed()
),
)
.await;
res
});
join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
Ok(())
}
/// ASSUMES FIRST LOG IS ALREADY CREATED
#[instrument(level = "debug")]
async fn add_line_to_update(update: &Mutex<Update>, line: &str) {