This commit is contained in:
mbecker20
2025-09-14 12:39:50 -07:00
parent cea7c5fc5e
commit d1d2227d36
9 changed files with 148 additions and 55 deletions

11
Cargo.lock generated
View File

@@ -2809,6 +2809,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"transport",
"uuid",
]
@@ -3647,6 +3648,7 @@ version = "1.19.5"
dependencies = [
"anyhow",
"cache",
"futures-util",
"komodo_client",
"reqwest",
"resolver_api",
@@ -3657,7 +3659,9 @@ dependencies = [
"serror",
"tokio",
"tokio-tungstenite 0.27.0",
"tokio-util",
"tracing",
"transport",
"uuid",
]
@@ -5504,6 +5508,13 @@ dependencies = [
"tracing-serde",
]
[[package]]
name = "transport"
version = "1.19.3"
dependencies = [
"tokio",
]
[[package]]
name = "try-lock"
version = "0.2.5"

View File

@@ -26,6 +26,7 @@ environment_file = { path = "lib/environment_file" }
environment = { path = "lib/environment" }
interpolate = { path = "lib/interpolate" }
formatting = { path = "lib/formatting" }
transport = { path = "lib/transport" }
database = { path = "lib/database" }
response = { path = "lib/response" }
command = { path = "lib/command" }

View File

@@ -21,6 +21,7 @@ environment_file.workspace = true
environment.workspace = true
interpolate.workspace = true
formatting.workspace = true
transport.workspace = true
response.workspace = true
command.workspace = true
config.workspace = true

View File

@@ -13,10 +13,8 @@ use periphery_client::message::{
use resolver_api::Resolve;
use response::JsonBytes;
use serror::{AddStatusCode, serialize_error_bytes};
use tokio::sync::{
Mutex,
mpsc::{Receiver, Sender, channel},
};
use tokio::sync::{Mutex, mpsc::Sender};
use transport::{BufferedReceiver, buffered_channel};
use crate::api::PeripheryRequest;
@@ -69,7 +67,7 @@ pub async fn inbound_connection(
match ws_write.send(msg).await {
// Clears the stored message from receiver buffer.
// TODO: Move after response ack from Core.
Ok(_) => response_receiver.confirm_receipt(),
Ok(_) => response_receiver.clear_buffer(),
Err(e) => {
warn!("Failed to send response | {e:?}");
let _ = ws_write.close().await;
@@ -165,49 +163,11 @@ const RESPONSE_BUFFER_MAX_LEN: usize = 1_024;
/// Must call in startup sequence
pub fn init_response_channel() {
let (response_sender, receiver) =
channel::<Vec<u8>>(RESPONSE_BUFFER_MAX_LEN);
let (sender, receiver) = buffered_channel(RESPONSE_BUFFER_MAX_LEN);
RESPONSE_SENDER
.set(response_sender)
.set(sender)
.expect("response_sender initialized more than once");
RESPONSE_RECEIVER
.set(Mutex::new(BufferedReceiver::new(receiver)))
.set(Mutex::new(receiver))
.expect("response_receiver initialized more than once");
}
/// Wrapper around channel receiver to control when
/// the latest message is dropped,
/// in case it must be re-transmitted.
#[derive(Debug)]
struct BufferedReceiver {
receiver: Receiver<Vec<u8>>,
buffer: Option<Vec<u8>>,
}
impl BufferedReceiver {
fn new(receiver: Receiver<Vec<u8>>) -> BufferedReceiver {
BufferedReceiver {
receiver,
buffer: None,
}
}
/// If 'next: Some(bytes)':
/// - Immediately returns borrow of next.
/// Else:
/// - Wait for next item
/// - store in 'next'
/// - return borrow of next.
async fn recv(&mut self) -> Option<&[u8]> {
if self.buffer.is_none() {
self.buffer = Some(self.receiver.recv().await?);
}
self.buffer.as_deref()
}
/// Clears buffer.
/// Should be called after transmission confirmed.
fn confirm_receipt(&mut self) {
self.buffer = None;
}
}

View File

@@ -12,12 +12,15 @@ repository.workspace = true
[dependencies]
# local
komodo_client.workspace = true
transport.workspace = true
cache.workspace = true
# mogh
resolver_api.workspace = true
serror.workspace = true
# external
tokio-tungstenite.workspace = true
futures-util.workspace = true
tokio-util.workspace = true
serde_json.workspace = true
serde_qs.workspace = true
reqwest.workspace = true

View File

@@ -1,29 +1,89 @@
use std::sync::OnceLock;
use cache::CloneCache;
use futures_util::StreamExt;
use tokio::sync::{broadcast, mpsc};
use tokio_util::sync::CancellationToken;
use transport::{BufferedReceiver, buffered_channel};
#[derive(Debug)]
struct ConnectionChannel {
request_sender: mpsc::Sender<Vec<u8>>,
response_receiver: broadcast::Receiver<Vec<u8>>,
cancel: CancellationToken,
}
impl ConnectionChannel {
fn new() -> (
ConnectionChannel,
BufferedReceiver,
broadcast::Sender<Vec<u8>>,
CancellationToken,
) {
let (request_sender, request_receiver) = buffered_channel(1000);
let (response_sender, response_receiver) =
broadcast::channel(1000);
let cancel = CancellationToken::new();
(
ConnectionChannel {
request_sender,
response_receiver,
cancel: cancel.clone(),
},
request_receiver,
response_sender,
cancel,
)
}
}
type ConnectionCache = CloneCache<String, ()>;
// fn connections() -> &'static () {
// static CONNECTIONS: OnceLock<CloneCache<>> = OnceLock::new();
// CONNECTIONS.get_or_init(|| ())
// }
impl Clone for ConnectionChannel {
fn clone(&self) -> Self {
Self {
request_sender: self.request_sender.clone(),
response_receiver: self.response_receiver.resubscribe(),
cancel: self.cancel.clone(),
}
}
}
type ConnectionCache = CloneCache<String, ConnectionChannel>;
fn connections() -> &'static ConnectionCache {
static CONNECTIONS: OnceLock<ConnectionCache> = OnceLock::new();
CONNECTIONS.get_or_init(Default::default)
}
// Assumes address already wss formatted
fn spawn_connection(address: String) -> anyhow::Result<()> {
async fn spawn_connection(address: String) -> anyhow::Result<()> {
let (channel, request_receiver, response_sender, cancel) =
ConnectionChannel::new();
if let Some(existing) =
connections().insert(address.clone(), channel).await
{
existing.cancel.cancel();
}
tokio::spawn(async move {
// Outer connection loop
loop {
let socket = match crate::ws::connect_websocket(&address).await {
let socket = match crate::ws::connect_websocket(&address).await
{
Ok(socket) => socket,
Err(e) => {
// TODO: handle connect error
return;
}
};
let (mut ws_read, mut ws_write) = socket.split();
let forward_requests = async {
loop {
// match request_receiver.recv().await {
// None => break,
// }
}
};
}
});

View File

@@ -85,12 +85,12 @@ impl<K: PartialEq + Eq + Hash + std::fmt::Debug + Clone, T: Clone>
cache.values().cloned().collect()
}
pub async fn insert<Key>(&self, key: Key, val: T)
pub async fn insert<Key>(&self, key: Key, val: T) -> Option<T>
where
T: std::fmt::Debug,
Key: Into<K> + std::fmt::Debug,
{
self.cache.write().await.insert(key.into(), val);
self.cache.write().await.insert(key.into(), val)
}
pub async fn remove(&self, key: &K) -> Option<T> {

11
lib/transport/Cargo.toml Normal file
View File

@@ -0,0 +1,11 @@
[package]
name = "transport"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
repository.workspace = true
homepage.workspace = true
[dependencies]
tokio.workspace = true

46
lib/transport/src/lib.rs Normal file
View File

@@ -0,0 +1,46 @@
use tokio::sync::mpsc;
/// Create a buffered channel
pub fn buffered_channel(
buffer: usize,
) -> (mpsc::Sender<Vec<u8>>, BufferedReceiver) {
let (sender, receiver) = mpsc::channel(buffer);
(sender, BufferedReceiver::new(receiver))
}
/// Wrapper around channel receiver to control when
/// the latest message is dropped,
/// in case it must be re-transmitted.
#[derive(Debug)]
pub struct BufferedReceiver {
receiver: mpsc::Receiver<Vec<u8>>,
buffer: Option<Vec<u8>>,
}
impl BufferedReceiver {
pub fn new(receiver: mpsc::Receiver<Vec<u8>>) -> BufferedReceiver {
BufferedReceiver {
receiver,
buffer: None,
}
}
/// If 'next: Some(bytes)':
/// - Immediately returns borrow of next.
/// Else:
/// - Wait for next item
/// - store in 'next'
/// - return borrow of next.
pub async fn recv(&mut self) -> Option<&[u8]> {
if self.buffer.is_none() {
self.buffer = Some(self.receiver.recv().await?);
}
self.buffer.as_deref()
}
/// Clears buffer.
/// Should be called after transmission confirmed.
pub fn clear_buffer(&mut self) {
self.buffer = None;
}
}