diff --git a/bin/core/src/connection/server.rs b/bin/core/src/connection/server.rs index fb15d6912..9edd10498 100644 --- a/bin/core/src/connection/server.rs +++ b/bin/core/src/connection/server.rs @@ -57,7 +57,7 @@ pub async fn handler( ); } - // Handle regular / creation flow. + // Handle connection vs. onboarding flow. match Server::coll() .find_one(id_or_name_filter(&server_query)) .await @@ -68,7 +68,7 @@ pub async fn handler( .await } None if ObjectId::from_str(&server_query).is_err() => { - create_server_handler(server_query, identifiers, ws).await + onboard_server_handler(server_query, identifiers, ws).await } None => Err( anyhow!("Must provide name based Server specifier for onboarding flow, name cannot be valid ObjectId (hex)") @@ -142,7 +142,7 @@ async fn existing_server_handler( })) } -async fn create_server_handler( +async fn onboard_server_handler( server_query: String, identifiers: HeaderConnectionIdentifiers, ws: WebSocketUpgrade, @@ -174,7 +174,7 @@ async fn create_server_handler( } }; - // TODO: Get actual public key from Periphery, maybe freshly generated + // Post onboarding login 1: Receive public key let periphery_public_key = match socket .recv_bytes_with_timeout(Duration::from_secs(2)) .await @@ -187,6 +187,7 @@ async fn create_server_handler( return; } }; + let config = ServerConfig { periphery_public_key, enabled: true, diff --git a/bin/periphery/src/connection/client.rs b/bin/periphery/src/connection/client.rs index 94a76580a..571fc870d 100644 --- a/bin/periphery/src/connection/client.rs +++ b/bin/periphery/src/connection/client.rs @@ -9,8 +9,8 @@ use tokio_tungstenite::tungstenite; use transport::{ MessageState, auth::{ - AddressConnectionIdentifiers, ClientLoginFlow, LoginFlow, - LoginFlowArgs, + AddressConnectionIdentifiers, ClientLoginFlow, + ConnectionIdentifiers, LoginFlow, LoginFlowArgs, }, fix_ws_address, websocket::{Websocket, tungstenite::TungsteniteWebsocket}, @@ -35,8 +35,7 @@ pub async fn handler( let mut already_logged_connection_error = false; let mut already_logged_login_error = false; - let mut already_logged_creation_error = false; - let mut already_logged_dne_error = false; + let mut already_logged_onboarding_error = false; let args = Arc::new(Args { core: identifiers.host().to_string(), @@ -58,7 +57,7 @@ pub async fn handler( // If error transitions from login to connection, // set to false to see login error after reconnect. already_logged_login_error = false; - already_logged_creation_error = false; + already_logged_onboarding_error = false; } tokio::time::sleep(Duration::from_secs( CONNECTION_RETRY_SECONDS, @@ -68,7 +67,7 @@ pub async fn handler( } }; - // Receive whether to use Server connection flow vs Server creation flow. + // Receive whether to use Server connection flow vs Server onboarding flow. let flow_bytes = match socket .recv_bytes_with_timeout(Duration::from_secs(2)) @@ -83,7 +82,7 @@ pub async fn handler( // If error transitions from login to connection, // set to false to see login error after reconnect. already_logged_login_error = false; - already_logged_creation_error = false; + already_logged_onboarding_error = false; } tokio::time::sleep(Duration::from_secs( CONNECTION_RETRY_SECONDS, @@ -130,14 +129,12 @@ pub async fn handler( } // Creation &[1] => { - let Some(onboarding_key) = - periphery_config().onboarding_key.as_deref() - else { - if !already_logged_dne_error { - error!( - "Server {connect_as} does not exist, and no PERIPHERY_ONBOARDING_KEY is provided." - ); - already_logged_dne_error = true; + if let Err(e) = + handle_onboarding(connect_as, socket, identifiers).await + { + if !already_logged_onboarding_error { + error!("{e:#}"); + already_logged_onboarding_error = true; } tokio::time::sleep(Duration::from_secs( CONNECTION_RETRY_SECONDS, @@ -145,74 +142,6 @@ pub async fn handler( .await; continue; }; - if let Err(e) = ClientLoginFlow::login(LoginFlowArgs { - private_key: onboarding_key, - identifiers, - public_key_validator: CorePublicKeyValidator, - socket: &mut socket, - }) - .await - { - if !already_logged_creation_error { - warn!("Failed to onboard Server | {e:#}"); - already_logged_creation_error = true; - } - tokio::time::sleep(Duration::from_secs( - CONNECTION_RETRY_SECONDS, - )) - .await; - continue; - } - - already_logged_creation_error = false; - - if let Err(e) = socket - .send(Bytes::from_static(periphery_public_key().as_bytes())) - .await - .context("Failed to send public key bytes") - { - warn!("Failed to onboard Server | {e:#}"); - tokio::time::sleep(Duration::from_secs( - CONNECTION_RETRY_SECONDS, - )) - .await; - continue; - } - - let res = match socket - .recv_bytes_with_timeout(Duration::from_secs(2)) - .await - .context("Failed to receive Server creation result") - { - Ok(res) => res, - Err(e) => { - warn!("Failed to onboard Server | {e:#}"); - tokio::time::sleep(Duration::from_secs( - CONNECTION_RETRY_SECONDS, - )) - .await; - continue; - } - }; - - match res.last().map(|byte| MessageState::from_byte(*byte)) { - Some(MessageState::Successful) => { - info!( - "Server onboarding flow for '{connect_as}' successful ✅" - ); - } - Some(MessageState::Failed) => { - error!( - "Server onboarding flow for '{connect_as}' failed | {:#}", - deserialize_error_bytes(&res[..(res.len() - 1)]) - ); - } - other => { - warn!( - "Got unrecognized onboarding flow response: {other:?}" - ) - } - } } // Other (error) other => { @@ -232,6 +161,51 @@ pub async fn handler( } } +async fn handle_onboarding( + connect_as: &str, + mut socket: TungsteniteWebsocket, + identifiers: ConnectionIdentifiers<'_>, +) -> anyhow::Result<()> { + let onboarding_key = periphery_config() + .onboarding_key + .as_deref() + .with_context(|| format!("Server {connect_as} does not exist, and no PERIPHERY_ONBOARDING_KEY is provided."))?; + + ClientLoginFlow::login(LoginFlowArgs { + private_key: onboarding_key, + identifiers, + public_key_validator: CorePublicKeyValidator, + socket: &mut socket, + }) + .await?; + + // Post onboarding login 1: Send public key + socket + .send(Bytes::from_static(periphery_public_key().as_bytes())) + .await + .context("Failed to send public key bytes")?; + + let res = socket + .recv_bytes_with_timeout(Duration::from_secs(2)) + .await + .context("Failed to receive Server creation result")?; + + match res.last().map(|byte| MessageState::from_byte(*byte)) { + Some(MessageState::Successful) => { + info!( + "Server onboarding flow for '{connect_as}' successful ✅" + ); + Ok(()) + } + Some(MessageState::Failed) => { + Err(deserialize_error_bytes(&res[..(res.len() - 1)])) + } + other => Err(anyhow!( + "Got unrecognized onboarding flow response: {other:?}" + )), + } +} + async fn connect_websocket( url: &str, connect_as: &str,