refactor caching to use custom Cache struct

This commit is contained in:
mbecker20
2023-05-28 08:31:18 +00:00
parent ab945aadde
commit 9c0be07ae1
11 changed files with 293 additions and 274 deletions

View File

@@ -7,10 +7,8 @@ use futures_util::future::join_all;
use helpers::{all_logs_success, to_monitor_name};
use mungos::mongodb::bson::{doc, to_bson};
use types::{
monitor_timestamp,
traits::{Busy, Permissioned},
AwsBuilderBuildConfig, Build, DockerContainerState, Log, Operation, PermissionLevel, Update,
UpdateStatus, UpdateTarget, Version,
monitor_timestamp, traits::Permissioned, AwsBuilderBuildConfig, Build, DockerContainerState,
Log, Operation, PermissionLevel, Update, UpdateStatus, UpdateTarget, Version,
};
use crate::{
@@ -43,13 +41,6 @@ impl State {
}
}
pub async fn build_busy(&self, id: &str) -> bool {
match self.build_action_states.lock().await.get(id) {
Some(a) => a.busy(),
None => false,
}
}
pub async fn create_build(&self, name: &str, user: &RequestUser) -> anyhow::Result<Build> {
if !user.is_admin && !user.create_build_permissions {
return Err(anyhow!("user does not have permission to create builds"));
@@ -117,7 +108,7 @@ impl State {
}
pub async fn delete_build(&self, build_id: &str, user: &RequestUser) -> anyhow::Result<Build> {
if self.build_busy(build_id).await {
if self.build_action_states.busy(build_id).await {
return Err(anyhow!("build busy"));
}
let build = self
@@ -147,21 +138,23 @@ impl State {
new_build: Build,
user: &RequestUser,
) -> anyhow::Result<Build> {
if self.build_busy(&new_build.id).await {
if self.build_action_states.busy(&new_build.id).await {
return Err(anyhow!("build busy"));
}
let id = new_build.id.clone();
{
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(id.clone()).or_default();
entry.updating = true;
}
self.build_action_states
.update_entry(id.clone(), |entry| {
entry.updating = true;
})
.await;
let res = self.update_build_inner(new_build, user).await;
{
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(id).or_default();
entry.updating = false;
}
self.build_action_states
.update_entry(id.clone(), |entry| {
entry.updating = false;
})
.await;
res
}
@@ -252,20 +245,20 @@ impl State {
}
pub async fn build(&self, build_id: &str, user: &RequestUser) -> anyhow::Result<Update> {
if self.build_busy(build_id).await {
if self.build_action_states.busy(build_id).await {
return Err(anyhow!("build busy"));
}
{
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(build_id.to_string()).or_default();
entry.building = true;
}
self.build_action_states
.update_entry(build_id.to_string(), |entry| {
entry.building = true;
})
.await;
let res = self.build_inner(build_id, user).await;
{
let mut lock = self.build_action_states.lock().await;
let entry = lock.entry(build_id.to_string()).or_default();
entry.building = false;
}
self.build_action_states
.update_entry(build_id.to_string(), |entry| {
entry.building = false;
})
.await;
res
}

View File

@@ -2,10 +2,8 @@ use anyhow::{anyhow, Context};
use diff::Diff;
use helpers::all_logs_success;
use types::{
monitor_timestamp,
traits::{Busy, Permissioned},
Log, Operation, PeripheryCommand, PeripheryCommandBuilder, PermissionLevel, Update,
UpdateStatus, UpdateTarget,
monitor_timestamp, traits::Permissioned, Log, Operation, PeripheryCommand,
PeripheryCommandBuilder, PermissionLevel, Update, UpdateStatus, UpdateTarget,
};
use crate::{auth::RequestUser, state::State};
@@ -28,13 +26,6 @@ impl State {
}
}
pub async fn command_busy(&self, id: &str) -> bool {
match self.command_action_states.lock().await.get(id) {
Some(a) => a.busy(),
None => false,
}
}
pub async fn create_command(
&self,
name: &str,
@@ -103,7 +94,7 @@ impl State {
command_id: &str,
user: &RequestUser,
) -> anyhow::Result<PeripheryCommand> {
if self.command_busy(command_id).await {
if self.command_action_states.busy(command_id).await {
return Err(anyhow!("command busy"));
}
let command = self
@@ -181,20 +172,20 @@ impl State {
command_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.command_busy(command_id).await {
if self.command_action_states.busy(command_id).await {
return Err(anyhow!("command busy"));
}
{
let mut lock = self.command_action_states.lock().await;
let entry = lock.entry(command_id.to_string()).or_default();
entry.running = true;
}
self.command_action_states
.update_entry(command_id.to_string(), |entry| {
entry.running = true;
})
.await;
let res = self.run_command_inner(command_id, user).await;
{
let mut lock = self.command_action_states.lock().await;
let entry = lock.entry(command_id.to_string()).or_default();
entry.running = false;
}
self.command_action_states
.update_entry(command_id.to_string(), |entry| {
entry.running = false;
})
.await;
res
}

View File

@@ -3,11 +3,9 @@ use diff::Diff;
use helpers::{all_logs_success, to_monitor_name};
use mungos::mongodb::bson::doc;
use types::{
monitor_timestamp,
traits::{Busy, Permissioned},
Deployment, DeploymentBuilder, DeploymentWithContainerState, DockerContainerState, Log,
Operation, PermissionLevel, ServerStatus, ServerWithStatus, TerminationSignal, Update,
UpdateStatus, UpdateTarget,
monitor_timestamp, traits::Permissioned, Deployment, DeploymentBuilder,
DeploymentWithContainerState, DockerContainerState, Log, Operation, PermissionLevel,
ServerStatus, ServerWithStatus, TerminationSignal, Update, UpdateStatus, UpdateTarget,
};
use crate::{
@@ -34,13 +32,6 @@ impl State {
}
}
pub async fn deployment_busy(&self, id: &str) -> bool {
match self.deployment_action_states.lock().await.get(id) {
Some(a) => a.busy(),
None => false,
}
}
pub async fn create_deployment(
&self,
name: &str,
@@ -118,7 +109,7 @@ impl State {
stop_signal: Option<TerminationSignal>,
stop_time: Option<i32>,
) -> anyhow::Result<Deployment> {
if self.deployment_busy(deployment_id).await {
if self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
let deployment = self
@@ -169,21 +160,25 @@ impl State {
new_deployment: Deployment,
user: &RequestUser,
) -> anyhow::Result<Deployment> {
if self.deployment_busy(&new_deployment.id).await {
if self.deployment_action_states.busy(&new_deployment.id).await {
return Err(anyhow!("deployment busy"));
}
let id = new_deployment.id.clone();
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(id.clone()).or_default();
entry.updating = true;
}
self.deployment_action_states
.update_entry(id.clone(), |entry| {
entry.updating = true;
})
.await;
let res = self.update_deployment_inner(new_deployment, user).await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(id).or_default();
entry.updating = false;
}
self.deployment_action_states
.update_entry(id.clone(), |entry| {
entry.updating = false;
})
.await;
res
}
@@ -287,22 +282,25 @@ impl State {
new_name: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(&deployment_id).await {
if self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.renaming = true;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.renaming = true;
})
.await;
let res = self
.rename_deployment_inner(deployment_id, new_name, user)
.await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.renaming = false;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.renaming = false;
})
.await;
res
}
@@ -453,20 +451,23 @@ impl State {
user: &RequestUser,
check_deployment_busy: bool,
) -> anyhow::Result<Update> {
if check_deployment_busy && self.deployment_busy(deployment_id).await {
if check_deployment_busy && self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.recloning = true;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.recloning = true;
})
.await;
let res = self.reclone_deployment_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.recloning = false;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.recloning = false;
})
.await;
res
}
@@ -517,22 +518,24 @@ impl State {
stop_signal: Option<TerminationSignal>,
stop_time: Option<i32>,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id).await {
if self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.deploying = true;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.deploying = true;
})
.await;
let res = self
.deploy_container_inner(deployment_id, user, stop_signal, stop_time)
.await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.deploying = false;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.deploying = false;
})
.await;
res
}
@@ -606,20 +609,22 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id).await {
if self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.starting = true;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.starting = true;
})
.await;
let res = self.start_container_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.starting = false;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.starting = false;
})
.await;
res
}
@@ -678,22 +683,24 @@ impl State {
stop_signal: Option<TerminationSignal>,
stop_time: Option<i32>,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id).await {
if self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.stopping = true;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.stopping = true;
})
.await;
let res = self
.stop_container_inner(deployment_id, user, stop_signal, stop_time)
.await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.stopping = false;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.stopping = false;
})
.await;
res
}
@@ -757,22 +764,24 @@ impl State {
stop_signal: Option<TerminationSignal>,
stop_time: Option<i32>,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id).await {
if self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.removing = true;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.removing = true;
})
.await;
let res = self
.remove_container_inner(deployment_id, user, stop_signal, stop_time)
.await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.removing = false;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.removing = false;
})
.await;
res
}
@@ -834,20 +843,22 @@ impl State {
deployment_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.deployment_busy(deployment_id).await {
if self.deployment_action_states.busy(deployment_id).await {
return Err(anyhow!("deployment busy"));
}
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.pulling = true;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.pulling = true;
})
.await;
let res = self.pull_deployment_repo_inner(deployment_id, user).await;
{
let mut lock = self.deployment_action_states.lock().await;
let entry = lock.entry(deployment_id.to_string()).or_default();
entry.pulling = false;
}
self.deployment_action_states
.update_entry(deployment_id.to_string(), |entry| {
entry.pulling = false;
})
.await;
res
}

