clean up terminal modules

This commit is contained in:
mbecker20
2025-10-27 16:17:20 -07:00
parent 0331780a5f
commit 70fab08520
5 changed files with 172 additions and 356 deletions

View File

@@ -1,25 +1,15 @@
use crate::{
auth::{auth_api_key_check_enabled, auth_jwt_check_enabled},
helpers::query::get_user,
periphery::{PeripheryClient, terminal::ConnectTerminalResponse},
state::periphery_connections,
};
use anyhow::anyhow;
use axum::{
Router,
extract::{
FromRequestParts,
ws::{self, WebSocket},
},
http::request,
extract::ws::{self, WebSocket},
routing::get,
};
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use futures_util::SinkExt;
use komodo_client::{entities::user::User, ws::WsLoginMessage};
use periphery_client::api::terminal::DisconnectTerminal;
use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
mod terminal;
mod update;
@@ -121,142 +111,3 @@ async fn check_user_valid(user_id: &str) -> anyhow::Result<User> {
}
Ok(user)
}
async fn forward_ws_channel(
periphery: PeripheryClient,
client_socket: axum::extract::ws::WebSocket,
ConnectTerminalResponse {
channel,
sender: periphery_sender,
receiver: mut periphery_receiver,
}: ConnectTerminalResponse,
) {
let (mut client_send, mut client_receive) = client_socket.split();
let cancel = CancellationToken::new();
periphery_receiver.set_cancel(cancel.clone());
trace!("starting ws exchange");
let core_to_periphery = async {
loop {
let client_recv_res = tokio::select! {
res = client_receive.next() => res,
_ = cancel.cancelled() => {
let _ = periphery_sender
.send_terminal(
channel,
Err(anyhow!("Client disconnected")),
)
.await;
break;
}
};
match client_recv_res {
Some(Ok(ws::Message::Binary(bytes))) => {
if let Err(_e) = periphery_sender
.send_terminal(channel, Ok(bytes.into()))
.await
{
cancel.cancel();
break;
};
}
Some(Ok(ws::Message::Text(text))) => {
let bytes: Bytes = text.into();
if let Err(_e) = periphery_sender
.send_terminal(channel, Ok(bytes.into()))
.await
{
cancel.cancel();
break;
};
}
Some(Ok(ws::Message::Close(_frame))) => {
cancel.cancel();
break;
}
Some(Err(_e)) => {
cancel.cancel();
break;
}
None => {
cancel.cancel();
break;
}
// Ignore
Some(Ok(_)) => {}
}
}
};
let periphery_to_core = async {
loop {
// Already adheres to cancellation token
match periphery_receiver.recv().await {
Ok(Ok(bytes)) => {
if let Err(e) =
client_send.send(ws::Message::Binary(bytes.into())).await
{
debug!("{e:?}");
cancel.cancel();
break;
};
}
Ok(Err(e)) => {
let _ = client_send
.send(ws::Message::Text(format!("{e:#}").into()))
.await;
let _ = client_send.close().await;
cancel.cancel();
break;
}
Err(_) => {
let _ =
client_send.send(ws::Message::text("STREAM EOF")).await;
cancel.cancel();
break;
}
}
}
};
tokio::join!(core_to_periphery, periphery_to_core);
// Cleanup
if let Err(e) =
periphery.request(DisconnectTerminal { channel }).await
{
warn!(
"Failed to disconnect Periphery terminal forwarding | {e:#}",
)
}
if let Some(connection) =
periphery_connections().get(&periphery.id).await
{
connection.terminals.remove(&channel).await;
}
}
pub struct Qs<T>(pub T);
impl<S, T> FromRequestParts<S> for Qs<T>
where
S: Send + Sync,
T: DeserializeOwned,
{
type Rejection = axum::response::Response;
async fn from_request_parts(
parts: &mut request::Parts,
_state: &S,
) -> Result<Self, Self::Rejection> {
let raw = parts.uri.query().unwrap_or_default();
serde_qs::from_str::<T>(raw).map(Qs).map_err(|e| {
axum::response::IntoResponse::into_response((
axum::http::StatusCode::BAD_REQUEST,
format!("Failed to parse request query: {e}"),
))
})
}
}

View File

