move to sync mutex for action state

This commit is contained in:
mbecker20
2024-04-28 14:44:48 -07:00
parent d46ff30540
commit 677e1a3830
18 changed files with 115 additions and 84 deletions

View File

@@ -74,7 +74,7 @@ impl Resolve<RunBuild, User> for State {
// This will set action state back to default when dropped.
// Will also check to ensure build not already busy before updating.
let _action_guard =
action_state.update(|state| state.building = true).await?;
action_state.update(|state| state.building = true)?;
build.config.version.increment();

View File

@@ -58,7 +58,7 @@ impl Resolve<Deploy, User> for State {
// Will check to ensure deployment not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard =
action_state.update(|state| state.deploying = true).await?;
action_state.update(|state| state.deploying = true)?;
if deployment.config.server_id.is_empty() {
return Err(anyhow!("deployment has no server configured"));
@@ -152,7 +152,7 @@ impl Resolve<StartContainer, User> for State {
// Will check to ensure deployment not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard =
action_state.update(|state| state.starting = true).await?;
action_state.update(|state| state.starting = true)?;
if deployment.config.server_id.is_empty() {
return Err(anyhow!("deployment has no server configured"));
@@ -231,7 +231,7 @@ impl Resolve<StopContainer, User> for State {
// Will check to ensure deployment not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard =
action_state.update(|state| state.stopping = true).await?;
action_state.update(|state| state.stopping = true)?;
if deployment.config.server_id.is_empty() {
return Err(anyhow!("deployment has no server configured"));
@@ -303,8 +303,7 @@ impl Resolve<StopAllContainers, User> for State {
// Will check to ensure server not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard = action_state
.update(|state| state.stopping_containers = true)
.await?;
.update(|state| state.stopping_containers = true)?;
let deployments = find_collect(
&db_client().await.deployments,
@@ -391,7 +390,7 @@ impl Resolve<RemoveContainer, User> for State {
// Will check to ensure deployment not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard =
action_state.update(|state| state.removing = true).await?;
action_state.update(|state| state.removing = true)?;
if deployment.config.server_id.is_empty() {
return Err(anyhow!("deployment has no server configured"));

View File

@@ -43,7 +43,7 @@ impl Resolve<RunProcedure, User> for State {
// This will set action state back to default when dropped.
// Will also check to ensure procedure not already busy before updating.
let _action_guard =
action_state.update(|state| state.running = true).await?;
action_state.update(|state| state.running = true)?;
let mut update =
make_update(&procedure, Operation::RunProcedure, &user);

View File

@@ -50,7 +50,7 @@ impl Resolve<CloneRepo, User> for State {
// This will set action state back to default when dropped.
// Will also check to ensure repo not already busy before updating.
let _action_guard =
action_state.update(|state| state.cloning = true).await?;
action_state.update(|state| state.cloning = true)?;
if repo.config.server_id.is_empty() {
return Err(anyhow!("repo has no server attached"));
@@ -133,7 +133,7 @@ impl Resolve<PullRepo, User> for State {
// This will set action state back to default when dropped.
// Will also check to ensure repo not already busy before updating.
let _action_guard =
action_state.update(|state| state.pulling = true).await?;
action_state.update(|state| state.pulling = true)?;
if repo.config.server_id.is_empty() {
return Err(anyhow!("repo has no server attached"));

View File

@@ -47,9 +47,8 @@ impl Resolve<PruneDockerContainers, User> for State {
// Will check to ensure server not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard = action_state
.update(|state| state.stopping_containers = true)
.await?;
let _action_guard =
action_state.update(|state| state.pruning_containers = true)?;
let periphery = periphery_client(&server)?;
@@ -105,9 +104,8 @@ impl Resolve<PruneDockerNetworks, User> for State {
// Will check to ensure server not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard = action_state
.update(|state| state.stopping_containers = true)
.await?;
let _action_guard =
action_state.update(|state| state.pruning_networks = true)?;
let periphery = periphery_client(&server)?;
@@ -163,9 +161,8 @@ impl Resolve<PruneDockerImages, User> for State {
// Will check to ensure server not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard = action_state
.update(|state| state.stopping_containers = true)
.await?;
let _action_guard =
action_state.update(|state| state.pruning_images = true)?;
let periphery = periphery_client(&server)?;

View File

@@ -76,8 +76,7 @@ impl Resolve<GetBuildActionState, User> for State {
.get(&build.id)
.await
.unwrap_or_default()
.get()
.await;
.get()?;
Ok(action_state)
}
}

View File

@@ -197,8 +197,7 @@ impl Resolve<GetDeploymentActionState, User> for State {
.get(&deployment.id)
.await
.unwrap_or_default()
.get()
.await;
.get()?;
Ok(action_state)
}
}

View File

@@ -105,8 +105,7 @@ impl Resolve<GetProcedureActionState, User> for State {
.get(&procedure.id)
.await
.unwrap_or_default()
.get()
.await;
.get()?;
Ok(action_state)
}
}

View File

@@ -66,8 +66,7 @@ impl Resolve<GetRepoActionState, User> for State {
.get(&repo.id)
.await
.unwrap_or_default()
.get()
.await;
.get()?;
Ok(action_state)
}
}

View File

@@ -153,8 +153,7 @@ impl Resolve<GetServerActionState, User> for State {
.get(&server.id)
.await
.unwrap_or_default()
.get()
.await;
.get()?;
Ok(action_state)
}
}

View File

@@ -187,8 +187,7 @@ impl Resolve<DeleteBuild, User> for State {
.get(&id)
.await
.unwrap_or_default()
.busy()
.await
.busy()?
{
return Err(anyhow!("build busy"));
}
@@ -248,8 +247,7 @@ impl Resolve<UpdateBuild, User> for State {
.get(&id)
.await
.unwrap_or_default()
.busy()
.await
.busy()?
{
return Err(anyhow!("build busy"));
}

View File

@@ -244,7 +244,7 @@ impl Resolve<DeleteDeployment, User> for State {
// Will check to ensure deployment not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard =
action_state.update(|state| state.deleting = true).await?;
action_state.update(|state| state.deleting = true)?;
let state = get_deployment_state(&deployment)
.await
@@ -347,8 +347,7 @@ impl Resolve<UpdateDeployment, User> for State {
.get(&id)
.await
.unwrap_or_default()
.busy()
.await
.busy()?
{
return Err(anyhow!("deployment busy"));
}
@@ -418,7 +417,7 @@ impl Resolve<RenameDeployment, User> for State {
// Will check to ensure deployment not already busy before updating, and return Err if so.
// The returned guard will set the action state back to default when dropped.
let _action_guard =
action_state.update(|state| state.renaming = true).await?;
action_state.update(|state| state.renaming = true)?;
let name = to_monitor_name(&name);

View File

@@ -359,8 +359,7 @@ impl Resolve<DeleteProcedure, User> for State {
.get(&id)
.await
.unwrap_or_default()
.busy()
.await
.busy()?
{
return Err(anyhow!("procedure busy"));
}

View File

@@ -216,7 +216,7 @@ impl Resolve<DeleteRepo, User> for State {
// This will set action state back to default when dropped.
// Will also check to ensure repo not already busy before updating.
let _action_guard =
action_state.update(|state| state.deleting = true).await?;
action_state.update(|state| state.deleting = true)?;
let periphery = if repo.config.server_id.is_empty() {
None
@@ -299,7 +299,7 @@ impl Resolve<UpdateRepo, User> for State {
// This will set action state back to default when dropped.
// Will also check to ensure repo not already busy before updating.
let _action_guard =
action_state.update(|state| state.updating = true).await?;
action_state.update(|state| state.updating = true)?;
update_one_by_id(
&db_client().await.repos,

View File

@@ -111,8 +111,7 @@ impl Resolve<DeleteServer, User> for State {
.get(&id)
.await
.unwrap_or_default()
.busy()
.await
.busy()?
{
return Err(anyhow!("server busy"));
}
@@ -208,8 +207,7 @@ impl Resolve<UpdateServer, User> for State {
.get(&id)
.await
.unwrap_or_default()
.busy()
.await
.busy()?
{
return Err(anyhow!("server busy"));
}

View File

@@ -1,4 +1,4 @@
use std::sync::Arc;
use std::sync::{Arc, Mutex};
use anyhow::anyhow;
use monitor_client::{
@@ -9,7 +9,6 @@ use monitor_client::{
server::ServerActionState,
},
};
use tokio::sync::Mutex;
use super::cache::Cache;
@@ -26,25 +25,43 @@ pub struct ActionStates {
/// Need to be able to check "busy" with write lock acquired.
#[derive(Default)]
pub struct ActionState<States: Default>(Mutex<States>);
pub struct ActionState<States: Default + Send + 'static>(
Mutex<States>,
);
impl<States: Default + Busy + Copy> ActionState<States> {
pub async fn get(&self) -> States {
*self.0.lock().await
impl<States: Default + Busy + Copy + Send + 'static>
ActionState<States>
{
pub fn get(&self) -> anyhow::Result<States> {
Ok(
*self
.0
.lock()
.map_err(|e| anyhow!("action state lock poisoned | {e:?}"))?,
)
}
pub async fn busy(&self) -> bool {
self.0.lock().await.busy()
pub fn busy(&self) -> anyhow::Result<bool> {
Ok(
self
.0
.lock()
.map_err(|e| anyhow!("action state lock poisoned | {e:?}"))?
.busy(),
)
}
/// Will acquire lock, check busy, and if not will
/// run the provided update function on the states.
/// Returns a guard that returns the states to default (not busy) when dropped.
pub async fn update(
pub fn update(
&self,
handler: impl Fn(&mut States),
) -> anyhow::Result<UpdateGuard<States>> {
let mut lock = self.0.lock().await;
let mut lock = self
.0
.lock()
.map_err(|e| anyhow!("action state lock poisoned | {e:?}"))?;
if lock.busy() {
return Err(anyhow!("resource is busy"));
}
@@ -57,11 +74,21 @@ impl<States: Default + Busy + Copy> ActionState<States> {
/// The inner mutex guard must already be dropped BEFORE this is dropped,
/// which is guaranteed as the inner guard is dropped by all public methods before
/// user could drop UpdateGuard.
pub struct UpdateGuard<'a, States: Default>(&'a Mutex<States>);
pub struct UpdateGuard<'a, States: Default + Send + 'static>(
&'a Mutex<States>,
);
impl<'a, States: Default> Drop for UpdateGuard<'a, States> {
impl<'a, States: Default + Send + 'static> Drop
for UpdateGuard<'a, States>
{
fn drop(&mut self) {
let mut lock = self.0.blocking_lock();
*lock = Default::default();
let mut lock = match self.0.lock() {
Ok(lock) => lock,
Err(e) => {
error!("CRITICAL: an action state lock is poisoned | {e:?}");
return;
}
};
*lock = States::default();
}
}

View File

@@ -12,6 +12,11 @@ mod execution;
mod maps;
mod sync;
fn cli_args() -> &'static CliArgs {
static CLI_ARGS: OnceLock<CliArgs> = OnceLock::new();
CLI_ARGS.get_or_init(CliArgs::parse)
}
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
struct CliArgs {
@@ -19,11 +24,8 @@ struct CliArgs {
command: Command,
#[arg(long, default_value_t = String::from("./creds.toml"))]
creds: String,
}
fn cli_args() -> &'static CliArgs {
static CLI_ARGS: OnceLock<CliArgs> = OnceLock::new();
CLI_ARGS.get_or_init(CliArgs::parse)
#[arg(long, default_value_t = false)]
verbose: bool,
}
#[derive(Debug, Clone, Subcommand)]

View File

@@ -10,7 +10,7 @@ use monitor_client::{
};
use partial_derive2::MaybeNone;
use crate::monitor_client;
use crate::{cli_args, monitor_client};
pub mod alerter;
pub mod build;
@@ -25,7 +25,11 @@ type ToCreate<T> = Vec<ResourceToml<T>>;
type UpdatesResult<T> = (ToCreate<T>, ToUpdate<T>);
pub trait ResourceSync {
type PartialConfig: Clone + Send + MaybeNone + 'static;
type PartialConfig: std::fmt::Debug
+ Clone
+ Send
+ MaybeNone
+ 'static;
type FullConfig: Clone + Send + 'static;
type FullInfo: Default;
type ListItemInfo: 'static;
@@ -88,28 +92,41 @@ pub trait ResourceSync {
}
}
let verbose = cli_args().verbose;
if !to_create.is_empty() {
println!(
"\n{} TO CREATE: {}",
Self::display(),
to_create
.iter()
.map(|item| item.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
if verbose {
println!("\n{} TO CREATE:\n{to_create:#?}", Self::display(),);
} else {
println!(
"\n{} TO CREATE: {:#?}",
Self::display(),
to_create
.iter()
.map(|item| item.name.as_str())
.collect::<Vec<_>>()
);
}
}
if !to_update.is_empty() {
println!(
"\n{} TO UPDATE: {}",
Self::display(),
to_update
.iter()
.map(|(_, item)| item.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
if verbose {
println!(
"\n{} TO UPDATE:\n{:#?}",
Self::display(),
to_update.iter().map(|(_, r)| r).collect::<Vec<_>>()
);
} else {
println!(
"\n{} TO UPDATE: {}",
Self::display(),
to_update
.iter()
.map(|(_, item)| item.name.as_str())
.collect::<Vec<_>>()
.join(", ")
);
}
}
Ok((to_create, to_update))