diff --git a/bin/core/src/connection/client.rs b/bin/core/src/connection/client.rs index b518c6c88..9e7481789 100644 --- a/bin/core/src/connection/client.rs +++ b/bin/core/src/connection/client.rs @@ -142,7 +142,7 @@ async fn handle_passkey_login( }; socket - .send(LoginMessage::V1Passkey(passkey)) + .send_message(LoginMessage::V1Passkey(passkey)) .await .context("Failed to send Login V1Passkey message")?; diff --git a/bin/core/src/connection/mod.rs b/bin/core/src/connection/mod.rs index d5fe2c2f9..07120acb9 100644 --- a/bin/core/src/connection/mod.rs +++ b/bin/core/src/connection/mod.rs @@ -365,7 +365,7 @@ impl PeripheryConnection { let Ok(message) = receiver.recv().await else { break; }; - match ws_write.send_inner(message.into_bytes()).await { + match ws_write.send(message.into_bytes()).await { Ok(_) => receiver.clear_buffer(), Err(e) => { self.set_error(e).await; @@ -380,7 +380,7 @@ impl PeripheryConnection { let handle_reads = async { loop { - match ws_read.recv_inner().await { + match ws_read.recv().await { Ok(WebsocketMessage::Message(message)) => { self.handle_incoming_message(message).await } diff --git a/bin/core/src/connection/server.rs b/bin/core/src/connection/server.rs index 5f135e494..c5d08433a 100644 --- a/bin/core/src/connection/server.rs +++ b/bin/core/src/connection/server.rs @@ -125,7 +125,7 @@ async fn existing_server_handler( let mut socket = AxumWebsocket(socket); if let Err(e) = socket - .send(LoginMessage::OnboardingFlow(false)) + .send_message(LoginMessage::OnboardingFlow(false)) .await .context("Failed to send Login OnboardingFlow false message") { @@ -158,7 +158,7 @@ async fn onboard_server_handler( format!("server={}", urlencoding::encode(&server_query)); let mut socket = AxumWebsocket(socket); - if let Err(e) = socket.send(LoginMessage::OnboardingFlow(true)).await.context( + if let Err(e) = socket.send_message(LoginMessage::OnboardingFlow(true)).await.context( "Failed to send Login OnboardingFlow true message", ).context("Server onboarding error") { warn!("{e:#}"); @@ -215,7 +215,7 @@ async fn onboard_server_handler( }; if let Err(e) = socket - .send(LoginMessage::Success) + .send_message(LoginMessage::Success) .await .context("Failed to send Login Onboarding Successful message") { diff --git a/bin/periphery/src/connection/client.rs b/bin/periphery/src/connection/client.rs index 39b43e949..e6504245e 100644 --- a/bin/periphery/src/connection/client.rs +++ b/bin/periphery/src/connection/client.rs @@ -161,7 +161,7 @@ async fn handle_onboarding( // Post onboarding login 1: Send public key socket - .send(LoginMessage::PublicKey( + .send_message(LoginMessage::PublicKey( periphery_keys().load().public.clone(), )) .await diff --git a/bin/periphery/src/connection/mod.rs b/bin/periphery/src/connection/mod.rs index cc4ed256b..4fa73244a 100644 --- a/bin/periphery/src/connection/mod.rs +++ b/bin/periphery/src/connection/mod.rs @@ -160,7 +160,7 @@ async fn handle_socket( break; } }; - match ws_write.send_inner(message.into_bytes()).await { + match ws_write.send(message.into_bytes()).await { // Clears the stored message from receiver buffer. Ok(_) => receiver.clear_buffer(), Err(e) => { @@ -174,7 +174,7 @@ async fn handle_socket( let handle_reads = async { loop { - let message = match ws_read.recv().await { + let message = match ws_read.recv_message().await { Ok(res) => res, Err(e) => { warn!("{e:#}"); diff --git a/bin/periphery/src/connection/server.rs b/bin/periphery/src/connection/server.rs index b6685b49b..4fdd3ac43 100644 --- a/bin/periphery/src/connection/server.rs +++ b/bin/periphery/src/connection/server.rs @@ -166,7 +166,7 @@ async fn handle_login( match (&config.core_public_keys, &config.passkeys) { (Some(_), _) | (_, None) => { socket - .send(LoginMessage::V1PasskeyFlow(false)) + .send_message(LoginMessage::V1PasskeyFlow(false)) .await .context("Failed to send Login V1PasskeyFlow message")?; super::handle_login::<_, ServerLoginFlow>(socket, identifiers) @@ -189,7 +189,7 @@ async fn handle_passkey_login( }; let res = async { socket - .send(LoginMessage::V1PasskeyFlow(true)) + .send_message(LoginMessage::V1PasskeyFlow(true)) .await .context("Failed to send login type indicator")?; @@ -204,7 +204,7 @@ async fn handle_passkey_login( .any(|expected_passkey| expected_passkey.as_bytes() == passkey) { socket - .send(LoginMessage::Success) + .send_message(LoginMessage::Success) .await .context("Failed to send login type indicator")?; Ok(()) diff --git a/lib/transport/src/auth.rs b/lib/transport/src/auth.rs index 02482461c..a8ea96f37 100644 --- a/lib/transport/src/auth.rs +++ b/lib/transport/src/auth.rs @@ -55,7 +55,7 @@ impl LoginFlow for ServerLoginFlow { let res = async { socket - .send(LoginMessage::Nonce(nonce)) + .send_message(LoginMessage::Nonce(nonce)) .await .context("Failed to send connection nonce")?; @@ -81,7 +81,7 @@ impl LoginFlow for ServerLoginFlow { .next_message() .context("Failed to write handshake_m2")?; socket - .send(LoginMessage::Handshake(handshake_m2)) + .send_message(LoginMessage::Handshake(handshake_m2)) .await .context("Failed to send handshake_m2")?; @@ -107,7 +107,7 @@ impl LoginFlow for ServerLoginFlow { match res { Ok(res) => { socket - .send(LoginMessage::Success) + .send_message(LoginMessage::Success) .await .context("Failed to send login successful to client")?; Ok(res) @@ -161,7 +161,7 @@ impl LoginFlow for ClientLoginFlow { .next_message() .context("Failed to write handshake m1")?; socket - .send(LoginMessage::Handshake(handshake_m1)) + .send_message(LoginMessage::Handshake(handshake_m1)) .await .context("Failed to send handshake_m1")?; @@ -188,7 +188,7 @@ impl LoginFlow for ClientLoginFlow { .next_message() .context("Failed to write handshake_m3")?; socket - .send(LoginMessage::Handshake(handshake_m3)) + .send_message(LoginMessage::Handshake(handshake_m3)) .await .context("Failed to send handshake_m3")?; diff --git a/lib/transport/src/websocket/axum.rs b/lib/transport/src/websocket/axum.rs index c8d5c8fd1..7fa3c43de 100644 --- a/lib/transport/src/websocket/axum.rs +++ b/lib/transport/src/websocket/axum.rs @@ -25,7 +25,7 @@ impl Websocket for AxumWebsocket { (AxumWebsocketSender(tx), AxumWebsocketReceiver::new(rx)) } - async fn send_inner(&mut self, bytes: Bytes) -> anyhow::Result<()> { + async fn send(&mut self, bytes: Bytes) -> anyhow::Result<()> { self .0 .send(axum::extract::ws::Message::Binary(bytes)) @@ -58,7 +58,7 @@ pub type InnerWebsocketSender = pub struct AxumWebsocketSender(pub InnerWebsocketSender); impl WebsocketSender for AxumWebsocketSender { - async fn send_inner(&mut self, bytes: Bytes) -> anyhow::Result<()> { + async fn send(&mut self, bytes: Bytes) -> anyhow::Result<()> { self .0 .send(axum::extract::ws::Message::Binary(bytes)) @@ -130,7 +130,7 @@ impl WebsocketReceiver for AxumWebsocketReceiver { self.cancel = Some(cancel); } - async fn recv_inner( + async fn recv( &mut self, ) -> anyhow::Result> { let fut = try_next(&mut self.receiver); diff --git a/lib/transport/src/websocket/login.rs b/lib/transport/src/websocket/login.rs index e973ef891..e498a587b 100644 --- a/lib/transport/src/websocket/login.rs +++ b/lib/transport/src/websocket/login.rs @@ -17,7 +17,7 @@ pub trait LoginWebsocketExt: WebsocketExt { ) -> impl Future> + Send { let message = TransportMessage::Login(EncodedLoginMessage::from(e.encode())); - self.send(message) + self.send_message(message) } fn recv_login_message( @@ -25,7 +25,7 @@ pub trait LoginWebsocketExt: WebsocketExt { ) -> impl Future> + Send { async { let TransportMessage::Login(message) = - self.recv().with_timeout(AUTH_TIMEOUT).await? + self.recv_message().with_timeout(AUTH_TIMEOUT).await? else { return Err(anyhow!( "Expected Login message, got other message type" diff --git a/lib/transport/src/websocket/mod.rs b/lib/transport/src/websocket/mod.rs index 7d527806f..a7e37e8c4 100644 --- a/lib/transport/src/websocket/mod.rs +++ b/lib/transport/src/websocket/mod.rs @@ -38,7 +38,7 @@ pub trait Websocket: Send { /// Abstraction over websocket splitting fn split(self) -> (impl WebsocketSender, impl WebsocketReceiver); - fn send_inner( + fn send( &mut self, bytes: Bytes, ) -> impl Future> + Send; @@ -60,15 +60,15 @@ pub trait Websocket: Send { } pub trait WebsocketExt: Websocket { - fn send( + fn send_message( &mut self, message: impl Encode, ) -> impl Future> + Send { - self.send_inner(message.encode().into_bytes()) + self.send(message.encode().into_bytes()) } - /// Looping receiver for websocket messages which only returns on messages. - fn recv( + /// Looping receiver for websocket messages which only returns on TransportMessage. + fn recv_message( &mut self, ) -> MaybeWithTimeout< impl Future> + Send, @@ -92,7 +92,7 @@ impl WebsocketExt for W {} /// Traits for split websocket receiver pub trait WebsocketSender { /// Streamlined sending on bytes - fn send_inner( + fn send( &mut self, bytes: Bytes, ) -> impl Future> + Send; @@ -108,7 +108,7 @@ pub trait WebsocketSenderExt: WebsocketSender + Send { &mut self, message: impl Encode, ) -> impl Future> + Send { - self.send_inner(message.encode().into_vec().into()) + self.send(message.encode().into_vec().into()) } fn send_request<'a, T: Serialize + Send>( @@ -160,7 +160,7 @@ pub trait WebsocketReceiver: Send { /// Looping receiver for websocket messages which only returns /// on significant messages. Must implement cancel support. - fn recv_inner( + fn recv( &mut self, ) -> impl Future< Output = anyhow::Result>, @@ -168,15 +168,15 @@ pub trait WebsocketReceiver: Send { } pub trait WebsocketReceiverExt: WebsocketReceiver { - /// Looping receiver for websocket messages which only returns on messages. - fn recv( + /// Looping receiver for websocket messages which only returns on TransportMessage. + fn recv_message( &mut self, ) -> MaybeWithTimeout< impl Future> + Send, > { MaybeWithTimeout::new(async { match self - .recv_inner() + .recv() .await .context("Failed to read websocket message")? { diff --git a/lib/transport/src/websocket/tungstenite.rs b/lib/transport/src/websocket/tungstenite.rs index 04e3839bd..caace9daf 100644 --- a/lib/transport/src/websocket/tungstenite.rs +++ b/lib/transport/src/websocket/tungstenite.rs @@ -51,7 +51,7 @@ impl Websocket for TungsteniteWebsocket { MaybeWithTimeout::new(try_next(&mut self.0)) } - async fn send_inner(&mut self, bytes: Bytes) -> anyhow::Result<()> { + async fn send(&mut self, bytes: Bytes) -> anyhow::Result<()> { self .0 .send(tungstenite::Message::Binary(bytes)) @@ -76,7 +76,7 @@ pub type InnerWebsocketSender = SplitSink< pub struct TungsteniteWebsocketSender(pub InnerWebsocketSender); impl WebsocketSender for TungsteniteWebsocketSender { - async fn send_inner(&mut self, bytes: Bytes) -> anyhow::Result<()> { + async fn send(&mut self, bytes: Bytes) -> anyhow::Result<()> { self .0 .send(tungstenite::Message::Binary(bytes)) @@ -149,7 +149,7 @@ impl WebsocketReceiver for TungsteniteWebsocketReceiver { self.cancel = Some(cancel); } - async fn recv_inner( + async fn recv( &mut self, ) -> anyhow::Result> { let fut = try_next(&mut self.receiver);