execute basically working, still need to clear the response channel upon completion

This commit is contained in:
mbecker20
2025-09-17 01:57:00 -07:00
parent f6243fe6b1
commit 991c95fff0
18 changed files with 340 additions and 319 deletions

2
Cargo.lock generated
View File

@@ -3650,8 +3650,8 @@ dependencies = [
"anyhow",
"bytes",
"cache",
"futures-util",
"komodo_client",
"reqwest",
"resolver_api",
"serde",
"serde_json",

View File

@@ -69,7 +69,7 @@ axum = { version = "0.8.4", features = ["ws", "json", "macros"] }
# SER/DE
ipnetwork = { version = "0.21.1", features = ["serde"] }
indexmap = { version = "2.11.1", features = ["serde"] }
indexmap = { version = "2.11.3", features = ["serde"] }
serde = { version = "1.0.219", features = ["derive"] }
strum = { version = "0.27.2", features = ["derive"] }
bson = { version = "2.15.0" } # must keep in sync with mongodb version

View File

@@ -54,34 +54,19 @@ async fn execute_terminal_inner(
) -> serror::Result<axum::body::Body> {
info!("/terminal/execute request | user: {}", user.username);
let res = async {
let server = get_check_permissions::<Server>(
&server,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let server = get_check_permissions::<Server>(
&server,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let periphery = periphery_client(&server)?;
let stream = periphery_client(&server)?
.execute_terminal(terminal, command)
.await
.context("Failed to execute command on periphery")?;
let stream = periphery
.execute_terminal(terminal, command)
.await
.context("Failed to execute command on periphery")?;
anyhow::Ok(stream)
}
.await;
let stream = match res {
Ok(stream) => stream,
Err(e) => {
warn!("/terminal/execute request {req_id} error: {e:#}");
return Err(e.into());
}
};
Ok(axum::body::Body::from_stream(stream.into_line_stream()))
Ok(axum::body::Body::from_stream(stream))
}
// ======================
@@ -112,43 +97,25 @@ async fn execute_container_exec_inner(
}: ExecuteContainerExecBody,
user: User,
) -> serror::Result<axum::body::Body> {
info!(
"/terminal/execute/container request | user: {}",
user.username
);
info!("ExecuteContainerExec request | user: {}", user.username);
let res = async {
let server = get_check_permissions::<Server>(
&server,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let server = get_check_permissions::<Server>(
&server,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let periphery = periphery_client(&server)?;
let periphery = periphery_client(&server)?;
let stream = periphery
.execute_container_exec(container, shell, command)
.await
.context(
"Failed to execute container exec command on periphery",
)?;
let stream = periphery
.execute_container_exec(container, shell, command)
.await
.context(
"Failed to execute container exec command on periphery",
)?;
anyhow::Ok(stream)
}
.await;
let stream = match res {
Ok(stream) => stream,
Err(e) => {
warn!(
"/terminal/execute/container request {req_id} error: {e:#}"
);
return Err(e.into());
}
};
Ok(axum::body::Body::from_stream(stream.into_line_stream()))
Ok(axum::body::Body::from_stream(stream))
}
// =======================
@@ -178,45 +145,27 @@ async fn execute_deployment_exec_inner(
}: ExecuteDeploymentExecBody,
user: User,
) -> serror::Result<axum::body::Body> {
info!(
"/terminal/execute/deployment request | user: {}",
user.username
);
info!("ExecuteDeploymentExec request | user: {}", user.username);
let res = async {
let deployment = get_check_permissions::<Deployment>(
&deployment,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let deployment = get_check_permissions::<Deployment>(
&deployment,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let server = get::<Server>(&deployment.config.server_id).await?;
let server = get::<Server>(&deployment.config.server_id).await?;
let periphery = periphery_client(&server)?;
let periphery = periphery_client(&server)?;
let stream = periphery
.execute_container_exec(deployment.name, shell, command)
.await
.context(
"Failed to execute container exec command on periphery",
)?;
let stream = periphery
.execute_container_exec(deployment.name, shell, command)
.await
.context(
"Failed to execute container exec command on periphery",
)?;
anyhow::Ok(stream)
}
.await;
let stream = match res {
Ok(stream) => stream,
Err(e) => {
warn!(
"/terminal/execute/deployment request {req_id} error: {e:#}"
);
return Err(e.into());
}
};
Ok(axum::body::Body::from_stream(stream.into_line_stream()))
Ok(axum::body::Body::from_stream(stream))
}
// ==================
@@ -247,53 +196,40 @@ async fn execute_stack_exec_inner(
}: ExecuteStackExecBody,
user: User,
) -> serror::Result<axum::body::Body> {
info!("/terminal/execute/stack request | user: {}", user.username);
info!("ExecuteStackExec request | user: {}", user.username);
let res = async {
let stack = get_check_permissions::<Stack>(
&stack,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let stack = get_check_permissions::<Stack>(
&stack,
&user,
PermissionLevel::Read.terminal(),
)
.await?;
let server = get::<Server>(&stack.config.server_id).await?;
let server = get::<Server>(&stack.config.server_id).await?;
let container = stack_status_cache()
.get(&stack.id)
.await
.context("could not get stack status")?
.curr
.services
.iter()
.find(|s| s.service == service)
.context("could not find service")?
.container
.as_ref()
.context("could not find service container")?
.name
.clone();
let container = stack_status_cache()
.get(&stack.id)
.await
.context("could not get stack status")?
.curr
.services
.iter()
.find(|s| s.service == service)
.context("could not find service")?
.container
.as_ref()
.context("could not find service container")?
.name
.clone();
let periphery = periphery_client(&server)?;
let periphery = periphery_client(&server)?;
let stream = periphery
.execute_container_exec(container, shell, command)
.await
.context(
"Failed to execute container exec command on periphery",
)?;
let stream = periphery
.execute_container_exec(container, shell, command)
.await
.context(
"Failed to execute container exec command on periphery",
)?;
anyhow::Ok(stream)
}
.await;
let stream = match res {
Ok(stream) => stream,
Err(e) => {
warn!("/terminal/execute/stack request {req_id} error: {e:#}");
return Err(e.into());
}
};
Ok(axum::body::Body::from_stream(stream.into_line_stream()))
Ok(axum::body::Body::from_stream(stream))
}

View File

@@ -428,9 +428,10 @@ async fn get_on_host_periphery(
Err(anyhow!("Files on host doesn't work with AWS builder"))
}
BuilderConfig::Url(config) => {
// TODO: Ensure connection is actually established.
// Builder id no good because it may be active for multiple connections.
let periphery = PeripheryClient::new(
builder.id,
config.address,
config.passkey,
);
periphery.health_check().await?;

View File

@@ -49,7 +49,6 @@ pub async fn get_builder_periphery(
}
let periphery = PeripheryClient::new(
&builder.id,
&config.address,
if config.passkey.is_empty() {
&core_config().passkey
} else {
@@ -114,13 +113,12 @@ async fn get_aws_builder(
update_update(update.clone()).await?;
let protocol = if config.use_https { "https" } else { "http" };
// TODO: Handle ad-hoc (non server) periphery connections. These don't have ids.
let periphery_address =
format!("{protocol}://{ip}:{}", config.port);
let periphery = PeripheryClient::new(
builder_id,
&periphery_address,
&core_config().passkey,
);
let periphery =
PeripheryClient::new(builder_id, &core_config().passkey);
let start_connect_ts = komodo_timestamp();
let mut res = Ok(GetVersionResponse {

View File

@@ -193,7 +193,6 @@ pub fn periphery_client(
let client = PeripheryClient::new(
&server.id,
&server.config.address,
if server.config.passkey.is_empty() {
&core_config().passkey
} else {

View File

@@ -3,11 +3,12 @@
#[macro_use]
extern crate tracing;
use std::{net::SocketAddr, str::FromStr};
use std::{net::SocketAddr, str::FromStr, time::Duration};
use anyhow::Context;
use axum::Router;
use axum_server::{Handle, tls_rustls::RustlsConfig};
use periphery_client::periphery_response_channels;
use tower_http::{
cors::{Any, CorsLayer},
services::{ServeDir, ServeFile},
@@ -79,6 +80,22 @@ async fn app() -> anyhow::Result<()> {
schedule::spawn_schedule_executor();
helpers::prune::spawn_prune_loop();
// TODO: Remove
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
for (server_id, cache) in
periphery_response_channels().get_entries().await
{
let channels = cache.get_keys().await;
println!(
"CHANNELS: [{server_id}] [{}] {channels:?}",
channels.len()
);
}
}
});
// Setup static frontend services
let frontend_path = &config.frontend_path;
let frontend_index =

View File

@@ -16,6 +16,7 @@ use komodo_client::{
};
use periphery_client::{
PeripheryClient, api::terminal::DisconnectTerminal,
periphery_response_channels,
};
use tokio::sync::mpsc::{Receiver, Sender};
use tokio_util::sync::CancellationToken;
@@ -280,6 +281,7 @@ async fn forward_ws_channel(
tokio::join!(core_to_periphery, periphery_to_core);
// Cleanup
if let Err(e) = periphery
.request(DisconnectTerminal {
id: periphery_connection_id,
@@ -290,4 +292,9 @@ async fn forward_ws_channel(
"Failed to disconnect Periphery terminal forwarding | {e:#}",
)
}
if let Some(response_channels) =
periphery_response_channels().get(&periphery.id).await
{
response_channels.remove(&periphery_connection_id).await;
}
}

View File

@@ -30,6 +30,7 @@ mod network;
mod stats;
mod volume;
#[derive(Debug)]
pub struct Args {
pub req_id: Uuid,
}
@@ -145,11 +146,12 @@ pub enum PeripheryRequest {
// Terminal
ListTerminals(ListTerminals),
CreateTerminal(CreateTerminal),
DeleteTerminal(DeleteTerminal),
DeleteAllTerminals(DeleteAllTerminals),
ConnectTerminal(ConnectTerminal),
ConnectContainerExec(ConnectContainerExec),
DisconnectTerminal(DisconnectTerminal),
DeleteTerminal(DeleteTerminal),
DeleteAllTerminals(DeleteAllTerminals),
ExecuteTerminal(ExecuteTerminal),
}
//

View File

@@ -3,16 +3,16 @@ use std::sync::Arc;
use anyhow::{Context, anyhow};
use axum::http::StatusCode;
use bytes::Bytes;
use futures::{StreamExt, TryStreamExt};
use futures::{Stream, StreamExt, TryStreamExt};
use komodo_client::{
api::write::TerminalRecreateMode,
entities::{KOMODO_EXIT_CODE, NoData, server::TerminalInfo},
};
use periphery_client::api::terminal::*;
use resolver_api::Resolve;
use serror::{AddStatusCodeError, Json};
use serror::AddStatusCodeError;
use tokio::sync::mpsc::channel;
use tokio_util::sync::CancellationToken;
use tokio_util::{codec::LinesCodecError, sync::CancellationToken};
use transport::{MessageState, bytes::to_transport_bytes};
use uuid::Uuid;
@@ -22,6 +22,8 @@ use crate::{
terminal::*,
};
//
impl Resolve<super::Args> for ListTerminals {
#[instrument(name = "ListTerminals", level = "debug")]
async fn resolve(
@@ -33,6 +35,8 @@ impl Resolve<super::Args> for ListTerminals {
}
}
//
impl Resolve<super::Args> for CreateTerminal {
#[instrument(name = "CreateTerminal", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<NoData> {
@@ -49,6 +53,8 @@ impl Resolve<super::Args> for CreateTerminal {
}
}
//
impl Resolve<super::Args> for DeleteTerminal {
#[instrument(name = "DeleteTerminal", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<NoData> {
@@ -57,6 +63,8 @@ impl Resolve<super::Args> for DeleteTerminal {
}
}
//
impl Resolve<super::Args> for DeleteAllTerminals {
#[instrument(name = "DeleteAllTerminals", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<NoData> {
@@ -65,6 +73,8 @@ impl Resolve<super::Args> for DeleteAllTerminals {
}
}
//
impl Resolve<super::Args> for ConnectTerminal {
#[instrument(name = "ConnectTerminal", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<Uuid> {
@@ -75,22 +85,22 @@ impl Resolve<super::Args> for ConnectTerminal {
);
}
let id = Uuid::new_v4();
clean_up_terminals().await;
let terminal = get_terminal(&self.terminal).await?;
let id = Uuid::new_v4();
tokio::spawn(handle_terminal_forwarding(id, terminal));
Ok(id)
}
}
//
impl Resolve<super::Args> for ConnectContainerExec {
#[instrument(name = "ConnectContainerExec", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<Uuid> {
let id = Uuid::new_v4();
async fn resolve(self, args: &super::Args) -> serror::Result<Uuid> {
if periphery_config().disable_container_exec {
return Err(
anyhow!("Container exec is disabled in the periphery config")
@@ -102,11 +112,11 @@ impl Resolve<super::Args> for ConnectContainerExec {
if container.contains("&&") || shell.contains("&&") {
return Err(
anyhow!(
"The use of '&&' is forbidden in the container name or shell"
)
.into(),
);
anyhow!(
"The use of '&&' is forbidden in the container name or shell"
)
.into(),
);
}
// Create (recreate if shell changed)
let terminal = create_terminal(
@@ -117,12 +127,16 @@ impl Resolve<super::Args> for ConnectContainerExec {
.await
.context("Failed to create terminal for container exec")?;
let id = Uuid::new_v4();
tokio::spawn(handle_terminal_forwarding(id, terminal));
Ok(id)
}
}
//
impl Resolve<super::Args> for DisconnectTerminal {
#[instrument(name = "DisconnectTerminal", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<NoData> {
@@ -138,6 +152,83 @@ impl Resolve<super::Args> for DisconnectTerminal {
}
}
//
impl Resolve<super::Args> for ExecuteTerminal {
#[instrument(name = "ExecuteTerminal", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<Uuid> {
if periphery_config().disable_terminals {
return Err(
anyhow!("Terminals are disabled in the periphery config")
.status_code(StatusCode::FORBIDDEN),
);
}
let terminal = get_terminal(&self.terminal).await?;
let stdout =
setup_execute_command_on_terminal(&terminal, &self.command)
.await?;
let id = Uuid::new_v4();
tokio::spawn(forward_execute_command_on_terminal_response(
id, stdout,
));
Ok(id)
}
}
//
impl Resolve<super::Args> for ExecuteContainerExec {
#[instrument(name = "ExecuteContainerExec", level = "debug")]
async fn resolve(self, _: &super::Args) -> serror::Result<Uuid> {
if periphery_config().disable_container_exec {
return Err(
anyhow!("Container exec is disabled in the periphery config")
.into(),
);
}
let Self {
container,
shell,
command,
} = self;
if container.contains("&&") || shell.contains("&&") {
return Err(
anyhow!(
"The use of '&&' is forbidden in the container name or shell"
)
.into(),
);
}
// Create terminal (recreate if shell changed)
let terminal = create_terminal(
container.clone(),
format!("docker exec -it {container} {shell}"),
TerminalRecreateMode::DifferentCommand,
)
.await
.context("Failed to create terminal for container exec")?;
let stdout =
setup_execute_command_on_terminal(&terminal, &command).await?;
let id = Uuid::new_v4();
tokio::spawn(forward_execute_command_on_terminal_response(
id, stdout,
));
Ok(id)
}
}
async fn handle_terminal_forwarding(
id: Uuid,
terminal: Arc<Terminal>,
@@ -179,6 +270,7 @@ async fn handle_terminal_forwarding(
if let Err(e) = init_res {
// TODO: Handle error
warn!("Failed to init terminal | {e:#}");
terminal_channels().remove(&id).await;
return;
}
@@ -295,63 +387,18 @@ async fn handle_terminal_forwarding(
tokio::join!(ws_read, ws_write);
// Clean up
terminal_channels().remove(&id).await;
clean_up_terminals().await;
}
pub async fn execute_terminal(
Json(ExecuteTerminalBody { terminal, command }): Json<
ExecuteTerminalBody,
>,
) -> serror::Result<axum::body::Body> {
if periphery_config().disable_terminals {
return Err(
anyhow!("Terminals are disabled in the periphery config")
.status_code(StatusCode::FORBIDDEN),
);
}
execute_command_on_terminal(&terminal, &command).await
}
pub async fn execute_container_exec(
Json(ExecuteContainerExecBody {
container,
shell,
command,
}): Json<ExecuteContainerExecBody>,
) -> serror::Result<axum::body::Body> {
if periphery_config().disable_container_exec {
return Err(
anyhow!("Container exec is disabled in the periphery config")
.into(),
);
}
if container.contains("&&") || shell.contains("&&") {
return Err(
anyhow!(
"The use of '&&' is forbidden in the container name or shell"
)
.into(),
);
}
// Create terminal (recreate if shell changed)
create_terminal(
container.clone(),
format!("docker exec -it {container} {shell}"),
TerminalRecreateMode::DifferentCommand,
)
.await
.context("Failed to create terminal for container exec")?;
execute_command_on_terminal(&container, &command).await
}
async fn execute_command_on_terminal(
terminal_name: &str,
/// This is run before spawning task handler
async fn setup_execute_command_on_terminal(
terminal: &Terminal,
command: &str,
) -> serror::Result<axum::body::Body> {
let terminal = get_terminal(terminal_name).await?;
) -> serror::Result<
impl Stream<Item = Result<String, LinesCodecError>> + 'static,
> {
// Read the bytes into lines
// This is done to check the lines for the EOF sentinal
let mut stdout = tokio_util::codec::FramedRead::new(
@@ -395,5 +442,44 @@ async fn execute_command_on_terminal(
}
}
Ok(axum::body::Body::from_stream(TerminalStream { stdout }))
Ok(stdout)
}
async fn forward_execute_command_on_terminal_response(
id: Uuid,
mut stdout: impl Stream<Item = Result<String, LinesCodecError>> + Unpin,
) {
let ws_sender = ws_sender();
loop {
match stdout.next().await {
Some(Ok(line)) if line.as_str() == END_OF_OUTPUT => break,
Some(Ok(line)) => {
if let Err(e) = ws_sender
.send(to_transport_bytes(
(line + "\n").into(),
id,
MessageState::Terminal,
))
.await
{
warn!("Got ws_sender send error | {e:?}");
break;
}
}
Some(Err(e)) => {
warn!("Got stdout stream error | {e:?}");
break;
}
None => {
clean_up_terminals().await;
break;
// return Err(
// anyhow!(
// "Stdout stream terminated before start sentinel received"
// )
// .into(),
// );
}
}
}
}

View File

@@ -6,7 +6,7 @@ use axum::{
http::{Request, StatusCode},
middleware::{self, Next},
response::Response,
routing::{get, post},
routing::get,
};
use serror::{AddStatusCode, AddStatusCodeError};
use std::net::{IpAddr, SocketAddr};
@@ -16,50 +16,9 @@ use crate::config::periphery_config;
pub fn router() -> Router {
Router::new()
.route("/", get(crate::connection::inbound_connection))
.nest(
"/terminal/execute",
Router::new()
.route("/", post(crate::api::terminal::execute_terminal))
.route(
"/container",
post(crate::api::terminal::execute_container_exec),
)
.layer(middleware::from_fn(guard_request_by_passkey)),
)
.layer(middleware::from_fn(guard_request_by_ip))
}
async fn guard_request_by_passkey(
req: Request<Body>,
next: Next,
) -> serror::Result<Response> {
if periphery_config().passkeys.is_empty() {
return Ok(next.run(req).await);
}
let Some(req_passkey) = req.headers().get("authorization") else {
return Err(
anyhow!("request was not sent with passkey")
.status_code(StatusCode::UNAUTHORIZED),
);
};
let req_passkey = req_passkey
.to_str()
.context("failed to convert passkey to str")
.status_code(StatusCode::UNAUTHORIZED)?;
if periphery_config()
.passkeys
.iter()
.any(|passkey| passkey == req_passkey)
{
Ok(next.run(req).await)
} else {
Err(
anyhow!("request passkey invalid")
.status_code(StatusCode::UNAUTHORIZED),
)
}
}
async fn guard_request_by_ip(
req: Request<Body>,
next: Next,

View File

@@ -18,8 +18,8 @@ cache.workspace = true
resolver_api.workspace = true
serror.workspace = true
# external
futures-util.workspace = true
serde_json.workspace = true
reqwest.workspace = true
tracing.workspace = true
anyhow.workspace = true
bytes.workspace = true

View File

@@ -82,11 +82,14 @@ pub struct DeleteTerminal {
#[response(NoData)]
#[error(serror::Error)]
pub struct DeleteAllTerminals {}
//
/// Note: The `terminal` must already exist, created by [CreateTerminal].
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExecuteTerminalBody {
#[derive(Serialize, Deserialize, Debug, Clone, Resolve)]
#[response(Uuid)]
#[error(serror::Error)]
pub struct ExecuteTerminal {
/// Specify the terminal to execute the command on.
pub terminal: String,
/// The command to execute.
@@ -95,8 +98,10 @@ pub struct ExecuteTerminalBody {
//
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ExecuteContainerExecBody {
#[derive(Serialize, Deserialize, Debug, Clone, Resolve)]
#[response(Uuid)]
#[error(serror::Error)]
pub struct ExecuteContainerExec {
/// The name of the container to execute command in.
pub container: String,
/// The shell to start inside container.

View File

@@ -25,32 +25,18 @@ pub fn periphery_response_channels() -> &'static ResponseChannels {
RESPONSE_CHANNELS.get_or_init(Default::default)
}
fn periphery_http_client() -> &'static reqwest::Client {
static PERIPHERY_HTTP_CLIENT: OnceLock<reqwest::Client> =
OnceLock::new();
PERIPHERY_HTTP_CLIENT.get_or_init(|| {
reqwest::Client::builder()
// Use to allow communication with Periphery self-signed certs.
.danger_accept_invalid_certs(true)
.build()
.expect("Failed to build Periphery http client")
})
}
pub struct PeripheryClient {
id: String,
address: String,
pub id: String,
passkey: String,
}
impl PeripheryClient {
pub fn new(
id: impl Into<String>,
address: impl Into<String>,
passkey: impl Into<String>,
) -> PeripheryClient {
PeripheryClient {
id: id.into(),
address: address.into(),
passkey: passkey.into(),
}
}

View File

@@ -78,7 +78,7 @@ where
}
};
let (state, data) = match from_transport_bytes(bytes) {
Ok((_, state, data)) if !data.is_empty() => (state, data),
Ok((data, _, state)) if !data.is_empty() => (state, data),
// TODO: Handle no data cases
Ok(_) => continue,
Err(e) => {

View File

@@ -1,8 +1,13 @@
use std::{
pin::Pin,
task::{self, Poll},
};
use anyhow::Context;
use bytes::Bytes;
use komodo_client::terminal::TerminalStreamResponse;
use reqwest::RequestBuilder;
use futures_util::{Stream, StreamExt};
use tokio::sync::mpsc::{Receiver, Sender, channel};
use transport::bytes::data_from_transport_bytes;
use uuid::Uuid;
use crate::{
@@ -55,7 +60,7 @@ impl PeripheryClient {
let id = self
.request(ConnectContainerExec { container, shell })
.await
.context("Failed to create conntainer exec connection")?;
.context("Failed to create container exec connection")?;
let response_channels = periphery_response_channels()
.get_or_insert_default(&self.id)
@@ -84,15 +89,30 @@ impl PeripheryClient {
&self,
terminal: String,
command: String,
) -> anyhow::Result<TerminalStreamResponse> {
) -> anyhow::Result<
impl Stream<Item = anyhow::Result<Bytes>> + 'static,
> {
tracing::trace!(
"sending request | type: ExecuteTerminal | terminal name: {terminal} | command: {command}",
);
let req = crate::periphery_http_client()
.post(format!("{}/terminal/execute", self.address))
.json(&ExecuteTerminalBody { terminal, command })
.header("authorization", &self.passkey);
terminal_stream_response(req).await
let id = self
.request(ExecuteTerminal { terminal, command })
.await
.context("Failed to create execute terminal connection")?;
let response_channels = periphery_response_channels()
.get_or_insert_default(&self.id)
.await;
let (response_sender, response_receiever) = channel(1000);
response_channels.insert(id, response_sender).await;
let stream = ReceiverStream(response_receiever)
.map(|bytes| data_from_transport_bytes(bytes));
Ok(stream)
}
/// Executes command on specified container,
@@ -114,45 +134,45 @@ impl PeripheryClient {
container: String,
shell: String,
command: String,
) -> anyhow::Result<TerminalStreamResponse> {
) -> anyhow::Result<
impl Stream<Item = anyhow::Result<Bytes>> + 'static,
> {
tracing::trace!(
"sending request | type: ExecuteContainerExec | container: {container} | shell: {shell} | command: {command}",
);
let req = crate::periphery_http_client()
.post(format!("{}/terminal/execute/container", self.address))
.json(&ExecuteContainerExecBody {
let id = self
.request(ExecuteContainerExec {
container,
shell,
command,
})
.header("authorization", &self.passkey);
terminal_stream_response(req).await
}
}
async fn terminal_stream_response(
req: RequestBuilder,
) -> anyhow::Result<TerminalStreamResponse> {
let res =
req.send().await.context("Failed at request to periphery")?;
let status = res.status();
tracing::debug!(
"got response | type: ExecuteTerminal | {status} | response: {res:?}",
);
if status.is_success() {
Ok(TerminalStreamResponse(res))
} else {
tracing::debug!("response is non-200");
let text = res
.text()
.await
.context("Failed to convert response to text")?;
.context("Failed to create execute terminal connection")?;
tracing::debug!("got response text, deserializing error");
let response_channels = periphery_response_channels()
.get_or_insert_default(&self.id)
.await;
let error = serror::deserialize_error(text).context(status);
let (response_sender, response_receiever) = channel(1000);
Err(error)
response_channels.insert(id, response_sender).await;
let stream = ReceiverStream(response_receiever)
.map(|bytes| data_from_transport_bytes(bytes));
Ok(stream)
}
}
pub struct ReceiverStream<T>(Receiver<T>);
impl<T> Stream for ReceiverStream<T> {
type Item = T;
fn poll_next(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Option<T>> {
self.0.poll_recv(cx)
}
}

View File

@@ -81,6 +81,11 @@ impl<K: PartialEq + Eq + Hash + std::fmt::Debug + Clone, T: Clone>
self.cache.read().await.get(key).cloned()
}
pub async fn get_keys(&self) -> Vec<K> {
let cache = self.cache.read().await;
cache.keys().cloned().collect()
}
pub async fn get_values(&self) -> Vec<T> {
let cache = self.cache.read().await;
cache.values().cloned().collect()

View File

@@ -66,11 +66,11 @@ pub fn data_from_transport_bytes(
/// incoming transport bytes.
pub fn from_transport_bytes(
bytes: Bytes,
) -> anyhow::Result<(Uuid, MessageState, Bytes)> {
) -> anyhow::Result<(Bytes, Uuid, MessageState)> {
let (id, state) = id_state_from_transport_bytes(&bytes)?;
let mut res: Vec<u8> = bytes.into();
res.drain((res.len() - 17)..);
Ok((id, state, res.into()))
Ok((res.into(), id, state))
}
impl MessageState {