clean up websocket handlers with many params

This commit is contained in:
mbecker20
2025-09-21 12:09:47 -07:00
parent 996d4aa129
commit 6700700a80
6 changed files with 190 additions and 168 deletions

View File

@@ -68,15 +68,14 @@ pub async fn handler(
query,
};
// TODO: source the pk
if let Err(e) = super::handle_websocket::<ClientLoginFlow>(
let handler = super::WebsocketHandler {
socket,
connection_identifiers,
&mut write_receiver,
|| already_logged_login_error = false,
)
.await
{
write_receiver: &mut write_receiver,
on_login_success: || already_logged_login_error = false,
};
if let Err(e) = handler.handle::<ClientLoginFlow>().await {
if !already_logged_login_error {
warn!("Failed to login | {e:#}");
already_logged_login_error = true;

View File

@@ -60,74 +60,85 @@ pub fn init_response_channel() {
.expect("response_receiver initialized more than once");
}
async fn handle_websocket<L: LoginFlow>(
mut socket: impl Websocket,
connection_identifiers: ConnectionIdentifiers<'_>,
write_receiver: &mut BufferedReceiver<Bytes>,
mut on_login_success: impl FnMut(),
) -> anyhow::Result<()> {
L::login(
&mut socket,
connection_identifiers,
&periphery_config().private_key,
&CorePublicKeyValidator,
)
.await?;
on_login_success();
pub struct WebsocketHandler<'a, W, S> {
pub socket: W,
pub connection_identifiers: ConnectionIdentifiers<'a>,
pub write_receiver: &'a mut BufferedReceiver<Bytes>,
pub on_login_success: S,
}
info!("Logged in to Core connection websocket");
impl<W: Websocket, S: FnMut()> WebsocketHandler<'_, W, S> {
async fn handle<L: LoginFlow>(self) -> anyhow::Result<()> {
let WebsocketHandler {
mut socket,
connection_identifiers,
write_receiver,
mut on_login_success,
} = self;
let (mut ws_write, mut ws_read) = socket.split();
L::login(
&mut socket,
connection_identifiers,
&periphery_config().private_key,
&CorePublicKeyValidator,
)
.await?;
on_login_success();
let forward_writes = async {
loop {
let msg = match write_receiver.recv().await {
// Sender Dropped (shouldn't happen, it is static).
None => break,
// This has to copy the bytes to follow ownership rules.
Some(msg) => Bytes::copy_from_slice(msg),
};
match ws_write.send(msg).await {
// Clears the stored message from receiver buffer.
// TODO: Move after response ack.
Ok(_) => write_receiver.clear_buffer(),
Err(e) => {
warn!("Failed to send response | {e:?}");
let _ = ws_write.close(None).await;
break;
info!("Logged in to Core connection websocket");
let (mut ws_write, mut ws_read) = socket.split();
let forward_writes = async {
loop {
let msg = match write_receiver.recv().await {
// Sender Dropped (shouldn't happen, it is static).
None => break,
// This has to copy the bytes to follow ownership rules.
Some(msg) => Bytes::copy_from_slice(msg),
};
match ws_write.send(msg).await {
// Clears the stored message from receiver buffer.
// TODO: Move after response ack.
Ok(_) => write_receiver.clear_buffer(),
Err(e) => {
warn!("Failed to send response | {e:?}");
let _ = ws_write.close(None).await;
break;
}
}
}
}
};
};
let handle_reads = async {
loop {
match ws_read.recv().await {
Ok(WebsocketMessage::Binary(bytes)) => {
handle_incoming_bytes(bytes).await
}
Ok(WebsocketMessage::Close(frame)) => {
warn!("Connection closed with frame: {frame:?}");
break;
}
Ok(WebsocketMessage::Closed) => {
warn!("Connection already closed");
break;
}
Err(e) => {
error!("Failed to read websocket message | {e:?}");
break;
}
};
}
};
let handle_reads = async {
loop {
match ws_read.recv().await {
Ok(WebsocketMessage::Binary(bytes)) => {
handle_incoming_bytes(bytes).await
}
Ok(WebsocketMessage::Close(frame)) => {
warn!("Connection closed with frame: {frame:?}");
break;
}
Ok(WebsocketMessage::Closed) => {
warn!("Connection already closed");
break;
}
Err(e) => {
error!("Failed to read websocket message | {e:?}");
break;
}
};
}
};
tokio::select! {
_ = forward_writes => {},
_ = handle_reads => {},
};
tokio::select! {
_ = forward_writes => {},
_ = handle_reads => {},
};
Ok(())
Ok(())
}
}
pub struct CorePublicKeyValidator;

View File

@@ -73,17 +73,13 @@ async fn handler(
.status_code(StatusCode::UNAUTHORIZED)?;
Ok(ws.on_upgrade(|socket| async move {
let socket = AxumWebsocket(socket);
// TODO: source the pk
if let Err(e) = super::handle_websocket::<ServerLoginFlow>(
socket,
identifiers.build(&[]),
&mut write_receiver,
|| {},
)
.await
{
let handler = super::WebsocketHandler {
socket: AxumWebsocket(socket),
connection_identifiers: identifiers.build(&[]),
write_receiver: &mut write_receiver,
on_login_success: || {},
};
if let Err(e) = handler.handle::<ServerLoginFlow>().await {
warn!("Core failed to login to connection | {e:#}");
return;
}

View File

@@ -146,21 +146,21 @@ pub async fn spawn_client_connection(
info!("PERIPHERY: Connected to {address}");
if let Err(e) = super::handle_websocket::<ClientLoginFlow>(
&address,
let handler = super::WebsocketHandler {
label: &address,
socket,
ConnectionIdentifiers {
connection_identifiers: ConnectionIdentifiers {
host: &host,
accept: accept.as_bytes(),
query: &[],
},
&private_key,
&mut write_receiver,
&connection,
&handler,
)
.await
{
private_key: &private_key,
write_receiver: &mut write_receiver,
connection: &connection,
handler: &handler,
};
if let Err(e) = handler.handle::<ClientLoginFlow>().await {
if connection.cancel.is_cancelled() {
break;
}

View File

@@ -27,88 +27,104 @@ use crate::all_server_channels;
pub mod client;
pub mod server;
async fn handle_websocket<L: LoginFlow>(
label: &str,
mut socket: impl Websocket,
connection_identifiers: ConnectionIdentifiers<'_>,
private_key: &str,
write_receiver: &mut BufferedReceiver<Bytes>,
connection: &PeripheryConnection,
handler: &MessageHandler,
) -> anyhow::Result<()> {
L::login(
&mut socket,
connection_identifiers,
private_key,
&PeripheryPublicKeyValidator,
)
.await?;
pub struct WebsocketHandler<'a, W> {
pub label: &'a str,
pub socket: W,
pub connection_identifiers: ConnectionIdentifiers<'a>,
pub private_key: &'a str,
pub write_receiver: &'a mut BufferedReceiver<Bytes>,
pub connection: &'a PeripheryConnection,
pub handler: &'a MessageHandler,
}
info!("PERIPHERY: Logged into {label}");
impl<W: Websocket> WebsocketHandler<'_, W> {
async fn handle<L: LoginFlow>(self) -> anyhow::Result<()> {
let WebsocketHandler {
label,
mut socket,
connection_identifiers,
private_key,
write_receiver,
connection,
handler,
} = self;
connection.set_connected(true);
connection.clear_error().await;
L::login(
&mut socket,
connection_identifiers,
private_key,
&PeripheryPublicKeyValidator,
)
.await?;
let (mut ws_write, mut ws_read) = socket.split();
info!("PERIPHERY: Logged into {label}");
let forward_writes = async {
loop {
let next = tokio::select! {
next = write_receiver.recv() => next,
_ = connection.cancel.cancelled() => break,
};
connection.set_connected(true);
connection.clear_error().await;
let message = match next {
Some(request) => Bytes::copy_from_slice(request),
// Sender Dropped (shouldn't happen, a reference is held on 'connection').
None => break,
};
let (mut ws_write, mut ws_read) = socket.split();
match ws_write.send(message).await {
Ok(_) => write_receiver.clear_buffer(),
Err(e) => {
warn!("Failed to send request to {label} | {e:#}");
break;
let forward_writes = async {
loop {
let next = tokio::select! {
next = write_receiver.recv() => next,
_ = connection.cancel.cancelled() => break,
};
let message = match next {
Some(request) => Bytes::copy_from_slice(request),
// Sender Dropped (shouldn't happen, a reference is held on 'connection').
None => break,
};
match ws_write.send(message).await {
Ok(_) => write_receiver.clear_buffer(),
Err(e) => {
warn!("Failed to send request to {label} | {e:#}");
break;
}
}
}
}
// Cancel again if not already
let _ = ws_write.close(None).await;
connection.cancel();
};
// Cancel again if not already
let _ = ws_write.close(None).await;
connection.cancel();
};
let handle_reads = async {
loop {
let next = tokio::select! {
next = ws_read.recv() => next,
_ = connection.cancel.cancelled() => break,
};
let handle_reads = async {
loop {
let next = tokio::select! {
next = ws_read.recv() => next,
_ = connection.cancel.cancelled() => break,
};
match next {
Ok(WebsocketMessage::Binary(bytes)) => {
handler.handle_incoming_bytes(bytes).await
}
Ok(WebsocketMessage::Close(frame)) => {
warn!("Connection to {label} broken with frame: {frame:?}");
break;
}
Ok(WebsocketMessage::Closed) => {}
Err(e) => {
warn!("Connection to {label} broken with error: {e:?}");
break;
}
};
}
// Cancel again if not already
connection.cancel();
};
match next {
Ok(WebsocketMessage::Binary(bytes)) => {
handler.handle_incoming_bytes(bytes).await
}
Ok(WebsocketMessage::Close(frame)) => {
warn!(
"Connection to {label} broken with frame: {frame:?}"
);
break;
}
Ok(WebsocketMessage::Closed) => {}
Err(e) => {
warn!("Connection to {label} broken with error: {e:?}");
break;
}
};
}
// Cancel again if not already
connection.cancel();
};
tokio::join!(forward_writes, handle_reads);
tokio::join!(forward_writes, handle_reads);
warn!("PERIPHERY: Disconnnected from {label}");
connection.set_connected(false);
warn!("PERIPHERY: Disconnnected from {label}");
connection.set_connected(false);
Ok(())
Ok(())
}
}
pub struct PeripheryPublicKeyValidator;

View File

@@ -37,17 +37,17 @@ pub async fn handler(
}
Ok(ws.on_upgrade(|socket| async move {
if let Err(e) = super::handle_websocket::<ServerLoginFlow>(
&server_id,
AxumWebsocket(socket),
identifiers.build(query.as_bytes()),
&private_key,
&mut write_receiver,
&connection,
&handler,
)
.await
{
let handler = super::WebsocketHandler {
label: &server_id,
socket: AxumWebsocket(socket),
connection_identifiers: identifiers.build(query.as_bytes()),
private_key: &private_key,
write_receiver: &mut write_receiver,
connection: &connection,
handler: &handler,
};
if let Err(e) = handler.handle::<ServerLoginFlow>().await {
warn!("Server {server_id} | Client failed to login | {e:#}");
connection.set_error(e).await;
return;