monitor client update ws subscription with reconnect

This commit is contained in:
mbecker20
2023-08-29 01:57:42 -04:00
parent 3da88135c2
commit 70a11de316
13 changed files with 438 additions and 188 deletions

160
Cargo.lock generated
View File

@@ -67,16 +67,15 @@ dependencies = [
[[package]]
name = "anstream"
version = "0.3.2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163"
checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is-terminal",
"utf8parse",
]
@@ -106,9 +105,9 @@ dependencies = [
[[package]]
name = "anstyle-wincon"
version = "1.0.1"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188"
checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd"
dependencies = [
"anstyle",
"windows-sys 0.48.0",
@@ -163,9 +162,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aws-config"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de3d533e0263bf453cc80af4c8bcc4d64e2aca293bd16f81633a36f1bf4a97cb"
checksum = "fc6b3804dca60326e07205179847f17a4fce45af3a1106939177ad41ac08a6de"
dependencies = [
"aws-credential-types",
"aws-http",
@@ -193,9 +192,9 @@ dependencies = [
[[package]]
name = "aws-credential-types"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4834ba01c5ad1ed9740aa222de62190e3c565d11ab7e72cc68314a258994567"
checksum = "70a66ac8ef5fa9cf01c2d999f39d16812e90ec1467bd382cbbb74ba23ea86201"
dependencies = [
"aws-smithy-async",
"aws-smithy-types",
@@ -207,9 +206,9 @@ dependencies = [
[[package]]
name = "aws-http"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72badf9de83cc7d66b21b004f09241836823b8302afb25a24708769e576a8d8f"
checksum = "3e626370f9ba806ae4c439e49675fd871f5767b093075cdf4fef16cac42ba900"
dependencies = [
"aws-credential-types",
"aws-smithy-http",
@@ -226,9 +225,9 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf832f522111225c02547e1e1c28137e840e4b082399d93a236e4b29193a4667"
checksum = "07ac5cf0ff19c1bca0cea7932e11b239d1025a45696a4f44f72ea86e2b8bdd07"
dependencies = [
"aws-credential-types",
"aws-http",
@@ -247,9 +246,9 @@ dependencies = [
[[package]]
name = "aws-sdk-ec2"
version = "0.29.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c58f098f12b70166afd023949291df62f7f716cb5866ac4256178cd3321d1b1b"
checksum = "dcb3e58549904567dcbfadb1e450b5a0f46f35300dd78c87a31ca4761d1481f6"
dependencies = [
"aws-credential-types",
"aws-http",
@@ -273,9 +272,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sso"
version = "0.29.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f41bf2c28d32dbb9894a8fcfcb148265d034d3f4a170552a47553a09de890895"
checksum = "903f888ff190e64f6f5c83fb0f8d54f9c20481f1dc26359bb8896f5d99908949"
dependencies = [
"aws-credential-types",
"aws-http",
@@ -297,9 +296,9 @@ dependencies = [
[[package]]
name = "aws-sdk-sts"
version = "0.29.0"
version = "0.30.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79e21aa1a5b0853969a1ef96ccfaa8ff5d57c761549786a4d5f86c1902b2586a"
checksum = "a47ad6bf01afc00423d781d464220bf69fb6a674ad6629cbbcb06d88cdc2be82"
dependencies = [
"aws-credential-types",
"aws-http",
@@ -321,9 +320,9 @@ dependencies = [
[[package]]
name = "aws-sigv4"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2cb40a93429794065f41f0581734fc56a345f6a38d8e2e3c25c7448d930cd132"
checksum = "b7b28f4910bb956b7ab320b62e98096402354eca976c587d1eeccd523d9bac03"
dependencies = [
"aws-smithy-http",
"form_urlencoded",
@@ -340,9 +339,9 @@ dependencies = [
[[package]]
name = "aws-smithy-async"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ee6d17d487c8b579423067718b3580c0908d0f01d7461813f94ec4323bad623"
checksum = "2cdb73f85528b9d19c23a496034ac53703955a59323d581c06aa27b4e4e247af"
dependencies = [
"futures-util",
"pin-project-lite",
@@ -352,9 +351,9 @@ dependencies = [
[[package]]
name = "aws-smithy-client"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bdbe0a3ad15283cc5f863a68cb6adc8e256e7c109c43c01bdd09be407219a1e9"
checksum = "c27b2756264c82f830a91cb4d2d485b2d19ad5bea476d9a966e03d27f27ba59a"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -376,9 +375,9 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34dc313472d727f5ef44fdda93e668ebfe17380c99dee512c403e3ca51863bb9"
checksum = "54cdcf365d8eee60686885f750a34c190e513677db58bbc466c44c588abf4199"
dependencies = [
"aws-smithy-types",
"bytes",
@@ -398,9 +397,9 @@ dependencies = [
[[package]]
name = "aws-smithy-http-tower"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1dd50fca5a4ea4ec3771689ee93bf06b32de02a80af01ed93a8f8a4ed90e8483"
checksum = "822de399d0ce62829a69dfa8c5cd08efdbe61a7426b953e2268f8b8b52a607bd"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
@@ -414,18 +413,18 @@ dependencies = [
[[package]]
name = "aws-smithy-json"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3591dd7c2fe01ab8025e4847a0a0f6d0c2b2269714688ffb856f9cf6c6d465cf"
checksum = "4fb1e7ab8fa7ad10c193af7ae56d2420989e9f4758bf03601a342573333ea34f"
dependencies = [
"aws-smithy-types",
]
[[package]]
name = "aws-smithy-query"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbabb1145e65dd57ae72d91a2619d3f5fba40b68a5f40ba009c30571dfd60aff"
checksum = "28556a3902091c1f768a34f6c998028921bdab8d47d92586f363f14a4a32d047"
dependencies = [
"aws-smithy-types",
"urlencoding",
@@ -433,9 +432,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3687fb838d4ad1c883b62eb59115bc9fb02c4f308aac49a7df89627067f6eb0d"
checksum = "745e096b3553e7e0f40622aa04971ce52765af82bebdeeac53aa6fc82fe801e6"
dependencies = [
"aws-smithy-async",
"aws-smithy-client",
@@ -455,9 +454,9 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime-api"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5cfbf1e5c2108b41f5ca607cde40dd5109fecc448f5d30c8e614b61f36dce704"
checksum = "93d0ae0c9cfd57944e9711ea610b48a963fb174a53aabacc08c5794a594b1d02"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@@ -470,9 +469,9 @@ dependencies = [
[[package]]
name = "aws-smithy-types"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eed0a94eefd845a2a78677f1b72f02fa75802d38f7f59be675add140279aa8bf"
checksum = "d90dbc8da2f6be461fa3c1906b20af8f79d14968fe47f2b7d29d086f62a51728"
dependencies = [
"base64-simd",
"itoa",
@@ -484,18 +483,18 @@ dependencies = [
[[package]]
name = "aws-smithy-xml"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c88052c812f696143ad7ba729c63535209ff0e0f49e31a6d2b1205208ea6ea79"
checksum = "e01d2dedcdd8023043716cfeeb3c6c59f2d447fce365d8e194838891794b23b6"
dependencies = [
"xmlparser",
]
[[package]]
name = "aws-types"
version = "0.56.0"
version = "0.56.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6bceb8cf724ad057ad7f327d0d256d7147b3eac777b39849a26189e003dc9782"
checksum = "85aa0451bf8af1bf22a4f028d5d28054507a14be43cb8ac0597a8471fba9edfe"
dependencies = [
"aws-credential-types",
"aws-smithy-async",
@@ -784,9 +783,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.3.23"
version = "4.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03aef18ddf7d879c15ce20f04826ef8418101c7e528014c3eeea13321047dca3"
checksum = "7c8d502cbaec4595d2e7d5f61e318f05417bd2b66fdc3809498f0d3fdf0bea27"
dependencies = [
"clap_builder",
"clap_derive",
@@ -795,9 +794,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.3.23"
version = "4.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8ce6fffb678c9b80a70b6b6de0aad31df727623a70fd9a842c30cd573e2fa98"
checksum = "5891c7bc0edb3e1c2204fc5e94009affabeb1821c9e5fdc3959536c5c0bb984d"
dependencies = [
"anstream",
"anstyle",
@@ -807,9 +806,9 @@ dependencies = [
[[package]]
name = "clap_derive"
version = "4.3.12"
version = "4.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050"
checksum = "c9fd1a5729c4548118d7d70ff234a44868d00489a4b6597b0b020918a0e91a1a"
dependencies = [
"heck",
"proc-macro2",
@@ -1659,7 +1658,7 @@ dependencies = [
"socket2 0.5.3",
"widestring",
"windows-sys 0.48.0",
"winreg 0.50.0",
"winreg",
]
[[package]]
@@ -1668,18 +1667,6 @@ version = "2.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f"
[[package]]
name = "is-terminal"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f"
dependencies = [
"hermit-abi 0.3.1",
"io-lifetimes",
"rustix",
"windows-sys 0.48.0",
]
[[package]]
name = "itoa"
version = "1.0.6"
@@ -1946,11 +1933,17 @@ version = "1.0.1"
dependencies = [
"anyhow",
"envy",
"futures",
"monitor_types",
"reqwest",
"resolver_api",
"serde",
"serde_json",
"serror",
"thiserror",
"tokio",
"tokio-tungstenite",
"tokio-util",
]
[[package]]
@@ -2447,9 +2440,9 @@ checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78"
[[package]]
name = "reqwest"
version = "0.11.18"
version = "0.11.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55"
checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1"
dependencies = [
"base64 0.21.2",
"bytes",
@@ -2479,7 +2472,7 @@ dependencies = [
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
"winreg 0.10.1",
"winreg",
]
[[package]]
@@ -2726,9 +2719,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]]
name = "serde"
version = "1.0.183"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c"
checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e"
dependencies = [
"serde_derive",
]
@@ -2753,9 +2746,9 @@ dependencies = [
[[package]]
name = "serde_derive"
version = "1.0.183"
version = "1.0.188"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816"
checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
@@ -3173,18 +3166,18 @@ dependencies = [
[[package]]
name = "thiserror"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac"
checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f"
dependencies = [
"thiserror-impl",
]
[[package]]
name = "thiserror-impl"
version = "1.0.40"
version = "1.0.47"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f"
checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b"
dependencies = [
"proc-macro2",
"quote",
@@ -3610,6 +3603,18 @@ version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a"
[[package]]
name = "update_logger"
version = "1.0.1"
dependencies = [
"anyhow",
"log",
"logger",
"monitor_client",
"termination_signal",
"tokio",
]
[[package]]
name = "url"
version = "2.4.0"
@@ -3954,15 +3959,6 @@ dependencies = [
"memchr",
]
[[package]]
name = "winreg"
version = "0.10.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d"
dependencies = [
"winapi",
]
[[package]]
name = "winreg"
version = "0.50.0"

View File

@@ -35,17 +35,19 @@ tokio = { version = "1.32.0", features = ["full"] }
axum = { version = "0.6.20", features = ["ws", "json", "headers"] }
tower = { version = "0.4.13", features = ["timeout"] }
tower-http = { version = "0.4.3", features = ["fs", "cors"] }
reqwest = { version = "0.11.18", features = ["json"] }
clap = { version = "4.3.21", features = ["derive"] }
reqwest = { version = "0.11.20", features = ["json"] }
clap = { version = "4.4.0", features = ["derive"] }
uuid = { version = "1.4.1", features = ["v4", "fast-rng", "serde"] }
serde = { version = "1.0.183", features = ["derive"] }
serde = { version = "1.0.188", features = ["derive"] }
serde_json = "1.0.105"
tokio-tungstenite = "0.20.0"
tokio-util = "0.7.8"
futures = "0.3.28"
futures-util = "0.3.28"
dotenv = "0.15.0"
envy = "0.4.2"
anyhow = "1.0.75"
thiserror = "1.0.47"
log = "0.4.20"
simple_logger = "4.2.0"
bollard = "0.14.0"
@@ -62,8 +64,8 @@ hmac = "0.12.1"
sha2 = "0.10.7"
bcrypt = "0.15.0"
hex = "0.4.3"
aws-config = "0.56.0"
aws-sdk-ec2 = "0.29.0"
aws-config = "0.56.1"
aws-sdk-ec2 = "0.30.0"
proc-macro2 = "1.0.66"
quote = "1.0.33"
syn = "2.0.29"

View File

@@ -13,10 +13,7 @@ async fn app() -> anyhow::Result<()> {
let socket_addr = SocketAddr::from_str("0.0.0.0:7000").unwrap();
info!(
"v {} | {socket_addr}",
env!("CARGO_PKG_VERSION")
);
info!("v {} | {socket_addr}", env!("CARGO_PKG_VERSION"));
let app = Router::new().route(
"/",

View File

@@ -1,10 +1,7 @@
use std::time::Instant;
use async_trait::async_trait;
use axum::{
headers::ContentType, middleware, routing::post, Extension, Json, Router,
TypedHeader,
};
use axum::{headers::ContentType, middleware, routing::post, Extension, Json, Router, TypedHeader};
use monitor_types::requests::read::*;
use resolver_api::{derive::Resolver, Resolve, ResolveToString, Resolver};
use serde::{Deserialize, Serialize};

View File

@@ -48,6 +48,7 @@ async fn ws_handler(state: StateExtension, ws: WebSocketUpgrade) -> impl IntoRes
let (mut ws_sender, mut ws_reciever) = socket.split();
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
tokio::spawn(async move {
loop {
let update = select! {
@@ -69,10 +70,7 @@ async fn ws_handler(state: StateExtension, ws: WebSocketUpgrade) -> impl IntoRes
let res = state
.user_can_see_update(&user, &user.id, &update.target)
.await;
if let Err(_e) = res {
// handle
return;
} else {
if res.is_ok() {
let _ = ws_sender
.send(Message::Text(serde_json::to_string(&update).unwrap()))
.await;

View File

@@ -1,6 +1,6 @@
use anyhow::anyhow;
use async_timing_util::unix_timestamp_ms;
use axum::{http::StatusCode, TypedHeader, headers::ContentType};
use axum::{headers::ContentType, http::StatusCode, TypedHeader};
use monitor_types::entities::update::Log;
use run_command::{async_run_command, CommandOutput};
use serror::serialize_error_pretty;

View File

@@ -0,0 +1,19 @@
[package]
name = "update_logger"
version.workspace = true
edition.workspace = true
authors.workspace = true
license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# local
monitor_client.workspace = true
logger.workspace = true
# mogh
termination_signal.workspace = true
# external
tokio.workspace = true
log.workspace = true
anyhow.workspace = true

View File

@@ -0,0 +1,40 @@
#[macro_use]
extern crate log;
use monitor_client::MonitorClient;
use termination_signal::tokio::immediate_term_handle;
async fn app() -> anyhow::Result<()> {
logger::init(log::LevelFilter::Info)?;
info!("v {}", env!("CARGO_PKG_VERSION"));
let monitor = MonitorClient::new_from_env().await?;
let (mut rx, _) = monitor.subscribe_to_updates(1000);
loop {
let msg = rx.recv().await;
if let Err(e) = msg {
error!("🚨 recv error | {e:#?}");
break;
}
info!("{msg:#?}")
}
Ok(())
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let term_signal = immediate_term_handle()?;
let app = tokio::spawn(app());
tokio::select! {
res = app => return res?,
_ = term_signal => {},
}
Ok(())
}

View File

@@ -8,10 +8,19 @@ license.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
# local
monitor_types.workspace = true
# mogh
serror.workspace = true
resolver_api.workspace = true
# external
reqwest.workspace = true
anyhow.workspace = true
thiserror.workspace = true
serde.workspace = true
serde_json.workspace = true
resolver_api.workspace = true
envy.workspace = true
envy.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tokio-tungstenite.workspace = true
futures.workspace = true

View File

@@ -2,10 +2,10 @@ use anyhow::{anyhow, Context};
use monitor_types::requests::auth::{
self, CreateLocalUserResponse, LoginLocalUserResponse, LoginWithSecretResponse,
};
use reqwest::StatusCode;
use resolver_api::HasResponse;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::json;
use serde::Deserialize;
mod request;
mod subscribe;
#[derive(Deserialize)]
struct MonitorEnv {
@@ -16,10 +16,18 @@ struct MonitorEnv {
monitor_secret: Option<String>,
}
#[derive(Clone)]
pub struct MonitorClient {
reqwest: reqwest::Client,
address: String,
jwt: String,
creds: Option<RefreshTokenCreds>,
}
#[derive(Clone)]
struct RefreshTokenCreds {
username: String,
secret: String,
}
impl MonitorClient {
@@ -28,6 +36,7 @@ impl MonitorClient {
reqwest: Default::default(),
address: address.into(),
jwt: token.into(),
creds: None,
}
}
@@ -40,6 +49,7 @@ impl MonitorClient {
reqwest: Default::default(),
address: address.into(),
jwt: Default::default(),
creds: None,
};
let LoginLocalUserResponse { jwt } = client
@@ -63,6 +73,7 @@ impl MonitorClient {
reqwest: Default::default(),
address: address.into(),
jwt: Default::default(),
creds: None,
};
let CreateLocalUserResponse { jwt } = client
@@ -86,16 +97,14 @@ impl MonitorClient {
reqwest: Default::default(),
address: address.into(),
jwt: Default::default(),
};
let LoginWithSecretResponse { jwt } = client
.auth(auth::LoginWithSecret {
creds: RefreshTokenCreds {
username: username.into(),
secret: secret.into(),
})
.await?;
}
.into(),
};
client.jwt = jwt;
client.refresh_jwt().await?;
Ok(client)
}
@@ -120,80 +129,24 @@ impl MonitorClient {
}
}
pub async fn auth<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/auth",
json!({
"type": req_type,
"params": request
}),
)
.await
}
pub async fn read<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/read",
json!({
"type": req_type,
"params": request
}),
)
.await
}
pub async fn write<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/write",
json!({
"type": req_type,
"params": request
}),
)
.await
}
pub async fn execute<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/execute",
json!({
"type": req_type,
"params": request
}),
)
.await
}
async fn post<B: Serialize, R: DeserializeOwned>(
&self,
endpoint: &str,
body: impl Into<Option<B>>,
) -> anyhow::Result<R> {
let req = self
.reqwest
.post(format!("{}{endpoint}", self.address))
.header("Authorization", format!("Bearer {}", self.jwt));
let req = if let Some(body) = body.into() {
req.header("Content-Type", "application/json").json(&body)
} else {
req
};
let res = req.send().await.context("failed to reach monitor api")?;
let status = res.status();
if status == StatusCode::OK {
match res.json().await {
Ok(res) => Ok(res),
Err(e) => Err(anyhow!("{status}: {e:#?}")),
}
} else {
match res.text().await {
Ok(res) => Err(anyhow!("{status}: {res}")),
Err(e) => Err(anyhow!("{status}: {e:#?}")),
}
pub async fn refresh_jwt(&mut self) -> anyhow::Result<()> {
if self.creds.is_none() {
return Err(anyhow!(
"only clients initialized using the secret login method can refresh their jwt"
));
}
let creds = self.creds.clone().unwrap();
let LoginWithSecretResponse { jwt } = self
.auth(auth::LoginWithSecret {
username: creds.username,
secret: creds.secret,
})
.await?;
self.jwt = jwt;
Ok(())
}
}

87
client/rs/src/request.rs Normal file
View File

@@ -0,0 +1,87 @@
use anyhow::{anyhow, Context};
use reqwest::StatusCode;
use resolver_api::HasResponse;
use serde::{de::DeserializeOwned, Serialize};
use serde_json::json;
use serror::deserialize_error;
use crate::MonitorClient;
impl MonitorClient {
pub async fn auth<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/auth",
json!({
"type": req_type,
"params": request
}),
)
.await
}
pub async fn read<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/read",
json!({
"type": req_type,
"params": request
}),
)
.await
}
pub async fn write<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/write",
json!({
"type": req_type,
"params": request
}),
)
.await
}
pub async fn execute<T: HasResponse>(&self, request: T) -> anyhow::Result<T::Response> {
let req_type = T::req_type();
self.post(
"/execute",
json!({
"type": req_type,
"params": request
}),
)
.await
}
async fn post<B: Serialize, R: DeserializeOwned>(
&self,
endpoint: &str,
body: impl Into<Option<B>>,
) -> anyhow::Result<R> {
let req = self
.reqwest
.post(format!("{}{endpoint}", self.address))
.header("Authorization", format!("Bearer {}", self.jwt));
let req = if let Some(body) = body.into() {
req.header("Content-Type", "application/json").json(&body)
} else {
req
};
let res = req.send().await.context("failed to reach monitor api")?;
let status = res.status();
if status == StatusCode::OK {
match res.json().await {
Ok(res) => Ok(res),
Err(e) => Err(anyhow!("{status} | {e:#?}")),
}
} else {
match res.text().await {
Ok(res) => Err(deserialize_error(res).context(status)),
Err(e) => Err(anyhow!("{status} | {e:#?}")),
}
}
}
}

152
client/rs/src/subscribe.rs Normal file
View File

@@ -0,0 +1,152 @@
use anyhow::Context;
use futures::{SinkExt, TryStreamExt};
use monitor_types::entities::update::UpdateListItem;
use serror::serialize_error_pretty;
use thiserror::Error;
use tokio::sync::broadcast;
use tokio_tungstenite::{connect_async, tungstenite::Message};
use tokio_util::sync::CancellationToken;
use crate::MonitorClient;
#[derive(Debug, Clone)]
pub enum UpdateWsMessage {
Update(UpdateListItem),
Error(UpdateWsError),
Disconnected,
Reconnected,
}
#[derive(Error, Debug, Clone)]
pub enum UpdateWsError {
#[error("failed to connect | {0}")]
ConnectionError(String),
#[error("failed to login | {0}")]
LoginError(String),
#[error("failed to recieve message | {0}")]
MessageError(String),
#[error("did not recognize message | {0}")]
MessageUnrecognized(String),
}
impl MonitorClient {
pub fn subscribe_to_updates(
&self,
capacity: usize,
) -> (broadcast::Receiver<UpdateWsMessage>, CancellationToken) {
let (tx, rx) = broadcast::channel(capacity);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let address = format!("{}/ws/update", self.address.replacen("http", "ws", 1));
let mut client = self.clone();
tokio::spawn(async move {
loop {
if cancel.is_cancelled() {
break;
}
if client.creds.is_some() {
let res = client.refresh_jwt().await;
if let Err(e) = res {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError(
serialize_error_pretty(e),
)));
}
}
let res = connect_async(&address)
.await
.context("failed to connect to websocket endpoint");
if let Err(e) = res {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::ConnectionError(
serialize_error_pretty(e),
)));
return;
}
let (mut ws, _) = res.unwrap();
// ==================
// SEND LOGIN MSG
// ==================
let login_send_res = ws
.send(Message::Text(client.jwt.clone()))
.await
.context("failed to send login message");
if let Err(e) = login_send_res {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError(
serialize_error_pretty(e),
)));
return;
}
// ==================
// HANDLE LOGIN RES
// ==================
match ws.try_next().await {
Ok(Some(Message::Text(msg))) => {
if msg != "LOGGED_IN" {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError(msg)));
return;
}
}
Ok(Some(msg)) => {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError(
format!("{msg:#?}"),
)));
return;
}
Ok(None) => {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError(
String::from("got None message after login message"),
)));
return;
}
Err(e) => {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError(
format!("failed to recieve message | {e:#?}"),
)));
return;
}
}
// ==================
// HANLDE MSGS
// ==================
loop {
match ws.try_next().await.context("failed to recieve message") {
Ok(Some(Message::Text(msg))) => {
match serde_json::from_str::<UpdateListItem>(&msg) {
Ok(msg) => {
let _ = tx.send(UpdateWsMessage::Update(msg));
}
Err(_) => {
let _ = tx.send(UpdateWsMessage::Error(
UpdateWsError::MessageUnrecognized(msg),
));
}
}
}
Ok(Some(Message::Close(_))) => {
let _ = tx.send(UpdateWsMessage::Disconnected);
break;
}
Err(e) => {
let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::MessageError(
serialize_error_pretty(e),
)));
let _ = tx.send(UpdateWsMessage::Disconnected);
break;
}
Ok(_) => {
// ignore
}
}
}
}
});
(rx, cancel_clone)
}
}

View File

@@ -2,7 +2,7 @@ use resolver_api::derive::Request;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::{entities::alert::Alert, I64, U64, MongoDocument};
use crate::{entities::alert::Alert, MongoDocument, I64, U64};
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone, Request)]