change update send channel to tokio::sync::broadcast

This commit is contained in:
mbecker20
2022-12-24 06:10:13 +00:00
parent 858cead89d
commit 6be463475c
10 changed files with 64 additions and 84 deletions

View File

@@ -2,10 +2,13 @@ use anyhow::{anyhow, Context};
use axum::{routing::post, Extension, Json, Router};
use helpers::handle_anyhow_error;
use mungos::{doc, Deserialize, Document, Serialize};
use types::{Build, Deployment, PermissionLevel, PermissionsTarget, Server, UpdateTarget, Operation, Log, monitor_timestamp, UpdateStatus, Update, Procedure};
use types::{
monitor_timestamp, Build, Deployment, Log, Operation, PermissionLevel, PermissionsTarget,
Procedure, Server, Update, UpdateStatus, UpdateTarget,
};
use typeshare::typeshare;
use crate::{auth::RequestUserExtension, state::StateExtension, response};
use crate::{auth::RequestUserExtension, response, state::StateExtension};
#[typeshare]
#[derive(Serialize, Deserialize)]
@@ -61,7 +64,10 @@ async fn update_permissions(
.find_one_by_id(&permission_update.user_id)
.await
.context("failed at find target user query")?
.ok_or(anyhow!("failed to find a user with id {}", permission_update.user_id))?;
.ok_or(anyhow!(
"failed to find a user with id {}",
permission_update.user_id
))?;
if !target_user.enabled {
return Err(anyhow!("target user not enabled"));
}
@@ -183,7 +189,9 @@ async fn update_permissions(
)
}
};
update.logs.push(Log::simple("modify permissions", log_text));
update
.logs
.push(Log::simple("modify permissions", log_text));
update.end_ts = Some(monitor_timestamp());
update.id = state.add_update(update.clone()).await?;
Ok(update)
@@ -211,16 +219,15 @@ async fn modify_user_enabled(
.users
.update_one::<Document>(&user_id, mungos::Update::Set(doc! { "enabled": enabled }))
.await?;
let update_type = if enabled {
"enabled"
} else {
"disabled"
};
let update_type = if enabled { "enabled" } else { "disabled" };
let ts = monitor_timestamp();
let mut update = Update {
target: UpdateTarget::System,
operation: Operation::ModifyUserEnabled,
logs: vec![Log::simple("modify user permissions", format!("{update_type} {} (id: {})", user.username, user.id))],
logs: vec![Log::simple(
"modify user permissions",
format!("{update_type} {} (id: {})", user.username, user.id),
)],
start_ts: ts.clone(),
end_ts: Some(ts),
status: UpdateStatus::Complete,

View File

@@ -385,14 +385,26 @@ impl State {
Ok(containers)
}
async fn get_github_accounts(&self, id: &str, user: &RequestUser) -> anyhow::Result<Vec<String>> {
let server = self.get_server_check_permissions(id, user, PermissionLevel::Read).await?;
async fn get_github_accounts(
&self,
id: &str,
user: &RequestUser,
) -> anyhow::Result<Vec<String>> {
let server = self
.get_server_check_permissions(id, user, PermissionLevel::Read)
.await?;
let github_accounts = self.periphery.get_github_accounts(&server).await?;
Ok(github_accounts)
}
async fn get_docker_accounts(&self, id: &str, user: &RequestUser) -> anyhow::Result<Vec<String>> {
let server = self.get_server_check_permissions(id, user, PermissionLevel::Read).await?;
async fn get_docker_accounts(
&self,
id: &str,
user: &RequestUser,
) -> anyhow::Result<Vec<String>> {
let server = self
.get_server_check_permissions(id, user, PermissionLevel::Read)
.await?;
let docker_accounts = self.periphery.get_docker_accounts(&server).await?;
Ok(docker_accounts)
}

View File

@@ -11,12 +11,11 @@ use axum::{
};
use db::DbClient;
use futures_util::{SinkExt, StreamExt};
use mungos::Serialize;
use serde_json::json;
use tokio::{
select,
sync::{
watch::{self, Receiver, Sender},
broadcast::{self, Receiver, Sender},
Mutex,
},
};
@@ -38,7 +37,7 @@ pub struct UpdateWsChannel {
impl UpdateWsChannel {
pub fn new() -> UpdateWsChannel {
let (sender, reciever) = watch::channel(Default::default());
let (sender, reciever) = broadcast::channel(16);
UpdateWsChannel {
sender: Mutex::new(sender),
reciever,
@@ -46,42 +45,12 @@ impl UpdateWsChannel {
}
}
#[derive(Serialize)]
struct UpdateMsg {
#[serde(rename = "type")]
msg_type: MsgType,
update: Update,
}
#[derive(Serialize)]
#[serde(rename_all = "UPPERCASE")]
enum MsgType {
// Login,
// Close,
Update,
}
impl UpdateMsg {
fn from_update(update: Update) -> Message {
let msg = UpdateMsg {
msg_type: MsgType::Update,
update,
};
Message::Text(serde_json::to_string(&msg).unwrap())
}
}
// pub fn make_update_ws_sender_reciver() -> (UpdateWsSenderExtension, UpdateWsRecieverExtension) {
// let (sender, reciever) = watch::channel(Default::default());
// (Extension(Arc::new(Mutex::new(sender))), Extension(reciever))
// }
pub async fn ws_handler(
Extension(jwt_client): JwtExtension,
Extension(state): StateExtension,
ws: WebSocketUpgrade,
) -> impl IntoResponse {
let mut reciever = state.update.reciever.clone();
let mut reciever = state.update.reciever.resubscribe();
ws.on_upgrade(|socket| async move {
let login_res = login(socket, &jwt_client, &state).await;
if login_res.is_none() {
@@ -90,14 +59,13 @@ pub async fn ws_handler(
let (socket, user_id) = login_res.unwrap();
let (ws_sender, mut ws_reciever) = socket.split();
let ws_sender = Arc::new(Mutex::new(ws_sender));
// let ws_sender_clone = ws_sender.clone();
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
loop {
select! {
let update = select! {
_ = cancel_clone.cancelled() => break,
_ = reciever.changed() => {}
update = reciever.recv() => {update.expect("failed to recv update msg")}
};
let user = state.db.users.find_one_by_id(&user_id).await;
if user.is_err()
@@ -113,13 +81,12 @@ pub async fn ws_handler(
return;
}
let user = user.unwrap().unwrap(); // already handle cases where this panics in the above early return
let update = reciever.borrow().to_owned();
match user_can_see_update(&user, &user_id, &update.target, &state.db).await {
Ok(_) => {
let _ = ws_sender
.lock()
.await
.send(UpdateMsg::from_update(update))
.send(Message::Text(serde_json::to_string(&update).unwrap()))
.await;
}
Err(_) => {
@@ -157,11 +124,7 @@ async fn login(
Ok(jwt) => match jwt {
Message::Text(jwt) => match jwt_client.auth_jwt_check_enabled(&jwt, state).await {
Ok(user) => {
let _ = socket
.send(Message::Text(
json!({ "type": "LOGIN", "success": true }).to_string(),
))
.await;
let _ = socket.send(Message::Text("LOGGED_IN".to_string())).await;
Some((socket, user.id))
}
Err(e) => {

View File

@@ -8,7 +8,7 @@ import {
useServerStats,
useUpdates,
} from "./hooks";
import socket from "./socket";
import connectToWs from "./ws";
import { useUser } from "./UserProvider";
export type State = {
@@ -21,7 +21,7 @@ export type State = {
const context = createContext<
State & {
ws: ReturnType<typeof socket>;
ws: ReturnType<typeof connectToWs>;
logout: () => void;
}
>();
@@ -37,7 +37,7 @@ export const AppStateProvider: ParentComponent = (p) => {
updates: useUpdates(),
};
const ws = socket(state);
const ws = connectToWs(state);
useWindowKeyDown((e) => {
if (e.key === "H" && e.shiftKey) {
@@ -63,7 +63,7 @@ export const AppStateProvider: ParentComponent = (p) => {
export function useAppState() {
return useContext(context) as State & {
ws: ReturnType<typeof socket>;
ws: ReturnType<typeof connectToWs>;
logout: () => void;
};
}

View File

@@ -4,7 +4,7 @@ import { createSignal } from "solid-js";
import ReconnectingWebSocket from "reconnecting-websocket";
import { Operation, Update, UpdateStatus, User } from "../types";
function socket(state: State) {
function connectToWs(state: State) {
const ws = new ReconnectingWebSocket(WS_URL);
const [isOpen, setOpen] = createSignal(false);
@@ -16,8 +16,8 @@ function socket(state: State) {
});
ws.addEventListener("message", ({ data }) => {
if (data === "PONG") {
// console.log("pong");
if (data === "LOGGED_IN") {
// console.log("logged in to ws");
return;
}
const update = JSON.parse(data) as Update;
@@ -133,4 +133,4 @@ async function handleMessage(
}
}
export default socket;
export default connectToWs;

View File

@@ -23,7 +23,11 @@ impl MonitorClient {
.await
}
pub async fn modify_user_enabled(&self, user_id: &str, enabled: bool) -> anyhow::Result<Update> {
pub async fn modify_user_enabled(
&self,
user_id: &str,
enabled: bool,
) -> anyhow::Result<Update> {
self.post(
"/api/permissions/update",
json!({

View File

@@ -33,10 +33,7 @@ impl MonitorClient {
.await
}
pub async fn get_server_github_accounts(
&self,
server_id: &str,
) -> anyhow::Result<Vec<String>> {
pub async fn get_server_github_accounts(&self, server_id: &str) -> anyhow::Result<Vec<String>> {
self.get(
&format!("/api/server/{server_id}/github_accounts"),
Option::<()>::None,
@@ -44,10 +41,7 @@ impl MonitorClient {
.await
}
pub async fn get_server_docker_accounts(
&self,
server_id: &str,
) -> anyhow::Result<Vec<String>> {
pub async fn get_server_docker_accounts(&self, server_id: &str) -> anyhow::Result<Vec<String>> {
self.get(
&format!("/api/server/{server_id}/docker_accounts"),
Option::<()>::None,

View File

@@ -641,7 +641,7 @@ fn default_repo_dir() -> String {
}
fn default_stats_refresh_interval() -> Timelength {
Timelength::FiveSeconds
Timelength::OneSecond
}
#[typeshare]
@@ -915,7 +915,7 @@ pub enum PermissionsTarget {
Server,
Deployment,
Build,
Procedure
Procedure,
}
#[typeshare]

View File

@@ -62,7 +62,7 @@ impl StatsClient {
mem_total_gb: self.sys.total_memory() as f64 / BYTES_PER_GB,
disk: self.get_disk_usage(),
networks: self.get_networks(),
polling_rate: self.polling_rate,
polling_rate: self.polling_rate,
}
}

View File

@@ -22,16 +22,16 @@ async fn main() -> anyhow::Result<()> {
// let server_stats = get_server_stats(&monitor).await?;
// println!("server stats:\n{server_stats:#?}\n");
// let (update, container) = deploy_mongo(&monitor).await?;
// println!(
// "mongo deploy update:\n{update:#?}\n\ncontainer: {:#?}\n",
// container.container
// );
let (update, container) = deploy_mongo(&monitor).await?;
println!(
"mongo deploy update:\n{update:#?}\n\ncontainer: {:#?}\n",
container.container
);
// let update = test_build(&monitor).await?;
// println!("build update:\n{update:#?}");
test_updates(&monitor).await.unwrap();
// test_updates(&monitor).await.unwrap();
let end_ts = unix_timestamp_ms();
let finished_in = (end_ts - start_ts) as f64 / 1000.0;