add tracing spans to periphery

This commit is contained in:
mbecker20
2024-04-10 18:23:35 -07:00
parent ad09fd81b4
commit 5d2082b478
16 changed files with 203 additions and 49 deletions

View File

@@ -18,6 +18,7 @@ use crate::{
#[async_trait]
impl Resolve<Build> for State {
#[instrument(name = "Build", skip(self))]
async fn resolve(
&self,
Build { build }: Build,
@@ -44,6 +45,7 @@ impl Resolve<Build> for State {
#[async_trait::async_trait]
impl Resolve<GetImageList> for State {
#[instrument(name = "GetImageList", level = "debug", skip(self))]
async fn resolve(
&self,
_: GetImageList,
@@ -57,11 +59,12 @@ impl Resolve<GetImageList> for State {
#[async_trait]
impl Resolve<PruneImages> for State {
#[instrument(name = "PruneImages", skip(self))]
async fn resolve(
&self,
_: PruneImages,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::prune_images().await)
Ok(docker::build::prune_images().await)
}
}

View File

@@ -15,6 +15,11 @@ use crate::{
#[async_trait::async_trait]
impl Resolve<GetContainerList> for State {
#[instrument(
name = "GetContainerList",
level = "debug",
skip(self)
)]
async fn resolve(
&self,
_: GetContainerList,
@@ -28,6 +33,7 @@ impl Resolve<GetContainerList> for State {
#[async_trait::async_trait]
impl Resolve<GetContainerLog> for State {
#[instrument(name = "GetContainerLog", level = "debug", skip(self))]
async fn resolve(
&self,
req: GetContainerLog,
@@ -41,6 +47,11 @@ impl Resolve<GetContainerLog> for State {
#[async_trait::async_trait]
impl Resolve<GetContainerLogSearch> for State {
#[instrument(
name = "GetContainerLogSearch",
level = "debug",
skip(self)
)]
async fn resolve(
&self,
req: GetContainerLogSearch,
@@ -57,6 +68,11 @@ impl Resolve<GetContainerLogSearch> for State {
#[async_trait::async_trait]
impl Resolve<GetContainerStats> for State {
#[instrument(
name = "GetContainerStats",
level = "debug",
skip(self)
)]
async fn resolve(
&self,
req: GetContainerStats,
@@ -74,6 +90,11 @@ impl Resolve<GetContainerStats> for State {
#[async_trait::async_trait]
impl Resolve<GetContainerStatsList> for State {
#[instrument(
name = "GetContainerStatsList",
level = "debug",
skip(self)
)]
async fn resolve(
&self,
_: GetContainerStatsList,
@@ -87,6 +108,10 @@ impl Resolve<GetContainerStatsList> for State {
#[async_trait::async_trait]
impl Resolve<StartContainer> for State {
#[instrument(
name = "StartContainer",
skip(self)
)]
async fn resolve(
&self,
req: StartContainer,
@@ -100,6 +125,10 @@ impl Resolve<StartContainer> for State {
#[async_trait::async_trait]
impl Resolve<StopContainer> for State {
#[instrument(
name = "StopContainer",
skip(self)
)]
async fn resolve(
&self,
req: StopContainer,
@@ -118,6 +147,10 @@ impl Resolve<StopContainer> for State {
#[async_trait::async_trait]
impl Resolve<RemoveContainer> for State {
#[instrument(
name = "RemoveContainer",
skip(self)
)]
async fn resolve(
&self,
req: RemoveContainer,
@@ -136,6 +169,10 @@ impl Resolve<RemoveContainer> for State {
#[async_trait::async_trait]
impl Resolve<RenameContainer> for State {
#[instrument(
name = "RenameContainer",
skip(self)
)]
async fn resolve(
&self,
req: RenameContainer,
@@ -155,6 +192,10 @@ impl Resolve<RenameContainer> for State {
#[async_trait::async_trait]
impl Resolve<PruneContainers> for State {
#[instrument(
name = "PruneContainers",
skip(self)
)]
async fn resolve(
&self,
_: PruneContainers,
@@ -168,6 +209,10 @@ impl Resolve<PruneContainers> for State {
#[async_trait::async_trait]
impl Resolve<Deploy> for State {
#[instrument(
name = "Deploy",
skip(self)
)]
async fn resolve(
&self,
Deploy {

View File

@@ -6,6 +6,10 @@ use crate::{config::periphery_config, helpers::git, State};
#[async_trait::async_trait]
impl Resolve<CloneRepo> for State {
#[instrument(
name = "CloneRepo",
skip(self)
)]
async fn resolve(
&self,
CloneRepo { args }: CloneRepo,
@@ -19,6 +23,10 @@ impl Resolve<CloneRepo> for State {
#[async_trait::async_trait]
impl Resolve<PullRepo> for State {
#[instrument(
name = "PullRepo",
skip(self)
)]
async fn resolve(
&self,
PullRepo {
@@ -44,6 +52,10 @@ impl Resolve<PullRepo> for State {
#[async_trait::async_trait]
impl Resolve<DeleteRepo> for State {
#[instrument(
name = "DeleteRepo",
skip(self)
)]
async fn resolve(
&self,
DeleteRepo { name }: DeleteRepo,

View File

@@ -3,7 +3,7 @@ use async_trait::async_trait;
use monitor_client::entities::{update::Log, SystemCommand};
use periphery_client::api::{
build::*, container::*, git::*, network::*, stats::*, GetAccounts,
GetHealth, GetSecrets, GetVersion, GetVersionResponse, PruneAll,
GetHealth, GetSecrets, GetVersion, GetVersionResponse, PruneSystem,
RunCommand,
};
use resolver_api::{derive::Resolver, Resolve, ResolveToString};
@@ -71,13 +71,14 @@ pub enum PeripheryRequest {
CreateNetwork(CreateNetwork),
DeleteNetwork(DeleteNetwork),
PruneNetworks(PruneNetworks),
PruneAll(PruneAll),
PruneAll(PruneSystem),
}
//
#[async_trait]
impl ResolveToString<GetHealth> for State {
#[instrument(name = "GetHealth", level = "debug", skip(self))]
async fn resolve_to_string(
&self,
_: GetHealth,
@@ -91,6 +92,7 @@ impl ResolveToString<GetHealth> for State {
#[async_trait]
impl Resolve<GetVersion> for State {
#[instrument(name = "GetVersion", level = "debug", skip(self))]
async fn resolve(
&self,
_: GetVersion,
@@ -106,6 +108,7 @@ impl Resolve<GetVersion> for State {
#[async_trait]
impl ResolveToString<GetAccounts> for State {
#[instrument(name = "GetAccounts", level = "debug", skip(self))]
async fn resolve_to_string(
&self,
_: GetAccounts,
@@ -119,6 +122,7 @@ impl ResolveToString<GetAccounts> for State {
#[async_trait]
impl ResolveToString<GetSecrets> for State {
#[instrument(name = "GetSecrets", level = "debug", skip(self))]
async fn resolve_to_string(
&self,
_: GetSecrets,
@@ -130,6 +134,7 @@ impl ResolveToString<GetSecrets> for State {
#[async_trait]
impl Resolve<RunCommand> for State {
#[instrument(name = "RunCommand", skip(self))]
async fn resolve(
&self,
RunCommand {
@@ -151,10 +156,11 @@ impl Resolve<RunCommand> for State {
}
#[async_trait]
impl Resolve<PruneAll> for State {
impl Resolve<PruneSystem> for State {
#[instrument(name = "PruneSystem", skip(self))]
async fn resolve(
&self,
PruneAll {}: PruneAll,
PruneSystem {}: PruneSystem,
_: (),
) -> anyhow::Result<Log> {
Ok(docker::prune_system().await)

View File

@@ -16,6 +16,7 @@ use crate::{
#[async_trait]
impl Resolve<GetNetworkList> for State {
#[instrument(name = "GetNetworkList", level = "debug", skip(self))]
async fn resolve(
&self,
_: GetNetworkList,
@@ -29,6 +30,7 @@ impl Resolve<GetNetworkList> for State {
#[async_trait]
impl Resolve<CreateNetwork> for State {
#[instrument(name = "CreateNetwork", skip(self))]
async fn resolve(
&self,
CreateNetwork { name, driver }: CreateNetwork,
@@ -42,6 +44,7 @@ impl Resolve<CreateNetwork> for State {
#[async_trait]
impl Resolve<DeleteNetwork> for State {
#[instrument(name = "DeleteNetwork", skip(self))]
async fn resolve(
&self,
DeleteNetwork { name }: DeleteNetwork,
@@ -55,6 +58,7 @@ impl Resolve<DeleteNetwork> for State {
#[async_trait]
impl Resolve<PruneNetworks> for State {
#[instrument(name = "PruneNetworks", skip(self))]
async fn resolve(
&self,
_: PruneNetworks,

View File

@@ -10,6 +10,11 @@ use crate::{system_stats::stats_client, State};
#[async_trait::async_trait]
impl ResolveToString<GetSystemInformation> for State {
#[instrument(
name = "GetSystemInformation",
level = "debug",
skip(self)
)]
async fn resolve_to_string(
&self,
_: GetSystemInformation,
@@ -25,6 +30,11 @@ impl ResolveToString<GetSystemInformation> for State {
#[async_trait::async_trait]
impl ResolveToString<GetAllSystemStats> for State {
#[instrument(
name = "GetAllSystemStats",
level = "debug",
skip(self)
)]
async fn resolve_to_string(
&self,
_: GetAllSystemStats,
@@ -40,6 +50,11 @@ impl ResolveToString<GetAllSystemStats> for State {
#[async_trait::async_trait]
impl ResolveToString<GetBasicSystemStats> for State {
#[instrument(
name = "GetBasicSystemStats",
level = "debug",
skip(self)
)]
async fn resolve_to_string(
&self,
_: GetBasicSystemStats,
@@ -55,6 +70,7 @@ impl ResolveToString<GetBasicSystemStats> for State {
#[async_trait::async_trait]
impl ResolveToString<GetCpuUsage> for State {
#[instrument(name = "GetCpuUsage", level = "debug", skip(self))]
async fn resolve_to_string(
&self,
_: GetCpuUsage,
@@ -70,6 +86,7 @@ impl ResolveToString<GetCpuUsage> for State {
#[async_trait::async_trait]
impl ResolveToString<GetDiskUsage> for State {
#[instrument(name = "GetDiskUsage", level = "debug", skip(self))]
async fn resolve_to_string(
&self,
_: GetDiskUsage,
@@ -85,6 +102,7 @@ impl ResolveToString<GetDiskUsage> for State {
#[async_trait::async_trait]
impl ResolveToString<GetNetworkUsage> for State {
#[instrument(name = "GetNetworkUsage", level = "debug", skip(self))]
async fn resolve_to_string(
&self,
_: GetNetworkUsage,
@@ -100,6 +118,11 @@ impl ResolveToString<GetNetworkUsage> for State {
#[async_trait::async_trait]
impl ResolveToString<GetSystemProcesses> for State {
#[instrument(
name = "GetSystemProcesses",
level = "debug",
skip(self)
)]
async fn resolve_to_string(
&self,
_: GetSystemProcesses,
@@ -115,6 +138,11 @@ impl ResolveToString<GetSystemProcesses> for State {
#[async_trait::async_trait]
impl ResolveToString<GetSystemComponents> for State {
#[instrument(
name = "GetSystemComponents",
level = "debug",
skip(self)
)]
async fn resolve_to_string(
&self,
_: GetSystemComponents,

View File

@@ -12,6 +12,7 @@ use serde_json::Value;
use crate::config::periphery_config;
#[instrument(level = "debug")]
pub async fn guard_request_by_passkey(
req: Request<Body>,
next: Next,
@@ -58,6 +59,7 @@ pub async fn guard_request_by_passkey(
}
}
#[instrument(level = "debug")]
pub async fn guard_request_by_ip(
req: Request<Body>,
next: Next,

View File

@@ -0,0 +1,35 @@
use std::time::Instant;
use axum::Json;
use axum_extra::{headers::ContentType, TypedHeader};
use resolver_api::Resolver;
use serror::AppResult;
use uuid::Uuid;
use crate::State;
#[instrument(name = "periphery_handler")]
pub async fn handler(
Json(request): Json<crate::api::PeripheryRequest>,
) -> AppResult<(TypedHeader<ContentType>, String)> {
let timer = Instant::now();
let req_id = Uuid::new_v4();
info!("request {req_id} | {request:?}");
let res = tokio::spawn(async move {
let res = State.resolve_request(request, ()).await;
if let Err(resolver_api::Error::Serialization(e)) = &res {
warn!("request {req_id} serialization error: {e:?}");
}
if let Err(resolver_api::Error::Inner(e)) = &res {
warn!("request {req_id} error: {e:#}");
}
let elapsed = timer.elapsed();
info!("request {req_id} | resolve time: {elapsed:?}");
res
})
.await;
if let Err(e) = &res {
warn!("request {req_id} spawn error: {e:#}");
}
AppResult::Ok((TypedHeader(ContentType::json()), res??))
}

View File

@@ -10,6 +10,13 @@ use crate::{config::periphery_config, helpers::run_monitor_command};
use super::{docker_login, parse_extra_args};
#[instrument]
pub async fn prune_images() -> Log {
let command = String::from("docker image prune -a -f");
run_monitor_command("prune images", command).await
}
#[instrument]
pub async fn build(
Build {
name,
@@ -63,6 +70,7 @@ pub async fn build(
if *skip_secret_interp {
let build_log =
run_monitor_command("docker build", command).await;
info!("finished building docker image");
logs.push(build_log);
} else {
let (command, replacers) = svi::interpolate_variables(

View File

@@ -20,11 +20,13 @@ use crate::{
use super::docker_login;
#[instrument(level = "debug")]
pub async fn container_log(container_name: &str, tail: u64) -> Log {
let command = format!("docker logs {container_name} --tail {tail}");
run_monitor_command("get container log", command).await
}
#[instrument(level = "debug")]
pub async fn container_log_search(
container_name: &str,
search: &str,
@@ -34,6 +36,7 @@ pub async fn container_log_search(
run_monitor_command("get container log grep", command).await
}
#[instrument(level = "debug")]
pub async fn container_stats(
container_name: Option<String>,
) -> anyhow::Result<Vec<DockerContainerStats>> {
@@ -62,17 +65,20 @@ pub async fn container_stats(
}
}
#[instrument]
pub async fn prune_containers() -> Log {
let command = String::from("docker container prune -f");
run_monitor_command("prune containers", command).await
}
#[instrument]
pub async fn start_container(container_name: &str) -> Log {
let container_name = to_monitor_name(container_name);
let command = format!("docker start {container_name}");
run_monitor_command("docker start", command).await
}
#[instrument]
pub async fn stop_container(
container_name: &str,
signal: Option<TerminationSignal>,
@@ -97,6 +103,7 @@ pub async fn stop_container(
}
}
#[instrument]
pub async fn stop_and_remove_container(
container_name: &str,
signal: Option<TerminationSignal>,
@@ -144,6 +151,7 @@ fn stop_container_command(
format!("docker stop{signal}{time} {container_name}")
}
#[instrument]
pub async fn rename_container(
curr_name: &str,
new_name: &str,
@@ -154,11 +162,13 @@ pub async fn rename_container(
run_monitor_command("docker rename", command).await
}
#[instrument]
async fn pull_image(image: &str) -> Log {
let command = format!("docker pull {image}");
run_monitor_command("docker pull", command).await
}
#[instrument]
pub async fn deploy(
deployment: &Deployment,
stop_signal: Option<TerminationSignal>,
@@ -200,14 +210,17 @@ pub async fn deploy(
};
let _ = pull_image(image).await;
debug!("image pulled");
let _ = stop_and_remove_container(
&deployment.name,
stop_signal,
stop_time,
)
.await;
debug!("container stopped and removed");
let command = docker_run_command(deployment, image);
debug!("docker run command: {command}");
if deployment.config.skip_secret_interp {
run_monitor_command("docker run", command).await

View File

@@ -9,11 +9,6 @@ pub mod client;
pub mod container;
pub mod network;
pub async fn prune_images() -> Log {
let command = String::from("docker image prune -a -f");
run_monitor_command("prune images", command).await
}
pub fn get_docker_username_pw(
docker_account: &Option<String>,
docker_token: &Option<String>,
@@ -53,6 +48,7 @@ pub fn parse_extra_args(extra_args: &[String]) -> String {
}
}
#[instrument]
pub async fn prune_system() -> Log {
let command = String::from("docker system prune -a -f");
run_monitor_command("prune system", command).await

View File

@@ -2,6 +2,7 @@ use monitor_client::entities::update::Log;
use crate::helpers::run_monitor_command;
#[instrument]
pub async fn create_network(
name: &str,
driver: Option<String>,
@@ -14,11 +15,13 @@ pub async fn create_network(
run_monitor_command("create network", command).await
}
#[instrument]
pub async fn delete_network(name: &str) -> Log {
let command = format!("docker network rm {name}");
run_monitor_command("delete network", command).await
}
#[instrument]
pub async fn prune_networks() -> Log {
let command = String::from("docker network prune -f");
run_monitor_command("prune networks", command).await

View File

@@ -54,9 +54,11 @@ pub async fn pull(
logs
}
pub async fn clone(
clone_args: impl Into<CloneArgs>,
) -> anyhow::Result<Vec<Log>> {
#[instrument]
pub async fn clone<T>(clone_args: T) -> anyhow::Result<Vec<Log>>
where
T: Into<CloneArgs> + std::fmt::Debug,
{
let CloneArgs {
name,
repo,
@@ -75,10 +77,14 @@ pub async fn clone(
let clone_log =
clone_inner(repo, &repo_dir, &branch, access_token).await;
if !clone_log.success {
warn!("repo at {repo_dir:?} failed to clone");
return Ok(vec![clone_log]);
}
info!("repo at {repo_dir:?} cloned with clone_inner");
let commit_hash_log = get_commit_hash_log(&repo_dir).await?;
let mut logs = vec![clone_log, commit_hash_log];
@@ -95,6 +101,10 @@ pub async fn clone(
),
)
.await;
info!(
"run repo on_clone command | command: {} | cwd: {:?}",
command.command, on_clone_path
);
logs.push(on_clone_log);
}
}
@@ -110,12 +120,17 @@ pub async fn clone(
),
)
.await;
info!(
"run repo on_pull command | command: {} | cwd: {:?}",
command.command, on_pull_path
);
logs.push(on_pull_log);
}
}
Ok(logs)
}
#[instrument]
async fn clone_inner(
repo: &str,
destination: &Path,
@@ -158,6 +173,7 @@ async fn clone_inner(
}
}
#[instrument]
async fn get_commit_hash_log(repo_dir: &Path) -> anyhow::Result<Log> {
let start_ts = monitor_timestamp();
let command = format!("cd {} && git rev-parse --short HEAD && git rev-parse HEAD && git log -1 --pretty=%B", repo_dir.display());

View File

@@ -1,20 +1,16 @@
#[macro_use]
extern crate tracing;
use std::{net::SocketAddr, str::FromStr, time::Instant};
use std::{net::SocketAddr, str::FromStr};
use anyhow::Context;
use axum::{middleware, routing::post, Json, Router};
use axum_extra::{headers::ContentType, TypedHeader};
use resolver_api::Resolver;
use serror::AppResult;
use axum::{middleware, routing::post, Router};
use termination_signal::tokio::immediate_term_handle;
use uuid::Uuid;
mod api;
mod config;
mod guard;
mod handler;
mod helpers;
mod system_stats;
@@ -34,31 +30,7 @@ async fn app() -> anyhow::Result<()> {
.context("failed to parse socket addr")?;
let app = Router::new()
.route(
"/",
post(|Json(request): Json<api::PeripheryRequest>| async move {
let timer = Instant::now();
let req_id = Uuid::new_v4();
info!("request {req_id} | {request:?}");
let res = tokio::spawn(async move {
let res = State.resolve_request(request, ()).await;
if let Err(resolver_api::Error::Serialization(e)) = &res {
warn!("request {req_id} serialization error: {e:?}");
}
if let Err(resolver_api::Error::Inner(e)) = &res {
warn!("request {req_id} error: {e:#}");
}
let elapsed = timer.elapsed();
info!("request {req_id} | resolve time: {elapsed:?}");
res
})
.await;
if let Err(e) = &res {
warn!("request {req_id} spawn error: {e:#}");
}
AppResult::Ok((TypedHeader(ContentType::json()), res??))
}),
)
.route("/", post(handler::handler))
.layer(middleware::from_fn(guard::guard_request_by_ip))
.layer(middleware::from_fn(guard::guard_request_by_passkey));

View File

@@ -50,7 +50,7 @@ pub struct GetSecrets {}
#[derive(Serialize, Deserialize, Debug, Clone, Request)]
#[response(Log)]
pub struct PruneAll {}
pub struct PruneSystem {}
//

View File

@@ -30,14 +30,19 @@ impl PeripheryClient {
}
}
// tracing will skip self, to avoid including passkey in traces
#[tracing::instrument(skip(self))]
pub async fn request<T: HasResponse>(
&self,
request: T,
) -> anyhow::Result<T::Response> {
tracing::debug!("running health check");
self.health_check().await?;
tracing::debug!("health check passed. running inner request");
self.request_inner(request, None).await
}
#[tracing::instrument(skip(self))]
pub async fn health_check(&self) -> anyhow::Result<()> {
self
.request_inner(api::GetHealth {}, Some(Duration::from_secs(1)))
@@ -45,6 +50,7 @@ impl PeripheryClient {
Ok(())
}
#[tracing::instrument(skip(self))]
async fn request_inner<T: HasResponse>(
&self,
request: T,
@@ -71,15 +77,20 @@ impl PeripheryClient {
"got response | type: {req_type} | {status} | body: {res:?}",
);
if status == StatusCode::OK {
res.json().await.context(format!(
"failed to parse response to json | type: {req_type} | body: {request:?}"
))
tracing::debug!("response ok, deserializing");
res.json().await.with_context(|| format!(
"failed to parse response to json | type: {req_type} | body: {request:?}"
))
} else {
tracing::debug!("response is non-200");
let text = res
.text()
.await
.context("failed to convert response to text")?;
tracing::debug!("got response text, deserializing error");
let error = deserialize_error(text)
.context(format!("request to periphery failed | {status}"));