ResourceSync state resolution refinement

This commit is contained in:
mbecker20
2025-03-04 00:20:23 -08:00
parent aaa528f69d
commit 8701e5363d
7 changed files with 26 additions and 173 deletions

View File

@@ -25,16 +25,13 @@ use komodo_client::{
},
};
use mongo_indexed::doc;
use mungos::{
by_id::update_one_by_id,
mongodb::bson::{oid::ObjectId, to_document},
};
use mungos::{by_id::update_one_by_id, mongodb::bson::oid::ObjectId};
use resolver_api::Resolve;
use crate::{
api::write::WriteArgs,
helpers::{query::get_id_to_tags, update::update_update},
resource::{self, refresh_resource_sync_state_cache},
resource,
state::{action_states, db_client},
sync::{
deploy::{
@@ -609,21 +606,6 @@ impl Resolve<ExecuteArgs> for RunSync {
}
update.finalize();
// Need to manually update the update before cache refresh,
// and before broadcast with add_update.
// The Err case of to_document should be unreachable,
// but will fail to update cache in that case.
if let Ok(update_doc) = to_document(&update) {
let _ = update_one_by_id(
&db.updates,
&update.id,
mungos::update::Update::Set(update_doc),
None,
)
.await;
refresh_resource_sync_state_cache().await;
}
update_update(update.clone()).await?;
Ok(update)

View File

@@ -6,7 +6,6 @@ use komodo_client::{
permission::PermissionLevel,
sync::{
ResourceSync, ResourceSyncActionState, ResourceSyncListItem,
ResourceSyncState,
},
},
};
@@ -16,7 +15,7 @@ use crate::{
config::core_config,
helpers::query::get_all_tags,
resource,
state::{action_states, github_client, resource_sync_state_cache},
state::{action_states, github_client},
};
use super::ReadArgs;
@@ -112,7 +111,6 @@ impl Resolve<ReadArgs> for GetResourceSyncsSummary {
let mut res = GetResourceSyncsSummaryResponse::default();
let cache = resource_sync_state_cache();
let action_states = action_states();
for resource_sync in resource_syncs {
@@ -131,29 +129,15 @@ impl Resolve<ReadArgs> for GetResourceSyncsSummary {
res.failed += 1;
continue;
}
match (
cache.get(&resource_sync.id).await.unwrap_or_default(),
action_states
.resource_sync
.get(&resource_sync.id)
.await
.unwrap_or_default()
.get()?,
) {
(_, action_states) if action_states.syncing => {
res.syncing += 1;
}
(ResourceSyncState::Ok, _) => res.ok += 1,
(ResourceSyncState::Failed, _) => res.failed += 1,
(ResourceSyncState::Unknown, _) => res.unknown += 1,
// will never come off the cache in the building state, since that comes from action states
(ResourceSyncState::Syncing, _) => {
unreachable!()
}
(ResourceSyncState::Pending, _) => {
unreachable!()
}
if action_states
.resource_sync
.get(&resource_sync.id)
.await
.unwrap_or_default()
.get()?
.syncing
{
res.syncing += 1;
}
}

View File

@@ -49,7 +49,7 @@ use crate::{
query::get_id_to_tags,
update::{add_update, make_update, update_update},
},
resource::{self, refresh_resource_sync_state_cache},
resource,
state::{db_client, github_client},
sync::{
deploy::SyncDeployParams, remote::RemoteResources,
@@ -66,12 +66,8 @@ impl Resolve<WriteArgs> for CreateResourceSync {
WriteArgs { user }: &WriteArgs,
) -> serror::Result<ResourceSync> {
Ok(
resource::create::<ResourceSync>(
&self.name,
self.config,
user,
)
.await?,
resource::create::<ResourceSync>(&self.name, self.config, user)
.await?,
)
}
}
@@ -483,21 +479,6 @@ impl Resolve<WriteArgs> for CommitSync {
};
update.finalize();
// Need to manually update the update before cache refresh,
// and before broadcast with add_update.
// The Err case of to_document should be unreachable,
// but will fail to update cache in that case.
if let Ok(update_doc) = to_document(&update) {
let _ = update_one_by_id(
&db_client().updates,
&update.id,
mungos::update::Update::Set(update_doc),
None,
)
.await;
refresh_resource_sync_state_cache().await;
}
update_update(update.clone()).await?;
Ok(update)
@@ -548,11 +529,9 @@ impl Resolve<WriteArgs> for RefreshResourceSyncPending {
sync.info.pending_message = message;
if !sync.info.remote_errors.is_empty() {
return Err(
anyhow!(
"Remote resources have errors. Cannot compute diffs."
),
);
return Err(anyhow!(
"Remote resources have errors. Cannot compute diffs."
));
}
let resources = resources?;

View File

@@ -59,7 +59,6 @@ async fn app() -> anyhow::Result<()> {
resource::spawn_repo_state_refresh_loop();
resource::spawn_procedure_state_refresh_loop();
resource::spawn_action_state_refresh_loop();
resource::spawn_resource_sync_state_refresh_loop();
helpers::prune::spawn_prune_loop();
// Setup static frontend services

View File

@@ -73,10 +73,6 @@ pub use refresh::spawn_resource_refresh_loop;
pub use repo::{
refresh_repo_state_cache, spawn_repo_state_refresh_loop,
};
pub use sync::{
refresh_resource_sync_state_cache,
spawn_resource_sync_state_refresh_loop,
};
/// Implement on each Komodo resource for common methods
pub trait KomodoResource {

View File

@@ -1,5 +1,3 @@
use std::time::Duration;
use anyhow::Context;
use formatting::format_serror;
use komodo_client::{
@@ -19,15 +17,12 @@ use komodo_client::{
},
};
use mongo_indexed::doc;
use mungos::{
find::find_collect,
mongodb::{options::FindOneOptions, Collection},
};
use mungos::mongodb::Collection;
use resolver_api::Resolve;
use crate::{
api::write::WriteArgs,
state::{action_states, db_client, resource_sync_state_cache},
state::{action_states, db_client},
};
impl super::KomodoResource for ResourceSync {
@@ -117,7 +112,6 @@ impl super::KomodoResource for ResourceSync {
format_serror(&e.error.context("The sync pending cache has failed to refresh. This is likely due to a misconfiguration of the sync").into())
);
};
refresh_resource_sync_state_cache().await;
Ok(())
}
@@ -180,35 +174,6 @@ impl super::KomodoResource for ResourceSync {
}
}
pub fn spawn_resource_sync_state_refresh_loop() {
tokio::spawn(async move {
loop {
refresh_resource_sync_state_cache().await;
tokio::time::sleep(Duration::from_secs(60)).await;
}
});
}
pub async fn refresh_resource_sync_state_cache() {
let _ = async {
let resource_syncs =
find_collect(&db_client().resource_syncs, None, None)
.await
.context("failed to get resource_syncs from db")?;
let cache = resource_sync_state_cache();
for resource_sync in resource_syncs {
let state =
get_resource_sync_state_from_db(&resource_sync.id).await;
cache.insert(resource_sync.id, state).await;
}
anyhow::Ok(())
}
.await
.inspect_err(|e| {
error!("failed to refresh resource_sync state cache | {e:#}")
});
}
async fn get_resource_sync_state(
id: &String,
data: &ResourceSyncInfo,
@@ -232,57 +197,15 @@ async fn get_resource_sync_state(
{
return state;
}
if data.pending_error.is_some() {
return ResourceSyncState::Failed;
}
if !data.resource_updates.is_empty()
if data.pending_error.is_some() || !data.remote_errors.is_empty() {
ResourceSyncState::Failed
} else if !data.resource_updates.is_empty()
|| !data.variable_updates.is_empty()
|| !data.user_group_updates.is_empty()
|| data.pending_deploy.to_deploy > 0
{
return ResourceSyncState::Pending;
ResourceSyncState::Pending
} else {
ResourceSyncState::Ok
}
resource_sync_state_cache()
.get(id)
.await
.unwrap_or_default()
}
async fn get_resource_sync_state_from_db(
id: &str,
) -> ResourceSyncState {
async {
let state = db_client()
.updates
.find_one(doc! {
"target.type": "ResourceSync",
"target.id": id,
"$or": [
{ "operation": "RunSync" },
{ "operation": "CommitSync" },
],
})
.with_options(
FindOneOptions::builder()
.sort(doc! { "start_ts": -1 })
.build(),
)
.await?
.map(|u| {
if u.success {
ResourceSyncState::Ok
} else {
ResourceSyncState::Failed
}
})
.unwrap_or(ResourceSyncState::Ok);
anyhow::Ok(state)
}
.await
.inspect_err(|e| {
warn!(
"failed to get resource sync state from db for {id} | {e:#}"
)
})
.unwrap_or(ResourceSyncState::Unknown)
}

View File

@@ -12,7 +12,6 @@ use komodo_client::entities::{
procedure::ProcedureState,
repo::RepoState,
stack::StackState,
sync::ResourceSyncState,
};
use octorust::auth::{
Credentials, InstallationTokenGenerator, JWTCredentials,
@@ -197,12 +196,3 @@ pub fn action_state_cache() -> &'static ActionStateCache {
OnceLock::new();
ACTION_STATE_CACHE.get_or_init(Default::default)
}
pub type ResourceSyncStateCache = Cache<String, ResourceSyncState>;
pub fn resource_sync_state_cache() -> &'static ResourceSyncStateCache
{
static RESOURCE_SYNC_STATE_CACHE: OnceLock<ResourceSyncStateCache> =
OnceLock::new();
RESOURCE_SYNC_STATE_CACHE.get_or_init(Default::default)
}