improve the resource busy locks

This commit is contained in:
mbecker20
2024-04-27 15:09:30 -07:00
parent 0fc6e89ffe
commit fab4e8e534
65 changed files with 1179 additions and 1349 deletions

View File

@@ -20,13 +20,12 @@ use resolver_api::Resolve;
use serror::serialize_error_pretty;
use crate::{
db::db_client,
helpers::{
periphery_client,
resource::StateResource,
update::{add_update, update_update},
},
state::{action_states, State},
state::{action_states, db_client, State},
};
#[async_trait]
@@ -44,6 +43,15 @@ impl Resolve<CloneRepo, User> for State {
)
.await?;
// get the action state for the repo (or insert default).
let action_state =
action_states().repo.get_or_insert_default(&repo.id).await;
// This will set action state back to default when dropped.
// Will also check to ensure repo not already busy before updating.
let _action_guard =
action_state.update(|state| state.cloning = true).await?;
if repo.config.server_id.is_empty() {
return Err(anyhow!("repo has no server attached"));
}
@@ -52,40 +60,37 @@ impl Resolve<CloneRepo, User> for State {
let periphery = periphery_client(&server)?;
let repo_id = repo.id.clone();
let start_ts = monitor_timestamp();
let inner = || async move {
let start_ts = monitor_timestamp();
let mut update = Update {
operation: Operation::CloneRepo,
target: ResourceTarget::Repo(repo.id.clone()),
start_ts,
status: UpdateStatus::InProgress,
operator: user.id.clone(),
success: true,
..Default::default()
};
let mut update = Update {
operation: Operation::CloneRepo,
target: ResourceTarget::Repo(repo.id.clone()),
start_ts,
status: UpdateStatus::InProgress,
operator: user.id.clone(),
success: true,
..Default::default()
};
update.id = add_update(update.clone()).await?;
update.id = add_update(update.clone()).await?;
let logs = match periphery
.request(api::git::CloneRepo {
args: (&repo).into(),
})
.await
{
Ok(logs) => logs,
Err(e) => {
vec![Log::error("clone repo", serialize_error_pretty(&e))]
}
};
let logs = match periphery
.request(api::git::CloneRepo {
args: (&repo).into(),
})
.await
{
Ok(logs) => logs,
Err(e) => {
vec![Log::error("clone repo", serialize_error_pretty(&e))]
}
};
update.logs.extend(logs);
update.finalize();
update.logs.extend(logs);
update.finalize();
if update.success {
let res = db_client().await
if update.success {
let res = db_client().await
.repos
.update_one(
doc! { "_id": ObjectId::from_str(&repo.id)? },
@@ -93,39 +98,16 @@ impl Resolve<CloneRepo, User> for State {
None,
)
.await;
if let Err(e) = res {
warn!(
if let Err(e) = res {
warn!(
"failed to update repo last_pulled_at | repo id: {} | {e:#}",
repo.id
);
}
}
update_update(update.clone()).await?;
Ok(update)
};
if action_states().repo.busy(&repo_id).await {
return Err(anyhow!("repo busy"));
}
action_states()
.repo
.update_entry(&repo_id, |entry| {
entry.cloning = true;
})
.await;
let res = inner().await;
action_states()
.repo
.update_entry(repo_id, |entry| {
entry.cloning = false;
})
.await;
res
update_update(update.clone()).await?;
Ok(update)
}
}
@@ -144,6 +126,15 @@ impl Resolve<PullRepo, User> for State {
)
.await?;
// get the action state for the repo (or insert default).
let action_state =
action_states().repo.get_or_insert_default(&repo.id).await;
// This will set action state back to default when dropped.
// Will also check to ensure repo not already busy before updating.
let _action_guard =
action_state.update(|state| state.pulling = true).await?;
if repo.config.server_id.is_empty() {
return Err(anyhow!("repo has no server attached"));
}
@@ -152,43 +143,40 @@ impl Resolve<PullRepo, User> for State {
let periphery = periphery_client(&server)?;
let repo_id = repo.id.clone();
let start_ts = monitor_timestamp();
let inner = || async move {
let start_ts = monitor_timestamp();
let mut update = Update {
operation: Operation::PullRepo,
target: ResourceTarget::Repo(repo.id.clone()),
start_ts,
status: UpdateStatus::InProgress,
operator: user.id.clone(),
success: true,
..Default::default()
};
let mut update = Update {
operation: Operation::PullRepo,
target: ResourceTarget::Repo(repo.id.clone()),
start_ts,
status: UpdateStatus::InProgress,
operator: user.id.clone(),
success: true,
..Default::default()
};
update.id = add_update(update.clone()).await?;
update.id = add_update(update.clone()).await?;
let logs = match periphery
.request(api::git::PullRepo {
name: repo.name,
branch: optional_string(&repo.config.branch),
on_pull: repo.config.on_pull.into_option(),
})
.await
{
Ok(logs) => logs,
Err(e) => {
vec![Log::error("pull repo", serialize_error_pretty(&e))]
}
};
let logs = match periphery
.request(api::git::PullRepo {
name: repo.name,
branch: optional_string(&repo.config.branch),
on_pull: repo.config.on_pull.into_option(),
})
.await
{
Ok(logs) => logs,
Err(e) => {
vec![Log::error("pull repo", serialize_error_pretty(&e))]
}
};
update.logs.extend(logs);
update.logs.extend(logs);
update.finalize();
update.finalize();
if update.success {
let res = db_client().await
if update.success {
let res = db_client().await
.repos
.update_one(
doc! { "_id": ObjectId::from_str(&repo.id)? },
@@ -196,38 +184,15 @@ impl Resolve<PullRepo, User> for State {
None,
)
.await;
if let Err(e) = res {
warn!(
if let Err(e) = res {
warn!(
"failed to update repo last_pulled_at | repo id: {} | {e:#}",
repo.id
);
}
}
update_update(update.clone()).await?;
Ok(update)
};
if action_states().repo.busy(&repo_id).await {
return Err(anyhow!("repo busy"));
}
action_states()
.repo
.update_entry(&repo_id, |entry| {
entry.pulling = true;
})
.await;
let res = inner().await;
action_states()
.repo
.update_entry(repo_id, |entry| {
entry.pulling = false;
})
.await;
res
update_update(update.clone()).await?;
Ok(update)
}
}