This commit is contained in:
mbecker20
2025-10-21 02:28:42 -07:00
parent a5b711a348
commit a8c16f64b1
25 changed files with 713 additions and 392 deletions

View File

@@ -21,9 +21,11 @@ config.workspace = true
logger.workspace = true
noise.workspace = true
# external
tokio-tungstenite.workspace = true
futures-util.workspace = true
comfy-table.workspace = true
serde_json.workspace = true
crossterm.workspace = true
serde_qs.workspace = true
wildcard.workspace = true
tracing.workspace = true
@@ -31,6 +33,7 @@ colored.workspace = true
dotenvy.workspace = true
anyhow.workspace = true
chrono.workspace = true
bytes.workspace = true
tokio.workspace = true
serde.workspace = true
clap.workspace = true

View File

@@ -18,6 +18,7 @@ pub mod container;
pub mod database;
pub mod execute;
pub mod list;
pub mod ssh;
pub mod update;
async fn komodo_client() -> anyhow::Result<&'static KomodoClient> {

193
bin/cli/src/command/ssh.rs Normal file
View File

@@ -0,0 +1,193 @@
use anyhow::Context;
use bytes::Bytes;
use colored::Colorize;
use futures_util::{SinkExt, StreamExt};
use komodo_client::{
api::write::{CreateTerminal, TerminalRecreateMode},
entities::config::cli::args::ssh::Ssh,
};
use tokio::io::{AsyncReadExt as _, AsyncWriteExt as _};
use tokio_tungstenite::tungstenite;
pub async fn handle(
Ssh {
server,
terminal,
command,
recreate,
}: &Ssh,
) -> anyhow::Result<()> {
// Need to forward multiple sources into ws write
let (write_tx, mut write_rx) =
tokio::sync::mpsc::channel::<Bytes>(1024);
// ================
// SETUP RESIZING
// ================
// Subscribe to SIGWINCH for resize messages
let mut sigwinch = tokio::signal::unix::signal(
tokio::signal::unix::SignalKind::window_change(),
)
.context("failed to register SIGWINCH handler")?;
// Send first resize messsage, bailing if it fails to get the size.
write_tx.send(resize_message()?).await?;
let forward_resize = async {
while sigwinch.recv().await.is_some() {
if let Ok(resize_message) = resize_message() {
if write_tx.send(resize_message).await.is_err() {
break;
};
}
}
};
let forward_stdin = async {
let mut stdin = tokio::io::stdin();
let mut buf = [0u8; 8192];
loop {
// Read into buffer starting from index 1,
// leaving first byte to represent 'data' message.
let n = match stdin.read(&mut buf[1..]).await {
Ok(0) => break, // EOF
Ok(n) => n,
Err(_) => break,
};
// Check for disconnect sequence (alt + q)
if buf[1..(n + 1)] == [197, 147] {
break;
}
let bytes = Bytes::copy_from_slice(&buf[..(n + 1)]);
if write_tx.send(bytes).await.is_err() {
break;
};
}
};
// =====================
// CONNECT AND FORWARD
// =====================
let client = super::komodo_client().await?;
// Init the terminal if it doesn't exist already.
client
.write(CreateTerminal {
server: server.to_string(),
name: terminal.to_string(),
command: command.clone(),
recreate: if *recreate {
TerminalRecreateMode::Always
} else {
TerminalRecreateMode::DifferentCommand
},
})
.await?;
let (mut ws_write, mut ws_read) = client
.connect_terminal_websocket(server, terminal)
.await?
.split();
let forward_write = async {
while let Some(bytes) = write_rx.recv().await {
if let Err(e) =
ws_write.send(tungstenite::Message::Binary(bytes)).await
{
return Some(e);
};
}
None
};
let forward_read = async {
let mut stdout = tokio::io::stdout();
loop {
match ws_read.next().await {
Some(Ok(tungstenite::Message::Binary(bytes))) => {
if let Err(e) =
tokio::io::copy(&mut bytes.as_ref(), &mut stdout)
.await
.context("Failed to copy bytes to stdout")
{
return Some(e);
}
let _ = stdout.flush().await;
}
Some(Ok(tungstenite::Message::Text(text))) => {
if let Err(e) =
tokio::io::copy(&mut text.as_ref(), &mut stdout)
.await
.context("Failed to copy text to stdout")
{
return Some(e);
}
let _ = stdout.flush().await;
}
Some(Ok(tungstenite::Message::Close(_))) => break,
Some(Err(e)) => {
return Some(
anyhow::Error::from(e).context("Websocket read error"),
);
}
None => break,
_ => {}
}
}
None
};
let guard = RawModeGuard::enable_raw_mode()?;
tokio::select! {
_ = forward_resize => drop(guard),
_ = forward_stdin => drop(guard),
e = forward_write => {
drop(guard);
if let Some(e) = e {
eprintln!("\nFailed to forward stdin | {e:#}");
}
},
e = forward_read => {
drop(guard);
if let Some(e) = e {
eprintln!("\nFailed to forward stdout | {e:#}");
}
},
};
println!("\n\n{} {}", "connection".bold(), "closed".red().bold());
// It doesn't seem to exit by itself after the raw mode stuff.
std::process::exit(0)
}
fn resize_message() -> anyhow::Result<Bytes> {
let (cols, rows) = crossterm::terminal::size()
.context("Failed to get terminal size")?;
let bytes: Vec<u8> =
format!(r#"{{"rows":{rows},"cols":{cols}}}"#).into();
let mut msg = Vec::with_capacity(bytes.len() + 1);
msg.push(0xff); // resize prefix
msg.extend(bytes);
Ok(msg.into())
}
struct RawModeGuard;
impl RawModeGuard {
fn enable_raw_mode() -> anyhow::Result<Self> {
crossterm::terminal::enable_raw_mode()
.context("Failed to enable terminal raw mode")?;
Ok(Self)
}
}
impl Drop for RawModeGuard {
fn drop(&mut self) {
if let Err(e) = crossterm::terminal::disable_raw_mode() {
eprintln!("Failed to disable terminal raw mode | {e:?}");
}
}
}

View File

@@ -41,12 +41,6 @@ async fn app() -> anyhow::Result<()> {
}
Ok(())
}
args::Command::Key { command } => {
noise::key::command::handle(command).await
}
args::Command::Database { command } => {
command::database::handle(command).await
}
args::Command::Container(container) => {
command::container::handle(container).await
}
@@ -60,6 +54,13 @@ async fn app() -> anyhow::Result<()> {
args::Command::Update { command } => {
command::update::handle(command).await
}
args::Command::Ssh(ssh) => command::ssh::handle(ssh).await,
args::Command::Key { command } => {
noise::key::command::handle(command).await
}
args::Command::Database { command } => {
command::database::handle(command).await
}
}
}

