From d1d2227d3619819c09df638506516915e7604c8d Mon Sep 17 00:00:00 2001 From: mbecker20 Date: Sun, 14 Sep 2025 12:39:50 -0700 Subject: [PATCH] prog --- Cargo.lock | 11 ++++ Cargo.toml | 1 + bin/periphery/Cargo.toml | 1 + bin/periphery/src/connection.rs | 52 +++--------------- client/periphery/rs/Cargo.toml | 3 ++ client/periphery/rs/src/connections.rs | 74 +++++++++++++++++++++++--- lib/cache/src/lib.rs | 4 +- lib/transport/Cargo.toml | 11 ++++ lib/transport/src/lib.rs | 46 ++++++++++++++++ 9 files changed, 148 insertions(+), 55 deletions(-) create mode 100644 lib/transport/Cargo.toml create mode 100644 lib/transport/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 48405a7f4..db68bdf93 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2809,6 +2809,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tracing", + "transport", "uuid", ] @@ -3647,6 +3648,7 @@ version = "1.19.5" dependencies = [ "anyhow", "cache", + "futures-util", "komodo_client", "reqwest", "resolver_api", @@ -3657,7 +3659,9 @@ dependencies = [ "serror", "tokio", "tokio-tungstenite 0.27.0", + "tokio-util", "tracing", + "transport", "uuid", ] @@ -5504,6 +5508,13 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "transport" +version = "1.19.3" +dependencies = [ + "tokio", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 8a947a26a..9670191de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ environment_file = { path = "lib/environment_file" } environment = { path = "lib/environment" } interpolate = { path = "lib/interpolate" } formatting = { path = "lib/formatting" } +transport = { path = "lib/transport" } database = { path = "lib/database" } response = { path = "lib/response" } command = { path = "lib/command" } diff --git a/bin/periphery/Cargo.toml b/bin/periphery/Cargo.toml index 34402303f..fafeb39f1 100644 --- a/bin/periphery/Cargo.toml +++ b/bin/periphery/Cargo.toml @@ -21,6 +21,7 @@ environment_file.workspace = true environment.workspace = true interpolate.workspace = true formatting.workspace = true +transport.workspace = true response.workspace = true command.workspace = true config.workspace = true diff --git a/bin/periphery/src/connection.rs b/bin/periphery/src/connection.rs index b2c0491c7..605009356 100644 --- a/bin/periphery/src/connection.rs +++ b/bin/periphery/src/connection.rs @@ -13,10 +13,8 @@ use periphery_client::message::{ use resolver_api::Resolve; use response::JsonBytes; use serror::{AddStatusCode, serialize_error_bytes}; -use tokio::sync::{ - Mutex, - mpsc::{Receiver, Sender, channel}, -}; +use tokio::sync::{Mutex, mpsc::Sender}; +use transport::{BufferedReceiver, buffered_channel}; use crate::api::PeripheryRequest; @@ -69,7 +67,7 @@ pub async fn inbound_connection( match ws_write.send(msg).await { // Clears the stored message from receiver buffer. // TODO: Move after response ack from Core. - Ok(_) => response_receiver.confirm_receipt(), + Ok(_) => response_receiver.clear_buffer(), Err(e) => { warn!("Failed to send response | {e:?}"); let _ = ws_write.close().await; @@ -165,49 +163,11 @@ const RESPONSE_BUFFER_MAX_LEN: usize = 1_024; /// Must call in startup sequence pub fn init_response_channel() { - let (response_sender, receiver) = - channel::>(RESPONSE_BUFFER_MAX_LEN); + let (sender, receiver) = buffered_channel(RESPONSE_BUFFER_MAX_LEN); RESPONSE_SENDER - .set(response_sender) + .set(sender) .expect("response_sender initialized more than once"); RESPONSE_RECEIVER - .set(Mutex::new(BufferedReceiver::new(receiver))) + .set(Mutex::new(receiver)) .expect("response_receiver initialized more than once"); } - -/// Wrapper around channel receiver to control when -/// the latest message is dropped, -/// in case it must be re-transmitted. -#[derive(Debug)] -struct BufferedReceiver { - receiver: Receiver>, - buffer: Option>, -} - -impl BufferedReceiver { - fn new(receiver: Receiver>) -> BufferedReceiver { - BufferedReceiver { - receiver, - buffer: None, - } - } - - /// If 'next: Some(bytes)': - /// - Immediately returns borrow of next. - /// Else: - /// - Wait for next item - /// - store in 'next' - /// - return borrow of next. - async fn recv(&mut self) -> Option<&[u8]> { - if self.buffer.is_none() { - self.buffer = Some(self.receiver.recv().await?); - } - self.buffer.as_deref() - } - - /// Clears buffer. - /// Should be called after transmission confirmed. - fn confirm_receipt(&mut self) { - self.buffer = None; - } -} diff --git a/client/periphery/rs/Cargo.toml b/client/periphery/rs/Cargo.toml index 78b8f3151..c6fbd6f80 100644 --- a/client/periphery/rs/Cargo.toml +++ b/client/periphery/rs/Cargo.toml @@ -12,12 +12,15 @@ repository.workspace = true [dependencies] # local komodo_client.workspace = true +transport.workspace = true cache.workspace = true # mogh resolver_api.workspace = true serror.workspace = true # external tokio-tungstenite.workspace = true +futures-util.workspace = true +tokio-util.workspace = true serde_json.workspace = true serde_qs.workspace = true reqwest.workspace = true diff --git a/client/periphery/rs/src/connections.rs b/client/periphery/rs/src/connections.rs index 0334fa16a..c239ac798 100644 --- a/client/periphery/rs/src/connections.rs +++ b/client/periphery/rs/src/connections.rs @@ -1,29 +1,89 @@ use std::sync::OnceLock; use cache::CloneCache; +use futures_util::StreamExt; use tokio::sync::{broadcast, mpsc}; +use tokio_util::sync::CancellationToken; +use transport::{BufferedReceiver, buffered_channel}; +#[derive(Debug)] +struct ConnectionChannel { + request_sender: mpsc::Sender>, + response_receiver: broadcast::Receiver>, + cancel: CancellationToken, +} +impl ConnectionChannel { + fn new() -> ( + ConnectionChannel, + BufferedReceiver, + broadcast::Sender>, + CancellationToken, + ) { + let (request_sender, request_receiver) = buffered_channel(1000); + let (response_sender, response_receiver) = + broadcast::channel(1000); + let cancel = CancellationToken::new(); + ( + ConnectionChannel { + request_sender, + response_receiver, + cancel: cancel.clone(), + }, + request_receiver, + response_sender, + cancel, + ) + } +} -type ConnectionCache = CloneCache; -// fn connections() -> &'static () { -// static CONNECTIONS: OnceLock> = OnceLock::new(); -// CONNECTIONS.get_or_init(|| ()) -// } +impl Clone for ConnectionChannel { + fn clone(&self) -> Self { + Self { + request_sender: self.request_sender.clone(), + response_receiver: self.response_receiver.resubscribe(), + cancel: self.cancel.clone(), + } + } +} + +type ConnectionCache = CloneCache; +fn connections() -> &'static ConnectionCache { + static CONNECTIONS: OnceLock = OnceLock::new(); + CONNECTIONS.get_or_init(Default::default) +} // Assumes address already wss formatted -fn spawn_connection(address: String) -> anyhow::Result<()> { +async fn spawn_connection(address: String) -> anyhow::Result<()> { + let (channel, request_receiver, response_sender, cancel) = + ConnectionChannel::new(); + if let Some(existing) = + connections().insert(address.clone(), channel).await + { + existing.cancel.cancel(); + } + tokio::spawn(async move { // Outer connection loop loop { - let socket = match crate::ws::connect_websocket(&address).await { + let socket = match crate::ws::connect_websocket(&address).await + { Ok(socket) => socket, Err(e) => { // TODO: handle connect error return; } }; + let (mut ws_read, mut ws_write) = socket.split(); + let forward_requests = async { + loop { + // match request_receiver.recv().await { + // None => break, + + // } + } + }; } }); diff --git a/lib/cache/src/lib.rs b/lib/cache/src/lib.rs index f04bc1605..e1a3ea9af 100644 --- a/lib/cache/src/lib.rs +++ b/lib/cache/src/lib.rs @@ -85,12 +85,12 @@ impl cache.values().cloned().collect() } - pub async fn insert(&self, key: Key, val: T) + pub async fn insert(&self, key: Key, val: T) -> Option where T: std::fmt::Debug, Key: Into + std::fmt::Debug, { - self.cache.write().await.insert(key.into(), val); + self.cache.write().await.insert(key.into(), val) } pub async fn remove(&self, key: &K) -> Option { diff --git a/lib/transport/Cargo.toml b/lib/transport/Cargo.toml new file mode 100644 index 000000000..115863e8f --- /dev/null +++ b/lib/transport/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "transport" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true +repository.workspace = true +homepage.workspace = true + +[dependencies] +tokio.workspace = true \ No newline at end of file diff --git a/lib/transport/src/lib.rs b/lib/transport/src/lib.rs new file mode 100644 index 000000000..77b1d97e6 --- /dev/null +++ b/lib/transport/src/lib.rs @@ -0,0 +1,46 @@ +use tokio::sync::mpsc; + +/// Create a buffered channel +pub fn buffered_channel( + buffer: usize, +) -> (mpsc::Sender>, BufferedReceiver) { + let (sender, receiver) = mpsc::channel(buffer); + (sender, BufferedReceiver::new(receiver)) +} + +/// Wrapper around channel receiver to control when +/// the latest message is dropped, +/// in case it must be re-transmitted. +#[derive(Debug)] +pub struct BufferedReceiver { + receiver: mpsc::Receiver>, + buffer: Option>, +} + +impl BufferedReceiver { + pub fn new(receiver: mpsc::Receiver>) -> BufferedReceiver { + BufferedReceiver { + receiver, + buffer: None, + } + } + + /// If 'next: Some(bytes)': + /// - Immediately returns borrow of next. + /// Else: + /// - Wait for next item + /// - store in 'next' + /// - return borrow of next. + pub async fn recv(&mut self) -> Option<&[u8]> { + if self.buffer.is_none() { + self.buffer = Some(self.receiver.recv().await?); + } + self.buffer.as_deref() + } + + /// Clears buffer. + /// Should be called after transmission confirmed. + pub fn clear_buffer(&mut self) { + self.buffer = None; + } +}