diff --git a/Cargo.lock b/Cargo.lock index e23b96b0b..1895febdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -67,16 +67,15 @@ dependencies = [ [[package]] name = "anstream" -version = "0.3.2" +version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ca84f3628370c59db74ee214b3263d58f9aadd9b4fe7e711fd87dc452b7f163" +checksum = "b1f58811cfac344940f1a400b6e6231ce35171f614f26439e80f8c1465c5cc0c" dependencies = [ "anstyle", "anstyle-parse", "anstyle-query", "anstyle-wincon", "colorchoice", - "is-terminal", "utf8parse", ] @@ -106,9 +105,9 @@ dependencies = [ [[package]] name = "anstyle-wincon" -version = "1.0.1" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180abfa45703aebe0093f79badacc01b8fd4ea2e35118747e5811127f926e188" +checksum = "58f54d10c6dfa51283a066ceab3ec1ab78d13fae00aa49243a45e4571fb79dfd" dependencies = [ "anstyle", "windows-sys 0.48.0", @@ -163,9 +162,9 @@ checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" [[package]] name = "aws-config" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "de3d533e0263bf453cc80af4c8bcc4d64e2aca293bd16f81633a36f1bf4a97cb" +checksum = "fc6b3804dca60326e07205179847f17a4fce45af3a1106939177ad41ac08a6de" dependencies = [ "aws-credential-types", "aws-http", @@ -193,9 +192,9 @@ dependencies = [ [[package]] name = "aws-credential-types" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4834ba01c5ad1ed9740aa222de62190e3c565d11ab7e72cc68314a258994567" +checksum = "70a66ac8ef5fa9cf01c2d999f39d16812e90ec1467bd382cbbb74ba23ea86201" dependencies = [ "aws-smithy-async", "aws-smithy-types", @@ -207,9 +206,9 @@ dependencies = [ [[package]] name = "aws-http" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "72badf9de83cc7d66b21b004f09241836823b8302afb25a24708769e576a8d8f" +checksum = "3e626370f9ba806ae4c439e49675fd871f5767b093075cdf4fef16cac42ba900" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -226,9 +225,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf832f522111225c02547e1e1c28137e840e4b082399d93a236e4b29193a4667" +checksum = "07ac5cf0ff19c1bca0cea7932e11b239d1025a45696a4f44f72ea86e2b8bdd07" dependencies = [ "aws-credential-types", "aws-http", @@ -247,9 +246,9 @@ dependencies = [ [[package]] name = "aws-sdk-ec2" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c58f098f12b70166afd023949291df62f7f716cb5866ac4256178cd3321d1b1b" +checksum = "dcb3e58549904567dcbfadb1e450b5a0f46f35300dd78c87a31ca4761d1481f6" dependencies = [ "aws-credential-types", "aws-http", @@ -273,9 +272,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f41bf2c28d32dbb9894a8fcfcb148265d034d3f4a170552a47553a09de890895" +checksum = "903f888ff190e64f6f5c83fb0f8d54f9c20481f1dc26359bb8896f5d99908949" dependencies = [ "aws-credential-types", "aws-http", @@ -297,9 +296,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "0.29.0" +version = "0.30.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79e21aa1a5b0853969a1ef96ccfaa8ff5d57c761549786a4d5f86c1902b2586a" +checksum = "a47ad6bf01afc00423d781d464220bf69fb6a674ad6629cbbcb06d88cdc2be82" dependencies = [ "aws-credential-types", "aws-http", @@ -321,9 +320,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2cb40a93429794065f41f0581734fc56a345f6a38d8e2e3c25c7448d930cd132" +checksum = "b7b28f4910bb956b7ab320b62e98096402354eca976c587d1eeccd523d9bac03" dependencies = [ "aws-smithy-http", "form_urlencoded", @@ -340,9 +339,9 @@ dependencies = [ [[package]] name = "aws-smithy-async" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ee6d17d487c8b579423067718b3580c0908d0f01d7461813f94ec4323bad623" +checksum = "2cdb73f85528b9d19c23a496034ac53703955a59323d581c06aa27b4e4e247af" dependencies = [ "futures-util", "pin-project-lite", @@ -352,9 +351,9 @@ dependencies = [ [[package]] name = "aws-smithy-client" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bdbe0a3ad15283cc5f863a68cb6adc8e256e7c109c43c01bdd09be407219a1e9" +checksum = "c27b2756264c82f830a91cb4d2d485b2d19ad5bea476d9a966e03d27f27ba59a" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -376,9 +375,9 @@ dependencies = [ [[package]] name = "aws-smithy-http" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "34dc313472d727f5ef44fdda93e668ebfe17380c99dee512c403e3ca51863bb9" +checksum = "54cdcf365d8eee60686885f750a34c190e513677db58bbc466c44c588abf4199" dependencies = [ "aws-smithy-types", "bytes", @@ -398,9 +397,9 @@ dependencies = [ [[package]] name = "aws-smithy-http-tower" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1dd50fca5a4ea4ec3771689ee93bf06b32de02a80af01ed93a8f8a4ed90e8483" +checksum = "822de399d0ce62829a69dfa8c5cd08efdbe61a7426b953e2268f8b8b52a607bd" dependencies = [ "aws-smithy-http", "aws-smithy-types", @@ -414,18 +413,18 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3591dd7c2fe01ab8025e4847a0a0f6d0c2b2269714688ffb856f9cf6c6d465cf" +checksum = "4fb1e7ab8fa7ad10c193af7ae56d2420989e9f4758bf03601a342573333ea34f" dependencies = [ "aws-smithy-types", ] [[package]] name = "aws-smithy-query" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dbabb1145e65dd57ae72d91a2619d3f5fba40b68a5f40ba009c30571dfd60aff" +checksum = "28556a3902091c1f768a34f6c998028921bdab8d47d92586f363f14a4a32d047" dependencies = [ "aws-smithy-types", "urlencoding", @@ -433,9 +432,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3687fb838d4ad1c883b62eb59115bc9fb02c4f308aac49a7df89627067f6eb0d" +checksum = "745e096b3553e7e0f40622aa04971ce52765af82bebdeeac53aa6fc82fe801e6" dependencies = [ "aws-smithy-async", "aws-smithy-client", @@ -455,9 +454,9 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5cfbf1e5c2108b41f5ca607cde40dd5109fecc448f5d30c8e614b61f36dce704" +checksum = "93d0ae0c9cfd57944e9711ea610b48a963fb174a53aabacc08c5794a594b1d02" dependencies = [ "aws-smithy-async", "aws-smithy-http", @@ -470,9 +469,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eed0a94eefd845a2a78677f1b72f02fa75802d38f7f59be675add140279aa8bf" +checksum = "d90dbc8da2f6be461fa3c1906b20af8f79d14968fe47f2b7d29d086f62a51728" dependencies = [ "base64-simd", "itoa", @@ -484,18 +483,18 @@ dependencies = [ [[package]] name = "aws-smithy-xml" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c88052c812f696143ad7ba729c63535209ff0e0f49e31a6d2b1205208ea6ea79" +checksum = "e01d2dedcdd8023043716cfeeb3c6c59f2d447fce365d8e194838891794b23b6" dependencies = [ "xmlparser", ] [[package]] name = "aws-types" -version = "0.56.0" +version = "0.56.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bceb8cf724ad057ad7f327d0d256d7147b3eac777b39849a26189e003dc9782" +checksum = "85aa0451bf8af1bf22a4f028d5d28054507a14be43cb8ac0597a8471fba9edfe" dependencies = [ "aws-credential-types", "aws-smithy-async", @@ -784,9 +783,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.3.23" +version = "4.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03aef18ddf7d879c15ce20f04826ef8418101c7e528014c3eeea13321047dca3" +checksum = "7c8d502cbaec4595d2e7d5f61e318f05417bd2b66fdc3809498f0d3fdf0bea27" dependencies = [ "clap_builder", "clap_derive", @@ -795,9 +794,9 @@ dependencies = [ [[package]] name = "clap_builder" -version = "4.3.23" +version = "4.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8ce6fffb678c9b80a70b6b6de0aad31df727623a70fd9a842c30cd573e2fa98" +checksum = "5891c7bc0edb3e1c2204fc5e94009affabeb1821c9e5fdc3959536c5c0bb984d" dependencies = [ "anstream", "anstyle", @@ -807,9 +806,9 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.3.12" +version = "4.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "54a9bb5758fc5dfe728d1019941681eccaf0cf8a4189b692a0ee2f2ecf90a050" +checksum = "c9fd1a5729c4548118d7d70ff234a44868d00489a4b6597b0b020918a0e91a1a" dependencies = [ "heck", "proc-macro2", @@ -1659,7 +1658,7 @@ dependencies = [ "socket2 0.5.3", "widestring", "windows-sys 0.48.0", - "winreg 0.50.0", + "winreg", ] [[package]] @@ -1668,18 +1667,6 @@ version = "2.7.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "12b6ee2129af8d4fb011108c73d99a1b83a85977f23b82460c0ae2e25bb4b57f" -[[package]] -name = "is-terminal" -version = "0.4.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "adcf93614601c8129ddf72e2d5633df827ba6551541c6d8c59520a371475be1f" -dependencies = [ - "hermit-abi 0.3.1", - "io-lifetimes", - "rustix", - "windows-sys 0.48.0", -] - [[package]] name = "itoa" version = "1.0.6" @@ -1946,11 +1933,17 @@ version = "1.0.1" dependencies = [ "anyhow", "envy", + "futures", "monitor_types", "reqwest", "resolver_api", "serde", "serde_json", + "serror", + "thiserror", + "tokio", + "tokio-tungstenite", + "tokio-util", ] [[package]] @@ -2447,9 +2440,9 @@ checksum = "436b050e76ed2903236f032a59761c1eb99e1b0aead2c257922771dab1fc8c78" [[package]] name = "reqwest" -version = "0.11.18" +version = "0.11.20" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" +checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" dependencies = [ "base64 0.21.2", "bytes", @@ -2479,7 +2472,7 @@ dependencies = [ "wasm-bindgen", "wasm-bindgen-futures", "web-sys", - "winreg 0.10.1", + "winreg", ] [[package]] @@ -2726,9 +2719,9 @@ checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" [[package]] name = "serde" -version = "1.0.183" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32ac8da02677876d532745a130fc9d8e6edfa81a269b107c5b00829b91d8eb3c" +checksum = "cf9e0fcba69a370eed61bcf2b728575f726b50b55cba78064753d708ddc7549e" dependencies = [ "serde_derive", ] @@ -2753,9 +2746,9 @@ dependencies = [ [[package]] name = "serde_derive" -version = "1.0.183" +version = "1.0.188" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" +checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2" dependencies = [ "proc-macro2", "quote", @@ -3173,18 +3166,18 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "978c9a314bd8dc99be594bc3c175faaa9794be04a5a5e153caba6915336cebac" +checksum = "97a802ec30afc17eee47b2855fc72e0c4cd62be9b4efe6591edde0ec5bd68d8f" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.40" +version = "1.0.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9456a42c5b0d803c8cd86e73dd7cc9edd429499f37a3550d286d5e86720569f" +checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", @@ -3610,6 +3603,18 @@ version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a156c684c91ea7d62626509bce3cb4e1d9ed5c4d978f7b4352658f96a4c26b4a" +[[package]] +name = "update_logger" +version = "1.0.1" +dependencies = [ + "anyhow", + "log", + "logger", + "monitor_client", + "termination_signal", + "tokio", +] + [[package]] name = "url" version = "2.4.0" @@ -3954,15 +3959,6 @@ dependencies = [ "memchr", ] -[[package]] -name = "winreg" -version = "0.10.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "80d0f4e272c85def139476380b12f9ac60926689dd2e01d4923222f40580869d" -dependencies = [ - "winapi", -] - [[package]] name = "winreg" version = "0.50.0" diff --git a/Cargo.toml b/Cargo.toml index d9844cd64..b21a4ff66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,17 +35,19 @@ tokio = { version = "1.32.0", features = ["full"] } axum = { version = "0.6.20", features = ["ws", "json", "headers"] } tower = { version = "0.4.13", features = ["timeout"] } tower-http = { version = "0.4.3", features = ["fs", "cors"] } -reqwest = { version = "0.11.18", features = ["json"] } -clap = { version = "4.3.21", features = ["derive"] } +reqwest = { version = "0.11.20", features = ["json"] } +clap = { version = "4.4.0", features = ["derive"] } uuid = { version = "1.4.1", features = ["v4", "fast-rng", "serde"] } -serde = { version = "1.0.183", features = ["derive"] } +serde = { version = "1.0.188", features = ["derive"] } serde_json = "1.0.105" +tokio-tungstenite = "0.20.0" tokio-util = "0.7.8" futures = "0.3.28" futures-util = "0.3.28" dotenv = "0.15.0" envy = "0.4.2" anyhow = "1.0.75" +thiserror = "1.0.47" log = "0.4.20" simple_logger = "4.2.0" bollard = "0.14.0" @@ -62,8 +64,8 @@ hmac = "0.12.1" sha2 = "0.10.7" bcrypt = "0.15.0" hex = "0.4.3" -aws-config = "0.56.0" -aws-sdk-ec2 = "0.29.0" +aws-config = "0.56.1" +aws-sdk-ec2 = "0.30.0" proc-macro2 = "1.0.66" quote = "1.0.33" syn = "2.0.29" diff --git a/bin/alert_logger/src/main.rs b/bin/alert_logger/src/main.rs index 1ae3a9d20..aca786b59 100644 --- a/bin/alert_logger/src/main.rs +++ b/bin/alert_logger/src/main.rs @@ -13,10 +13,7 @@ async fn app() -> anyhow::Result<()> { let socket_addr = SocketAddr::from_str("0.0.0.0:7000").unwrap(); - info!( - "v {} | {socket_addr}", - env!("CARGO_PKG_VERSION") - ); + info!("v {} | {socket_addr}", env!("CARGO_PKG_VERSION")); let app = Router::new().route( "/", diff --git a/bin/core/src/requests/read/mod.rs b/bin/core/src/requests/read/mod.rs index de51d29a2..07412026f 100644 --- a/bin/core/src/requests/read/mod.rs +++ b/bin/core/src/requests/read/mod.rs @@ -1,10 +1,7 @@ use std::time::Instant; use async_trait::async_trait; -use axum::{ - headers::ContentType, middleware, routing::post, Extension, Json, Router, - TypedHeader, -}; +use axum::{headers::ContentType, middleware, routing::post, Extension, Json, Router, TypedHeader}; use monitor_types::requests::read::*; use resolver_api::{derive::Resolver, Resolve, ResolveToString, Resolver}; use serde::{Deserialize, Serialize}; diff --git a/bin/core/src/ws.rs b/bin/core/src/ws.rs index bb019cc22..113085a1a 100644 --- a/bin/core/src/ws.rs +++ b/bin/core/src/ws.rs @@ -48,6 +48,7 @@ async fn ws_handler(state: StateExtension, ws: WebSocketUpgrade) -> impl IntoRes let (mut ws_sender, mut ws_reciever) = socket.split(); let cancel = CancellationToken::new(); let cancel_clone = cancel.clone(); + tokio::spawn(async move { loop { let update = select! { @@ -69,10 +70,7 @@ async fn ws_handler(state: StateExtension, ws: WebSocketUpgrade) -> impl IntoRes let res = state .user_can_see_update(&user, &user.id, &update.target) .await; - if let Err(_e) = res { - // handle - return; - } else { + if res.is_ok() { let _ = ws_sender .send(Message::Text(serde_json::to_string(&update).unwrap())) .await; diff --git a/bin/periphery/src/helpers/mod.rs b/bin/periphery/src/helpers/mod.rs index 001f7e9a7..18795afbb 100644 --- a/bin/periphery/src/helpers/mod.rs +++ b/bin/periphery/src/helpers/mod.rs @@ -1,6 +1,6 @@ use anyhow::anyhow; use async_timing_util::unix_timestamp_ms; -use axum::{http::StatusCode, TypedHeader, headers::ContentType}; +use axum::{headers::ContentType, http::StatusCode, TypedHeader}; use monitor_types::entities::update::Log; use run_command::{async_run_command, CommandOutput}; use serror::serialize_error_pretty; diff --git a/bin/update_logger/Cargo.toml b/bin/update_logger/Cargo.toml new file mode 100644 index 000000000..792435645 --- /dev/null +++ b/bin/update_logger/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "update_logger" +version.workspace = true +edition.workspace = true +authors.workspace = true +license.workspace = true + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +# local +monitor_client.workspace = true +logger.workspace = true +# mogh +termination_signal.workspace = true +# external +tokio.workspace = true +log.workspace = true +anyhow.workspace = true \ No newline at end of file diff --git a/bin/update_logger/src/main.rs b/bin/update_logger/src/main.rs new file mode 100644 index 000000000..95eff5471 --- /dev/null +++ b/bin/update_logger/src/main.rs @@ -0,0 +1,40 @@ +#[macro_use] +extern crate log; + +use monitor_client::MonitorClient; +use termination_signal::tokio::immediate_term_handle; + +async fn app() -> anyhow::Result<()> { + logger::init(log::LevelFilter::Info)?; + + info!("v {}", env!("CARGO_PKG_VERSION")); + + let monitor = MonitorClient::new_from_env().await?; + + let (mut rx, _) = monitor.subscribe_to_updates(1000); + + loop { + let msg = rx.recv().await; + if let Err(e) = msg { + error!("🚨 recv error | {e:#?}"); + break; + } + info!("{msg:#?}") + } + + Ok(()) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let term_signal = immediate_term_handle()?; + + let app = tokio::spawn(app()); + + tokio::select! { + res = app => return res?, + _ = term_signal => {}, + } + + Ok(()) +} diff --git a/client/rs/Cargo.toml b/client/rs/Cargo.toml index 0410bdd1f..0c1c3ca9f 100644 --- a/client/rs/Cargo.toml +++ b/client/rs/Cargo.toml @@ -8,10 +8,19 @@ license.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +# local monitor_types.workspace = true +# mogh +serror.workspace = true +resolver_api.workspace = true +# external reqwest.workspace = true anyhow.workspace = true +thiserror.workspace = true serde.workspace = true serde_json.workspace = true -resolver_api.workspace = true -envy.workspace = true \ No newline at end of file +envy.workspace = true +tokio.workspace = true +tokio-util.workspace = true +tokio-tungstenite.workspace = true +futures.workspace = true \ No newline at end of file diff --git a/client/rs/src/lib.rs b/client/rs/src/lib.rs index 21e95e9fd..873af2065 100644 --- a/client/rs/src/lib.rs +++ b/client/rs/src/lib.rs @@ -2,10 +2,10 @@ use anyhow::{anyhow, Context}; use monitor_types::requests::auth::{ self, CreateLocalUserResponse, LoginLocalUserResponse, LoginWithSecretResponse, }; -use reqwest::StatusCode; -use resolver_api::HasResponse; -use serde::{de::DeserializeOwned, Deserialize, Serialize}; -use serde_json::json; +use serde::Deserialize; + +mod request; +mod subscribe; #[derive(Deserialize)] struct MonitorEnv { @@ -16,10 +16,18 @@ struct MonitorEnv { monitor_secret: Option, } +#[derive(Clone)] pub struct MonitorClient { reqwest: reqwest::Client, address: String, jwt: String, + creds: Option, +} + +#[derive(Clone)] +struct RefreshTokenCreds { + username: String, + secret: String, } impl MonitorClient { @@ -28,6 +36,7 @@ impl MonitorClient { reqwest: Default::default(), address: address.into(), jwt: token.into(), + creds: None, } } @@ -40,6 +49,7 @@ impl MonitorClient { reqwest: Default::default(), address: address.into(), jwt: Default::default(), + creds: None, }; let LoginLocalUserResponse { jwt } = client @@ -63,6 +73,7 @@ impl MonitorClient { reqwest: Default::default(), address: address.into(), jwt: Default::default(), + creds: None, }; let CreateLocalUserResponse { jwt } = client @@ -86,16 +97,14 @@ impl MonitorClient { reqwest: Default::default(), address: address.into(), jwt: Default::default(), - }; - - let LoginWithSecretResponse { jwt } = client - .auth(auth::LoginWithSecret { + creds: RefreshTokenCreds { username: username.into(), secret: secret.into(), - }) - .await?; + } + .into(), + }; - client.jwt = jwt; + client.refresh_jwt().await?; Ok(client) } @@ -120,80 +129,24 @@ impl MonitorClient { } } - pub async fn auth(&self, request: T) -> anyhow::Result { - let req_type = T::req_type(); - self.post( - "/auth", - json!({ - "type": req_type, - "params": request - }), - ) - .await - } - - pub async fn read(&self, request: T) -> anyhow::Result { - let req_type = T::req_type(); - self.post( - "/read", - json!({ - "type": req_type, - "params": request - }), - ) - .await - } - - pub async fn write(&self, request: T) -> anyhow::Result { - let req_type = T::req_type(); - self.post( - "/write", - json!({ - "type": req_type, - "params": request - }), - ) - .await - } - - pub async fn execute(&self, request: T) -> anyhow::Result { - let req_type = T::req_type(); - self.post( - "/execute", - json!({ - "type": req_type, - "params": request - }), - ) - .await - } - - async fn post( - &self, - endpoint: &str, - body: impl Into>, - ) -> anyhow::Result { - let req = self - .reqwest - .post(format!("{}{endpoint}", self.address)) - .header("Authorization", format!("Bearer {}", self.jwt)); - let req = if let Some(body) = body.into() { - req.header("Content-Type", "application/json").json(&body) - } else { - req - }; - let res = req.send().await.context("failed to reach monitor api")?; - let status = res.status(); - if status == StatusCode::OK { - match res.json().await { - Ok(res) => Ok(res), - Err(e) => Err(anyhow!("{status}: {e:#?}")), - } - } else { - match res.text().await { - Ok(res) => Err(anyhow!("{status}: {res}")), - Err(e) => Err(anyhow!("{status}: {e:#?}")), - } + pub async fn refresh_jwt(&mut self) -> anyhow::Result<()> { + if self.creds.is_none() { + return Err(anyhow!( + "only clients initialized using the secret login method can refresh their jwt" + )); } + + let creds = self.creds.clone().unwrap(); + + let LoginWithSecretResponse { jwt } = self + .auth(auth::LoginWithSecret { + username: creds.username, + secret: creds.secret, + }) + .await?; + + self.jwt = jwt; + + Ok(()) } } diff --git a/client/rs/src/request.rs b/client/rs/src/request.rs new file mode 100644 index 000000000..2049ddb73 --- /dev/null +++ b/client/rs/src/request.rs @@ -0,0 +1,87 @@ +use anyhow::{anyhow, Context}; +use reqwest::StatusCode; +use resolver_api::HasResponse; +use serde::{de::DeserializeOwned, Serialize}; +use serde_json::json; +use serror::deserialize_error; + +use crate::MonitorClient; + +impl MonitorClient { + pub async fn auth(&self, request: T) -> anyhow::Result { + let req_type = T::req_type(); + self.post( + "/auth", + json!({ + "type": req_type, + "params": request + }), + ) + .await + } + + pub async fn read(&self, request: T) -> anyhow::Result { + let req_type = T::req_type(); + self.post( + "/read", + json!({ + "type": req_type, + "params": request + }), + ) + .await + } + + pub async fn write(&self, request: T) -> anyhow::Result { + let req_type = T::req_type(); + self.post( + "/write", + json!({ + "type": req_type, + "params": request + }), + ) + .await + } + + pub async fn execute(&self, request: T) -> anyhow::Result { + let req_type = T::req_type(); + self.post( + "/execute", + json!({ + "type": req_type, + "params": request + }), + ) + .await + } + + async fn post( + &self, + endpoint: &str, + body: impl Into>, + ) -> anyhow::Result { + let req = self + .reqwest + .post(format!("{}{endpoint}", self.address)) + .header("Authorization", format!("Bearer {}", self.jwt)); + let req = if let Some(body) = body.into() { + req.header("Content-Type", "application/json").json(&body) + } else { + req + }; + let res = req.send().await.context("failed to reach monitor api")?; + let status = res.status(); + if status == StatusCode::OK { + match res.json().await { + Ok(res) => Ok(res), + Err(e) => Err(anyhow!("{status} | {e:#?}")), + } + } else { + match res.text().await { + Ok(res) => Err(deserialize_error(res).context(status)), + Err(e) => Err(anyhow!("{status} | {e:#?}")), + } + } + } +} diff --git a/client/rs/src/subscribe.rs b/client/rs/src/subscribe.rs new file mode 100644 index 000000000..9dd5f3eed --- /dev/null +++ b/client/rs/src/subscribe.rs @@ -0,0 +1,152 @@ +use anyhow::Context; +use futures::{SinkExt, TryStreamExt}; +use monitor_types::entities::update::UpdateListItem; +use serror::serialize_error_pretty; +use thiserror::Error; +use tokio::sync::broadcast; +use tokio_tungstenite::{connect_async, tungstenite::Message}; +use tokio_util::sync::CancellationToken; + +use crate::MonitorClient; + +#[derive(Debug, Clone)] +pub enum UpdateWsMessage { + Update(UpdateListItem), + Error(UpdateWsError), + Disconnected, + Reconnected, +} + +#[derive(Error, Debug, Clone)] +pub enum UpdateWsError { + #[error("failed to connect | {0}")] + ConnectionError(String), + #[error("failed to login | {0}")] + LoginError(String), + #[error("failed to recieve message | {0}")] + MessageError(String), + #[error("did not recognize message | {0}")] + MessageUnrecognized(String), +} + +impl MonitorClient { + pub fn subscribe_to_updates( + &self, + capacity: usize, + ) -> (broadcast::Receiver, CancellationToken) { + let (tx, rx) = broadcast::channel(capacity); + let cancel = CancellationToken::new(); + let cancel_clone = cancel.clone(); + let address = format!("{}/ws/update", self.address.replacen("http", "ws", 1)); + let mut client = self.clone(); + + tokio::spawn(async move { + loop { + if cancel.is_cancelled() { + break; + } + + if client.creds.is_some() { + let res = client.refresh_jwt().await; + if let Err(e) = res { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError( + serialize_error_pretty(e), + ))); + } + } + + let res = connect_async(&address) + .await + .context("failed to connect to websocket endpoint"); + if let Err(e) = res { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::ConnectionError( + serialize_error_pretty(e), + ))); + return; + } + let (mut ws, _) = res.unwrap(); + + // ================== + // SEND LOGIN MSG + // ================== + let login_send_res = ws + .send(Message::Text(client.jwt.clone())) + .await + .context("failed to send login message"); + + if let Err(e) = login_send_res { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError( + serialize_error_pretty(e), + ))); + return; + } + + // ================== + // HANDLE LOGIN RES + // ================== + match ws.try_next().await { + Ok(Some(Message::Text(msg))) => { + if msg != "LOGGED_IN" { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError(msg))); + return; + } + } + Ok(Some(msg)) => { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError( + format!("{msg:#?}"), + ))); + return; + } + Ok(None) => { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError( + String::from("got None message after login message"), + ))); + return; + } + Err(e) => { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::LoginError( + format!("failed to recieve message | {e:#?}"), + ))); + return; + } + } + + // ================== + // HANLDE MSGS + // ================== + loop { + match ws.try_next().await.context("failed to recieve message") { + Ok(Some(Message::Text(msg))) => { + match serde_json::from_str::(&msg) { + Ok(msg) => { + let _ = tx.send(UpdateWsMessage::Update(msg)); + } + Err(_) => { + let _ = tx.send(UpdateWsMessage::Error( + UpdateWsError::MessageUnrecognized(msg), + )); + } + } + } + Ok(Some(Message::Close(_))) => { + let _ = tx.send(UpdateWsMessage::Disconnected); + break; + } + Err(e) => { + let _ = tx.send(UpdateWsMessage::Error(UpdateWsError::MessageError( + serialize_error_pretty(e), + ))); + let _ = tx.send(UpdateWsMessage::Disconnected); + break; + } + Ok(_) => { + // ignore + } + } + } + } + }); + + (rx, cancel_clone) + } +} diff --git a/lib/types/src/requests/read/alert.rs b/lib/types/src/requests/read/alert.rs index 65d060eee..0d7c90513 100644 --- a/lib/types/src/requests/read/alert.rs +++ b/lib/types/src/requests/read/alert.rs @@ -2,7 +2,7 @@ use resolver_api::derive::Request; use serde::{Deserialize, Serialize}; use typeshare::typeshare; -use crate::{entities::alert::Alert, I64, U64, MongoDocument}; +use crate::{entities::alert::Alert, MongoDocument, I64, U64}; #[typeshare] #[derive(Serialize, Deserialize, Debug, Clone, Request)]