diff --git a/bin/core/src/api/write/build.rs b/bin/core/src/api/write/build.rs index 0ed361546..067d6bb3d 100644 --- a/bin/core/src/api/write/build.rs +++ b/bin/core/src/api/write/build.rs @@ -441,7 +441,6 @@ async fn get_on_host_periphery( &config.address, |server_id, address| async move { spawn_client_connection( - format!("Builder {}", builder.name), server_id, address, config.private_key, diff --git a/bin/core/src/connection/client.rs b/bin/core/src/connection/client.rs index 715a65c8e..2eefc5336 100644 --- a/bin/core/src/connection/client.rs +++ b/bin/core/src/connection/client.rs @@ -30,7 +30,6 @@ pub async fn manage_client_connections(servers: &[Server]) { ( &s.id, ( - &s.name, &s.config.address, &s.config.private_key, &s.config.public_key, @@ -56,7 +55,7 @@ pub async fn manage_client_connections(servers: &[Server]) { // Apply latest connection specs for ( server_id, - (name, address, private_key, expected_public_key), + (address, private_key, expected_public_key), ) in specs { let address = if address.is_empty() { @@ -94,7 +93,6 @@ pub async fn manage_client_connections(servers: &[Server]) { }; // If reaches here, recreate the connection. if let Err(e) = spawn_client_connection( - name.clone(), server_id.clone(), address, private_key.clone(), @@ -111,7 +109,6 @@ pub async fn manage_client_connections(servers: &[Server]) { // Assumes address already wss formatted pub async fn spawn_client_connection( - label: String, server_id: String, address: String, private_key: String, @@ -119,8 +116,11 @@ pub async fn spawn_client_connection( ) -> anyhow::Result<()> { let url = ::url::Url::parse(&address) .context("Failed to parse server address")?; - let host: Vec = - url.host().context("url has no host")?.to_string().into(); + let mut host = url.host().context("url has no host")?.to_string(); + if let Some(port) = url.port() { + host.push(':'); + host.push_str(&port.to_string()); + } let handler = MessageHandler::new(&server_id).await; @@ -162,10 +162,9 @@ pub async fn spawn_client_connection( }; let handler = super::WebsocketHandler { - label: &label, socket, connection_identifiers: ConnectionIdentifiers { - host: &host, + host: host.as_bytes(), accept: accept.as_bytes(), query: &[], }, diff --git a/bin/core/src/connection/mod.rs b/bin/core/src/connection/mod.rs index 9845a1a77..dfd3be85f 100644 --- a/bin/core/src/connection/mod.rs +++ b/bin/core/src/connection/mod.rs @@ -5,7 +5,8 @@ use bytes::Bytes; use cache::CloneCache; use periphery_client::{PeripheryConnection, all_server_channels}; use tokio::sync::mpsc::Sender; -use tracing::{info, warn}; +use tokio_util::sync::CancellationToken; +use tracing::warn; use transport::{ auth::{ConnectionIdentifiers, LoginFlow, PublicKeyValidator}, bytes::id_from_transport_bytes, @@ -21,7 +22,6 @@ pub mod client; pub mod server; pub struct WebsocketHandler<'a, W> { - pub label: &'a str, pub socket: W, pub connection_identifiers: ConnectionIdentifiers<'a>, pub private_key: &'a str, @@ -34,7 +34,6 @@ pub struct WebsocketHandler<'a, W> { impl WebsocketHandler<'_, W> { async fn handle(self) -> anyhow::Result<()> { let WebsocketHandler { - label, mut socket, connection_identifiers, private_key, @@ -54,7 +53,7 @@ impl WebsocketHandler<'_, W> { ) .await?; - info!("PERIPHERY: Authenticated connection to {label} established"); + let handler_cancel = CancellationToken::new(); connection.set_connected(true); connection.clear_error().await; @@ -66,6 +65,7 @@ impl WebsocketHandler<'_, W> { let next = tokio::select! { next = write_receiver.recv() => next, _ = connection.cancel.cancelled() => break, + _ = handler_cancel.cancelled() => break, }; let message = match next { @@ -77,14 +77,14 @@ impl WebsocketHandler<'_, W> { match ws_write.send(message).await { Ok(_) => write_receiver.clear_buffer(), Err(e) => { - warn!("Failed to send request to {label} | {e:#}"); + connection.set_error(e.into()).await; break; } } } // Cancel again if not already let _ = ws_write.close(None).await; - connection.cancel(); + handler_cancel.cancel(); }; let handle_reads = async { @@ -92,32 +92,29 @@ impl WebsocketHandler<'_, W> { let next = tokio::select! { next = ws_read.recv() => next, _ = connection.cancel.cancelled() => break, + _ = handler_cancel.cancelled() => break, }; match next { Ok(WebsocketMessage::Binary(bytes)) => { handler.handle_incoming_bytes(bytes).await } - Ok(WebsocketMessage::Close(frame)) => { - warn!( - "Connection to {label} broken with frame: {frame:?}" - ); + Ok(WebsocketMessage::Close(_)) + | Ok(WebsocketMessage::Closed) => { + connection.set_error(anyhow!("Connection closed")).await; break; } - Ok(WebsocketMessage::Closed) => {} Err(e) => { - warn!("Connection to {label} broken with error: {e:?}"); - break; + connection.set_error(e.into()).await; } }; } // Cancel again if not already - connection.cancel(); + handler_cancel.cancel(); }; tokio::join!(forward_writes, handle_reads); - warn!("PERIPHERY: Disconnnected from {label}"); connection.set_connected(false); Ok(()) diff --git a/bin/core/src/connection/server.rs b/bin/core/src/connection/server.rs index df2d1dff6..086834045 100644 --- a/bin/core/src/connection/server.rs +++ b/bin/core/src/connection/server.rs @@ -32,6 +32,18 @@ pub async fn handler( .await .status_code(StatusCode::BAD_REQUEST)?; + if !server.config.enabled { + return Err(anyhow!("Server is Disabled.")) + .status_code(StatusCode::BAD_REQUEST); + } + + if !server.config.address.is_empty() { + return Err(anyhow!( + "Server is configured to use a Core -> Periphery connection." + )) + .status_code(StatusCode::BAD_REQUEST); + } + let connections = periphery_connections(); // Ensure connected server can't get bumped off the connection. @@ -68,7 +80,6 @@ pub async fn handler( Ok(ws.on_upgrade(|socket| async move { let query = format!("server={}", urlencoding::encode(&_server)); let handler = super::WebsocketHandler { - label: &server.name, socket: AxumWebsocket(socket), connection_identifiers: identifiers.build(query.as_bytes()), private_key: if server.config.private_key.is_empty() { diff --git a/bin/core/src/helpers/builder.rs b/bin/core/src/helpers/builder.rs index 911fde745..ceeb09861 100644 --- a/bin/core/src/helpers/builder.rs +++ b/bin/core/src/helpers/builder.rs @@ -56,7 +56,6 @@ pub async fn get_builder_periphery( &config.address, |server_id, address| async move { spawn_client_connection( - format!("Builder {}", builder.name), server_id, address, config.private_key, @@ -124,7 +123,6 @@ async fn get_aws_builder( &periphery_address, |server_id, address| async move { spawn_client_connection( - instance_name, server_id, address, config.private_key, diff --git a/bin/core/src/main.rs b/bin/core/src/main.rs index aa5cd2c24..435b15013 100644 --- a/bin/core/src/main.rs +++ b/bin/core/src/main.rs @@ -3,7 +3,7 @@ #[macro_use] extern crate tracing; -use std::{net::SocketAddr, str::FromStr, time::Duration}; +use std::{net::SocketAddr, str::FromStr}; use anyhow::Context; use axum::Router; @@ -83,24 +83,6 @@ async fn app() -> anyhow::Result<()> { schedule::spawn_schedule_executor(); helpers::prune::spawn_prune_loop(); - // TODO: Remove - tokio::spawn(async move { - loop { - tokio::time::sleep(Duration::from_secs(5)).await; - for (server_id, cache) in - all_server_channels().get_entries().await - { - let channels = cache.get_keys().await; - if !channels.is_empty() { - println!( - "CHANNELS: [{server_id}] [{}] {channels:?}", - channels.len() - ); - } - } - } - }); - // Setup static frontend services let frontend_path = &config.frontend_path; let frontend_index = diff --git a/bin/periphery/src/api/terminal.rs b/bin/periphery/src/api/terminal.rs index db7f05b51..7e7638aee 100644 --- a/bin/periphery/src/api/terminal.rs +++ b/bin/periphery/src/api/terminal.rs @@ -142,13 +142,10 @@ impl Resolve for ConnectContainerExec { impl Resolve for DisconnectTerminal { #[instrument(name = "DisconnectTerminal", level = "debug")] async fn resolve(self, _: &super::Args) -> serror::Result { - // TODO: Remove logs - info!("Disconnect called for {}", self.id); if let Some((_, cancel)) = terminal_channels().remove(&self.id).await { cancel.cancel(); - info!("Cancel called for {}", self.id); } Ok(NoData {}) } diff --git a/bin/periphery/src/config.rs b/bin/periphery/src/config.rs index a391c1b1c..9f212c1ce 100644 --- a/bin/periphery/src/config.rs +++ b/bin/periphery/src/config.rs @@ -27,7 +27,7 @@ pub fn periphery_config() -> &'static PeripheryConfig { let args = CliArgs::parse(); let config_paths = args.config_path.unwrap_or(env.periphery_config_paths); - + println!("{config_paths:?}"); let config = if config_paths.is_empty() { println!( "{}: No config paths found, using default config", diff --git a/bin/periphery/src/connection/client.rs b/bin/periphery/src/connection/client.rs index 3f2bdb626..02fef1ce1 100644 --- a/bin/periphery/src/connection/client.rs +++ b/bin/periphery/src/connection/client.rs @@ -98,6 +98,7 @@ async fn connect_websocket( let accept = response .headers_mut() .remove("sec-websocket-accept") - .context("sec-websocket-accept")?; + .context("sec-websocket-accept") + .context("Headers do not contain Sec-Websocket-Accept")?; Ok((TungsteniteWebsocket(ws), accept)) } diff --git a/bin/periphery/src/connection/server.rs b/bin/periphery/src/connection/server.rs index 1e18f8f4b..872b68839 100644 --- a/bin/periphery/src/connection/server.rs +++ b/bin/periphery/src/connection/server.rs @@ -41,7 +41,7 @@ pub async fn run() -> anyhow::Result<()> { .install_default() .expect("failed to install default rustls CryptoProvider"); crate::helpers::ensure_ssl_certs().await; - info!("Komodo Periphery starting on https://{}", socket_addr); + info!("Komodo Periphery starting on wss://{}", socket_addr); let ssl_config = RustlsConfig::from_pem_file( config.ssl_cert_file(), config.ssl_key_file(), @@ -53,7 +53,7 @@ pub async fn run() -> anyhow::Result<()> { .await? } else { info!("🔓 Periphery SSL Disabled"); - info!("Komodo Periphery starting on http://{}", socket_addr); + info!("Komodo Periphery starting on ws://{}", socket_addr); axum_server::bind(socket_addr).serve(app).await? } diff --git a/client/core/rs/src/entities/server.rs b/client/core/rs/src/entities/server.rs index 1986377bc..06192bea3 100644 --- a/client/core/rs/src/entities/server.rs +++ b/client/core/rs/src/entities/server.rs @@ -64,11 +64,10 @@ pub type _PartialServerConfig = PartialServerConfig; #[partial_derive(Serialize, Deserialize, Debug, Clone, Default)] #[partial(skip_serializing_none, from, diff)] pub struct ServerConfig { - /// The http address of the periphery client. - /// Default: http://localhost:8120 - #[serde(default = "default_address")] - #[builder(default = "default_address()")] - #[partial_default(default_address())] + /// The ws/s address of the periphery client. + /// If unset, Server expects Periphery -> Core connection. + #[serde(default)] + #[builder(default)] pub address: String, /// The address to use with links for containers on the server. @@ -216,10 +215,6 @@ impl ServerConfig { } } -fn default_address() -> String { - String::from("https://periphery:8120") -} - fn default_enabled() -> bool { false } @@ -263,7 +258,7 @@ fn default_disk_critical() -> f64 { impl Default for ServerConfig { fn default() -> Self { Self { - address: default_address(), + address: Default::default(), external_address: Default::default(), enabled: default_enabled(), ignore_mounts: Default::default(), diff --git a/lib/transport/Cargo.toml b/lib/transport/Cargo.toml index 4c1073ce3..098ecfd46 100644 --- a/lib/transport/Cargo.toml +++ b/lib/transport/Cargo.toml @@ -23,4 +23,4 @@ axum.workspace = true rand.workspace = true sha1.workspace = true sha2.workspace = true -uuid.workspace = true +uuid.workspace = true \ No newline at end of file diff --git a/runfile.toml b/runfile.toml index 0a941ea45..d762435c9 100644 --- a/runfile.toml +++ b/runfile.toml @@ -8,7 +8,12 @@ cmd = "KOMODO_CONFIG_PATH=.dev/core.config.toml cargo run -p komodo_core --relea [dev-periphery] alias = "dp" description = "runs periphery --release pointing to .dev/periphery.config.toml" -cmd = "PERIPHERY_CONFIG_PATH=.dev/periphery.config.toml cargo run -p komodo_periphery --release" +cmd = "cargo run -p komodo_periphery --release -- -c .dev/periphery.config.toml" + +[dev-periphery-outbound] +alias = "dpo" +description = "runs periphery --release pointing to .dev/periphery.config.toml and .dev/outbound.periphery.config.toml" +cmd = "cargo run -p komodo_periphery --release -- -c .dev/periphery.config.toml -c .dev/outbound.periphery.config.toml" [yarn-install] description = "downloads latest javacript dependencies for client and frontend"