diff --git a/bin/periphery/src/api/terminal.rs b/bin/periphery/src/api/terminal.rs index 1a9174447..b38025148 100644 --- a/bin/periphery/src/api/terminal.rs +++ b/bin/periphery/src/api/terminal.rs @@ -17,9 +17,7 @@ use transport::{MessageState, bytes::to_transport_bytes}; use uuid::Uuid; use crate::{ - config::periphery_config, - connection::{channels, terminal_channels}, - terminal::*, + config::periphery_config, connection::core_channels, terminal::*, }; // @@ -86,7 +84,7 @@ impl Resolve for ConnectTerminal { } let channel = - channels().get(&args.core).await.with_context(|| { + core_channels().get(&args.core).await.with_context(|| { format!("Failed to find channel for {}", args.core) })?; @@ -117,7 +115,7 @@ impl Resolve for ConnectContainerExec { } let channel = - channels().get(&args.core).await.with_context(|| { + core_channels().get(&args.core).await.with_context(|| { format!("Failed to find channel for {}", args.core) })?; @@ -178,7 +176,7 @@ impl Resolve for ExecuteTerminal { } let channel = - channels().get(&args.core).await.with_context(|| { + core_channels().get(&args.core).await.with_context(|| { format!("Failed to find channel for {}", args.core) })?; @@ -248,7 +246,7 @@ impl Resolve for ExecuteContainerExec { let id = Uuid::new_v4(); let channel = - channels().get(&args.core).await.with_context(|| { + core_channels().get(&args.core).await.with_context(|| { format!("Failed to find channel for {}", args.core) })?; @@ -461,12 +459,6 @@ async fn forward_execute_command_on_terminal_response( None => { clean_up_terminals().await; break; - // return Err( - // anyhow!( - // "Stdout stream terminated before start sentinel received" - // ) - // .into(), - // ); } } } diff --git a/bin/periphery/src/connection/client.rs b/bin/periphery/src/connection/client.rs index 3b62282cc..8707e4eae 100644 --- a/bin/periphery/src/connection/client.rs +++ b/bin/periphery/src/connection/client.rs @@ -9,7 +9,7 @@ use transport::{ websocket::tungstenite::TungsteniteWebsocket, }; -use crate::{api::Args, connection::channels}; +use crate::{api::Args, connection::core_channels}; pub async fn handler( address: &str, @@ -29,7 +29,8 @@ pub async fn handler( core: identifiers.host().to_string(), }); - let channel = channels().get_or_insert_default(&args.core).await; + let channel = + core_channels().get_or_insert_default(&args.core).await; let mut receiver = channel.receiver()?; diff --git a/bin/periphery/src/connection/mod.rs b/bin/periphery/src/connection/mod.rs index b40a2733a..eab68ea9b 100644 --- a/bin/periphery/src/connection/mod.rs +++ b/bin/periphery/src/connection/mod.rs @@ -10,7 +10,6 @@ use resolver_api::Resolve; use response::JsonBytes; use serror::serialize_error_bytes; use tokio::sync::{Mutex, MutexGuard, mpsc::Sender}; -use tokio_util::sync::CancellationToken; use transport::{ MessageState, auth::{ConnectionIdentifiers, LoginFlow, PublicKeyValidator}, @@ -29,7 +28,6 @@ use uuid::Uuid; use crate::{ api::{Args, PeripheryRequest}, config::periphery_config, - terminal::{ResizeDimensions, StdinMsg}, }; pub mod client; @@ -62,11 +60,11 @@ impl Channel { } // Core Address / Host -> Channel -pub type Channels = CloneCache>; +pub type CoreChannels = CloneCache>; -pub fn channels() -> &'static Channels { - static CHANNELS: OnceLock = OnceLock::new(); - CHANNELS.get_or_init(Default::default) +pub fn core_channels() -> &'static CoreChannels { + static CORE_CHANNELS: OnceLock = OnceLock::new(); + CORE_CHANNELS.get_or_init(Default::default) } pub struct CorePublicKeyValidator; @@ -176,11 +174,9 @@ async fn handle_incoming_bytes( sender: &Sender, bytes: Bytes, ) { - // Maybe wrap all of this on tokio spawn let (id, state) = match id_state_from_transport_bytes(&bytes) { Ok(res) => res, Err(e) => { - // TODO: handle: warn!("Failed to parse transport bytes | {e:#}"); return; } @@ -190,7 +186,7 @@ async fn handle_incoming_bytes( handle_request(args.clone(), sender.clone(), id, bytes) } MessageState::Terminal => { - handle_terminal_message(id, bytes).await + crate::terminal::handle_incoming_message(id, bytes).await } // Shouldn't be received by Periphery MessageState::InProgress => {} @@ -266,43 +262,3 @@ fn handle_request( } }); } - -pub type TerminalChannels = - CloneCache, CancellationToken)>; - -pub fn terminal_channels() -> &'static TerminalChannels { - static TERMINAL_CHANNELS: OnceLock = - OnceLock::new(); - TERMINAL_CHANNELS.get_or_init(Default::default) -} - -async fn handle_terminal_message(id: Uuid, bytes: Bytes) { - let Some((channel, _)) = terminal_channels().get(&id).await else { - warn!("No terminal channel for {id}"); - return; - }; - let Ok(data) = data_from_transport_bytes(bytes) else { - warn!("Got terminal message with no data for {id}"); - return; - }; - let msg = match data.first() { - Some(&0x00) => { - StdinMsg::Bytes(Bytes::copy_from_slice(&data[1..])) - } - Some(&0xFF) => { - if let Ok(dimensions) = - serde_json::from_slice::(&data[1..]) - { - StdinMsg::Resize(dimensions) - } else { - return; - } - } - Some(_) => StdinMsg::Bytes(data), - // No data - None => return, - }; - if let Err(e) = channel.send(msg).await { - warn!("No receiver for {id} | {e:?}"); - }; -} diff --git a/bin/periphery/src/connection/server.rs b/bin/periphery/src/connection/server.rs index 9c8e9da42..25d51bcd7 100644 --- a/bin/periphery/src/connection/server.rs +++ b/bin/periphery/src/connection/server.rs @@ -33,7 +33,7 @@ use transport::{ }; use crate::{ - api::Args, config::periphery_config, connection::channels, + api::Args, config::periphery_config, connection::core_channels, }; pub async fn run() -> anyhow::Result<()> { @@ -87,7 +87,7 @@ async fn handler( let args = Arc::new(Args { core }); - let channel = channels().get_or_insert_default(&args.core).await; + let channel = core_channels().get_or_insert_default(&args.core).await; // Ensure the receiver is free before upgrading connection. // Due to ownership, it needs to be re-locked inside the ws handler, diff --git a/bin/periphery/src/terminal.rs b/bin/periphery/src/terminal.rs index 16bc9b66b..cf026d799 100644 --- a/bin/periphery/src/terminal.rs +++ b/bin/periphery/src/terminal.rs @@ -6,12 +6,55 @@ use std::{ use anyhow::{Context, anyhow}; use bytes::Bytes; +use cache::CloneCache; use komodo_client::{ api::write::TerminalRecreateMode, entities::server::TerminalInfo, }; use portable_pty::{CommandBuilder, PtySize, native_pty_system}; use tokio::sync::{broadcast, mpsc}; use tokio_util::sync::CancellationToken; +use transport::bytes::data_from_transport_bytes; +use uuid::Uuid; + +pub type TerminalChannels = + CloneCache, CancellationToken)>; + +pub fn terminal_channels() -> &'static TerminalChannels { + static TERMINAL_CHANNELS: OnceLock = + OnceLock::new(); + TERMINAL_CHANNELS.get_or_init(Default::default) +} + +pub async fn handle_incoming_message(id: Uuid, bytes: Bytes) { + let Some((channel, _)) = terminal_channels().get(&id).await else { + warn!("No terminal channel for {id}"); + return; + }; + let Ok(data) = data_from_transport_bytes(bytes) else { + warn!("Got terminal message with no data for {id}"); + return; + }; + let msg = match data.first() { + Some(&0x00) => { + StdinMsg::Bytes(Bytes::copy_from_slice(&data[1..])) + } + Some(&0xFF) => { + if let Ok(dimensions) = + serde_json::from_slice::(&data[1..]) + { + StdinMsg::Resize(dimensions) + } else { + return; + } + } + Some(_) => StdinMsg::Bytes(data), + // No data + None => return, + }; + if let Err(e) = channel.send(msg).await { + warn!("No receiver for {id} | {e:?}"); + }; +} type PtyName = String; type PtyMap = tokio::sync::RwLock>>; diff --git a/lib/transport/src/channel.rs b/lib/transport/src/channel.rs index cf0ad6482..00de9e2e0 100644 --- a/lib/transport/src/channel.rs +++ b/lib/transport/src/channel.rs @@ -28,12 +28,12 @@ impl BufferedReceiver { } } - /// - If 'next: Some(bytes)': - /// - Immediately returns borrow of next. + /// - If 'buffer: Some(bytes)': + /// - Immediately returns borrow of buffer. /// - Else: - /// - Wait for next item - /// - store in 'next' - /// - return borrow of next. + /// - Wait for next item. + /// - store in buffer. + /// - return borrow of buffer. pub async fn recv(&mut self) -> Option<&::Target> { if self.buffer.is_none() { self.buffer = Some(self.receiver.recv().await?);