View File

@@ -257,7 +257,8 @@ impl<'a> From<&'a OwnedPeripheryConnectionArgs>
pub type ResponseChannels =
CloneCache<Uuid, Sender<EncodedResponse<EncodedJsonMessage>>>;
pub type TerminalChannels = CloneCache<Uuid, Sender<Vec<u8>>>;
pub type TerminalChannels =
CloneCache<Uuid, Sender<anyhow::Result<Vec<u8>>>>;
#[derive(Debug)]
pub struct PeripheryConnection {

View File

@@ -5,7 +5,6 @@ use std::{
};
use anyhow::Context;
use bytes::Bytes;
use cache::CloneCache;
use futures_util::Stream;
use komodo_client::api::write::TerminalRecreateMode;
@@ -31,7 +30,7 @@ impl PeripheryClient {
) -> anyhow::Result<(
Uuid,
Sender<EncodedTransportMessage>,
Receiver<Vec<u8>>,
Receiver<anyhow::Result<Vec<u8>>>,
)> {
tracing::trace!(
"request | type: ConnectTerminal | terminal name: {terminal}",
@@ -52,7 +51,7 @@ impl PeripheryClient {
connection
.sender
.send_terminal(channel_id, Bytes::new())
.send_terminal(channel_id, Ok(Vec::with_capacity(17))) // 16 bytes uuid + 1 EncodedResponse
.await
.context(
"Failed to send TerminalTrigger to begin forwarding.",
@@ -70,7 +69,7 @@ impl PeripheryClient {
) -> anyhow::Result<(
Uuid,
Sender<EncodedTransportMessage>,
Receiver<Vec<u8>>,
Receiver<anyhow::Result<Vec<u8>>>,
)> {
tracing::trace!(
"request | type: ConnectContainerExec | container name: {container} | shell: {shell}",
@@ -95,7 +94,7 @@ impl PeripheryClient {
connection
.sender
.send_terminal(channel_id, Bytes::new())
.send_terminal(channel_id, Ok(Vec::with_capacity(17)))
.await
.context(
"Failed to send TerminalTrigger to begin forwarding.",
@@ -112,7 +111,7 @@ impl PeripheryClient {
) -> anyhow::Result<(
Uuid,
Sender<EncodedTransportMessage>,
Receiver<Vec<u8>>,
Receiver<anyhow::Result<Vec<u8>>>,
)> {
tracing::trace!(
"request | type: ConnectContainerAttach | container name: {container}",
@@ -136,7 +135,7 @@ impl PeripheryClient {
connection
.sender
.send_terminal(channel, Bytes::new())
.send_terminal(channel, Ok(Vec::with_capacity(17)))
.await
.context(
"Failed to send TerminalTrigger to begin forwarding.",
@@ -188,7 +187,7 @@ impl PeripheryClient {
connection
.sender
.send_terminal(channel_id, Bytes::new())
.send_terminal(channel_id, Ok(Vec::with_capacity(17)))
.await
.context(
"Failed to send TerminalTrigger to begin forwarding.",
@@ -251,7 +250,7 @@ impl PeripheryClient {
// This is required to not miss messages.
connection
.sender
.send_terminal(channel_id, Bytes::new())
.send_terminal(channel_id, Ok(Vec::with_capacity(17)))
.await?;
Ok(ReceiverStream {
@@ -264,8 +263,8 @@ impl PeripheryClient {
pub struct ReceiverStream {
channel_id: Uuid,
channels: Arc<CloneCache<Uuid, Sender<Vec<u8>>>>,
receiver: Receiver<Vec<u8>>,
channels: Arc<CloneCache<Uuid, Sender<anyhow::Result<Vec<u8>>>>>,
receiver: Receiver<anyhow::Result<Vec<u8>>>,
}
impl Stream for ReceiverStream {
@@ -275,14 +274,14 @@ impl Stream for ReceiverStream {
cx: &mut task::Context<'_>,
) -> Poll<Option<Self::Item>> {
match self.receiver.poll_recv(cx) {
Poll::Ready(Some(bytes))
Poll::Ready(Some(Ok(bytes)))
if bytes == END_OF_OUTPUT.as_bytes() =>
{
self.cleanup();
Poll::Ready(None)
}
Poll::Ready(Some(bytes)) => Poll::Ready(Some(Ok(bytes))),
// Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(bytes))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
self.cleanup();
Poll::Ready(None)

View File

@@ -243,7 +243,7 @@ async fn forward_ws_channel(
client_socket: axum::extract::ws::WebSocket,
periphery_connection_id: Uuid,
periphery_sender: Sender<EncodedTransportMessage>,
mut periphery_receiver: Receiver<Vec<u8>>,
mut periphery_receiver: Receiver<anyhow::Result<Vec<u8>>>,
) {
let (mut client_send, mut client_receive) = client_socket.split();
let cancel = CancellationToken::new();
@@ -264,7 +264,7 @@ async fn forward_ws_channel(
match client_recv_res {
Some(Ok(ws::Message::Binary(bytes))) => {
if let Err(e) = periphery_sender
.send_terminal(periphery_connection_id, bytes)
.send_terminal(periphery_connection_id, Ok(bytes.into()))
.await
{
debug!("Failed to send terminal message | {e:?}",);
@@ -275,7 +275,7 @@ async fn forward_ws_channel(
Some(Ok(ws::Message::Text(text))) => {
let bytes: Bytes = text.into();
if let Err(e) = periphery_sender
.send_terminal(periphery_connection_id, bytes)
.send_terminal(periphery_connection_id, Ok(bytes.into()))
.await
{
debug!("Failed to send terminal message | {e:?}",);
@@ -283,21 +283,38 @@ async fn forward_ws_channel(
break;
};
}
// TODO: Disconnect from periphery when client disconnects
Some(Ok(ws::Message::Close(_frame))) => {
let _ = periphery_sender
.send_terminal(
periphery_connection_id,
Err(anyhow!("Client disconnected")),
)
.await;
cancel.cancel();
break;
}
Some(Err(_e)) => {
let _ = periphery_sender
.send_terminal(
periphery_connection_id,
Err(anyhow!("Client disconnected")),
)
.await;
cancel.cancel();
break;
}
None => {
let _ = periphery_sender
.send_terminal(
periphery_connection_id,
Err(anyhow!("Client disconnected")),
)
.await;
cancel.cancel();
break;
}
// Ignore
Some(Ok(_)) => {}
Some(Err(_e)) => {
cancel.cancel();
break;
}
None => {
cancel.cancel();
break;
}
}
}
};
@@ -306,7 +323,7 @@ async fn forward_ws_channel(
loop {
// Already adheres to cancellation token
match periphery_receiver.recv().await {
Ok(bytes) => {
Ok(Ok(bytes)) => {
if let Err(e) =
client_send.send(ws::Message::Binary(bytes.into())).await
{
@@ -315,6 +332,14 @@ async fn forward_ws_channel(
break;
};
}
Ok(Err(e)) => {
let _ = client_send
.send(ws::Message::Text(format!("{e:#}").into()))
.await;
let _ = client_send.close().await;
cancel.cancel();
break;
}
Err(_) => {
let _ =
client_send.send(ws::Message::text("STREAM EOF")).await;

View File

@@ -1,6 +1,7 @@
use std::{sync::Arc, time::Duration};
use anyhow::{Context, anyhow};
use colored::Colorize;
use futures_util::{Stream, StreamExt, TryStreamExt};
use komodo_client::entities::{
ContainerTerminalMode, KOMODO_EXIT_CODE, NoData,
@@ -181,7 +182,7 @@ impl Resolve<super::Args> for ConnectContainerExec {
// Create (recreate if shell changed)
let terminal = create_terminal(
container.clone(),
format!("docker exec -it {container} {shell}"),
Some(format!("docker exec -it {container} {shell}")),
recreate,
Some((container, ContainerTerminalMode::Exec)),
)
@@ -234,7 +235,7 @@ impl Resolve<super::Args> for ConnectContainerAttach {
// Create (recreate if shell changed)
let terminal = create_terminal(
container.clone(),
format!("docker attach {container} --sig-proxy=false"),
Some(format!("docker attach {container} --sig-proxy=false")),
recreate,
Some((container, ContainerTerminalMode::Attach)),
)
@@ -363,7 +364,7 @@ impl Resolve<super::Args> for ExecuteContainerExec {
let terminal = create_terminal(
container.clone(),
format!("docker exec -it {container} {shell}"),
Some(format!("docker exec -it {container} {shell}")),
recreate,
Some((container, ContainerTerminalMode::Exec)),
)
@@ -445,13 +446,13 @@ async fn handle_terminal_forwarding(
let (a, b) = terminal.history.bytes_parts();
if !a.is_empty() {
sender
.send_terminal(channel, a)
.send_terminal(channel, Ok(a.into()))
.await
.context("Failed to send history part a")?;
}
if !b.is_empty() {
sender
.send_terminal(channel, b)
.send_terminal(channel, Ok(b.into()))
.await
.context("Failed to send history part b")?;
}
@@ -472,32 +473,44 @@ async fn handle_terminal_forwarding(
let res = tokio::select! {
res = stdout.recv() => res,
_ = terminal.cancel.cancelled() => {
trace!("ws write: cancelled from outside");
// let _ = ws_sender.send("PTY KILLED")).await;
// if let Err(e) = ws_write.close().await {
// debug!("Failed to close ws: {e:?}");
// };
let _ = sender.send_terminal(channel, Err(anyhow!(
"\n{} {}",
"pty".bold(),
"exited".red().bold()
))).await;
break
},
_ = cancel.cancelled() => {
// let _ = ws_write.send(Message::Text(Utf8Bytes::from_static("WS KILLED"))).await;
// if let Err(e) = ws_write.close().await {
// debug!("Failed to close ws: {e:?}");
// };
let _ = sender.send_terminal(channel, Err(anyhow!(
"\n{} {}",
"websocket".bold(),
"disconnected".red().bold()
))).await;
break
}
};
let bytes = match res {
Ok(bytes) => bytes,
Err(e) => {
debug!("Terminal receiver failed | {e:?}");
Err(_e) => {
terminal.cancel();
let _ = sender
.send_terminal(
channel,
Err(anyhow!(
"\n{} {}",
"pty".bold(),
"exited".red().bold()
)),
)
.await;
break;
}
};
if let Err(e) = sender.send_terminal(channel, bytes).await {
if let Err(e) =
sender.send_terminal(channel, Ok(bytes.into())).await
{
debug!("Failed to send to WS: {e:?}");
cancel.cancel();
break;
@@ -583,14 +596,17 @@ async fn forward_execute_command_on_terminal_response(
loop {
match stdout.next().await {
Some(Ok(line)) if line.as_str() == END_OF_OUTPUT => {
if let Err(e) = sender.send_terminal(channel, line).await {
if let Err(e) =
sender.send_terminal(channel, Ok(line.into())).await
{
warn!("Got ws_sender send error on END_OF_OUTPUT | {e:?}");
}
break;
}
Some(Ok(line)) => {
if let Err(e) =
sender.send_terminal(channel, line + "\n").await
if let Err(e) = sender
.send_terminal(channel, Ok((line + "\n").into()))
.await
{
warn!("Got ws_sender send error | {e:?}");
break;

View File

@@ -108,6 +108,9 @@ pub fn periphery_config() -> &'static PeripheryConfig {
repo_dir: env.periphery_repo_dir.or(config.repo_dir),
stack_dir: env.periphery_stack_dir.or(config.stack_dir),
build_dir: env.periphery_build_dir.or(config.build_dir),
default_terminal_command: env
.periphery_default_terminal_command
.unwrap_or(config.default_terminal_command),
disable_terminals: env
.periphery_disable_terminals
.unwrap_or(config.disable_terminals),

View File

@@ -12,12 +12,15 @@ use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use crate::state::{terminal_channels, terminal_triggers, terminals};
use crate::{
config::periphery_config,
state::{terminal_channels, terminal_triggers, terminals},
};
pub async fn handle_message(message: EncodedTerminalMessage) {
let WithChannel {
channel: channel_id,
mut data,
data,
} = match message.decode() {
Ok(res) => res,
Err(e) => {
@@ -25,6 +28,17 @@ pub async fn handle_message(message: EncodedTerminalMessage) {
return;
}
};
let mut data = match data {
Ok(data) => data,
Err(e) => {
warn!("Recieved Terminal error from Core | {e:#}");
// This means Core should disconnect.
terminal_channels().remove(&channel_id).await;
return;
}
};
let msg = match data.first() {
Some(&0x00) => StdinMsg::Bytes(data.drain(1..).collect()),
Some(&0xFF) => {
@@ -45,11 +59,13 @@ pub async fn handle_message(message: EncodedTerminalMessage) {
return;
}
};
let Some(channel) = terminal_channels().get(&channel_id).await
else {
warn!("No terminal channel for {channel_id}");
return;
};
if let Err(e) = channel.sender.send(msg).await {
warn!("No receiver for {channel_id} | {e:?}");
};
@@ -58,10 +74,13 @@ pub async fn handle_message(message: EncodedTerminalMessage) {
#[instrument("CreateTerminalInner", skip_all, fields(name))]
pub async fn create_terminal(
name: String,
command: String,
command: Option<String>,
recreate: TerminalRecreateMode,
container: Option<(String, ContainerTerminalMode)>,
) -> anyhow::Result<Arc<Terminal>> {
let command = command.unwrap_or_else(|| {
periphery_config().default_terminal_command.clone()
});
trace!(
"CreateTerminal: {name} | command: {command} | recreate: {recreate:?}"
);