finish instrumenting

This commit is contained in:
mbecker20
2024-04-11 03:37:57 -07:00
parent 34176c336e
commit f26c937747
10 changed files with 80 additions and 27 deletions

View File

@@ -66,6 +66,7 @@ pub fn router() -> Router {
)
}
#[instrument(skip(headers, body))]
async fn handle_build_webhook(
build_id: String,
headers: HeaderMap,
@@ -86,6 +87,7 @@ async fn handle_build_webhook(
Ok(())
}
#[instrument(skip(headers, body))]
async fn handle_repo_clone_webhook(
repo_id: String,
headers: HeaderMap,
@@ -106,6 +108,7 @@ async fn handle_repo_clone_webhook(
Ok(())
}
#[instrument(skip(headers, body))]
async fn handle_repo_pull_webhook(
repo_id: String,
headers: HeaderMap,
@@ -126,6 +129,7 @@ async fn handle_repo_pull_webhook(
Ok(())
}
#[instrument(skip_all)]
async fn verify_gh_signature(
headers: HeaderMap,
body: &str,

View File

@@ -15,6 +15,7 @@ use crate::{
},
};
#[instrument(level = "debug")]
pub async fn alert_deployments(
ts: i64,
server_names: HashMap<String, String>,
@@ -66,8 +67,6 @@ pub async fn alert_deployments(
send_alerts(&alerts).await;
let res = db_client().await.alerts.insert_many(alerts, None).await;
if let Err(e) = res {
error!(
"failed to record deployment status alerts to db | {e:#}"
);
error!("failed to record deployment status alerts to db | {e:#}");
}
}

View File

@@ -13,6 +13,7 @@ use monitor_client::entities::{
use crate::helpers::resource::StateResource;
// called after cache update
#[instrument(level = "debug")]
pub async fn check_alerts(ts: i64) {
let servers = get_all_servers_map().await;
@@ -29,6 +30,7 @@ pub async fn check_alerts(ts: i64) {
);
}
#[instrument(level = "debug")]
async fn get_all_servers_map() -> anyhow::Result<(
HashMap<String, ServerListItem>,
HashMap<String, String>,

View File

@@ -27,6 +27,7 @@ type OpenAlertMap<T = AlertDataVariant> =
type OpenDiskAlertMap = OpenAlertMap<PathBuf>;
type OpenTempAlertMap = OpenAlertMap<String>;
#[instrument(level = "debug")]
pub async fn alert_servers(
ts: i64,
mut servers: HashMap<String, ServerListItem>,
@@ -439,6 +440,7 @@ pub async fn alert_servers(
);
}
#[instrument(level = "debug")]
async fn open_alerts(alerts: &[(Alert, SendAlerts)]) {
if alerts.is_empty() {
return;
@@ -486,6 +488,7 @@ async fn open_alerts(alerts: &[(Alert, SendAlerts)]) {
send_alerts(&alerts).await
}
#[instrument(level = "debug")]
async fn update_alerts(alerts: &[(Alert, SendAlerts)]) {
if alerts.is_empty() {
return;
@@ -533,6 +536,7 @@ async fn update_alerts(alerts: &[(Alert, SendAlerts)]) {
}
}
#[instrument(level = "debug")]
async fn resolve_alerts(alert_ids: &[(String, SendAlerts)]) {
if alert_ids.is_empty() {
return;
@@ -598,6 +602,7 @@ async fn resolve_alerts(alert_ids: &[(String, SendAlerts)]) {
}
}
#[instrument(level = "debug")]
async fn get_open_alerts(
) -> anyhow::Result<(OpenAlertMap, OpenDiskAlertMap, OpenTempAlertMap)>
{

View File

@@ -16,6 +16,7 @@ use crate::helpers::cache::{
use super::{CachedDeploymentStatus, CachedServerStatus, History};
#[instrument(level = "debug", skip_all)]
pub async fn insert_deployments_status_unknown(
deployments: Vec<Deployment>,
) {
@@ -40,6 +41,7 @@ pub async fn insert_deployments_status_unknown(
}
}
#[instrument(level = "debug", skip_all)]
pub async fn insert_server_status(
server: &Server,
status: ServerStatus,

View File

@@ -75,6 +75,7 @@ pub fn spawn_monitor_loop() {
});
}
#[instrument(level = "debug")]
pub async fn update_cache_for_server(server: &Server) {
let deployments = match find_collect(
&db_client().await.deployments,

View File

@@ -4,6 +4,7 @@ use monitor_client::entities::server::stats::{
use crate::{db::db_client, helpers::cache::server_status_cache};
#[instrument(level = "debug")]
pub async fn record_server_stats(ts: i64) {
let status = server_status_cache().get_list().await;
let records = status

View File

@@ -42,7 +42,7 @@ impl PeripheryClient {
self.request_inner(request, None).await
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
pub async fn health_check(&self) -> anyhow::Result<()> {
self
.request_inner(api::GetHealth {}, Some(Duration::from_secs(1)))
@@ -50,7 +50,7 @@ impl PeripheryClient {
Ok(())
}
#[tracing::instrument(skip(self))]
#[tracing::instrument(level = "debug", skip(self))]
async fn request_inner<T: HasResponse>(
&self,
request: T,
@@ -63,8 +63,8 @@ impl PeripheryClient {
let mut req = http_client()
.post(&self.address)
.json(&json!({
"type": req_type,
"params": request
"type": req_type,
"params": request
}))
.header("authorization", &self.passkey);
if let Some(timeout) = timeout {

View File

@@ -1,13 +1,12 @@
use std::time::Duration;
use anyhow::Context;
use opentelemetry_otlp::WithExportConfig;
use serde::{Deserialize, Serialize};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{
layer::SubscriberExt, util::SubscriberInitExt,
};
mod opentelemetry;
#[derive(Debug, Clone, Default, Deserialize)]
pub struct LogConfig {
/// The logging level. default: info
@@ -18,23 +17,15 @@ pub struct LogConfig {
#[serde(default)]
pub stdio: StdioLogMode,
/// Enable opentelemetry experting
/// Enable opentelemetry exporting
pub otlp_endpoint: Option<String>,
#[serde(default = "default_opentelemetry_service_name")]
pub opentelemetry_service_name: String,
}
macro_rules! opentelemetry_layer {
($endpoint:expr) => {{
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint($endpoint)
.with_timeout(Duration::from_secs(3)),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)?;
tracing_opentelemetry::layer().with_tracer(tracer)
}};
fn default_opentelemetry_service_name() -> String {
String::from("Monitor")
}
pub fn init(config: &LogConfig) -> anyhow::Result<()> {
@@ -46,16 +37,25 @@ pub fn init(config: &LogConfig) -> anyhow::Result<()> {
match (config.stdio, &config.otlp_endpoint) {
(StdioLogMode::Standard, Some(endpoint)) => registry
.with(tracing_subscriber::fmt::layer())
.with(opentelemetry_layer!(endpoint))
.with(opentelemetry::layer(
endpoint,
config.opentelemetry_service_name.clone(),
))
.try_init()
.context("failed to init logger"),
(StdioLogMode::Json, Some(endpoint)) => registry
.with(tracing_subscriber::fmt::layer().json())
.with(opentelemetry_layer!(endpoint))
.with(opentelemetry::layer(
endpoint,
config.opentelemetry_service_name.clone(),
))
.try_init()
.context("failed to init logger"),
(StdioLogMode::None, Some(endpoint)) => registry
.with(opentelemetry_layer!(endpoint))
.with(opentelemetry::layer(
endpoint,
config.opentelemetry_service_name.clone(),
))
.try_init()
.context("failed to init logger"),

View File

@@ -0,0 +1,39 @@
use std::time::Duration;
use opentelemetry::KeyValue;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{
trace::{self, RandomIdGenerator, Sampler, Tracer},
Resource,
};
use tracing_opentelemetry::OpenTelemetryLayer;
pub fn layer<S>(
endpoint: &str,
service_name: String,
) -> OpenTelemetryLayer<S, Tracer>
where
S: tracing::Subscriber,
for<'span> S: tracing_subscriber::registry::LookupSpan<'span>,
{
let tracer = opentelemetry_otlp::new_pipeline()
.tracing()
.with_exporter(
opentelemetry_otlp::new_exporter()
.tonic()
.with_endpoint(endpoint)
.with_timeout(Duration::from_secs(3)),
)
.with_trace_config(
trace::config()
.with_sampler(Sampler::AlwaysOn)
.with_id_generator(RandomIdGenerator::default())
.with_resource(Resource::new(vec![KeyValue::new(
"service.name",
service_name,
)])),
)
.install_batch(opentelemetry_sdk::runtime::Tokio)
.expect("");
tracing_opentelemetry::layer().with_tracer(tracer)
}