forked from github-starred/komodo
refactor Periphery onboarding
This commit is contained in:
@@ -57,7 +57,7 @@ pub async fn handler(
|
||||
);
|
||||
}
|
||||
|
||||
// Handle regular / creation flow.
|
||||
// Handle connection vs. onboarding flow.
|
||||
match Server::coll()
|
||||
.find_one(id_or_name_filter(&server_query))
|
||||
.await
|
||||
@@ -68,7 +68,7 @@ pub async fn handler(
|
||||
.await
|
||||
}
|
||||
None if ObjectId::from_str(&server_query).is_err() => {
|
||||
create_server_handler(server_query, identifiers, ws).await
|
||||
onboard_server_handler(server_query, identifiers, ws).await
|
||||
}
|
||||
None => Err(
|
||||
anyhow!("Must provide name based Server specifier for onboarding flow, name cannot be valid ObjectId (hex)")
|
||||
@@ -142,7 +142,7 @@ async fn existing_server_handler(
|
||||
}))
|
||||
}
|
||||
|
||||
async fn create_server_handler(
|
||||
async fn onboard_server_handler(
|
||||
server_query: String,
|
||||
identifiers: HeaderConnectionIdentifiers,
|
||||
ws: WebSocketUpgrade,
|
||||
@@ -174,7 +174,7 @@ async fn create_server_handler(
|
||||
}
|
||||
};
|
||||
|
||||
// TODO: Get actual public key from Periphery, maybe freshly generated
|
||||
// Post onboarding login 1: Receive public key
|
||||
let periphery_public_key = match socket
|
||||
.recv_bytes_with_timeout(Duration::from_secs(2))
|
||||
.await
|
||||
@@ -187,6 +187,7 @@ async fn create_server_handler(
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let config = ServerConfig {
|
||||
periphery_public_key,
|
||||
enabled: true,
|
||||
|
||||
@@ -9,8 +9,8 @@ use tokio_tungstenite::tungstenite;
|
||||
use transport::{
|
||||
MessageState,
|
||||
auth::{
|
||||
AddressConnectionIdentifiers, ClientLoginFlow, LoginFlow,
|
||||
LoginFlowArgs,
|
||||
AddressConnectionIdentifiers, ClientLoginFlow,
|
||||
ConnectionIdentifiers, LoginFlow, LoginFlowArgs,
|
||||
},
|
||||
fix_ws_address,
|
||||
websocket::{Websocket, tungstenite::TungsteniteWebsocket},
|
||||
@@ -35,8 +35,7 @@ pub async fn handler(
|
||||
|
||||
let mut already_logged_connection_error = false;
|
||||
let mut already_logged_login_error = false;
|
||||
let mut already_logged_creation_error = false;
|
||||
let mut already_logged_dne_error = false;
|
||||
let mut already_logged_onboarding_error = false;
|
||||
|
||||
let args = Arc::new(Args {
|
||||
core: identifiers.host().to_string(),
|
||||
@@ -58,7 +57,7 @@ pub async fn handler(
|
||||
// If error transitions from login to connection,
|
||||
// set to false to see login error after reconnect.
|
||||
already_logged_login_error = false;
|
||||
already_logged_creation_error = false;
|
||||
already_logged_onboarding_error = false;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
CONNECTION_RETRY_SECONDS,
|
||||
@@ -68,7 +67,7 @@ pub async fn handler(
|
||||
}
|
||||
};
|
||||
|
||||
// Receive whether to use Server connection flow vs Server creation flow.
|
||||
// Receive whether to use Server connection flow vs Server onboarding flow.
|
||||
|
||||
let flow_bytes = match socket
|
||||
.recv_bytes_with_timeout(Duration::from_secs(2))
|
||||
@@ -83,7 +82,7 @@ pub async fn handler(
|
||||
// If error transitions from login to connection,
|
||||
// set to false to see login error after reconnect.
|
||||
already_logged_login_error = false;
|
||||
already_logged_creation_error = false;
|
||||
already_logged_onboarding_error = false;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
CONNECTION_RETRY_SECONDS,
|
||||
@@ -130,14 +129,12 @@ pub async fn handler(
|
||||
}
|
||||
// Creation
|
||||
&[1] => {
|
||||
let Some(onboarding_key) =
|
||||
periphery_config().onboarding_key.as_deref()
|
||||
else {
|
||||
if !already_logged_dne_error {
|
||||
error!(
|
||||
"Server {connect_as} does not exist, and no PERIPHERY_ONBOARDING_KEY is provided."
|
||||
);
|
||||
already_logged_dne_error = true;
|
||||
if let Err(e) =
|
||||
handle_onboarding(connect_as, socket, identifiers).await
|
||||
{
|
||||
if !already_logged_onboarding_error {
|
||||
error!("{e:#}");
|
||||
already_logged_onboarding_error = true;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
CONNECTION_RETRY_SECONDS,
|
||||
@@ -145,74 +142,6 @@ pub async fn handler(
|
||||
.await;
|
||||
continue;
|
||||
};
|
||||
if let Err(e) = ClientLoginFlow::login(LoginFlowArgs {
|
||||
private_key: onboarding_key,
|
||||
identifiers,
|
||||
public_key_validator: CorePublicKeyValidator,
|
||||
socket: &mut socket,
|
||||
})
|
||||
.await
|
||||
{
|
||||
if !already_logged_creation_error {
|
||||
warn!("Failed to onboard Server | {e:#}");
|
||||
already_logged_creation_error = true;
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
CONNECTION_RETRY_SECONDS,
|
||||
))
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
|
||||
already_logged_creation_error = false;
|
||||
|
||||
if let Err(e) = socket
|
||||
.send(Bytes::from_static(periphery_public_key().as_bytes()))
|
||||
.await
|
||||
.context("Failed to send public key bytes")
|
||||
{
|
||||
warn!("Failed to onboard Server | {e:#}");
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
CONNECTION_RETRY_SECONDS,
|
||||
))
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
|
||||
let res = match socket
|
||||
.recv_bytes_with_timeout(Duration::from_secs(2))
|
||||
.await
|
||||
.context("Failed to receive Server creation result")
|
||||
{
|
||||
Ok(res) => res,
|
||||
Err(e) => {
|
||||
warn!("Failed to onboard Server | {e:#}");
|
||||
tokio::time::sleep(Duration::from_secs(
|
||||
CONNECTION_RETRY_SECONDS,
|
||||
))
|
||||
.await;
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
match res.last().map(|byte| MessageState::from_byte(*byte)) {
|
||||
Some(MessageState::Successful) => {
|
||||
info!(
|
||||
"Server onboarding flow for '{connect_as}' successful ✅"
|
||||
);
|
||||
}
|
||||
Some(MessageState::Failed) => {
|
||||
error!(
|
||||
"Server onboarding flow for '{connect_as}' failed | {:#}",
|
||||
deserialize_error_bytes(&res[..(res.len() - 1)])
|
||||
);
|
||||
}
|
||||
other => {
|
||||
warn!(
|
||||
"Got unrecognized onboarding flow response: {other:?}"
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
// Other (error)
|
||||
other => {
|
||||
@@ -232,6 +161,51 @@ pub async fn handler(
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_onboarding(
|
||||
connect_as: &str,
|
||||
mut socket: TungsteniteWebsocket,
|
||||
identifiers: ConnectionIdentifiers<'_>,
|
||||
) -> anyhow::Result<()> {
|
||||
let onboarding_key = periphery_config()
|
||||
.onboarding_key
|
||||
.as_deref()
|
||||
.with_context(|| format!("Server {connect_as} does not exist, and no PERIPHERY_ONBOARDING_KEY is provided."))?;
|
||||
|
||||
ClientLoginFlow::login(LoginFlowArgs {
|
||||
private_key: onboarding_key,
|
||||
identifiers,
|
||||
public_key_validator: CorePublicKeyValidator,
|
||||
socket: &mut socket,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Post onboarding login 1: Send public key
|
||||
socket
|
||||
.send(Bytes::from_static(periphery_public_key().as_bytes()))
|
||||
.await
|
||||
.context("Failed to send public key bytes")?;
|
||||
|
||||
let res = socket
|
||||
.recv_bytes_with_timeout(Duration::from_secs(2))
|
||||
.await
|
||||
.context("Failed to receive Server creation result")?;
|
||||
|
||||
match res.last().map(|byte| MessageState::from_byte(*byte)) {
|
||||
Some(MessageState::Successful) => {
|
||||
info!(
|
||||
"Server onboarding flow for '{connect_as}' successful ✅"
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Some(MessageState::Failed) => {
|
||||
Err(deserialize_error_bytes(&res[..(res.len() - 1)]))
|
||||
}
|
||||
other => Err(anyhow!(
|
||||
"Got unrecognized onboarding flow response: {other:?}"
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
async fn connect_websocket(
|
||||
url: &str,
|
||||
connect_as: &str,
|
||||
|
||||
Reference in New Issue
Block a user