everything over ws working

This commit is contained in:
mbecker20
2025-09-17 02:48:17 -07:00
parent 991c95fff0
commit 230f357b5a
10 changed files with 81 additions and 89 deletions

1
Cargo.lock generated
View File

@@ -2794,7 +2794,6 @@ dependencies = [
"komodo_client",
"logger",
"periphery_client",
"pin-project-lite",
"portable-pty",
"resolver_api",
"response",

View File

@@ -35,7 +35,6 @@ derive_variants.workspace = true
resolver_api.workspace = true
run_command.workspace = true
# external
pin-project-lite.workspace = true
serde_yaml_ng.workspace = true
tokio-stream.workspace = true
portable-pty.workspace = true

View File

@@ -18,7 +18,6 @@ use periphery_client::api::{
};
use resolver_api::Resolve;
use tokio::fs;
use uuid::Uuid;
use crate::{
api::Args, config::periphery_config, docker::docker_login,
@@ -152,9 +151,7 @@ pub async fn pull_or_clone_stack(
let git_token = crate::helpers::git_token(git_token, &args)?;
let req_args = Args {
req_id: Uuid::new_v4(),
};
let req_args = Args;
PullOrCloneRepo {
args,
git_token,

View File

@@ -14,7 +14,6 @@ use periphery_client::api::{
};
use resolver_api::Resolve;
use tokio::fs;
use uuid::Uuid;
use crate::{api::Args, config::periphery_config, helpers};
@@ -152,9 +151,7 @@ async fn write_stack_linked_repo<'a>(
let on_pull = (!repo.config.on_pull.is_none())
.then_some(repo.config.on_pull.clone());
let req_args = Args {
req_id: Uuid::new_v4(),
};
let req_args = Args;
let clone_res = if stack.config.reclone {
CloneRepo {
args,
@@ -240,9 +237,7 @@ async fn write_stack_inline_repo(
let git_token = stack_git_token(git_token, &args, &mut res)?;
let req_args = Args {
req_id: Uuid::new_v4(),
};
let req_args = Args;
let clone_res = if stack.config.reclone {
CloneRepo {
args,

View File

@@ -14,7 +14,6 @@ use periphery_client::api::{
use resolver_api::Resolve;
use response::JsonBytes;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::{config::periphery_config, docker::docker_client};
@@ -31,9 +30,7 @@ mod stats;
mod volume;
#[derive(Debug)]
pub struct Args {
pub req_id: Uuid,
}
pub struct Args;
#[derive(
Serialize, Deserialize, Debug, Clone, Resolve, EnumVariants,

View File

@@ -8,7 +8,10 @@ use komodo_client::{
api::write::TerminalRecreateMode,
entities::{KOMODO_EXIT_CODE, NoData, server::TerminalInfo},
};
use periphery_client::api::terminal::*;
use periphery_client::{
api::terminal::*,
terminal::{END_OF_OUTPUT, START_OF_OUTPUT},
};
use resolver_api::Resolve;
use serror::AddStatusCodeError;
use tokio::sync::mpsc::channel;
@@ -388,7 +391,10 @@ async fn handle_terminal_forwarding(
tokio::join!(ws_read, ws_write);
// Clean up
terminal_channels().remove(&id).await;
if let Some((_, cancel)) = terminal_channels().remove(&id).await {
info!("Cancel called for {id}");
cancel.cancel();
}
clean_up_terminals().await;
}
@@ -452,7 +458,19 @@ async fn forward_execute_command_on_terminal_response(
let ws_sender = ws_sender();
loop {
match stdout.next().await {
Some(Ok(line)) if line.as_str() == END_OF_OUTPUT => break,
Some(Ok(line)) if line.as_str() == END_OF_OUTPUT => {
if let Err(e) = ws_sender
.send(to_transport_bytes(
line.into(),
id,
MessageState::Terminal,
))
.await
{
warn!("Got ws_sender send error on END_OF_OUTPUT | {e:?}");
}
break;
}
Some(Ok(line)) => {
if let Err(e) = ws_sender
.send(to_transport_bytes(

View File

@@ -105,7 +105,7 @@ fn handle_request(req_id: Uuid, bytes: Bytes) {
let resolve_response = async {
let (state, data) =
match request.resolve(&Args { req_id }).await {
match request.resolve(&Args).await {
Ok(JsonBytes::Ok(res)) => (MessageState::Successful, res),
Ok(JsonBytes::Err(e)) => (
MessageState::Failed,

View File

@@ -1,18 +1,14 @@
use std::{
collections::{HashMap, VecDeque},
pin::Pin,
sync::{Arc, OnceLock},
task::Poll,
time::Duration,
};
use anyhow::{Context, anyhow};
use bytes::Bytes;
use futures::Stream;
use komodo_client::{
api::write::TerminalRecreateMode, entities::server::TerminalInfo,
};
use pin_project_lite::pin_project;
use portable_pty::{CommandBuilder, PtySize, native_pty_system};
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
@@ -347,47 +343,3 @@ impl History {
self.buf.read().unwrap().len() as f64 / 1024.0
}
}
/// Execute Sentinels
pub const START_OF_OUTPUT: &str = "__KOMODO_START_OF_OUTPUT__";
pub const END_OF_OUTPUT: &str = "__KOMODO_END_OF_OUTPUT__";
pin_project! {
pub struct TerminalStream<S> { #[pin] pub stdout: S }
}
impl<S> Stream for TerminalStream<S>
where
S:
Stream<Item = Result<String, tokio_util::codec::LinesCodecError>>,
{
// Axum expects a stream of results
type Item = Result<String, String>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> Poll<Option<Self::Item>> {
let this = self.project();
match this.stdout.poll_next(cx) {
Poll::Ready(None) => {
// This is if a None comes in before END_OF_OUTPUT.
// This probably means the terminal has exited early,
// and needs to be cleaned up
tokio::spawn(async move { clean_up_terminals().await });
Poll::Ready(None)
}
Poll::Ready(Some(line)) => {
match line {
Ok(line) if line.as_str() == END_OF_OUTPUT => {
// Stop the stream on end sentinel
Poll::Ready(None)
}
Ok(line) => Poll::Ready(Some(Ok(line + "\n"))),
Err(e) => Poll::Ready(Some(Err(format!("{e:?}")))),
}
}
Poll::Pending => Poll::Pending,
}
}
}

View File

@@ -4,16 +4,16 @@ use bytes::Bytes;
use cache::CloneCache;
use resolver_api::HasResponse;
use serde::{Serialize, de::DeserializeOwned};
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
pub mod api;
pub mod connection;
pub mod terminal;
mod request;
mod terminal;
pub use request::request;
use tokio::sync::mpsc::Sender;
use uuid::Uuid;
// Server id => Channel sender map
pub type ResponseChannels =

View File

@@ -1,11 +1,13 @@
use std::{
pin::Pin,
sync::Arc,
task::{self, Poll},
};
use anyhow::Context;
use bytes::Bytes;
use futures_util::{Stream, StreamExt};
use cache::CloneCache;
use futures_util::Stream;
use tokio::sync::mpsc::{Receiver, Sender, channel};
use transport::bytes::data_from_transport_bytes;
use uuid::Uuid;
@@ -109,10 +111,11 @@ impl PeripheryClient {
response_channels.insert(id, response_sender).await;
let stream = ReceiverStream(response_receiever)
.map(|bytes| data_from_transport_bytes(bytes));
Ok(stream)
Ok(ReceiverStream {
id,
channels: response_channels,
receiver: response_receiever,
})
}
/// Executes command on specified container,
@@ -134,9 +137,7 @@ impl PeripheryClient {
container: String,
shell: String,
command: String,
) -> anyhow::Result<
impl Stream<Item = anyhow::Result<Bytes>> + 'static,
> {
) -> anyhow::Result<ReceiverStream> {
tracing::trace!(
"sending request | type: ExecuteContainerExec | container: {container} | shell: {shell} | command: {command}",
);
@@ -158,21 +159,55 @@ impl PeripheryClient {
response_channels.insert(id, response_sender).await;
let stream = ReceiverStream(response_receiever)
.map(|bytes| data_from_transport_bytes(bytes));
Ok(stream)
Ok(ReceiverStream {
id,
channels: response_channels,
receiver: response_receiever,
})
}
}
pub struct ReceiverStream<T>(Receiver<T>);
/// Execute Sentinels
pub const START_OF_OUTPUT: &str = "__KOMODO_START_OF_OUTPUT__";
pub const END_OF_OUTPUT: &str = "__KOMODO_END_OF_OUTPUT__";
impl<T> Stream for ReceiverStream<T> {
type Item = T;
pub struct ReceiverStream {
id: Uuid,
channels: Arc<CloneCache<Uuid, Sender<Bytes>>>,
receiver: Receiver<Bytes>,
}
impl Stream for ReceiverStream {
type Item = anyhow::Result<Bytes>;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<T>> {
self.0.poll_recv(cx)
) -> Poll<Option<Self::Item>> {
match self.receiver.poll_recv(cx).map(|bytes| {
bytes.map(|bytes| data_from_transport_bytes(bytes))
}) {
Poll::Ready(Some(Ok(bytes))) if &bytes == END_OF_OUTPUT => {
self.cleanup();
Poll::Ready(None)
}
Poll::Ready(Some(Ok(bytes))) => Poll::Ready(Some(Ok(bytes))),
Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
self.cleanup();
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}
impl ReceiverStream {
fn cleanup(&self) {
// Not the prettiest but it should be fine
let channels = self.channels.clone();
let id = self.id;
tokio::spawn(async move {
channels.remove(&id).await;
});
}
}