diff --git a/bin/core/src/listener/github.rs b/bin/core/src/listener/github.rs index f2f48b92c..6788b76a0 100644 --- a/bin/core/src/listener/github.rs +++ b/bin/core/src/listener/github.rs @@ -1,3 +1,5 @@ +use std::sync::{Arc, OnceLock}; + use anyhow::{anyhow, Context}; use axum::{extract::Path, http::HeaderMap, routing::post, Router}; use hex::ToHex; @@ -9,11 +11,12 @@ use monitor_client::{ use resolver_api::Resolve; use serde::Deserialize; use sha2::Sha256; +use tokio::sync::Mutex; use tracing::Instrument; use crate::{ config::core_config, - helpers::{random_duration, resource::StateResource}, + helpers::{cache::Cache, random_duration, resource::StateResource}, state::State, }; @@ -116,6 +119,12 @@ async fn handle_build_webhook( headers: HeaderMap, body: String, ) -> anyhow::Result<()> { + // Acquire and hold lock to make a task queue for + // subsequent listener calls on same resource. + // It would fail if we let it go through from action state busy. + let lock = build_locks().get_or_insert_default(&build_id).await; + let _lock = lock.lock().await; + verify_gh_signature(headers, &body).await?; let request_branch = extract_branch(&body)?; let build = Build::get_resource(&build_id).await?; @@ -136,6 +145,12 @@ async fn handle_repo_clone_webhook( headers: HeaderMap, body: String, ) -> anyhow::Result<()> { + // Acquire and hold lock to make a task queue for + // subsequent listener calls on same resource. + // It would fail if we let it go through from action state busy. + let lock = repo_locks().get_or_insert_default(&repo_id).await; + let _lock = lock.lock().await; + verify_gh_signature(headers, &body).await?; let request_branch = extract_branch(&body)?; let repo = Repo::get_resource(&repo_id).await?; @@ -156,6 +171,12 @@ async fn handle_repo_pull_webhook( headers: HeaderMap, body: String, ) -> anyhow::Result<()> { + // Acquire and hold lock to make a task queue for + // subsequent listener calls on same resource. + // It would fail if we let it go through from action state busy. + let lock = repo_locks().get_or_insert_default(&repo_id).await; + let _lock = lock.lock().await; + verify_gh_signature(headers, &body).await?; let request_branch = extract_branch(&body)?; let repo = Repo::get_resource(&repo_id).await?; @@ -177,6 +198,12 @@ async fn handle_procedure_webhook( headers: HeaderMap, body: String, ) -> anyhow::Result<()> { + // Acquire and hold lock to make a task queue for + // subsequent listener calls on same resource. + // It would fail if we let it go through from action state busy. + let lock = procedure_locks().get_or_insert_default(&procedure_id).await; + let _lock = lock.lock().await; + verify_gh_signature(headers, &body).await?; let request_branch = extract_branch(&body)?; if request_branch != target_branch { @@ -236,3 +263,20 @@ fn extract_branch(body: &str) -> anyhow::Result { .replace("refs/heads/", ""); Ok(branch) } + +type ListenerLockCache = Cache>>; + +fn build_locks() -> &'static ListenerLockCache { + static BUILD_LOCKS: OnceLock = OnceLock::new(); + BUILD_LOCKS.get_or_init(Default::default) +} + +fn repo_locks() -> &'static ListenerLockCache { + static REPO_LOCKS: OnceLock = OnceLock::new(); + REPO_LOCKS.get_or_init(Default::default) +} + +fn procedure_locks() -> &'static ListenerLockCache { + static BUILD_LOCKS: OnceLock = OnceLock::new(); + BUILD_LOCKS.get_or_init(Default::default) +}