deploy 2.0.0-dev-85

This commit is contained in:
mbecker20
2025-10-27 20:01:18 -07:00
parent ea6dee4d51
commit bc672d9649
7 changed files with 88 additions and 93 deletions

40
Cargo.lock generated
View File

@@ -902,7 +902,7 @@ dependencies = [
[[package]]
name = "cache"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"tokio",
@@ -1094,7 +1094,7 @@ dependencies = [
[[package]]
name = "command"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"komodo_client",
"shlex",
@@ -1121,7 +1121,7 @@ checksum = "e47641d3deaf41fb1538ac1f54735925e275eaf3bf4d55c81b137fba797e5cbb"
[[package]]
name = "config"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"colored",
"indexmap 2.12.0",
@@ -1443,7 +1443,7 @@ checksum = "2a2330da5de22e8a3cb63252ce2abb30116bf5265e89c0e01bc17015ce30a476"
[[package]]
name = "database"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"async-compression",
@@ -1742,7 +1742,7 @@ dependencies = [
[[package]]
name = "encoding"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"bytes",
@@ -1784,7 +1784,7 @@ dependencies = [
[[package]]
name = "environment"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"formatting",
@@ -1794,7 +1794,7 @@ dependencies = [
[[package]]
name = "environment_file"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"thiserror 2.0.17",
]
@@ -1890,7 +1890,7 @@ dependencies = [
[[package]]
name = "formatting"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"serror",
]
@@ -2056,7 +2056,7 @@ dependencies = [
[[package]]
name = "git"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"cache",
@@ -2688,7 +2688,7 @@ dependencies = [
[[package]]
name = "interpolate"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"komodo_client",
@@ -2810,7 +2810,7 @@ dependencies = [
[[package]]
name = "komodo_cli"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"chrono",
@@ -2838,7 +2838,7 @@ dependencies = [
[[package]]
name = "komodo_client"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"async_timing_util",
@@ -2874,7 +2874,7 @@ dependencies = [
[[package]]
name = "komodo_core"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"arc-swap",
@@ -2948,7 +2948,7 @@ dependencies = [
[[package]]
name = "komodo_periphery"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"arc-swap",
@@ -3068,7 +3068,7 @@ checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
[[package]]
name = "logger"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"komodo_client",
@@ -3360,7 +3360,7 @@ dependencies = [
[[package]]
name = "noise"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"arc-swap",
@@ -3775,7 +3775,7 @@ checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220"
[[package]]
name = "periphery_client"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"derive_variants",
@@ -4256,7 +4256,7 @@ dependencies = [
[[package]]
name = "response"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"axum",
@@ -4526,7 +4526,7 @@ dependencies = [
[[package]]
name = "secret_file"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"tokio",
]
@@ -5559,7 +5559,7 @@ dependencies = [
[[package]]
name = "transport"
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
dependencies = [
"anyhow",
"axum",

View File

@@ -8,7 +8,7 @@ members = [
]
[workspace.package]
version = "2.0.0-dev-84"
version = "2.0.0-dev-85"
edition = "2024"
authors = ["mbecker20 <becker.maxh@gmail.com>"]
license = "GPL-3.0-or-later"

View File

@@ -209,13 +209,14 @@ async fn handle_terminal_forwarding<
if n == 0 {
break;
}
let bytes = &buf[..n];
// Check for disconnect sequence (alt + q)
if buf[..n] == [197, 147] {
if bytes == [197, 147] {
break;
}
// Forward bytes
if write_tx
.send(TerminalStdinMessage::Forward(buf.to_vec()))
.send(TerminalStdinMessage::Forward(bytes.to_vec()))
.await
.is_err()
{

View File

@@ -81,52 +81,36 @@ async fn forward_ws_channel(
loop {
let client_recv_res = tokio::select! {
res = client_receive.next() => res,
_ = cancel.cancelled() => {
let _ = periphery_sender
.send_terminal(
channel,
Err(anyhow!("Client disconnected")),
)
.await;
break;
}
_ = cancel.cancelled() => break,
};
match client_recv_res {
Some(Ok(ws::Message::Binary(bytes))) => {
if let Err(_e) = periphery_sender
.send_terminal(channel, Ok(bytes.into()))
.await
{
cancel.cancel();
break;
};
}
let bytes = match client_recv_res {
Some(Ok(ws::Message::Binary(bytes))) => bytes.into(),
Some(Ok(ws::Message::Text(text))) => {
let bytes: Bytes = text.into();
if let Err(_e) = periphery_sender
.send_terminal(channel, Ok(bytes.into()))
.await
{
cancel.cancel();
break;
};
bytes.into()
}
Some(Ok(ws::Message::Close(_frame))) => {
cancel.cancel();
break;
}
Some(Err(_e)) => {
cancel.cancel();
break;
}
None => {
cancel.cancel();
break;
}
// Ignore
Some(Ok(_)) => {}
}
Some(Ok(_)) => continue,
};
if let Err(_e) =
periphery_sender.send_terminal(channel, Ok(bytes)).await
{
break;
};
}
cancel.cancel();
let _ = periphery_sender
.send_terminal(channel, Err(anyhow!("Client disconnected")))
.await;
};
let periphery_to_core = async {
@@ -138,26 +122,24 @@ async fn forward_ws_channel(
client_send.send(ws::Message::Binary(bytes.into())).await
{
debug!("{e:?}");
cancel.cancel();
break;
};
}
Ok(Err(e)) => {
let _ = client_send
.send(ws::Message::Text(format!("{e:#}").into()))
.send(ws::Message::text(format!("{e:#}")))
.await;
let _ = client_send.close().await;
cancel.cancel();
break;
}
Err(_) => {
let _ =
client_send.send(ws::Message::text("STREAM EOF")).await;
cancel.cancel();
break;
}
}
}
let _ = client_send.close().await;
cancel.cancel();
};
tokio::join!(core_to_periphery, periphery_to_core);

View File

@@ -249,11 +249,7 @@ impl Resolve<super::Args> for DisconnectTerminal {
self,
args: &super::Args,
) -> anyhow::Result<NoData> {
if let Some(channel) =
terminal_channels().remove(&self.channel).await
{
channel.cancel.cancel();
}
terminal_channels().remove(&self.channel).await;
Ok(NoData {})
}
}
@@ -415,12 +411,7 @@ async fn handle_terminal_forwarding(
}
// Clean up
if let Some(terminal_channel) =
terminal_channels().remove(&channel).await
{
trace!("Cancel called for {channel}");
terminal_channel.cancel.cancel();
}
terminal_channels().remove(&channel).await;
clean_up_terminals().await;
}

View File

@@ -174,7 +174,32 @@ pub fn terminals() -> &'static CloneVecCache<Arc<PeripheryTerminal>> {
TERMINALS.get_or_init(Default::default)
}
pub type TerminalChannels = CloneCache<Uuid, Arc<TerminalChannel>>;
#[derive(Default)]
pub struct TerminalChannels(CloneCache<Uuid, Arc<TerminalChannel>>);
impl TerminalChannels {
pub async fn get(
&self,
channel: &Uuid,
) -> Option<Arc<TerminalChannel>> {
self.0.get(channel).await
}
pub async fn insert(
&self,
channel: Uuid,
terminal: Arc<TerminalChannel>,
) -> Option<Arc<TerminalChannel>> {
self.0.insert(channel, terminal).await
}
pub async fn remove(&self, channel: &Uuid) {
let Some(channel) = self.0.remove(channel).await else {
return;
};
channel.cancel.cancel();
}
}
pub fn terminal_channels() -> &'static TerminalChannels {
static TERMINAL_CHANNELS: OnceLock<TerminalChannels> =

View File

@@ -52,7 +52,7 @@ pub async fn handle_message(message: EncodedTerminalMessage) {
);
return;
}
// Send 'beginn' trigger for Terminal Executions
// Send 'begin' trigger for Terminal Executions
Ok(TerminalStdinMessage::Begin) => {
if let Err(e) = terminal_triggers().send(&channel_id).await {
warn!("{e:#}")
@@ -309,26 +309,27 @@ impl PeripheryTerminal {
break;
}
match child.try_wait() {
Ok(None) => {
// Continue
std::thread::sleep(Duration::from_millis(500));
}
Ok(Some(code)) => {
debug!("child exited with code {code}");
_cancel.cancel();
break;
}
Ok(None) => {
std::thread::sleep(Duration::from_millis(500));
}
Err(e) => {
debug!("failed to wait for child | {e:?}");
_cancel.cancel();
break;
}
}
}
// Cancel if loop broken
_cancel.cancel();
});
// WS (channel) -> STDIN TASK
// Theres only one consumer here, so use mpsc
let (stdin, mut channel_read) = tokio::sync::mpsc::channel(8192);
let (stdin, mut stdin_read) = tokio::sync::mpsc::channel(8192);
let _cancel = cancel.clone();
tokio::task::spawn_blocking(move || {
loop {
@@ -336,13 +337,12 @@ impl PeripheryTerminal {
trace!("terminal write: cancelled from outside");
break;
}
match channel_read.blocking_recv() {
match stdin_read.blocking_recv() {
// Handled in self::handle_message
Some(TerminalStdinMessage::Begin) => {}
Some(TerminalStdinMessage::Forward(bytes)) => {
if let Err(e) = terminal_write.write_all(&bytes) {
debug!("Failed to write to PTY: {e:?}");
_cancel.cancel();
break;
}
}
@@ -354,24 +354,24 @@ impl PeripheryTerminal {
pixel_height: 0,
}) {
debug!("Failed to resize | {e:?}");
_cancel.cancel();
break;
};
}
None => {
debug!("WS -> PTY channel read error: Disconnected");
_cancel.cancel();
break;
}
}
}
// Cancel if loop broken
_cancel.cancel();
});
let history = Arc::new(History::default());
// PTY -> WS (channel) TASK
// Uses broadcast to output to multiple client simultaneously
let (write, stdout) =
let (write_stdout, stdout) =
tokio::sync::broadcast::channel::<Bytes>(8192);
let _cancel = cancel.clone();
let _history = history.clone();
@@ -383,29 +383,25 @@ impl PeripheryTerminal {
break;
}
match terminal_read.read(&mut buf) {
Ok(0) => {
// EOF
trace!("Got PTY read EOF");
_cancel.cancel();
break;
}
Ok(0) => break, // EOF
Ok(n) => {
_history.push(&buf[..n]);
let slice = &buf[..n];
_history.push(slice);
if let Err(e) =
write.send(Bytes::copy_from_slice(&buf[..n]))
write_stdout.send(Bytes::copy_from_slice(slice))
{
debug!("PTY -> WS channel send error: {e:?}");
_cancel.cancel();
break;
}
}
Err(e) => {
debug!("Failed to read for PTY: {e:?}");
_cancel.cancel();
break;
}
}
}
// Cancel if loop broken
_cancel.cancel();
});
trace!("terminal tasks spawned");