add queue to listeners using locks

This commit is contained in:
mbecker20
2024-05-06 00:33:19 -07:00
parent 7634b9fa1d
commit f0e1f253f4

View File

@@ -1,3 +1,5 @@
use std::sync::{Arc, OnceLock};
use anyhow::{anyhow, Context};
use axum::{extract::Path, http::HeaderMap, routing::post, Router};
use hex::ToHex;
@@ -9,11 +11,12 @@ use monitor_client::{
use resolver_api::Resolve;
use serde::Deserialize;
use sha2::Sha256;
use tokio::sync::Mutex;
use tracing::Instrument;
use crate::{
config::core_config,
helpers::{random_duration, resource::StateResource},
helpers::{cache::Cache, random_duration, resource::StateResource},
state::State,
};
@@ -116,6 +119,12 @@ async fn handle_build_webhook(
headers: HeaderMap,
body: String,
) -> anyhow::Result<()> {
// Acquire and hold lock to make a task queue for
// subsequent listener calls on same resource.
// It would fail if we let it go through from action state busy.
let lock = build_locks().get_or_insert_default(&build_id).await;
let _lock = lock.lock().await;
verify_gh_signature(headers, &body).await?;
let request_branch = extract_branch(&body)?;
let build = Build::get_resource(&build_id).await?;
@@ -136,6 +145,12 @@ async fn handle_repo_clone_webhook(
headers: HeaderMap,
body: String,
) -> anyhow::Result<()> {
// Acquire and hold lock to make a task queue for
// subsequent listener calls on same resource.
// It would fail if we let it go through from action state busy.
let lock = repo_locks().get_or_insert_default(&repo_id).await;
let _lock = lock.lock().await;
verify_gh_signature(headers, &body).await?;
let request_branch = extract_branch(&body)?;
let repo = Repo::get_resource(&repo_id).await?;
@@ -156,6 +171,12 @@ async fn handle_repo_pull_webhook(
headers: HeaderMap,
body: String,
) -> anyhow::Result<()> {
// Acquire and hold lock to make a task queue for
// subsequent listener calls on same resource.
// It would fail if we let it go through from action state busy.
let lock = repo_locks().get_or_insert_default(&repo_id).await;
let _lock = lock.lock().await;
verify_gh_signature(headers, &body).await?;
let request_branch = extract_branch(&body)?;
let repo = Repo::get_resource(&repo_id).await?;
@@ -177,6 +198,12 @@ async fn handle_procedure_webhook(
headers: HeaderMap,
body: String,
) -> anyhow::Result<()> {
// Acquire and hold lock to make a task queue for
// subsequent listener calls on same resource.
// It would fail if we let it go through from action state busy.
let lock = procedure_locks().get_or_insert_default(&procedure_id).await;
let _lock = lock.lock().await;
verify_gh_signature(headers, &body).await?;
let request_branch = extract_branch(&body)?;
if request_branch != target_branch {
@@ -236,3 +263,20 @@ fn extract_branch(body: &str) -> anyhow::Result<String> {
.replace("refs/heads/", "");
Ok(branch)
}
type ListenerLockCache = Cache<String, Arc<Mutex<()>>>;
fn build_locks() -> &'static ListenerLockCache {
static BUILD_LOCKS: OnceLock<ListenerLockCache> = OnceLock::new();
BUILD_LOCKS.get_or_init(Default::default)
}
fn repo_locks() -> &'static ListenerLockCache {
static REPO_LOCKS: OnceLock<ListenerLockCache> = OnceLock::new();
REPO_LOCKS.get_or_init(Default::default)
}
fn procedure_locks() -> &'static ListenerLockCache {
static BUILD_LOCKS: OnceLock<ListenerLockCache> = OnceLock::new();
BUILD_LOCKS.get_or_init(Default::default)
}