clean up rust client websocket subscription

This commit is contained in:
mbecker20
2025-01-29 23:59:26 -08:00
parent c718ee0d2c
commit a3e4bd5cf2
5 changed files with 50 additions and 7 deletions

26
Cargo.lock generated
View File

@@ -39,6 +39,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "alerter"
version = "1.17.0"
dependencies = [
"anyhow",
"axum 0.8.1",
"dotenvy",
"envy",
"komodo_client",
"logger",
"serde",
"tokio",
"tracing",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
@@ -5269,6 +5284,17 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "update_logger"
version = "1.17.0"
dependencies = [
"anyhow",
"komodo_client",
"logger",
"tokio",
"tracing",
]
[[package]]
name = "url"
version = "2.5.4"

View File

@@ -3,6 +3,7 @@ resolver = "2"
members = [
"bin/*",
"lib/*",
"example/*",
"client/core/rs",
"client/periphery/rs",
]

View File

@@ -18,13 +18,13 @@ blocking = ["reqwest/blocking"]
[dependencies]
# mogh
mongo_indexed = { workspace = true, optional = true }
serror = { workspace = true, features = ["axum"]}
derive_default_builder.workspace = true
derive_empty_traits.workspace = true
async_timing_util.workspace = true
partial_derive2.workspace = true
derive_variants.workspace = true
resolver_api.workspace = true
serror.workspace = true
# external
tokio-tungstenite.workspace = true
derive_builder.workspace = true

View File

@@ -57,15 +57,32 @@ pub enum UpdateWsError {
const MAX_SHORT_RETRY_COUNT: usize = 5;
impl KomodoClient {
/// Subscribes to the Komodo Core update websocket,
/// and forwards the updates over a channel.
/// Handles reconnection internally.
///
/// ```
/// let (mut rx, _) = komodo.subscribe_to_updates()?;
/// loop {
/// let update = match rx.recv().await {
/// Ok(msg) => msg,
/// Err(e) => {
/// error!("🚨 recv error | {e:?}");
/// break;
/// }
/// };
/// // Handle the update
/// info!("Got update: {update:?}");
/// }
/// ```
pub fn subscribe_to_updates(
&self,
capacity: usize,
retry_cooldown_secs: u64,
// retry_cooldown_secs: u64,
) -> anyhow::Result<(
broadcast::Receiver<UpdateWsMessage>,
CancellationToken,
)> {
let (tx, rx) = broadcast::channel(capacity);
let (tx, rx) = broadcast::channel(128);
let cancel = CancellationToken::new();
let cancel_clone = cancel.clone();
let address =
@@ -262,8 +279,7 @@ impl KomodoClient {
}
}.instrument(span).await;
tokio::time::sleep(Duration::from_secs(retry_cooldown_secs))
.await;
tokio::time::sleep(Duration::from_secs(3)).await;
}
});

View File

@@ -20,7 +20,7 @@ async fn app() -> anyhow::Result<()> {
let komodo =
KomodoClient::new_from_env()?.with_healthcheck().await?;
let (mut rx, _) = komodo.subscribe_to_updates(1000, 5)?;
let (mut rx, _) = komodo.subscribe_to_updates()?;
loop {
let update = match rx.recv().await {