@@ -1,184 +0,0 @@
use axum::{
extract::{Query, WebSocketUpgrade, ws::Message},
response::IntoResponse,
};
use futures_util::SinkExt;
use komodo_client::{
api::terminal::{ConnectStackAttachQuery, ConnectStackExecQuery},
entities::{
permission::PermissionLevel, server::Server, stack::Stack,
terminal::TerminalTarget,
},
};
use crate::{
permission::get_check_permissions, resource::get,
state::stack_status_cache,
};
#[instrument("ConnectStackExec", skip(ws))]
pub async fn exec(
Query(ConnectStackExecQuery {
stack,
service,
terminal,
init,
}): Query<ConnectStackExecQuery>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(async move |socket| {
let Some((mut client_socket, target, server, container)) =
login_get_target_server_container(socket, &stack, service)
.await
else {
return;
};
let (periphery, response) =
match super::setup_container_exec_terminal(
target, &server, container, terminal, init,
)
.await
{
Ok(response) => response,
Err(e) => {
let _ = client_socket
.send(Message::text(format!("ERROR: {e:#}")))
.await;
let _ = client_socket.close().await;
return;
}
};
super::forward_ws_channel(periphery, client_socket, response)
.await
})
}
#[instrument("ConnectStackAttach", skip(ws))]
pub async fn attach(
Query(ConnectStackAttachQuery {
stack,
service,
terminal,
}): Query<ConnectStackAttachQuery>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(async move |socket| {
let Some((mut client_socket, target, server, container)) =
login_get_target_server_container(socket, &stack, service)
.await
else {
return;
};
let (periphery, response) =
match super::setup_container_attach_terminal(
target, &server, container, terminal,
)
.await
{
Ok(response) => response,
Err(e) => {
let _ = client_socket
.send(Message::text(format!("ERROR: {e:#}")))
.await;
let _ = client_socket.close().await;
return;
}
};
super::forward_ws_channel(periphery, client_socket, response)
.await
})
}
async fn login_get_target_server_container(
socket: axum::extract::ws::WebSocket,
stack: &str,
service: String,
) -> Option<(
axum::extract::ws::WebSocket,
TerminalTarget,
Server,
String,
)> {
let (mut client_socket, user) =
super::user_ws_login(socket).await?;
let stack = match get_check_permissions::<Stack>(
stack,
&user,
PermissionLevel::Read.terminal(),
)
.await
{
Ok(stack) => stack,
Err(e) => {
debug!("could not get stack | {e:#}");
let _ = client_socket
.send(Message::text(format!("ERROR: {e:#}")))
.await;
let _ = client_socket.close().await;
return None;
}
};
let server = match get::<Server>(&stack.config.server_id).await {
Ok(server) => server,
Err(e) => {
debug!("could not get server | {e:#}");
let _ = client_socket
.send(Message::text(format!("ERROR: {e:#}")))
.await;
let _ = client_socket.close().await;
return None;
}
};
let Some(status) = stack_status_cache().get(&stack.id).await else {
debug!("could not get stack status");
let _ = client_socket
.send(Message::text(String::from(
"ERROR: could not get stack status",
)))
.await;
let _ = client_socket.close().await;
return None;
};
let container = match status
.curr
.services
.iter()
.find(|s| s.service == service)
.map(|s| s.container.as_ref())
{
Some(Some(container)) => container.name.clone(),
Some(None) => {
let _ = client_socket
.send(Message::text(format!(
"ERROR: Service {service} container could not be found"
)))
.await;
let _ = client_socket.close().await;
return None;
}
None => {
let _ = client_socket
.send(Message::text(format!(
"ERROR: Service {service} could not be found"
)))
.await;
let _ = client_socket.close().await;
return None;
}
};
let target = TerminalTarget::Stack {
stack: stack.id,
service: Some(service),
};
Some((client_socket, target, server, container))
}

View File

