move system info to server cache

This commit is contained in:
mbecker20
2025-09-26 01:26:43 -07:00
parent 2749d49435
commit a032f0f4ff
8 changed files with 153 additions and 163 deletions

View File

@@ -43,14 +43,13 @@ use periphery_client::api::{
network::InspectNetwork,
volume::InspectVolume,
};
use reqwest::StatusCode;
use resolver_api::Resolve;
use serror::AddStatusCode;
use tokio::sync::Mutex;
use crate::{
helpers::{
periphery_client,
query::{get_all_tags, get_system_info},
},
helpers::{periphery_client, query::get_all_tags},
permission::get_check_permissions,
resource,
stack::compose_container_match_regex,
@@ -235,8 +234,16 @@ impl Resolve<ReadArgs> for GetSystemInformation {
user,
PermissionLevel::Read.into(),
)
.await?;
get_system_info(&server).await.map_err(Into::into)
.await
.status_code(StatusCode::BAD_REQUEST)?;
let info = server_status_cache()
.get(&server.id)
.await
.context("No status found for server")?
.info
.clone()
.context("No info found for server")?;
Ok(info)
}
}
@@ -289,7 +296,8 @@ impl Resolve<ReadArgs> for ListSystemProcesses {
cached.0.clone()
}
_ => {
let stats = periphery_client(&server).await?
let stats = periphery_client(&server)
.await?
.request(periphery::stats::GetSystemProcesses {})
.await?;
lock.insert(
@@ -478,7 +486,8 @@ impl Resolve<ReadArgs> for InspectDockerContainer {
.into(),
);
}
let res = periphery_client(&server).await?
let res = periphery_client(&server)
.await?
.request(InspectContainer {
name: self.container,
})
@@ -506,7 +515,8 @@ impl Resolve<ReadArgs> for GetContainerLog {
PermissionLevel::Read.logs(),
)
.await?;
let res = periphery_client(&server).await?
let res = periphery_client(&server)
.await?
.request(periphery::container::GetContainerLog {
name: container,
tail: cmp::min(tail, MAX_LOG_LENGTH),
@@ -537,7 +547,8 @@ impl Resolve<ReadArgs> for SearchContainerLog {
PermissionLevel::Read.logs(),
)
.await?;
let res = periphery_client(&server).await?
let res = periphery_client(&server)
.await?
.request(periphery::container::GetContainerLogSearch {
name: container,
terms,
@@ -657,7 +668,8 @@ impl Resolve<ReadArgs> for InspectDockerNetwork {
.into(),
);
}
let res = periphery_client(&server).await?
let res = periphery_client(&server)
.await?
.request(InspectNetwork { name: self.network })
.await?;
Ok(res)
@@ -706,7 +718,8 @@ impl Resolve<ReadArgs> for InspectDockerImage {
.into(),
);
}
let res = periphery_client(&server).await?
let res = periphery_client(&server)
.await?
.request(InspectImage { name: self.image })
.await?;
Ok(res)
@@ -736,7 +749,8 @@ impl Resolve<ReadArgs> for ListDockerImageHistory {
.into(),
);
}
let res = periphery_client(&server).await?
let res = periphery_client(&server)
.await?
.request(ImageHistory { name: self.image })
.await?;
Ok(res)
@@ -785,7 +799,8 @@ impl Resolve<ReadArgs> for InspectDockerVolume {
.into(),
);
}
let res = periphery_client(&server).await?
let res = periphery_client(&server)
.await?
.request(InspectVolume { name: self.volume })
.await?;
Ok(res)
@@ -865,7 +880,8 @@ impl Resolve<ReadArgs> for ListTerminals {
let cache = terminals_cache().get_or_insert(server.id.clone());
let mut cache = cache.lock().await;
if self.fresh || komodo_timestamp() > cache.ttl {
cache.list = periphery_client(&server).await?
cache.list = periphery_client(&server)
.await?
.request(periphery_client::api::terminal::ListTerminals {})
.await
.context("Failed to get fresh terminal list")?;

View File

@@ -1,11 +1,6 @@
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, OnceLock},
};
use std::{collections::HashMap, str::FromStr};
use anyhow::{Context, anyhow};
use async_timing_util::{ONE_MIN_MS, unix_timestamp_ms};
use database::mungos::{
find::find_collect,
mongodb::{
@@ -30,7 +25,6 @@ use komodo_client::{
repo::Repo,
server::{Server, ServerState},
stack::{Stack, StackServiceNames, StackState},
stats::SystemInformation,
sync::ResourceSync,
tag::Tag,
update::Update,
@@ -39,8 +33,6 @@ use komodo_client::{
variable::Variable,
},
};
use periphery_client::api::stats;
use tokio::sync::Mutex;
use crate::{
config::core_config,
@@ -54,8 +46,6 @@ use crate::{
},
};
use super::periphery_client;
// user: Id or username
#[instrument(level = "debug")]
pub async fn get_user(user: &str) -> anyhow::Result<User> {
@@ -85,7 +75,8 @@ pub async fn get_server_state(server: &Server) -> ServerState {
return ServerState::Disabled;
}
// Unwrap ok: Server disabled check above
match super::periphery_client(server).await
match super::periphery_client(server)
.await
.unwrap()
.request(periphery_client::api::GetHealth {})
.await
@@ -413,39 +404,6 @@ pub async fn get_variables_and_secrets()
Ok(VariablesAndSecrets { variables, secrets })
}
// This protects the peripheries from spam requests
const SYSTEM_INFO_EXPIRY: u128 = ONE_MIN_MS;
type SystemInfoCache =
Mutex<HashMap<String, Arc<(SystemInformation, u128)>>>;
fn system_info_cache() -> &'static SystemInfoCache {
static SYSTEM_INFO_CACHE: OnceLock<SystemInfoCache> =
OnceLock::new();
SYSTEM_INFO_CACHE.get_or_init(Default::default)
}
pub async fn get_system_info(
server: &Server,
) -> anyhow::Result<SystemInformation> {
let mut lock = system_info_cache().lock().await;
let res = match lock.get(&server.id) {
Some(cached) if cached.1 > unix_timestamp_ms() => {
cached.0.clone()
}
_ => {
let stats = periphery_client(server).await?
.request(stats::GetSystemInformation {})
.await?;
lock.insert(
server.id.clone(),
(stats.clone(), unix_timestamp_ms() + SYSTEM_INFO_EXPIRY)
.into(),
);
stats
}
};
Ok(res)
}
/// Get last time procedure / action was run using Update query.
/// Ignored whether run was successful.
pub async fn get_last_run_at<R: KomodoResource>(

View File

@@ -11,7 +11,7 @@ use komodo_client::entities::{
ServerState,
},
stack::{ComposeProject, Stack, StackState},
stats::{SingleDiskUsage, SystemStats},
stats::{SingleDiskUsage, SystemInformation, SystemStats},
};
use serror::Serror;
@@ -104,6 +104,7 @@ pub async fn insert_server_status(
server: &Server,
state: ServerState,
version: String,
info: Option<SystemInformation>,
stats: Option<SystemStats>,
(containers, networks, images, volumes, projects): DockerLists,
err: impl Into<Option<Serror>>,
@@ -116,6 +117,7 @@ pub async fn insert_server_status(
id: server.id.clone(),
state,
version,
info,
stats,
health,
containers,

View File

@@ -14,7 +14,7 @@ use komodo_client::entities::{
komodo_timestamp, optional_string,
server::{Server, ServerHealth, ServerState},
stack::{ComposeProject, StackService, StackState},
stats::SystemStats,
stats::{SystemInformation, SystemStats},
};
use periphery_client::api::{self, git::GetLatestCommit};
use serror::Serror;
@@ -49,8 +49,9 @@ pub struct CachedServerStatus {
pub id: String,
pub state: ServerState,
pub version: String,
pub stats: Option<SystemStats>,
pub health: Option<ServerHealth>,
pub info: Option<SystemInformation>,
pub stats: Option<SystemStats>,
pub containers: Option<Vec<ContainerListItem>>,
pub networks: Option<Vec<NetworkListItem>>,
pub images: Option<Vec<ImageListItem>>,
@@ -187,7 +188,8 @@ pub async fn update_cache_for_server(server: &Server, force: bool) {
insert_server_status(
server,
ServerState::Disabled,
String::from("unknown"),
String::from("Unknown"),
None,
None,
(None, None, None, None, None),
None,
@@ -200,8 +202,11 @@ pub async fn update_cache_for_server(server: &Server, force: bool) {
return;
};
let version = match periphery.request(api::GetVersion {}).await {
Ok(version) => version.version,
let info = match periphery
.request(api::stats::GetSystemInformation {})
.await
{
Ok(info) => info,
Err(e) => {
insert_deployments_status_unknown(deployments).await;
insert_stacks_status_unknown(stacks).await;
@@ -211,6 +216,7 @@ pub async fn update_cache_for_server(server: &Server, force: bool) {
ServerState::NotOk,
String::from("Unknown"),
None,
None,
(None, None, None, None, None),
Serror::from(&e),
)
@@ -229,7 +235,8 @@ pub async fn update_cache_for_server(server: &Server, force: bool) {
insert_server_status(
server,
ServerState::NotOk,
String::from("unknown"),
info.version.clone(),
Some(info),
None,
(None, None, None, None, None),
Serror::from(&e),
@@ -265,7 +272,8 @@ pub async fn update_cache_for_server(server: &Server, force: bool) {
insert_server_status(
server,
ServerState::Ok,
version,
info.version.clone(),
Some(info),
stats,
(
Some(containers.clone()),
@@ -284,7 +292,8 @@ pub async fn update_cache_for_server(server: &Server, force: bool) {
insert_server_status(
server,
ServerState::Ok,
version,
info.version.clone(),
Some(info),
stats,
(None, None, None, None, None),
Some(e.into()),

View File

@@ -16,7 +16,6 @@ use komodo_client::entities::{
use crate::{
config::core_config,
connection::PeripheryConnectionArgs,
helpers::query::get_system_info,
monitor::update_cache_for_server,
periphery::PeripheryClient,
state::{
@@ -62,11 +61,14 @@ impl super::KomodoResource for Server {
server: Resource<Self::Config, Self::Info>,
) -> Self::ListItem {
let status = server_status_cache().get(&server.id).await;
let (terminals_disabled, container_exec_disabled) =
get_system_info(&server)
.await
.map(|i| (i.terminals_disabled, i.container_exec_disabled))
.unwrap_or((true, true));
let (terminals_disabled, container_exec_disabled) = status
.as_ref()
.and_then(|s| {
s.info
.as_ref()
.map(|i| (i.terminals_disabled, i.container_exec_disabled))
})
.unwrap_or((true, true));
ServerListItem {
name: server.name,
id: server.id,

View File

@@ -62,6 +62,94 @@ pub fn jwt_client() -> &'static JwtClient {
})
}
/// server id => connection
pub fn periphery_connections() -> &'static PeripheryConnections {
static CONNECTIONS: OnceLock<PeripheryConnections> =
OnceLock::new();
CONNECTIONS.get_or_init(Default::default)
}
pub fn action_states() -> &'static ActionStates {
static ACTION_STATES: OnceLock<ActionStates> = OnceLock::new();
ACTION_STATES.get_or_init(ActionStates::default)
}
pub type ServerStatusCache =
CloneCache<String, Arc<CachedServerStatus>>;
pub fn server_status_cache() -> &'static ServerStatusCache {
static SERVER_STATUS_CACHE: OnceLock<ServerStatusCache> =
OnceLock::new();
SERVER_STATUS_CACHE.get_or_init(Default::default)
}
pub type StackStatusCache =
CloneCache<String, Arc<History<CachedStackStatus, StackState>>>;
pub fn stack_status_cache() -> &'static StackStatusCache {
static STACK_STATUS_CACHE: OnceLock<StackStatusCache> =
OnceLock::new();
STACK_STATUS_CACHE.get_or_init(Default::default)
}
/// Cache of ids to status
pub type DeploymentStatusCache = CloneCache<
String,
Arc<History<CachedDeploymentStatus, DeploymentState>>,
>;
/// Cache of ids to status
pub fn deployment_status_cache() -> &'static DeploymentStatusCache {
static DEPLOYMENT_STATUS_CACHE: OnceLock<DeploymentStatusCache> =
OnceLock::new();
DEPLOYMENT_STATUS_CACHE.get_or_init(Default::default)
}
pub type BuildStateCache = CloneCache<String, BuildState>;
pub fn build_state_cache() -> &'static BuildStateCache {
static BUILD_STATE_CACHE: OnceLock<BuildStateCache> =
OnceLock::new();
BUILD_STATE_CACHE.get_or_init(Default::default)
}
pub type RepoStatusCache = CloneCache<String, Arc<CachedRepoStatus>>;
pub fn repo_status_cache() -> &'static RepoStatusCache {
static REPO_STATUS_CACHE: OnceLock<RepoStatusCache> =
OnceLock::new();
REPO_STATUS_CACHE.get_or_init(Default::default)
}
pub type RepoStateCache = CloneCache<String, RepoState>;
pub fn repo_state_cache() -> &'static RepoStateCache {
static REPO_STATE_CACHE: OnceLock<RepoStateCache> = OnceLock::new();
REPO_STATE_CACHE.get_or_init(Default::default)
}
pub type ProcedureStateCache = CloneCache<String, ProcedureState>;
pub fn procedure_state_cache() -> &'static ProcedureStateCache {
static PROCEDURE_STATE_CACHE: OnceLock<ProcedureStateCache> =
OnceLock::new();
PROCEDURE_STATE_CACHE.get_or_init(Default::default)
}
pub type ActionStateCache = CloneCache<String, ActionState>;
pub fn action_state_cache() -> &'static ActionStateCache {
static ACTION_STATE_CACHE: OnceLock<ActionStateCache> =
OnceLock::new();
ACTION_STATE_CACHE.get_or_init(Default::default)
}
pub fn all_resources_cache() -> &'static ArcSwap<AllResourcesById> {
static ALL_RESOURCES: OnceLock<ArcSwap<AllResourcesById>> =
OnceLock::new();
ALL_RESOURCES.get_or_init(Default::default)
}
pub fn github_client()
-> Option<&'static HashMap<String, octorust::Client>> {
static GITHUB_CLIENT: OnceLock<
@@ -131,91 +219,3 @@ pub fn github_client()
})
.as_ref()
}
pub fn action_states() -> &'static ActionStates {
static ACTION_STATES: OnceLock<ActionStates> = OnceLock::new();
ACTION_STATES.get_or_init(ActionStates::default)
}
/// Cache of ids to status
pub type DeploymentStatusCache = CloneCache<
String,
Arc<History<CachedDeploymentStatus, DeploymentState>>,
>;
/// Cache of ids to status
pub fn deployment_status_cache() -> &'static DeploymentStatusCache {
static DEPLOYMENT_STATUS_CACHE: OnceLock<DeploymentStatusCache> =
OnceLock::new();
DEPLOYMENT_STATUS_CACHE.get_or_init(Default::default)
}
pub type StackStatusCache =
CloneCache<String, Arc<History<CachedStackStatus, StackState>>>;
pub fn stack_status_cache() -> &'static StackStatusCache {
static STACK_STATUS_CACHE: OnceLock<StackStatusCache> =
OnceLock::new();
STACK_STATUS_CACHE.get_or_init(Default::default)
}
pub type ServerStatusCache =
CloneCache<String, Arc<CachedServerStatus>>;
pub fn server_status_cache() -> &'static ServerStatusCache {
static SERVER_STATUS_CACHE: OnceLock<ServerStatusCache> =
OnceLock::new();
SERVER_STATUS_CACHE.get_or_init(Default::default)
}
pub type RepoStatusCache = CloneCache<String, Arc<CachedRepoStatus>>;
pub fn repo_status_cache() -> &'static RepoStatusCache {
static REPO_STATUS_CACHE: OnceLock<RepoStatusCache> =
OnceLock::new();
REPO_STATUS_CACHE.get_or_init(Default::default)
}
pub type BuildStateCache = CloneCache<String, BuildState>;
pub fn build_state_cache() -> &'static BuildStateCache {
static BUILD_STATE_CACHE: OnceLock<BuildStateCache> =
OnceLock::new();
BUILD_STATE_CACHE.get_or_init(Default::default)
}
pub type RepoStateCache = CloneCache<String, RepoState>;
pub fn repo_state_cache() -> &'static RepoStateCache {
static REPO_STATE_CACHE: OnceLock<RepoStateCache> = OnceLock::new();
REPO_STATE_CACHE.get_or_init(Default::default)
}
pub type ProcedureStateCache = CloneCache<String, ProcedureState>;
pub fn procedure_state_cache() -> &'static ProcedureStateCache {
static PROCEDURE_STATE_CACHE: OnceLock<ProcedureStateCache> =
OnceLock::new();
PROCEDURE_STATE_CACHE.get_or_init(Default::default)
}
pub type ActionStateCache = CloneCache<String, ActionState>;
pub fn action_state_cache() -> &'static ActionStateCache {
static ACTION_STATE_CACHE: OnceLock<ActionStateCache> =
OnceLock::new();
ACTION_STATE_CACHE.get_or_init(Default::default)
}
pub fn all_resources_cache() -> &'static ArcSwap<AllResourcesById> {
static ALL_RESOURCES: OnceLock<ArcSwap<AllResourcesById>> =
OnceLock::new();
ALL_RESOURCES.get_or_init(Default::default)
}
/// server id => connection
pub fn periphery_connections() -> &'static PeripheryConnections {
static CONNECTIONS: OnceLock<PeripheryConnections> =
OnceLock::new();
CONNECTIONS.get_or_init(Default::default)
}

View File

@@ -217,5 +217,6 @@ fn get_system_information(
terminals_disabled: config.disable_terminals,
container_exec_disabled: config.disable_container_exec,
public_key: periphery_public_key().clone(),
version: env!("CARGO_PKG_VERSION").to_string(),
}
}

View File

@@ -27,6 +27,8 @@ pub struct SystemInformation {
pub container_exec_disabled: bool,
/// The public key of the Periphery agent
pub public_key: String,
/// The version of the Periphery agent
pub version: String,
}
/// System stats stored on the database.