prune state and alerts based on config

This commit is contained in:
mbecker20
2023-08-28 02:33:27 -04:00
parent d2a706fd9d
commit 3da88135c2
3 changed files with 74 additions and 10 deletions

View File

@@ -55,7 +55,11 @@ pub struct CoreConfig {
// number of days to keep stats, or 0 to disable pruning. stats older than this number of days are deleted on a daily cycle
#[serde(default)]
pub keep_stats_for_days: u64,
pub keep_stats_for_days: u128,
// number of days to keep alerts, or 0 to disable pruning. alerts older than this number of days are deleted on a daily cycle
#[serde(default)]
pub keep_alerts_for_days: u128,
// used to verify validity from github webhooks
#[serde(default)]

View File

@@ -1,9 +1,8 @@
#[macro_use]
extern crate log;
use axum::{Extension, Router, http::StatusCode, TypedHeader, headers::ContentType};
use axum::{headers::ContentType, http::StatusCode, Extension, Router, TypedHeader};
use termination_signal::tokio::immediate_term_handle;
use tower_http::cors::{Any, CorsLayer};
mod auth;
mod cloud;
@@ -31,13 +30,8 @@ async fn app() -> anyhow::Result<()> {
.nest("/execute", requests::execute::router())
.nest("/listener", listener::router())
.nest("/ws", ws::router())
.layer(Extension(state))
.layer(
CorsLayer::new()
.allow_origin(Any)
.allow_methods(Any)
.allow_headers(Any),
);
.layer(state.cors()?)
.layer(Extension(state));
info!("starting monitor core on {socket_addr}");

View File

@@ -1,6 +1,7 @@
use std::{net::SocketAddr, str::FromStr, sync::Arc};
use anyhow::Context;
use async_timing_util::{unix_timestamp_ms, wait_until_timelength, Timelength, ONE_DAY_MS};
use axum::Extension;
use monitor_types::entities::{
build::BuildActionState,
@@ -9,6 +10,8 @@ use monitor_types::entities::{
server::ServerActionState,
update::UpdateListItem,
};
use mungos::mongodb::bson::doc;
use tower_http::cors::{Any, CorsLayer};
use crate::{
auth::{GithubOauthClient, GoogleOauthClient, JwtClient},
@@ -70,14 +73,77 @@ impl State {
let state_clone = state.clone();
tokio::spawn(async move { state_clone.monitor().await });
let state_clone = state.clone();
tokio::spawn(async move { state_clone.prune().await });
Ok(state)
}
async fn prune(&self) {
loop {
wait_until_timelength(Timelength::OneDay, 5000).await;
let (stats_res, alerts_res) = tokio::join!(self.prune_stats(), self.prune_alerts());
if let Err(e) = stats_res {
error!("error in pruning stats | {e:#?}");
}
if let Err(e) = alerts_res {
error!("error in pruning alerts | {e:#?}");
}
}
}
async fn prune_stats(&self) -> anyhow::Result<()> {
if self.config.keep_stats_for_days == 0 {
return Ok(());
}
let delete_before_ts =
(unix_timestamp_ms() - self.config.keep_stats_for_days * ONE_DAY_MS) as i64;
let res = self
.db
.stats
.delete_many(doc! {
"ts": { "$lt": delete_before_ts }
})
.await?;
info!("deleted {} stats from db", res.deleted_count);
Ok(())
}
async fn prune_alerts(&self) -> anyhow::Result<()> {
if self.config.keep_alerts_for_days == 0 {
return Ok(());
}
let delete_before_ts =
(unix_timestamp_ms() - self.config.keep_alerts_for_days * ONE_DAY_MS) as i64;
let res = self
.db
.alerts
.delete_many(doc! {
"ts": { "$lt": delete_before_ts }
})
.await?;
info!("deleted {} alerts from db", res.deleted_count);
Ok(())
}
pub fn socket_addr(&self) -> anyhow::Result<SocketAddr> {
SocketAddr::from_str(&format!("0.0.0.0:{}", self.config.port))
.context("failed to parse socket addr")
}
pub fn cors(&self) -> anyhow::Result<CorsLayer> {
let cors = CorsLayer::new()
.allow_origin(
// self.config
// .host
// .parse::<HeaderValue>()
// .context("failed to parse host into origin")?,
Any,
)
.allow_methods(Any)
.allow_headers(Any);
Ok(cors)
}
}
#[derive(Default)]