@@ -1,21 +1,27 @@
use anyhow::anyhow;
use axum::{
extract::{WebSocketUpgrade, ws::Message},
extract::{FromRequestParts, WebSocketUpgrade, ws},
http::request,
response::IntoResponse,
};
use futures_util::SinkExt;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt as _};
use komodo_client::{
api::terminal::ConnectTerminalQuery, entities::user::User,
};
use periphery_client::api::terminal::DisconnectTerminal;
use serde::de::DeserializeOwned;
use tokio_util::sync::CancellationToken;
use crate::{
helpers::terminal::setup_target_for_user,
periphery::{PeripheryClient, terminal::ConnectTerminalResponse},
ws::forward_ws_channel,
state::periphery_connections,
};
#[instrument("ConnectTerminal", skip(ws))]
pub async fn handler(
super::Qs(query): super::Qs<ConnectTerminalQuery>,
Qs(query): Qs<ConnectTerminalQuery>,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
ws.on_upgrade(|socket| async move {
@@ -30,7 +36,7 @@ pub async fn handler(
Ok(response) => response,
Err(e) => {
let _ = client_socket
.send(Message::text(format!("ERROR: {e:#}")))
.send(ws::Message::text(format!("ERROR: {e:#}")))
.await;
let _ = client_socket.close().await;
return;
@@ -54,3 +60,142 @@ async fn setup_forwarding(
let response = periphery.connect_terminal(terminal, target).await?;
Ok((periphery, response))
}
async fn forward_ws_channel(
periphery: PeripheryClient,
client_socket: axum::extract::ws::WebSocket,
ConnectTerminalResponse {
channel,
sender: periphery_sender,
receiver: mut periphery_receiver,
}: ConnectTerminalResponse,
) {
let (mut client_send, mut client_receive) = client_socket.split();
let cancel = CancellationToken::new();
periphery_receiver.set_cancel(cancel.clone());
trace!("starting ws exchange");
let core_to_periphery = async {
loop {
let client_recv_res = tokio::select! {
res = client_receive.next() => res,
_ = cancel.cancelled() => {
let _ = periphery_sender
.send_terminal(
channel,
Err(anyhow!("Client disconnected")),
)
.await;
break;
}
};
match client_recv_res {
Some(Ok(ws::Message::Binary(bytes))) => {
if let Err(_e) = periphery_sender
.send_terminal(channel, Ok(bytes.into()))
.await
{
cancel.cancel();
break;
};
}
Some(Ok(ws::Message::Text(text))) => {
let bytes: Bytes = text.into();
if let Err(_e) = periphery_sender
.send_terminal(channel, Ok(bytes.into()))
.await
{
cancel.cancel();
break;
};
}
Some(Ok(ws::Message::Close(_frame))) => {
cancel.cancel();
break;
}
Some(Err(_e)) => {
cancel.cancel();
break;
}
None => {
cancel.cancel();
break;
}
// Ignore
Some(Ok(_)) => {}
}
}
};
let periphery_to_core = async {
loop {
// Already adheres to cancellation token
match periphery_receiver.recv().await {
Ok(Ok(bytes)) => {
if let Err(e) =
client_send.send(ws::Message::Binary(bytes.into())).await
{
debug!("{e:?}");
cancel.cancel();
break;
};
}
Ok(Err(e)) => {
let _ = client_send
.send(ws::Message::Text(format!("{e:#}").into()))
.await;
let _ = client_send.close().await;
cancel.cancel();
break;
}
Err(_) => {
let _ =
client_send.send(ws::Message::text("STREAM EOF")).await;
cancel.cancel();
break;
}
}
}
};
tokio::join!(core_to_periphery, periphery_to_core);
// Cleanup
if let Err(e) =
periphery.request(DisconnectTerminal { channel }).await
{
warn!(
"Failed to disconnect Periphery terminal forwarding | {e:#}",
)
}
if let Some(connection) =
periphery_connections().get(&periphery.id).await
{
connection.terminals.remove(&channel).await;
}
}
pub struct Qs<T>(pub T);
impl<S, T> FromRequestParts<S> for Qs<T>
where
S: Send + Sync,
T: DeserializeOwned,
{
type Rejection = axum::response::Response;
async fn from_request_parts(
parts: &mut request::Parts,
_state: &S,
) -> Result<Self, Self::Rejection> {
let raw = parts.uri.query().unwrap_or_default();
serde_qs::from_str::<T>(raw).map(Qs).map_err(|e| {
axum::response::IntoResponse::into_response((
axum::http::StatusCode::BAD_REQUEST,
format!("Failed to parse request query: {e}"),
))
})
}
}

View File

@@ -9,11 +9,9 @@ use tokio_tungstenite::{
};
use typeshare::typeshare;
use crate::{
KomodoClient, api::terminal::ConnectTerminalQuery,
entities::terminal::TerminalRecreateMode,
};
use crate::{KomodoClient, entities::terminal::TerminalRecreateMode};
pub mod terminal;
pub mod update;
#[typeshare]
@@ -37,18 +35,6 @@ impl WsLoginMessage {
}
impl KomodoClient {
pub async fn connect_terminal_websocket(
&self,
query: &ConnectTerminalQuery,
) -> anyhow::Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
self
.connect_login_user_websocket(
"/terminal",
Some(&serde_qs::to_string(query)?),
)
.await
}
pub async fn connect_container_exec_websocket(
&self,
server: &str,

View File

@@ -0,0 +1,18 @@
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use crate::{KomodoClient, api::terminal::ConnectTerminalQuery};
impl KomodoClient {
pub async fn connect_terminal_websocket(
&self,
query: &ConnectTerminalQuery,
) -> anyhow::Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
self
.connect_login_user_websocket(
"/terminal",
Some(&serde_qs::to_string(query)?),
)
.await
}
}