clean up periphery

This commit is contained in:
mbecker20
2024-03-19 02:12:04 -07:00
parent c32905cca4
commit 12315e90de
22 changed files with 390 additions and 356 deletions

2
Cargo.lock generated
View File

@@ -1862,7 +1862,7 @@ checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f"
name = "logger"
version = "1.0.1"
dependencies = [
"anyhow",
"serde",
"tracing",
"tracing-subscriber",
]

View File

@@ -98,11 +98,7 @@ impl State {
)
.await;
self
.execute_sequence(
&filter_list_by_enabled(ids),
map,
update,
)
.execute_sequence(&filter_list_by_enabled(ids), map, update)
.await
.with_context(|| {
let time = Duration::from_millis(
@@ -138,11 +134,7 @@ impl State {
)
.await;
self
.execute_parallel(
&filter_list_by_enabled(ids),
map,
update,
)
.execute_parallel(&filter_list_by_enabled(ids), map, update)
.await
.with_context(|| {
let time = Duration::from_millis(

View File

@@ -1,20 +1,35 @@
use std::{collections::HashMap, net::IpAddr, path::PathBuf};
use std::{
collections::HashMap, net::IpAddr, path::PathBuf, sync::OnceLock,
};
use anyhow::Context;
use clap::Parser;
use merge_config_files::parse_config_paths;
use monitor_client::entities::Timelength;
use parse_csl::parse_comma_seperated;
use serde::{Deserialize, Serialize};
use serde_json::json;
#[derive(Deserialize)]
struct Env {
#[serde(default = "default_config_paths")]
config_paths: Vec<String>,
#[serde(default)]
config_keywords: Vec<String>,
port: Option<u16>,
}
fn default_config_paths() -> Vec<String> {
vec!["~/.config/monitor/periphery.config.toml".to_string()]
}
#[derive(Parser)]
#[command(author, about, version)]
pub struct CliArgs {
struct CliArgs {
/// Sets the path of a config file or directory to use. can use multiple times
#[arg(short, long)]
pub config_path: Option<Vec<String>>,
/// Sets the keywords to match directory periphery config file names on. can use multiple times. default "periphery" and "config"
/// Sets the keywords to match directory periphery config file names on.
/// can use multiple times. default "periphery" and "config"
#[arg(long)]
pub config_keyword: Option<Vec<String>>,
@@ -31,66 +46,16 @@ pub struct CliArgs {
pub log_level: tracing::Level,
}
#[derive(Deserialize)]
pub struct Env {
#[serde(default = "default_config_path")]
config_paths: String,
#[serde(default)]
config_keywords: String,
port: Option<u16>,
}
impl Env {
pub fn load() -> anyhow::Result<Env> {
dotenv::dotenv().ok();
envy::from_env().context("failed to parse environment")
}
}
fn default_config_path() -> String {
"~/.config/monitor.periphery.config.toml".to_string()
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PeripheryConfig {
#[serde(default = "default_periphery_port")]
pub port: u16,
#[serde(default = "default_repo_dir")]
pub repo_dir: PathBuf,
#[serde(default = "default_stats_refresh_interval")]
pub stats_polling_rate: Timelength,
#[serde(default)]
pub allowed_ips: Vec<IpAddr>,
#[serde(default)]
pub passkeys: Vec<String>,
#[serde(default)]
pub secrets: HashMap<String, String>,
#[serde(default)]
pub github_accounts: HashMap<String, String>,
#[serde(default)]
pub docker_accounts: HashMap<String, String>,
}
impl PeripheryConfig {
pub fn load(
env: &Env,
args: &CliArgs,
) -> anyhow::Result<PeripheryConfig> {
let env_config_paths = parse_comma_seperated(&env.config_paths)
.context("failed to parse config paths on environment into comma seperated list")?;
let config_paths = args
.config_path
.as_ref()
.unwrap_or(&env_config_paths)
.to_vec();
let env_match_keywords = parse_comma_seperated::<String>(&env.config_keywords)
.context("failed to parse environemt CONFIG_KEYWORDS into comma seperated list")?;
let match_keywords = args
.config_keyword
.as_ref()
.unwrap_or(&env_match_keywords)
.iter()
.map(|kw| kw.as_str());
pub fn periphery_config() -> &'static PeripheryConfig {
static PERIPHERY_CONFIG: OnceLock<PeripheryConfig> =
OnceLock::new();
PERIPHERY_CONFIG.get_or_init(|| {
let env: Env = envy::from_env()
.expect("failed to parse periphery environment");
let args = CliArgs::parse();
let config_paths = args.config_path.unwrap_or(env.config_paths);
let match_keywords =
args.config_keyword.unwrap_or(env.config_keywords);
let mut config = parse_config_paths::<PeripheryConfig>(
config_paths,
match_keywords,
@@ -101,14 +66,75 @@ impl PeripheryConfig {
if let Some(port) = env.port {
config.port = port;
}
Ok(config)
}
config
})
}
pub fn accounts_response() -> &'static String {
static ACCOUNTS_RESPONSE: OnceLock<String> = OnceLock::new();
ACCOUNTS_RESPONSE.get_or_init(|| json!({
"docker": periphery_config().docker_accounts.keys().collect::<Vec<_>>(),
"github": periphery_config().github_accounts.keys().collect::<Vec<_>>(),
}).to_string())
}
pub fn secrets_response() -> &'static String {
static SECRETS_RESPONSE: OnceLock<String> = OnceLock::new();
SECRETS_RESPONSE.get_or_init(|| {
serde_json::to_string(
&periphery_config().secrets.keys().collect::<Vec<_>>(),
)
.unwrap()
})
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct PeripheryConfig {
/// The port periphery will run on
#[serde(default = "default_periphery_port")]
pub port: u16,
/// Configure the logging level: error, warn, info, debug, trace
#[serde(default = "default_log_level")]
pub log_level: logger::LogLevel,
/// The system directory where monitor managed repos will be cloned
#[serde(default = "default_repo_dir")]
pub repo_dir: PathBuf,
/// The rate at which the system stats will be polled to update the cache
#[serde(default = "default_stats_refresh_interval")]
pub stats_polling_rate: Timelength,
/// Limits which IPv4 addresses are allowed to call the api
#[serde(default)]
pub allowed_ips: Vec<IpAddr>,
/// Limits the accepted passkeys
#[serde(default)]
pub passkeys: Vec<String>,
/// Mapping on local periphery secrets. These can be interpolated into eg. Deployment environment variables.
#[serde(default)]
pub secrets: HashMap<String, String>,
/// Mapping of github usernames to access tokens
#[serde(default)]
pub github_accounts: HashMap<String, String>,
/// Mapping of docker usernames to access tokens
#[serde(default)]
pub docker_accounts: HashMap<String, String>,
}
fn default_periphery_port() -> u16 {
8000
}
fn default_log_level() -> logger::LogLevel {
logger::LogLevel::Info
}
fn default_repo_dir() -> PathBuf {
"/repos".parse().unwrap()
}

View File

@@ -1,4 +1,4 @@
use std::{net::SocketAddr, sync::Arc};
use std::net::SocketAddr;
use axum::{
body::Body,
@@ -10,17 +10,13 @@ use axum::{
};
use serde_json::Value;
use crate::state::State;
use crate::config::periphery_config;
pub async fn guard_request_by_passkey(
req: Request<Body>,
next: Next,
) -> Result<Response, (StatusCode, String)> {
let state = req.extensions().get::<Arc<State>>().ok_or((
StatusCode::INTERNAL_SERVER_ERROR,
"could not get state extension".to_string(),
))?;
if state.config.passkeys.is_empty() {
if periphery_config().passkeys.is_empty() {
return Ok(next.run(req).await);
}
let req_passkey = req.headers().get("authorization");
@@ -31,16 +27,16 @@ pub async fn guard_request_by_passkey(
));
}
let req_passkey = req_passkey
.unwrap()
.to_str()
.map_err(|e| {
(
StatusCode::UNAUTHORIZED,
format!("failed to get passkey from authorization header as str: {e:?}"),
)
})?
.replace("Bearer ", "");
if state.config.passkeys.contains(&req_passkey) {
.unwrap()
.to_str()
.map_err(|e| {
(
StatusCode::UNAUTHORIZED,
format!("failed to get passkey from authorization header as str: {e:?}"),
)
})?
.replace("Bearer ", "");
if periphery_config().passkeys.contains(&req_passkey) {
Ok(next.run(req).await)
} else {
let ConnectInfo(socket_addr) =
@@ -68,11 +64,7 @@ pub async fn guard_request_by_ip(
req: Request<Body>,
next: Next,
) -> Result<Response, (StatusCode, String)> {
let state = req.extensions().get::<Arc<State>>().ok_or((
StatusCode::INTERNAL_SERVER_ERROR,
"could not get state extension".to_string(),
))?;
if state.config.allowed_ips.is_empty() {
if periphery_config().allowed_ips.is_empty() {
return Ok(next.run(req).await);
}
let ConnectInfo(socket_addr) =
@@ -81,7 +73,7 @@ pub async fn guard_request_by_ip(
"could not get socket addr of request".to_string(),
))?;
let ip = socket_addr.ip();
if state.config.allowed_ips.contains(&ip) {
if periphery_config().allowed_ips.contains(&ip) {
Ok(next.run(req).await)
} else {
let body = req
@@ -90,7 +82,7 @@ pub async fn guard_request_by_ip(
.ok()
.map(|Json(body)| body);
warn!(
"unauthorized request from {ip} (bad passkey) | body: {body:?}"
"unauthorized request from {ip} (unknown ip) | body: {body:?}"
);
Err((
StatusCode::UNAUTHORIZED,

View File

@@ -1,5 +1,3 @@
use std::{collections::HashMap, path::PathBuf};
use anyhow::Context;
use monitor_client::entities::{
build::{Build, BuildConfig},
@@ -8,15 +6,10 @@ use monitor_client::entities::{
EnvironmentVar, Version,
};
use crate::helpers::run_monitor_command;
use crate::{config::periphery_config, helpers::run_monitor_command};
use super::{docker_login, parse_extra_args};
pub async fn prune_images() -> Log {
let command = String::from("docker image prune -a -f");
run_monitor_command("prune images", command).await
}
pub async fn build(
Build {
name,
@@ -35,9 +28,7 @@ pub async fn build(
},
..
}: &Build,
mut repo_dir: PathBuf,
docker_token: Option<String>,
secrets: &HashMap<String, String>,
) -> anyhow::Result<Vec<Log>> {
let mut logs = Vec::new();
let name = to_monitor_name(name);
@@ -45,8 +36,8 @@ pub async fn build(
docker_login(&optional_string(docker_account), &docker_token)
.await
.context("failed to login to docker")?;
repo_dir.push(&name);
let build_dir = repo_dir.join(build_path);
let build_dir =
periphery_config().repo_dir.join(&name).join(build_path);
let dockerfile_path = match optional_string(dockerfile_path) {
Some(dockerfile_path) => dockerfile_path.to_owned(),
None => "Dockerfile".to_owned(),
@@ -66,9 +57,9 @@ pub async fn build(
String::new()
};
let command = format!(
"cd {} && docker{buildx} build{build_args}{extra_args}{image_tags} -f {dockerfile_path} .{docker_push}",
build_dir.display()
);
"cd {} && docker{buildx} build{build_args}{extra_args}{image_tags} -f {dockerfile_path} .{docker_push}",
build_dir.display()
);
if *skip_secret_interp {
let build_log =
run_monitor_command("docker build", command).await;
@@ -76,7 +67,7 @@ pub async fn build(
} else {
let (command, replacers) = svi::interpolate_variables(
&command,
secrets,
&periphery_config().secrets,
svi::Interpolator::DoubleBrackets,
)
.context(

View File

@@ -1,4 +1,6 @@
use anyhow::{anyhow, Context};
use std::sync::OnceLock;
use anyhow::Context;
use bollard::{container::ListContainersOptions, Docker};
use monitor_client::entities::{
deployment::ContainerSummary,
@@ -7,6 +9,11 @@ use monitor_client::entities::{
},
};
pub fn docker_client() -> &'static DockerClient {
static DOCKER_CLIENT: OnceLock<DockerClient> = OnceLock::new();
DOCKER_CLIENT.get_or_init(Default::default)
}
pub struct DockerClient {
docker: Docker,
}
@@ -37,9 +44,9 @@ impl DockerClient {
id: s.id.unwrap_or_default(),
name: s
.names
.ok_or(anyhow!("no names on container"))?
.context("no names on container")?
.pop()
.ok_or(anyhow!("no names on container (empty vec)"))?
.context("no names on container (empty vec)")?
.replace('/', ""),
image: s.image.unwrap_or(String::from("unknown")),
state: s

View File

@@ -1,5 +1,3 @@
use std::collections::HashMap;
use anyhow::{anyhow, Context};
use monitor_client::entities::{
deployment::{
@@ -12,7 +10,12 @@ use monitor_client::entities::{
};
use run_command::async_run_command;
use crate::helpers::{docker::parse_extra_args, run_monitor_command};
use crate::{
config::periphery_config,
helpers::{
docker::parse_extra_args, get_docker_token, run_monitor_command,
},
};
use super::docker_login;
@@ -157,19 +160,25 @@ async fn pull_image(image: &str) -> Log {
pub async fn deploy(
deployment: &Deployment,
docker_token: &Option<String>,
secrets: &HashMap<String, String>,
stop_signal: Option<TerminationSignal>,
stop_time: Option<i32>,
) -> Log {
let docker_token = match get_docker_token(&optional_string(
&deployment.config.docker_account,
)) {
Ok(token) => token,
Err(e) => return Log::error("docker login", format!("{e:#?}")),
};
if let Err(e) = docker_login(
&optional_string(&deployment.config.docker_account),
docker_token,
&docker_token,
)
.await
{
return Log::error("docker login", format!("{e:#?}"));
}
let image = if let DeploymentImage::Image { image } =
&deployment.config.image
{
@@ -186,6 +195,7 @@ pub async fn deploy(
String::from("deployment does not have image attached"),
);
};
let _ = pull_image(image).await;
let _ = stop_and_remove_container(
&deployment.name,
@@ -193,13 +203,15 @@ pub async fn deploy(
stop_time,
)
.await;
let command = docker_run_command(deployment, image);
if deployment.config.skip_secret_interp {
run_monitor_command("docker run", command).await
} else {
let command = svi::interpolate_variables(
&command,
secrets,
&periphery_config().secrets,
svi::Interpolator::DoubleBrackets,
)
.context("failed to interpolate secrets into docker run command");

View File

@@ -1,28 +1,32 @@
mod build;
mod client;
mod container;
mod network;
use anyhow::anyhow;
pub use build::*;
pub use client::*;
pub use container::*;
pub use network::*;
use monitor_client::entities::update::Log;
use run_command::async_run_command;
use super::run_monitor_command;
pub mod build;
pub mod client;
pub mod container;
pub mod network;
pub async fn prune_images() -> Log {
let command = String::from("docker image prune -a -f");
run_monitor_command("prune images", command).await
}
pub fn get_docker_username_pw(
docker_account: &Option<String>,
docker_token: &Option<String>,
) -> anyhow::Result<Option<(String, String)>> {
match docker_account {
Some(docker_account) => match docker_token {
Some(docker_token) => Ok(Some((docker_account.to_owned(), docker_token.to_owned()))),
None => Err(anyhow!(
"docker token for account {docker_account} has not been configured on this client"
)),
},
None => Ok(None),
}
Some(docker_account) => match docker_token {
Some(docker_token) => Ok(Some((docker_account.to_owned(), docker_token.to_owned()))),
None => Err(anyhow!(
"docker token for account {docker_account} has not been configured on this client"
)),
},
None => Ok(None),
}
}
pub async fn docker_login(

View File

@@ -8,7 +8,9 @@ use monitor_client::entities::{
};
use run_command::async_run_command;
use super::run_monitor_command;
use crate::config::periphery_config;
use super::{get_github_token, run_monitor_command};
pub async fn pull(
mut path: PathBuf,
@@ -45,8 +47,6 @@ pub async fn pull(
pub async fn clone(
clone_args: impl Into<CloneArgs>,
mut repo_dir: PathBuf,
access_token: Option<String>,
) -> anyhow::Result<Vec<Log>> {
let CloneArgs {
name,
@@ -54,19 +54,23 @@ pub async fn clone(
branch,
on_clone,
on_pull,
..
github_account,
} = clone_args.into();
let access_token = get_github_token(&github_account)?;
let repo =
repo.as_ref().ok_or(anyhow!("build has no repo attached"))?;
let name = to_monitor_name(&name);
let mut repo_dir = periphery_config().repo_dir.clone();
repo_dir.push(name);
let destination = repo_dir.display().to_string();
let clone_destination = repo_dir.display().to_string();
let clone_log =
clone_inner(repo, &destination, &branch, access_token).await;
clone_inner(repo, &clone_destination, &branch, access_token)
.await;
if !clone_log.success {
return Ok(vec![clone_log]);
}
let commit_hash_log = get_commit_hash_log(&destination).await?;
let commit_hash_log =
get_commit_hash_log(&clone_destination).await?;
let mut logs = vec![clone_log, commit_hash_log];
if let Some(command) = on_clone {
if !command.path.is_empty() && !command.command.is_empty() {

View File

@@ -3,42 +3,40 @@ use async_timing_util::unix_timestamp_ms;
use monitor_client::entities::update::Log;
use run_command::{async_run_command, CommandOutput};
use crate::state::State;
use crate::config::periphery_config;
pub mod docker;
pub mod git;
pub mod stats;
impl State {
pub fn get_github_token(
&self,
github_account: &Option<String>,
) -> anyhow::Result<Option<String>> {
match github_account {
Some(account) => match self.config.github_accounts.get(account)
{
pub fn get_github_token(
github_account: &Option<String>,
) -> anyhow::Result<Option<String>> {
match github_account {
Some(account) => {
match periphery_config().github_accounts.get(account) {
Some(token) => Ok(Some(token.to_owned())),
None => Err(anyhow!(
"did not find token in config for github account {account} "
)),
},
None => Ok(None),
}
}
None => Ok(None),
}
}
pub fn get_docker_token(
&self,
docker_account: &Option<String>,
) -> anyhow::Result<Option<String>> {
match docker_account {
Some(account) => match self.config.docker_accounts.get(account) {
pub fn get_docker_token(
docker_account: &Option<String>,
) -> anyhow::Result<Option<String>> {
match docker_account {
Some(account) => {
match periphery_config().docker_accounts.get(account) {
Some(token) => Ok(Some(token.to_owned())),
None => Err(anyhow!(
"did not find token in config for docker account {account} "
)),
},
None => Ok(None),
"did not find token in config for docker account {account} "
)),
}
}
None => Ok(None),
}
}

View File

@@ -5,4 +5,6 @@ pub mod config;
pub mod guard;
pub mod helpers;
pub mod requests;
pub mod state;
pub mod system_stats;
pub struct State;

View File

@@ -1,10 +1,10 @@
#[macro_use]
extern crate tracing;
use std::{net::SocketAddr, sync::Arc, time::Instant};
use std::{net::SocketAddr, str::FromStr, time::Instant};
use anyhow::Context;
use axum::{middleware, routing::post, Extension, Json, Router};
use axum::{middleware, routing::post, Json, Router};
use axum_extra::{headers::ContentType, TypedHeader};
use resolver_api::Resolver;
@@ -12,33 +12,35 @@ use serror::AppResult;
use termination_signal::tokio::immediate_term_handle;
use uuid::Uuid;
mod config;
mod guard;
mod helpers;
mod requests;
mod state;
use requests::PeripheryRequest;
use state::State;
use monitor_periphery::*;
async fn app() -> anyhow::Result<()> {
let state = State::load().await?;
dotenv::dotenv().ok();
let config = config::periphery_config();
logger::init(config.log_level);
info!("version: v{}", env!("CARGO_PKG_VERSION"));
let socket_addr = state.socket_addr()?;
system_stats::spawn_system_stats_polling_threads();
let socket_addr =
SocketAddr::from_str(&format!("0.0.0.0:{}", config.port))
.context("failed to parse socket addr")?;
let app = Router::new()
.route(
"/",
post(
|state: Extension<Arc<State>>,
Json(request): Json<PeripheryRequest>| async move {
|Json(request): Json<
monitor_periphery::requests::PeripheryRequest,
>| async move {
let timer = Instant::now();
let req_id = Uuid::new_v4();
info!("request {req_id} | {request:?}");
let res = tokio::spawn(async move {
let res = state.resolve_request(request, ()).await;
let res = monitor_periphery::State
.resolve_request(request, ())
.await;
if let Err(e) = &res {
debug!("request {req_id} ERROR: {e:#?}");
}
@@ -57,8 +59,7 @@ async fn app() -> anyhow::Result<()> {
),
)
.layer(middleware::from_fn(guard::guard_request_by_ip))
.layer(middleware::from_fn(guard::guard_request_by_passkey))
.layer(Extension(state));
.layer(middleware::from_fn(guard::guard_request_by_passkey));
info!("starting server on {}", socket_addr);

View File

@@ -5,7 +5,13 @@ use monitor_client::entities::{
use resolver_api::{derive::Request, Resolve};
use serde::{Deserialize, Serialize};
use crate::{helpers::docker, state::State};
use crate::{
helpers::{
docker::{self, client::docker_client},
get_docker_token,
},
State,
};
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(Vec<Log>)]
@@ -20,15 +26,11 @@ impl Resolve<Build> for State {
Build { build }: Build,
_: (),
) -> anyhow::Result<Vec<Log>> {
let secrets = self.secrets.clone();
let repo_dir = self.config.repo_dir.clone();
let log = match self.get_docker_token(&optional_string(
let log = match get_docker_token(&optional_string(
&build.config.docker_account,
)) {
Ok(docker_token) => {
match docker::build(&build, repo_dir, docker_token, &secrets)
.await
{
match docker::build::build(&build, docker_token).await {
Ok(logs) => logs,
Err(e) => {
vec![Log::error("build", format!("{e:#?}"))]
@@ -54,7 +56,7 @@ impl Resolve<GetImageList> for State {
_: GetImageList,
_: (),
) -> anyhow::Result<Vec<ImageSummary>> {
self.docker.list_images().await
docker_client().list_images().await
}
}

View File

@@ -4,13 +4,15 @@ use monitor_client::entities::{
ContainerSummary, Deployment, DockerContainerStats,
TerminationSignal,
},
optional_string,
update::Log,
};
use resolver_api::{derive::Request, Resolve};
use serde::{Deserialize, Serialize};
use crate::{helpers::docker, state::State};
use crate::{
helpers::docker::{self, client::docker_client},
State,
};
//
@@ -25,7 +27,7 @@ impl Resolve<GetContainerList> for State {
_: GetContainerList,
_: (),
) -> anyhow::Result<Vec<ContainerSummary>> {
self.docker.list_containers().await
docker_client().list_containers().await
}
}
@@ -50,7 +52,7 @@ impl Resolve<GetContainerLog> for State {
req: GetContainerLog,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::container_log(&req.name, req.tail).await)
Ok(docker::container::container_log(&req.name, req.tail).await)
}
}
@@ -70,7 +72,10 @@ impl Resolve<GetContainerLogSearch> for State {
req: GetContainerLogSearch,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::container_log_search(&req.name, &req.search).await)
Ok(
docker::container::container_log_search(&req.name, &req.search)
.await,
)
}
}
@@ -90,7 +95,8 @@ impl Resolve<GetContainerStats> for State {
_: (),
) -> anyhow::Result<DockerContainerStats> {
let error = anyhow!("no stats matching {}", req.name);
let mut stats = docker::container_stats(Some(req.name)).await?;
let mut stats =
docker::container::container_stats(Some(req.name)).await?;
let stats = stats.pop().ok_or(error)?;
Ok(stats)
}
@@ -109,7 +115,7 @@ impl Resolve<GetContainerStatsList> for State {
_: GetContainerStatsList,
_: (),
) -> anyhow::Result<Vec<DockerContainerStats>> {
docker::container_stats(None).await
docker::container::container_stats(None).await
}
}
@@ -128,7 +134,7 @@ impl Resolve<StartContainer> for State {
req: StartContainer,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::start_container(&req.name).await)
Ok(docker::container::start_container(&req.name).await)
}
}
@@ -149,7 +155,12 @@ impl Resolve<StopContainer> for State {
req: StopContainer,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::stop_container(&req.name, req.signal, req.time).await)
Ok(
docker::container::stop_container(
&req.name, req.signal, req.time,
)
.await,
)
}
}
@@ -171,7 +182,7 @@ impl Resolve<RemoveContainer> for State {
_: (),
) -> anyhow::Result<Log> {
Ok(
docker::stop_and_remove_container(
docker::container::stop_and_remove_container(
&req.name, req.signal, req.time,
)
.await,
@@ -195,7 +206,13 @@ impl Resolve<RenameContainer> for State {
req: RenameContainer,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::rename_container(&req.curr_name, &req.new_name).await)
Ok(
docker::container::rename_container(
&req.curr_name,
&req.new_name,
)
.await,
)
}
}
@@ -212,7 +229,7 @@ impl Resolve<PruneContainers> for State {
_: PruneContainers,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::prune_containers().await)
Ok(docker::container::prune_containers().await)
}
}
@@ -237,28 +254,20 @@ impl Resolve<Deploy> for State {
}: Deploy,
_: (),
) -> anyhow::Result<Log> {
let secrets = self.secrets.clone();
let log = match self.get_docker_token(&optional_string(
&deployment.config.docker_account,
)) {
Ok(docker_token) => tokio::spawn(async move {
docker::deploy(
&deployment,
&docker_token,
&secrets,
stop_signal
.unwrap_or(deployment.config.termination_signal)
.into(),
stop_time
.unwrap_or(deployment.config.termination_timeout)
.into(),
)
.await
})
let log = tokio::spawn(async move {
docker::container::deploy(
&deployment,
stop_signal
.unwrap_or(deployment.config.termination_signal)
.into(),
stop_time
.unwrap_or(deployment.config.termination_timeout)
.into(),
)
.await
.context("failed at spawn thread for deploy")?,
Err(e) => Log::error("docker login", format!("{e:#?}")),
};
})
.await
.context("failed at spawn thread for deploy")?;
Ok(log)
}
}

View File

@@ -4,7 +4,7 @@ use monitor_client::entities::{
use resolver_api::{derive::Request, Resolve};
use serde::{Deserialize, Serialize};
use crate::{helpers::git, state::State};
use crate::{config::periphery_config, helpers::git, State};
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(Vec<Log>)]
@@ -19,8 +19,7 @@ impl Resolve<CloneRepo> for State {
CloneRepo { args }: CloneRepo,
_: (),
) -> anyhow::Result<Vec<Log>> {
let access_token = self.get_github_token(&args.github_account)?;
git::clone(args, self.config.repo_dir.clone(), access_token).await
git::clone(args).await
}
}
@@ -47,8 +46,12 @@ impl Resolve<PullRepo> for State {
) -> anyhow::Result<Vec<Log>> {
let name = to_monitor_name(&name);
Ok(
git::pull(self.config.repo_dir.join(name), &branch, &on_pull)
.await,
git::pull(
periphery_config().repo_dir.join(name),
&branch,
&on_pull,
)
.await,
)
}
}
@@ -69,8 +72,9 @@ impl Resolve<DeleteRepo> for State {
_: (),
) -> anyhow::Result<Log> {
let name = to_monitor_name(&name);
let deleted =
std::fs::remove_dir_all(self.config.repo_dir.join(&name));
let deleted = std::fs::remove_dir_all(
periphery_config().repo_dir.join(&name),
);
let msg = match deleted {
Ok(_) => format!("deleted repo {name}"),
Err(_) => format!("no repo at {name} to delete"),

View File

@@ -7,7 +7,11 @@ use resolver_api::{
};
use serde::{Deserialize, Serialize};
use crate::{helpers::run_monitor_command, state::State};
use crate::{
config::{accounts_response, secrets_response},
helpers::run_monitor_command,
State,
};
mod stats;
pub use stats::*;
@@ -139,7 +143,7 @@ impl ResolveToString<GetAccounts> for State {
_: GetAccounts,
_: (),
) -> anyhow::Result<String> {
Ok(self.accounts_response.clone())
Ok(accounts_response().clone())
}
}
@@ -156,7 +160,7 @@ impl ResolveToString<GetSecrets> for State {
_: GetSecrets,
_: (),
) -> anyhow::Result<String> {
Ok(self.secrets_response.clone())
Ok(secrets_response().clone())
}
}

View File

@@ -5,7 +5,10 @@ use monitor_client::entities::{
use resolver_api::{derive::Request, Resolve};
use serde::{Deserialize, Serialize};
use crate::{helpers::docker, state::State};
use crate::{
helpers::docker::{self, client::docker_client},
State,
};
//
@@ -20,7 +23,7 @@ impl Resolve<GetNetworkList> for State {
_: GetNetworkList,
_: (),
) -> anyhow::Result<Vec<DockerNetwork>> {
self.docker.list_networks().await
docker_client().list_networks().await
}
}
@@ -40,7 +43,7 @@ impl Resolve<CreateNetwork> for State {
CreateNetwork { name, driver }: CreateNetwork,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::create_network(&name, driver).await)
Ok(docker::network::create_network(&name, driver).await)
}
}
@@ -59,7 +62,7 @@ impl Resolve<DeleteNetwork> for State {
DeleteNetwork { name }: DeleteNetwork,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::delete_network(&name).await)
Ok(docker::network::delete_network(&name).await)
}
}
@@ -76,6 +79,6 @@ impl Resolve<PruneNetworks> for State {
_: PruneNetworks,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::prune_networks().await)
Ok(docker::network::prune_networks().await)
}
}

View File

@@ -6,7 +6,7 @@ use monitor_client::entities::server::stats::{
use resolver_api::{derive::Request, ResolveToString};
use serde::{Deserialize, Serialize};
use crate::state::State;
use crate::{system_stats::stats_client, State};
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(SystemInformation)]
@@ -19,7 +19,7 @@ impl ResolveToString<GetSystemInformation> for State {
_: GetSystemInformation,
_: (),
) -> anyhow::Result<String> {
let info = &self.stats.read().await.info;
let info = &stats_client().read().await.info;
serde_json::to_string(info)
.context("failed to serialize response to string")
}
@@ -38,7 +38,7 @@ impl ResolveToString<GetAllSystemStats> for State {
_: GetAllSystemStats,
_: (),
) -> anyhow::Result<String> {
let stats = &self.stats.read().await.stats;
let stats = &stats_client().read().await.stats;
serde_json::to_string(stats)
.context("failed to serialize response to string")
}
@@ -57,7 +57,7 @@ impl ResolveToString<GetBasicSystemStats> for State {
_: GetBasicSystemStats,
_: (),
) -> anyhow::Result<String> {
let stats = &self.stats.read().await.stats.basic;
let stats = &stats_client().read().await.stats.basic;
serde_json::to_string(stats)
.context("failed to serialize response to string")
}
@@ -76,7 +76,7 @@ impl ResolveToString<GetCpuUsage> for State {
_: GetCpuUsage,
_: (),
) -> anyhow::Result<String> {
let stats = &self.stats.read().await.stats.cpu;
let stats = &stats_client().read().await.stats.cpu;
serde_json::to_string(stats)
.context("failed to serialize response to string")
}
@@ -95,7 +95,7 @@ impl ResolveToString<GetDiskUsage> for State {
_: GetDiskUsage,
_: (),
) -> anyhow::Result<String> {
let stats = &self.stats.read().await.stats.disk;
let stats = &stats_client().read().await.stats.disk;
serde_json::to_string(stats)
.context("failed to serialize response to string")
}
@@ -114,7 +114,7 @@ impl ResolveToString<GetNetworkUsage> for State {
_: GetNetworkUsage,
_: (),
) -> anyhow::Result<String> {
let stats = &self.stats.read().await.stats.network;
let stats = &stats_client().read().await.stats.network;
serde_json::to_string(&stats)
.context("failed to serialize response to string")
}
@@ -133,7 +133,7 @@ impl ResolveToString<GetSystemProcesses> for State {
_: GetSystemProcesses,
_: (),
) -> anyhow::Result<String> {
let stats = &self.stats.read().await.stats.processes;
let stats = &stats_client().read().await.stats.processes;
serde_json::to_string(&stats)
.context("failed to serialize response to string")
}
@@ -152,7 +152,7 @@ impl ResolveToString<GetSystemComponents> for State {
_: GetSystemComponents,
_: (),
) -> anyhow::Result<String> {
let stats = &self.stats.read().await.stats.components;
let stats = &stats_client().read().await.stats.components;
serde_json::to_string(&stats)
.context("failed to serialize response to string")
}

View File

@@ -1,54 +0,0 @@
use std::{
collections::HashMap, net::SocketAddr, str::FromStr, sync::Arc,
};
use anyhow::Context;
use clap::Parser;
use serde_json::json;
use crate::{
config::{CliArgs, Env, PeripheryConfig},
helpers::{
docker::DockerClient,
stats::{InnerStatsClient, StatsClient},
},
};
pub struct State {
pub config: PeripheryConfig,
pub secrets: Arc<HashMap<String, String>>,
pub stats: StatsClient,
pub docker: DockerClient,
pub accounts_response: String,
pub secrets_response: String,
}
impl State {
pub async fn load() -> anyhow::Result<Arc<State>> {
let env = Env::load()?;
let args = CliArgs::parse();
logger::init(args.log_level);
let config = PeripheryConfig::load(&env, &args)?;
let state = State {
secrets: config.secrets.clone().into(),
docker: DockerClient::default(),
stats: InnerStatsClient::new(config.stats_polling_rate),
accounts_response: serde_json::to_string(&json!({
"docker": config.docker_accounts.keys().collect::<Vec<_>>(),
"github": config.github_accounts.keys().collect::<Vec<_>>(),
}))
.unwrap(),
secrets_response: serde_json::to_string(
&config.secrets.keys().collect::<Vec<_>>(),
)
.unwrap(),
config,
};
Ok(state.into())
}
pub fn socket_addr(&self) -> anyhow::Result<SocketAddr> {
SocketAddr::from_str(&format!("0.0.0.0:{}", self.config.port))
.context("failed to parse socket addr")
}
}

View File

@@ -1,22 +1,58 @@
use std::{cmp::Ordering, sync::Arc};
use std::{cmp::Ordering, sync::OnceLock};
use async_timing_util::wait_until_timelength;
use monitor_client::entities::{
server::stats::{
AllSystemStats, BasicSystemStats, CpuUsage, DiskUsage,
LoadAverage, NetworkUsage, SingleCpuUsage, SingleDiskUsage,
SystemComponent, SystemInformation, SystemNetwork, SystemProcess,
},
Timelength,
};
use monitor_client::entities::server::stats::*;
use sysinfo::System;
use tokio::sync::RwLock;
pub type StatsClient = Arc<RwLock<InnerStatsClient>>;
use crate::config::periphery_config;
pub struct InnerStatsClient {
pub info: SystemInformation,
pub fn stats_client() -> &'static RwLock<StatsClient> {
static STATS_CLIENT: OnceLock<RwLock<StatsClient>> =
OnceLock::new();
STATS_CLIENT.get_or_init(|| RwLock::new(StatsClient::default()))
}
/// This should be called before starting the server in main.rs.
/// Keeps the caches stats up to date
pub fn spawn_system_stats_polling_threads() {
tokio::spawn(async move {
let client = stats_client();
loop {
let ts = wait_until_timelength(
async_timing_util::Timelength::FiveMinutes,
0,
)
.await;
let mut client = client.write().await;
client.refresh_lists();
client.stats.refresh_list_ts = ts as i64;
}
});
tokio::spawn(async move {
let polling_rate = periphery_config()
.stats_polling_rate
.to_string()
.parse()
.expect("invalid stats polling rate");
let client = stats_client();
loop {
let ts = wait_until_timelength(polling_rate, 1).await;
let mut client = client.write().await;
client.refresh();
client.stats = client.get_all_stats();
client.stats.refresh_ts = ts as i64;
}
});
}
pub struct StatsClient {
/// Cached system stats
pub stats: AllSystemStats,
/// Cached system information
pub info: SystemInformation,
// the handles used to get the stats
system: sysinfo::System,
disks: sysinfo::Disks,
components: sysinfo::Components,
@@ -27,52 +63,28 @@ const BYTES_PER_GB: f64 = 1073741824.0;
const BYTES_PER_MB: f64 = 1048576.0;
const BYTES_PER_KB: f64 = 1024.0;
impl InnerStatsClient {
pub fn new(polling_rate: Timelength) -> StatsClient {
impl Default for StatsClient {
fn default() -> Self {
let system = sysinfo::System::new_all();
let disks = sysinfo::Disks::new_with_refreshed_list();
let components = sysinfo::Components::new_with_refreshed_list();
let networks = sysinfo::Networks::new_with_refreshed_list();
let stats = AllSystemStats {
polling_rate,
polling_rate: periphery_config().stats_polling_rate,
..Default::default()
};
let client = InnerStatsClient {
StatsClient {
info: get_system_information(&system),
system,
disks,
components,
networks,
stats,
};
let client = Arc::new(RwLock::new(client));
let clone = client.clone();
tokio::spawn(async move {
loop {
let ts = wait_until_timelength(
async_timing_util::Timelength::FiveMinutes,
0,
)
.await;
let mut client = clone.write().await;
client.refresh_lists();
client.stats.refresh_list_ts = ts as i64;
}
});
let clone = client.clone();
tokio::spawn(async move {
let polling_rate = polling_rate.to_string().parse().unwrap();
loop {
let ts = wait_until_timelength(polling_rate, 1).await;
let mut client = clone.write().await;
client.refresh();
client.stats = client.get_all_stats();
client.stats.refresh_ts = ts as i64;
}
});
client
}
}
}
impl StatsClient {
fn refresh(&mut self) {
self.system.refresh_cpu();
self.system.refresh_memory();

View File

@@ -8,6 +8,6 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
serde.workspace = true
tracing.workspace = true
tracing-subscriber.workspace = true
anyhow.workspace = true

View File

@@ -1,7 +1,32 @@
use serde::{Deserialize, Serialize};
use tracing::level_filters::LevelFilter;
pub fn init(log_level: tracing::Level) {
pub fn init(log_level: impl Into<tracing::Level>) {
let log_level: tracing::Level = log_level.into();
tracing_subscriber::fmt()
.with_max_level(LevelFilter::from(log_level))
.init()
}
#[derive(
Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize,
)]
pub enum LogLevel {
Trace,
Debug,
Info,
Warn,
Error,
}
impl From<LogLevel> for tracing::Level {
fn from(value: LogLevel) -> Self {
match value {
LogLevel::Trace => tracing::Level::TRACE,
LogLevel::Debug => tracing::Level::DEBUG,
LogLevel::Info => tracing::Level::INFO,
LogLevel::Warn => tracing::Level::WARN,
LogLevel::Error => tracing::Level::ERROR,
}
}
}