create channel to handle stats stream in client

This commit is contained in:
mbecker20
2023-01-02 07:16:55 +00:00
parent a6d655b42e
commit a491f206c9
4 changed files with 28 additions and 7 deletions

1
Cargo.lock generated
View File

@@ -1379,6 +1379,7 @@ dependencies = [
"serde_json",
"tokio",
"tokio-tungstenite 0.18.0",
"tokio-util",
]
[[package]]

View File

@@ -14,6 +14,7 @@ monitor_types = { path = "../types" }
reqwest = { version = "0.11", features = ["json"] }
tokio-tungstenite = { version = "0.18", features=["native-tls"] }
tokio = "1.23"
tokio-util = "0.7"
anyhow = "1.0"
serde = "1.0"
serde_json = "1.0"

View File

@@ -5,8 +5,9 @@ use monitor_types::{
SystemStats, SystemStatsQuery,
};
use serde_json::{json, Value};
use tokio::net::TcpStream;
use tokio_tungstenite::{connect_async, tungstenite::Message, MaybeTlsStream, WebSocketStream};
use tokio::{task::JoinHandle, sync::broadcast::{Receiver, self}};
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tokio_util::sync::CancellationToken;
use crate::MonitorClient;
@@ -95,7 +96,7 @@ impl MonitorClient {
&self,
server_id: &str,
query: impl Into<Option<SystemStatsQuery>>,
) -> anyhow::Result<WebSocketStream<MaybeTlsStream<TcpStream>>> {
) -> anyhow::Result<(Receiver<SystemStats>, JoinHandle<anyhow::Result<()>>, CancellationToken)> {
let query = query.into().unwrap_or_default();
let endpoint = format!(
"{}/ws/stats/{server_id}?networks={}&components={}&processes={}",
@@ -109,7 +110,26 @@ impl MonitorClient {
let msg = socket.next().await;
if let Some(Ok(Message::Text(msg))) = &msg {
if msg.as_str() == "LOGGED_IN" {
Ok(socket)
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let (sender, receiver) = broadcast::channel(100);
let handle = tokio::spawn(async move {
loop {
let stats = tokio::select! {
_ = cancel_clone.cancelled() => {
let _ = socket.close(None).await;
break;
},
stats = socket.next() => stats,
};
if let Some(Ok(Message::Text(stats))) = stats {
let stats: SystemStats = serde_json::from_str(&stats).context("failed to parse msg as SystemStats")?;
sender.send(stats).context("failed to send stats through broadcast channel")?;
}
}
Ok(())
});
Ok((receiver, handle, cancel))
} else {
Err(anyhow!("failed to log in"))
}

View File

@@ -62,9 +62,8 @@ pub async fn subscribe_to_server_stats(monitor: &MonitorClient) -> anyhow::Resul
.await
.context("failed at list servers")?;
let server = &servers.get(0).ok_or(anyhow!("no servers"))?.server;
let mut stats_stream = monitor.subscribe_to_stats_ws(&server.id, None).await?;
while let Some(Ok(Message::Text(msg))) = stats_stream.next().await {
let stats: SystemStats = serde_json::from_str(&msg)?;
let (mut recv, _, _) = monitor.subscribe_to_stats_ws(&server.id, None).await?;
while let Ok(stats) = recv.recv().await {
println!("{stats:#?}");
}
Ok(())