From a491f206c94d64672b4225cf1b11102658eacb49 Mon Sep 17 00:00:00 2001 From: mbecker20 Date: Mon, 2 Jan 2023 07:16:55 +0000 Subject: [PATCH] create channel to handle stats stream in client --- Cargo.lock | 1 + lib/monitor_client/Cargo.toml | 1 + lib/monitor_client/src/server.rs | 28 ++++++++++++++++++++++++---- tests/src/tests.rs | 5 ++--- 4 files changed, 28 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7b91b82dd..a8aec2947 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1379,6 +1379,7 @@ dependencies = [ "serde_json", "tokio", "tokio-tungstenite 0.18.0", + "tokio-util", ] [[package]] diff --git a/lib/monitor_client/Cargo.toml b/lib/monitor_client/Cargo.toml index 5521773d8..7c094607f 100644 --- a/lib/monitor_client/Cargo.toml +++ b/lib/monitor_client/Cargo.toml @@ -14,6 +14,7 @@ monitor_types = { path = "../types" } reqwest = { version = "0.11", features = ["json"] } tokio-tungstenite = { version = "0.18", features=["native-tls"] } tokio = "1.23" +tokio-util = "0.7" anyhow = "1.0" serde = "1.0" serde_json = "1.0" diff --git a/lib/monitor_client/src/server.rs b/lib/monitor_client/src/server.rs index 056105c5c..fd83bbe98 100644 --- a/lib/monitor_client/src/server.rs +++ b/lib/monitor_client/src/server.rs @@ -5,8 +5,9 @@ use monitor_types::{ SystemStats, SystemStatsQuery, }; use serde_json::{json, Value}; -use tokio::net::TcpStream; -use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream}; +use tokio::{task::JoinHandle, sync::broadcast::{Receiver, self}}; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tokio_util::sync::CancellationToken; use crate::MonitorClient; @@ -95,7 +96,7 @@ impl MonitorClient { &self, server_id: &str, query: impl Into>, - ) -> anyhow::Result>> { + ) -> anyhow::Result<(Receiver, JoinHandle>, CancellationToken)> { let query = query.into().unwrap_or_default(); let endpoint = format!( "{}/ws/stats/{server_id}?networks={}&components={}&processes={}", @@ -109,7 +110,26 @@ impl MonitorClient { let msg = socket.next().await; if let Some(Ok(Message::Text(msg))) = &msg { if msg.as_str() == "LOGGED_IN" { - Ok(socket) + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + let (sender, receiver) = broadcast::channel(100); + let handle = tokio::spawn(async move { + loop { + let stats = tokio::select! { + _ = cancel_clone.cancelled() => { + let _ = socket.close(None).await; + break; + }, + stats = socket.next() => stats, + }; + if let Some(Ok(Message::Text(stats))) = stats { + let stats: SystemStats = serde_json::from_str(&stats).context("failed to parse msg as SystemStats")?; + sender.send(stats).context("failed to send stats through broadcast channel")?; + } + } + Ok(()) + }); + Ok((receiver, handle, cancel)) } else { Err(anyhow!("failed to log in")) } diff --git a/tests/src/tests.rs b/tests/src/tests.rs index 5da749f9c..a06117870 100644 --- a/tests/src/tests.rs +++ b/tests/src/tests.rs @@ -62,9 +62,8 @@ pub async fn subscribe_to_server_stats(monitor: &MonitorClient) -> anyhow::Resul .await .context("failed at list servers")?; let server = &servers.get(0).ok_or(anyhow!("no servers"))?.server; - let mut stats_stream = monitor.subscribe_to_stats_ws(&server.id, None).await?; - while let Some(Ok(Message::Text(msg))) = stats_stream.next().await { - let stats: SystemStats = serde_json::from_str(&msg)?; + let (mut recv, _, _) = monitor.subscribe_to_stats_ws(&server.id, None).await?; + while let Ok(stats) = recv.recv().await { println!("{stats:#?}"); } Ok(())