fix Core -> Periphery reconnection

This commit is contained in:
mbecker20
2025-09-21 18:17:28 -07:00
parent 4675dfa736
commit 33501dac3e
13 changed files with 49 additions and 65 deletions

View File

@@ -441,7 +441,6 @@ async fn get_on_host_periphery(
&config.address,
|server_id, address| async move {
spawn_client_connection(
format!("Builder {}", builder.name),
server_id,
address,
config.private_key,

View File

@@ -30,7 +30,6 @@ pub async fn manage_client_connections(servers: &[Server]) {
(
&s.id,
(
&s.name,
&s.config.address,
&s.config.private_key,
&s.config.public_key,
@@ -56,7 +55,7 @@ pub async fn manage_client_connections(servers: &[Server]) {
// Apply latest connection specs
for (
server_id,
(name, address, private_key, expected_public_key),
(address, private_key, expected_public_key),
) in specs
{
let address = if address.is_empty() {
@@ -94,7 +93,6 @@ pub async fn manage_client_connections(servers: &[Server]) {
};
// If reaches here, recreate the connection.
if let Err(e) = spawn_client_connection(
name.clone(),
server_id.clone(),
address,
private_key.clone(),
@@ -111,7 +109,6 @@ pub async fn manage_client_connections(servers: &[Server]) {
// Assumes address already wss formatted
pub async fn spawn_client_connection(
label: String,
server_id: String,
address: String,
private_key: String,
@@ -119,8 +116,11 @@ pub async fn spawn_client_connection(
) -> anyhow::Result<()> {
let url = ::url::Url::parse(&address)
.context("Failed to parse server address")?;
let host: Vec<u8> =
url.host().context("url has no host")?.to_string().into();
let mut host = url.host().context("url has no host")?.to_string();
if let Some(port) = url.port() {
host.push(':');
host.push_str(&port.to_string());
}
let handler = MessageHandler::new(&server_id).await;
@@ -162,10 +162,9 @@ pub async fn spawn_client_connection(
};
let handler = super::WebsocketHandler {
label: &label,
socket,
connection_identifiers: ConnectionIdentifiers {
host: &host,
host: host.as_bytes(),
accept: accept.as_bytes(),
query: &[],
},

View File

@@ -5,7 +5,8 @@ use bytes::Bytes;
use cache::CloneCache;
use periphery_client::{PeripheryConnection, all_server_channels};
use tokio::sync::mpsc::Sender;
use tracing::{info, warn};
use tokio_util::sync::CancellationToken;
use tracing::warn;
use transport::{
auth::{ConnectionIdentifiers, LoginFlow, PublicKeyValidator},
bytes::id_from_transport_bytes,
@@ -21,7 +22,6 @@ pub mod client;
pub mod server;
pub struct WebsocketHandler<'a, W> {
pub label: &'a str,
pub socket: W,
pub connection_identifiers: ConnectionIdentifiers<'a>,
pub private_key: &'a str,
@@ -34,7 +34,6 @@ pub struct WebsocketHandler<'a, W> {
impl<W: Websocket> WebsocketHandler<'_, W> {
async fn handle<L: LoginFlow>(self) -> anyhow::Result<()> {
let WebsocketHandler {
label,
mut socket,
connection_identifiers,
private_key,
@@ -54,7 +53,7 @@ impl<W: Websocket> WebsocketHandler<'_, W> {
)
.await?;
info!("PERIPHERY: Authenticated connection to {label} established");
let handler_cancel = CancellationToken::new();
connection.set_connected(true);
connection.clear_error().await;
@@ -66,6 +65,7 @@ impl<W: Websocket> WebsocketHandler<'_, W> {
let next = tokio::select! {
next = write_receiver.recv() => next,
_ = connection.cancel.cancelled() => break,
_ = handler_cancel.cancelled() => break,
};
let message = match next {
@@ -77,14 +77,14 @@ impl<W: Websocket> WebsocketHandler<'_, W> {
match ws_write.send(message).await {
Ok(_) => write_receiver.clear_buffer(),
Err(e) => {
warn!("Failed to send request to {label} | {e:#}");
connection.set_error(e.into()).await;
break;
}
}
}
// Cancel again if not already
let _ = ws_write.close(None).await;
connection.cancel();
handler_cancel.cancel();
};
let handle_reads = async {
@@ -92,32 +92,29 @@ impl<W: Websocket> WebsocketHandler<'_, W> {
let next = tokio::select! {
next = ws_read.recv() => next,
_ = connection.cancel.cancelled() => break,
_ = handler_cancel.cancelled() => break,
};
match next {
Ok(WebsocketMessage::Binary(bytes)) => {
handler.handle_incoming_bytes(bytes).await
}
Ok(WebsocketMessage::Close(frame)) => {
warn!(
"Connection to {label} broken with frame: {frame:?}"
);
Ok(WebsocketMessage::Close(_))
| Ok(WebsocketMessage::Closed) => {
connection.set_error(anyhow!("Connection closed")).await;
break;
}
Ok(WebsocketMessage::Closed) => {}
Err(e) => {
warn!("Connection to {label} broken with error: {e:?}");
break;
connection.set_error(e.into()).await;
}
};
}
// Cancel again if not already
connection.cancel();
handler_cancel.cancel();
};
tokio::join!(forward_writes, handle_reads);
warn!("PERIPHERY: Disconnnected from {label}");
connection.set_connected(false);
Ok(())

View File

@@ -32,6 +32,18 @@ pub async fn handler(
.await
.status_code(StatusCode::BAD_REQUEST)?;
if !server.config.enabled {
return Err(anyhow!("Server is Disabled."))
.status_code(StatusCode::BAD_REQUEST);
}
if !server.config.address.is_empty() {
return Err(anyhow!(
"Server is configured to use a Core -> Periphery connection."
))
.status_code(StatusCode::BAD_REQUEST);
}
let connections = periphery_connections();
// Ensure connected server can't get bumped off the connection.
@@ -68,7 +80,6 @@ pub async fn handler(
Ok(ws.on_upgrade(|socket| async move {
let query = format!("server={}", urlencoding::encode(&_server));
let handler = super::WebsocketHandler {
label: &server.name,
socket: AxumWebsocket(socket),
connection_identifiers: identifiers.build(query.as_bytes()),
private_key: if server.config.private_key.is_empty() {

View File

@@ -56,7 +56,6 @@ pub async fn get_builder_periphery(
&config.address,
|server_id, address| async move {
spawn_client_connection(
format!("Builder {}", builder.name),
server_id,
address,
config.private_key,
@@ -124,7 +123,6 @@ async fn get_aws_builder(
&periphery_address,
|server_id, address| async move {
spawn_client_connection(
instance_name,
server_id,
address,
config.private_key,

View File

@@ -3,7 +3,7 @@
#[macro_use]
extern crate tracing;
use std::{net::SocketAddr, str::FromStr, time::Duration};
use std::{net::SocketAddr, str::FromStr};
use anyhow::Context;
use axum::Router;
@@ -83,24 +83,6 @@ async fn app() -> anyhow::Result<()> {
schedule::spawn_schedule_executor();
helpers::prune::spawn_prune_loop();
// TODO: Remove
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
for (server_id, cache) in
all_server_channels().get_entries().await
{
let channels = cache.get_keys().await;
if !channels.is_empty() {
println!(
"CHANNELS: [{server_id}] [{}] {channels:?}",
channels.len()
);
}
}
}
});
// Setup static frontend services
let frontend_path = &config.frontend_path;
let frontend_index =

View File

@@ -142,13 +142,10 @@ impl Resolve<super::Args> for ConnectContainerExec {
impl Resolve<super::Args> for DisconnectTerminal {
#[instrument(name = "DisconnectTerminal", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<NoData> {
// TODO: Remove logs
info!("Disconnect called for {}", self.id);
if let Some((_, cancel)) =
terminal_channels().remove(&self.id).await
{
cancel.cancel();
info!("Cancel called for {}", self.id);
}
Ok(NoData {})
}

View File

@@ -27,7 +27,7 @@ pub fn periphery_config() -> &'static PeripheryConfig {
let args = CliArgs::parse();
let config_paths =
args.config_path.unwrap_or(env.periphery_config_paths);
println!("{config_paths:?}");
let config = if config_paths.is_empty() {
println!(
"{}: No config paths found, using default config",

View File

@@ -98,6 +98,7 @@ async fn connect_websocket(
let accept = response
.headers_mut()
.remove("sec-websocket-accept")
.context("sec-websocket-accept")?;
.context("sec-websocket-accept")
.context("Headers do not contain Sec-Websocket-Accept")?;
Ok((TungsteniteWebsocket(ws), accept))
}

View File

@@ -41,7 +41,7 @@ pub async fn run() -> anyhow::Result<()> {
.install_default()
.expect("failed to install default rustls CryptoProvider");
crate::helpers::ensure_ssl_certs().await;
info!("Komodo Periphery starting on https://{}", socket_addr);
info!("Komodo Periphery starting on wss://{}", socket_addr);
let ssl_config = RustlsConfig::from_pem_file(
config.ssl_cert_file(),
config.ssl_key_file(),
@@ -53,7 +53,7 @@ pub async fn run() -> anyhow::Result<()> {
.await?
} else {
info!("🔓 Periphery SSL Disabled");
info!("Komodo Periphery starting on http://{}", socket_addr);
info!("Komodo Periphery starting on ws://{}", socket_addr);
axum_server::bind(socket_addr).serve(app).await?
}

View File

@@ -64,11 +64,10 @@ pub type _PartialServerConfig = PartialServerConfig;
#[partial_derive(Serialize, Deserialize, Debug, Clone, Default)]
#[partial(skip_serializing_none, from, diff)]
pub struct ServerConfig {
/// The http address of the periphery client.
/// Default: http://localhost:8120
#[serde(default = "default_address")]
#[builder(default = "default_address()")]
#[partial_default(default_address())]
/// The ws/s address of the periphery client.
/// If unset, Server expects Periphery -> Core connection.
#[serde(default)]
#[builder(default)]
pub address: String,
/// The address to use with links for containers on the server.
@@ -216,10 +215,6 @@ impl ServerConfig {
}
}
fn default_address() -> String {
String::from("https://periphery:8120")
}
fn default_enabled() -> bool {
false
}
@@ -263,7 +258,7 @@ fn default_disk_critical() -> f64 {
impl Default for ServerConfig {
fn default() -> Self {
Self {
address: default_address(),
address: Default::default(),
external_address: Default::default(),
enabled: default_enabled(),
ignore_mounts: Default::default(),

View File

@@ -23,4 +23,4 @@ axum.workspace = true
rand.workspace = true
sha1.workspace = true
sha2.workspace = true
uuid.workspace = true
uuid.workspace = true

View File

@@ -8,7 +8,12 @@ cmd = "KOMODO_CONFIG_PATH=.dev/core.config.toml cargo run -p komodo_core --relea
[dev-periphery]
alias = "dp"
description = "runs periphery --release pointing to .dev/periphery.config.toml"
cmd = "PERIPHERY_CONFIG_PATH=.dev/periphery.config.toml cargo run -p komodo_periphery --release"
cmd = "cargo run -p komodo_periphery --release -- -c .dev/periphery.config.toml"
[dev-periphery-outbound]
alias = "dpo"
description = "runs periphery --release pointing to .dev/periphery.config.toml and .dev/outbound.periphery.config.toml"
cmd = "cargo run -p komodo_periphery --release -- -c .dev/periphery.config.toml -c .dev/outbound.periphery.config.toml"
[yarn-install]
description = "downloads latest javacript dependencies for client and frontend"