View File

@@ -3,9 +3,8 @@ use diff::Diff;
use futures_util::future::join_all;
use mungos::mongodb::bson::doc;
use types::{
monitor_timestamp,
traits::{Busy, Permissioned},
Log, Operation, PermissionLevel, Server, Update, UpdateStatus, UpdateTarget,
monitor_timestamp, traits::Permissioned, Log, Operation, PermissionLevel, Server, Update,
UpdateStatus, UpdateTarget,
};
use crate::{auth::RequestUser, state::State};
@@ -28,13 +27,6 @@ impl State {
}
}
pub async fn server_busy(&self, id: &str) -> bool {
match self.server_action_states.lock().await.get(id) {
Some(a) => a.busy(),
None => false,
}
}
pub async fn create_server(
&self,
name: &str,
@@ -96,7 +88,7 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Server> {
if self.server_busy(server_id).await {
if self.server_action_states.busy(server_id).await {
return Err(anyhow!("server busy"));
}
let server = self
@@ -164,7 +156,7 @@ impl State {
mut new_server: Server,
user: &RequestUser,
) -> anyhow::Result<Server> {
if self.server_busy(&new_server.id).await {
if self.server_action_states.busy(&new_server.id).await {
return Err(anyhow!("server busy"));
}
let current_server = self
@@ -208,20 +200,22 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.server_busy(server_id).await {
if self.server_action_states.busy(server_id).await {
return Err(anyhow!("server busy"));
}
{
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_networks = true;
}
self.server_action_states
.update_entry(server_id.to_string(), |entry| {
entry.pruning_networks = true;
})
.await;
let res = self.prune_networks_inner(server_id, user).await;
{
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_networks = false;
}
self.server_action_states
.update_entry(server_id.to_string(), |entry| {
entry.pruning_networks = false;
})
.await;
res
}
@@ -269,20 +263,22 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.server_busy(server_id).await {
if self.server_action_states.busy(server_id).await {
return Err(anyhow!("server busy"));
}
{
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_images = true;
}
self.server_action_states
.update_entry(server_id.to_string(), |entry| {
entry.pruning_images = true;
})
.await;
let res = self.prune_images_inner(server_id, user).await;
{
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_images = false;
}
self.server_action_states
.update_entry(server_id.to_string(), |entry| {
entry.pruning_images = false;
})
.await;
res
}
@@ -331,20 +327,22 @@ impl State {
server_id: &str,
user: &RequestUser,
) -> anyhow::Result<Update> {
if self.server_busy(server_id).await {
if self.server_action_states.busy(server_id).await {
return Err(anyhow!("server busy"));
}
{
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_containers = true;
}
self.server_action_states
.update_entry(server_id.to_string(), |entry| {
entry.pruning_containers = true;
})
.await;
let res = self.prune_containers_inner(server_id, user).await;
{
let mut lock = self.server_action_states.lock().await;
let entry = lock.entry(server_id.to_string()).or_default();
entry.pruning_containers = false;
}
self.server_action_states
.update_entry(server_id.to_string(), |entry| {
entry.pruning_containers = false;
})
.await;
res
}

View File

@@ -297,13 +297,7 @@ impl State {
) -> anyhow::Result<BuildActionState> {
self.get_build_check_permissions(&id, &user, PermissionLevel::Read)
.await?;
let action_state = self
.build_action_states
.lock()
.await
.entry(id)
.or_default()
.clone();
let action_state = self.build_action_states.get_or_default(id).await;
Ok(action_state)
}

View File

@@ -1,12 +1,21 @@
use anyhow::Context;
use axum::{Router, routing::{get, post, delete, patch}, Json, extract::{Path, Query}};
use axum::{
extract::{Path, Query},
routing::{delete, get, patch, post},
Json, Router,
};
use helpers::handle_anyhow_error;
use mungos::mongodb::bson::Document;
use serde::{Serialize, Deserialize};
use types::{PeripheryCommand, PermissionLevel, traits::Permissioned, CommandActionState};
use serde::{Deserialize, Serialize};
use types::{traits::Permissioned, CommandActionState, PeripheryCommand, PermissionLevel};
use typeshare::typeshare;
use crate::{state::{State, StateExtension}, auth::{RequestUser, RequestUserExtension}, response, api::spawn_request_action};
use crate::{
api::spawn_request_action,
auth::{RequestUser, RequestUserExtension},
response,
state::{State, StateExtension},
};
#[derive(Serialize, Deserialize)]
pub struct CommandId {
@@ -205,13 +214,7 @@ impl State {
) -> anyhow::Result<CommandActionState> {
self.get_command_check_permissions(&id, &user, PermissionLevel::Read)
.await?;
let action_state = self
.command_action_states
.lock()
.await
.entry(id)
.or_default()
.clone();
let action_state = self.command_action_states.get_or_default(id).await;
Ok(action_state)
}
}
}

View File

@@ -455,13 +455,7 @@ impl State {
) -> anyhow::Result<DeploymentActionState> {
self.get_deployment_check_permissions(&id, &user, PermissionLevel::Read)
.await?;
let action_state = self
.deployment_action_states
.lock()
.await
.entry(id)
.or_default()
.clone();
let action_state = self.deployment_action_states.get_or_default(id).await;
Ok(action_state)
}

View File

@@ -680,13 +680,7 @@ impl State {
) -> anyhow::Result<ServerActionState> {
self.get_server_check_permissions(&id, &user, PermissionLevel::Read)
.await?;
let action_state = self
.server_action_states
.lock()
.await
.entry(id)
.or_default()
.clone();
let action_state = self.server_action_states.get_or_default(id).await;
Ok(action_state)
}
}

View File

@@ -1,9 +1,10 @@
use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};
use anyhow::anyhow;
use diff::{Diff, OptionDiff};
use helpers::to_monitor_name;
use types::Build;
use tokio::sync::RwLock;
use types::{traits::Busy, Build};
#[macro_export]
macro_rules! response {
@@ -66,3 +67,37 @@ pub fn empty_or_only_spaces(word: &str) -> bool {
}
return true;
}
#[derive(Default)]
pub struct Cache<T: Clone + Default> {
cache: RwLock<HashMap<String, T>>,
}
impl<T: Clone + Default> Cache<T> {
pub async fn get(&self, key: &str) -> Option<T> {
self.cache.read().await.get(key).map(|e| e.clone())
}
pub async fn get_or_default(&self, key: String) -> T {
let mut cache = self.cache.write().await;
cache.entry(key).or_default().clone()
}
pub async fn update_entry(&self, key: String, handler: impl Fn(&mut T) -> ()) {
let mut cache = self.cache.write().await;
handler(cache.entry(key).or_default());
}
pub async fn clear(&self) {
self.cache.write().await.clear();
}
}
impl<T: Clone + Default + Busy> Cache<T> {
pub async fn busy(&self, id: &str) -> bool {
match self.get(id).await {
Some(state) => state.busy(),
None => false,
}
}
}

View File

@@ -10,7 +10,7 @@ use types::{Server, SystemStats, SystemStatsQuery, SystemStatsRecord};
use crate::state::State;
#[derive(Default)]
#[derive(Default, Clone)]
pub struct AlertStatus {
cpu_alert: bool,
mem_alert: bool,
@@ -100,16 +100,16 @@ impl State {
}
async fn check_cpu(&self, server: &Server, stats: &SystemStats) {
let server_alert_status = self.server_alert_status.lock().await;
if self.slack.is_none()
|| server_alert_status
|| self
.server_alert_status
.get(&server.id)
.await
.map(|s| s.cpu_alert)
.unwrap_or(false)
{
return;
}
drop(server_alert_status);
if stats.cpu_perc > server.cpu_alert {
let region = if let Some(region) = &server.region {
format!(" ({region})")
@@ -171,24 +171,26 @@ impl State {
server.name, stats.cpu_perc
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.cpu_alert = true;
self.server_alert_status
.update_entry(server.id.clone(), |entry| {
entry.cpu_alert = true;
})
.await;
}
}
}
async fn check_mem(&self, server: &Server, stats: &SystemStats) {
let server_alert_status = self.server_alert_status.lock().await;
if self.slack.is_none()
|| server_alert_status
|| self
.server_alert_status
.get(&server.id)
.await
.map(|s| s.mem_alert)
.unwrap_or(false)
{
return;
}
drop(server_alert_status);
let usage_perc = (stats.mem_used_gb / stats.mem_total_gb) * 100.0;
if usage_perc > server.mem_alert {
let region = if let Some(region) = &server.region {
@@ -254,25 +256,27 @@ impl State {
server.name, stats.mem_used_gb, stats.mem_total_gb,
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.mem_alert = true;
self.server_alert_status
.update_entry(server.id.clone(), |entry| {
entry.mem_alert = true;
})
.await;
}
}
}
async fn check_disk(&self, server: &Server, stats: &SystemStats) {
for disk in &stats.disk.disks {
let server_alert_status = self.server_alert_status.lock().await;
if self.slack.is_none()
|| server_alert_status
|| self
.server_alert_status
.get(&server.id)
.await
.map(|s| *s.disk_alert.get(&disk.mount).unwrap_or(&false))
.unwrap_or(false)
{
return;
}
drop(server_alert_status);
let usage_perc = (disk.used_gb / disk.total_gb) * 100.0;
if usage_perc > server.disk_alert {
let region = if let Some(region) = &server.region {
@@ -315,25 +319,27 @@ impl State {
server.name, stats.disk.used_gb, stats.disk.total_gb,
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.disk_alert.insert(disk.mount.clone(), true);
self.server_alert_status
.update_entry(server.id.clone(), |entry| {
entry.disk_alert.insert(disk.mount.clone(), true);
})
.await;
}
}
}
}
async fn check_components(&self, server: &Server, stats: &SystemStats) {
let lock = self.server_alert_status.lock().await;
if self.slack.is_none()
|| lock
|| self
.server_alert_status
.get(&server.id)
.await
.map(|s| s.component_alert)
.unwrap_or(false)
{
return;
}
drop(lock);
let info = stats
.components
.iter()
@@ -393,9 +399,11 @@ impl State {
info.join(" | "),
)
} else {
let mut lock = self.server_alert_status.lock().await;
let entry = lock.entry(server.id.clone()).or_default();
entry.component_alert = true;
self.server_alert_status
.update_entry(server.id.clone(), |entry| {
entry.component_alert = true;
})
.await;
}
}
}
@@ -487,7 +495,7 @@ impl State {
);
}
{
self.server_alert_status.lock().await.clear();
self.server_alert_status.clear().await;
}
}
}

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::sync::Arc;
use async_timing_util::{unix_timestamp_ms, wait_until_timelength, Timelength, ONE_HOUR_MS};
use axum::Extension;
@@ -6,28 +6,26 @@ use db::DbClient;
use futures_util::future::join_all;
use mungos::mongodb::bson::doc;
use periphery::PeripheryClient;
use tokio::sync::Mutex;
use types::{
BuildActionState, CommandActionState, CoreConfig, DeploymentActionState, ServerActionState,
};
use crate::{monitoring::AlertStatus, ws::update::UpdateWsChannel};
use crate::{helpers::Cache, monitoring::AlertStatus, ws::update::UpdateWsChannel};
pub type StateExtension = Extension<Arc<State>>;
pub type ActionStateMap<T> = Mutex<HashMap<String, T>>;
// pub type Cache<T> = RwLock<HashMap<String, T>>;
pub struct State {
pub config: CoreConfig,
pub db: DbClient,
pub update: UpdateWsChannel,
pub periphery: PeripheryClient,
pub slack: Option<slack::Client>,
pub build_action_states: ActionStateMap<BuildActionState>,
pub deployment_action_states: ActionStateMap<DeploymentActionState>,
pub server_action_states: ActionStateMap<ServerActionState>,
pub command_action_states: ActionStateMap<CommandActionState>,
pub server_alert_status: Mutex<HashMap<String, AlertStatus>>, // (server_id, AlertStatus)
pub build_action_states: Cache<BuildActionState>,
pub deployment_action_states: Cache<DeploymentActionState>,
pub server_action_states: Cache<ServerActionState>,
pub command_action_states: Cache<CommandActionState>,
pub server_alert_status: Cache<AlertStatus>, // (server_id, AlertStatus)
}
impl State {