move core connection handlers into core binary

This commit is contained in:
mbecker20
2025-09-21 16:04:56 -07:00
parent 52453d1320
commit 0be51dc784
14 changed files with 164 additions and 187 deletions

6
Cargo.lock generated
View File

@@ -2928,6 +2928,7 @@ dependencies = [
"slack_client_rs",
"svi",
"tokio",
"tokio-tungstenite 0.27.0",
"tokio-util",
"toml",
"toml_pretty",
@@ -2935,6 +2936,7 @@ dependencies = [
"tracing",
"transport",
"typeshare",
"url",
"urlencoding",
"uuid",
"wildcard",
@@ -3852,22 +3854,18 @@ version = "2.0.0-dev-0"
>>>>>>> 6fbead30 (noise library and cli key utilities)
dependencies = [
"anyhow",
"axum",
"bytes",
"cache",
"futures-util",
"komodo_client",
"resolver_api",
"rustls 0.23.32",
"serde",
"serde_json",
"serror",
"tokio",
"tokio-tungstenite 0.27.0",
"tokio-util",
"tracing",
"transport",
"url",
"uuid",
]

View File

@@ -40,6 +40,7 @@ slack.workspace = true
svi.workspace = true
# external
aws-credential-types.workspace = true
tokio-tungstenite.workspace = true
english-to-cron.workspace = true
openidconnect.workspace = true
jsonwebtoken.workspace = true
@@ -83,3 +84,4 @@ rand.workspace = true
hmac.workspace = true
sha2.workspace = true
hex.workspace = true
url.workspace = true

View File

@@ -34,6 +34,7 @@ use periphery_client::{
use resolver_api::Resolve;
use tokio::fs;
use crate::connection::client::spawn_client_connection;
use crate::{
config::core_config,
helpers::{
@@ -438,12 +439,15 @@ async fn get_on_host_periphery(
PeripheryClient::new_with_spawned_client_connection(
ObjectId::new().to_hex(),
&config.address,
if config.private_key.is_empty() {
core_config().private_key.clone()
} else {
config.private_key
|server_id, address| async move {
spawn_client_connection(
server_id,
address,
config.private_key,
optional_string(config.public_key),
)
.await
},
optional_string(config.public_key),
)
.await?;
// Poll for connection to be estalished

View File

@@ -1,8 +1,14 @@
use std::{collections::HashMap, sync::Arc, time::Duration};
use crate::{
all_server_channels,
config::core_config,
connection::{MessageHandler, PeripheryConnection},
};
use anyhow::Context;
use axum::http::HeaderValue;
use komodo_client::entities::{optional_string, server::Server};
use periphery_client::periphery_connections;
use rustls::{ClientConfig, client::danger::ServerCertVerifier};
use tokio_tungstenite::Connector;
use tracing::{info, warn};
@@ -12,18 +18,8 @@ use transport::{
websocket::tungstenite::TungsteniteWebsocket,
};
use crate::{
all_server_channels,
connection::{
MessageHandler, PeripheryConnection, periphery_connections,
},
};
/// Managed connections to exactly those specified by specs (ServerId -> Address)
pub async fn manage_client_connections(
servers: &[Server],
default_private_key: &'static str,
) {
pub async fn manage_client_connections(servers: &[Server]) {
let periphery_connections = periphery_connections();
let periphery_channels = all_server_channels();
@@ -97,11 +93,7 @@ pub async fn manage_client_connections(
if let Err(e) = spawn_client_connection(
server_id.clone(),
address,
if private_key.is_empty() {
default_private_key.to_string()
} else {
private_key.clone()
},
private_key.clone(),
optional_string(expected_public_key),
)
.await
@@ -139,6 +131,15 @@ pub async fn spawn_client_connection(
existing_connection.cancel();
}
let config = core_config();
let private_key = if private_key.is_empty() {
config.private_key.clone()
} else {
private_key
};
let expected_public_key = expected_public_key
.or_else(|| config.periphery_public_key.clone());
tokio::spawn(async move {
loop {
let ws = tokio::select! {

View File

@@ -1,21 +1,15 @@
use std::sync::{
Arc, OnceLock,
atomic::{self, AtomicBool},
};
use std::sync::Arc;
use anyhow::anyhow;
use bytes::Bytes;
use cache::CloneCache;
use tokio::sync::{
RwLock,
mpsc::{Sender, error::SendError},
};
use tokio_util::sync::CancellationToken;
use periphery_client::{PeripheryConnection, all_server_channels};
use tokio::sync::mpsc::Sender;
use tracing::{info, warn};
use transport::{
auth::{ConnectionIdentifiers, LoginFlow, PublicKeyValidator},
bytes::id_from_transport_bytes,
channel::{BufferedReceiver, buffered_channel},
channel::BufferedReceiver,
websocket::{
Websocket, WebsocketMessage, WebsocketReceiver as _,
WebsocketSender as _,
@@ -23,8 +17,6 @@ use transport::{
};
use uuid::Uuid;
use crate::all_server_channels;
pub mod client;
pub mod server;
@@ -181,77 +173,3 @@ impl MessageHandler {
}
}
}
/// server id => connection
pub type PeripheryConnections =
CloneCache<String, Arc<PeripheryConnection>>;
pub fn periphery_connections() -> &'static PeripheryConnections {
static CONNECTIONS: OnceLock<PeripheryConnections> =
OnceLock::new();
CONNECTIONS.get_or_init(Default::default)
}
#[derive(Debug)]
pub struct PeripheryConnection {
// Inbound connections have this as None
pub address: Option<String>,
pub write_sender: Sender<Bytes>,
pub connected: AtomicBool,
pub error: RwLock<Option<serror::Serror>>,
pub cancel: CancellationToken,
}
impl PeripheryConnection {
fn new(
address: Option<String>,
) -> (Arc<PeripheryConnection>, BufferedReceiver<Bytes>) {
let (write_sender, write_receiver) = buffered_channel(1000);
(
PeripheryConnection {
address,
write_sender,
connected: AtomicBool::new(false),
error: RwLock::new(None),
cancel: CancellationToken::new(),
}
.into(),
write_receiver,
)
}
pub async fn send(
&self,
value: Bytes,
) -> Result<(), SendError<Bytes>> {
self.write_sender.send(value).await
}
pub fn set_connected(&self, connected: bool) {
self.connected.store(connected, atomic::Ordering::Relaxed);
}
pub fn connected(&self) -> bool {
self.connected.load(atomic::Ordering::Relaxed)
}
pub async fn error(&self) -> Option<serror::Serror> {
self.error.read().await.clone()
}
pub async fn set_error(&self, e: anyhow::Error) {
let mut error = self.error.write().await;
*error = Some(e.into());
}
pub async fn clear_error(&self) {
let mut error = self.error.write().await;
*error = None;
}
pub fn cancel(&self) {
// TODO: remove logs
info!("Cancelling connection");
self.cancel.cancel();
}
}

View File

@@ -1,34 +1,40 @@
use anyhow::Context;
use axum::{
extract::WebSocketUpgrade,
extract::{Query, WebSocketUpgrade},
http::{HeaderMap, StatusCode},
response::Response,
};
use komodo_client::entities::server::Server;
use periphery_client::periphery_connections;
use serror::AddStatusCode;
use tracing::warn;
use transport::{
PeripheryConnectionQuery,
auth::{ServerHeaderIdentifiers, ServerLoginFlow},
websocket::axum::AxumWebsocket,
};
use crate::connection::{
MessageHandler, PeripheryConnection, periphery_connections,
use crate::{
config::core_config,
connection::{MessageHandler, PeripheryConnection},
};
pub async fn handler(
server: Server,
default_private_key: String,
default_public_key: Option<String>,
Query(PeripheryConnectionQuery { server: _server }): Query<
PeripheryConnectionQuery,
>,
mut headers: HeaderMap,
query: String,
ws: WebSocketUpgrade,
) -> serror::Result<Response> {
let identifiers = ServerHeaderIdentifiers::extract(&mut headers)
.status_code(StatusCode::UNAUTHORIZED)?;
let server = crate::resource::get::<Server>(&_server)
.await
.status_code(StatusCode::BAD_REQUEST)?;
let expected_public_key = if server.config.public_key.is_empty() {
default_public_key.context("Must either configure Server 'Periphery Public Key' or set KOMODO_PERIPHERY_PUBLIC_KEY")?
core_config().periphery_public_key.clone().context("Must either configure Server 'Periphery Public Key' or set KOMODO_PERIPHERY_PUBLIC_KEY")?
} else {
server.config.public_key
};
@@ -46,12 +52,13 @@ pub async fn handler(
}
Ok(ws.on_upgrade(|socket| async move {
let query = format!("server={}", urlencoding::encode(&_server));
let handler = super::WebsocketHandler {
label: &server.id,
socket: AxumWebsocket(socket),
connection_identifiers: identifiers.build(query.as_bytes()),
private_key: if server.config.private_key.is_empty() {
&default_private_key
&core_config().private_key
} else {
&server.config.private_key
},

View File

@@ -23,7 +23,7 @@ use crate::{
terminate_ec2_instance_with_retry,
},
},
config::core_config,
connection::client::spawn_client_connection,
helpers::update::update_update,
resource,
};
@@ -54,12 +54,15 @@ pub async fn get_builder_periphery(
PeripheryClient::new_with_spawned_client_connection(
ObjectId::new().to_hex(),
&config.address,
if config.private_key.is_empty() {
core_config().private_key.clone()
} else {
config.private_key
|server_id, address| async move {
spawn_client_connection(
server_id,
address,
config.private_key,
optional_string(config.public_key),
)
.await
},
optional_string(config.public_key),
)
.await?;
periphery
@@ -120,12 +123,15 @@ async fn get_aws_builder(
PeripheryClient::new_with_spawned_client_connection(
ObjectId::new().to_hex(),
&periphery_address,
if config.private_key.is_empty() {
core_config().private_key.clone()
} else {
config.private_key
|server_id, address| async move {
spawn_client_connection(
server_id,
address,
config.private_key,
optional_string(config.public_key),
)
.await
},
optional_string(config.public_key),
)
.await?;

View File

@@ -21,6 +21,7 @@ mod api;
mod auth;
mod cloud;
mod config;
mod connection;
mod helpers;
mod listener;
mod monitor;

View File

@@ -113,11 +113,8 @@ async fn refresh_server_cache(ts: i64) {
return;
}
};
periphery_client::connection::client::manage_client_connections(
&servers,
&core_config().private_key,
)
.await;
crate::connection::client::manage_client_connections(&servers)
.await;
let futures = servers.into_iter().map(|server| async move {
update_cache_for_server(&server, false).await;
});

View File

@@ -28,7 +28,6 @@ use uuid::Uuid;
mod container;
mod deployment;
mod periphery;
mod stack;
mod terminal;
mod update;
@@ -36,7 +35,7 @@ mod update;
pub fn router() -> Router {
Router::new()
// Periphery facing
.route("/periphery", get(periphery::handler))
.route("/periphery", get(crate::connection::server::handler))
// User facing
.route("/update", get(update::handler))
.route("/terminal", get(terminal::handler))

View File

@@ -1,29 +0,0 @@
use axum::{
extract::{Query, WebSocketUpgrade},
http::HeaderMap,
response::Response,
};
use komodo_client::entities::server::Server;
use transport::PeripheryConnectionQuery;
use crate::config::core_config;
pub async fn handler(
Query(PeripheryConnectionQuery { server: _server }): Query<
PeripheryConnectionQuery,
>,
headers: HeaderMap,
ws: WebSocketUpgrade,
) -> serror::Result<Response> {
let server = crate::resource::get::<Server>(&_server).await?;
let query = format!("server={}", urlencoding::encode(&_server));
periphery_client::connection::server::handler(
server,
core_config().private_key.clone(),
core_config().periphery_public_key.clone(),
headers,
query,
ws,
)
.await
}

View File

@@ -18,16 +18,12 @@ cache.workspace = true
resolver_api.workspace = true
serror.workspace = true
# external
tokio-tungstenite.workspace = true
futures-util.workspace = true
tokio-util.workspace = true
serde_json.workspace = true
tracing.workspace = true
anyhow.workspace = true
rustls.workspace = true
bytes.workspace = true
tokio.workspace = true
serde.workspace = true
axum.workspace = true
uuid.workspace = true
url.workspace = true

View File

@@ -1,5 +1,8 @@
use std::{
sync::{Arc, OnceLock},
sync::{
Arc, OnceLock,
atomic::{self, AtomicBool},
},
time::Duration,
};
@@ -10,17 +13,21 @@ use resolver_api::HasResponse;
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use serror::{deserialize_error_bytes, serror_into_anyhow_error};
use tokio::sync::mpsc::{self, Sender};
use tracing::warn;
use tokio::sync::{
RwLock,
mpsc::{self, Sender, error::SendError},
};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};
use transport::{
MessageState,
bytes::{from_transport_bytes, to_transport_bytes},
channel::{BufferedReceiver, buffered_channel},
fix_ws_address,
};
use uuid::Uuid;
pub mod api;
pub mod connection;
pub mod terminal;
pub type ServerChannels = CloneCache<Uuid, Sender<Bytes>>;
@@ -47,23 +54,19 @@ impl PeripheryClient {
}
}
pub async fn new_with_spawned_client_connection(
pub async fn new_with_spawned_client_connection<
F: Future<Output = anyhow::Result<()>>,
>(
server_id: String,
address: &str,
private_key: String,
expected_public_key: Option<String>,
// (Server id, address)
spawn: impl FnOnce(String, String) -> F,
) -> anyhow::Result<PeripheryClient> {
if address.is_empty() {
return Err(anyhow!("Server address cannot be empty"));
}
let periphery = PeripheryClient::new(server_id.clone()).await;
connection::client::spawn_client_connection(
server_id,
fix_ws_address(address),
private_key,
expected_public_key,
)
.await?;
spawn(server_id, fix_ws_address(address)).await?;
Ok(periphery)
}
@@ -86,7 +89,7 @@ impl PeripheryClient {
T: std::fmt::Debug + Serialize + HasResponse,
T::Response: DeserializeOwned,
{
let connection = connection::periphery_connections()
let connection = periphery_connections()
.get(&self.server_id)
.await
.with_context(|| {
@@ -183,3 +186,77 @@ impl PeripheryClient {
}
}
}
/// server id => connection
pub type PeripheryConnections =
CloneCache<String, Arc<PeripheryConnection>>;
pub fn periphery_connections() -> &'static PeripheryConnections {
static CONNECTIONS: OnceLock<PeripheryConnections> =
OnceLock::new();
CONNECTIONS.get_or_init(Default::default)
}
#[derive(Debug)]
pub struct PeripheryConnection {
// Inbound connections have this as None
pub address: Option<String>,
pub write_sender: Sender<Bytes>,
pub connected: AtomicBool,
pub error: RwLock<Option<serror::Serror>>,
pub cancel: CancellationToken,
}
impl PeripheryConnection {
pub fn new(
address: Option<String>,
) -> (Arc<PeripheryConnection>, BufferedReceiver<Bytes>) {
let (write_sender, write_receiver) = buffered_channel(1000);
(
PeripheryConnection {
address,
write_sender,
connected: AtomicBool::new(false),
error: RwLock::new(None),
cancel: CancellationToken::new(),
}
.into(),
write_receiver,
)
}
pub async fn send(
&self,
value: Bytes,
) -> Result<(), SendError<Bytes>> {
self.write_sender.send(value).await
}
pub fn set_connected(&self, connected: bool) {
self.connected.store(connected, atomic::Ordering::Relaxed);
}
pub fn connected(&self) -> bool {
self.connected.load(atomic::Ordering::Relaxed)
}
pub async fn error(&self) -> Option<serror::Serror> {
self.error.read().await.clone()
}
pub async fn set_error(&self, e: anyhow::Error) {
let mut error = self.error.write().await;
*error = Some(e.into());
}
pub async fn clear_error(&self) {
let mut error = self.error.write().await;
*error = None;
}
pub fn cancel(&self) {
// TODO: remove logs
info!("Cancelling connection");
self.cancel.cancel();
}
}

View File

@@ -14,7 +14,7 @@ use uuid::Uuid;
use crate::{
PeripheryClient, all_server_channels, api::terminal::*,
connection::periphery_connections,
periphery_connections,
};
impl PeripheryClient {