Files
komodo/bin/core/src/helpers/update.rs
Maxwell Becker ec88a6fa5a 1.16.5 (#159)
* repo pull lock

* implement BatchRunAction

* other batch methods
2024-10-27 20:56:56 -07:00

484 lines
13 KiB
Rust

use anyhow::Context;
use komodo_client::entities::{
action::Action,
build::Build,
deployment::Deployment,
komodo_timestamp,
procedure::Procedure,
repo::Repo,
server::Server,
server_template::ServerTemplate,
stack::Stack,
sync::ResourceSync,
update::{Update, UpdateListItem},
user::User,
Operation, ResourceTarget,
};
use mungos::{
by_id::{find_one_by_id, update_one_by_id},
mongodb::bson::to_document,
};
use crate::{
api::execute::ExecuteRequest, resource, state::db_client,
};
use super::channel::update_channel;
pub fn make_update(
target: impl Into<ResourceTarget>,
operation: Operation,
user: &User,
) -> Update {
Update {
start_ts: komodo_timestamp(),
target: target.into(),
operation,
operator: user.id.clone(),
success: true,
..Default::default()
}
}
#[instrument(level = "debug")]
pub async fn add_update(
mut update: Update,
) -> anyhow::Result<String> {
update.id = db_client()
.updates
.insert_one(&update)
.await
.context("failed to insert update into db")?
.inserted_id
.as_object_id()
.context("inserted_id is not object id")?
.to_string();
let id = update.id.clone();
let update = update_list_item(update).await?;
let _ = send_update(update).await;
Ok(id)
}
#[instrument(level = "debug")]
pub async fn add_update_without_send(
update: &Update,
) -> anyhow::Result<String> {
let id = db_client()
.updates
.insert_one(update)
.await
.context("failed to insert update into db")?
.inserted_id
.as_object_id()
.context("inserted_id is not object id")?
.to_string();
Ok(id)
}
#[instrument(level = "debug")]
pub async fn update_update(update: Update) -> anyhow::Result<()> {
update_one_by_id(&db_client().updates, &update.id, mungos::update::Update::Set(to_document(&update)?), None)
.await
.context("failed to update the update on db. the update build process was deleted")?;
let update = update_list_item(update).await?;
let _ = send_update(update).await;
Ok(())
}
#[instrument(level = "debug")]
async fn update_list_item(
update: Update,
) -> anyhow::Result<UpdateListItem> {
let username = if User::is_service_user(&update.operator) {
update.operator.clone()
} else {
find_one_by_id(&db_client().users, &update.operator)
.await
.context("failed to query mongo for user")?
.with_context(|| {
format!("no user found with id {}", update.operator)
})?
.username
};
let update = UpdateListItem {
id: update.id,
operation: update.operation,
start_ts: update.start_ts,
success: update.success,
operator: update.operator,
target: update.target,
status: update.status,
version: update.version,
other_data: update.other_data,
username,
};
Ok(update)
}
#[instrument(level = "debug")]
async fn send_update(update: UpdateListItem) -> anyhow::Result<()> {
update_channel().sender.lock().await.send(update)?;
Ok(())
}
pub async fn init_execution_update(
request: &ExecuteRequest,
user: &User,
) -> anyhow::Result<Update> {
let (operation, target) = match &request {
// Server
ExecuteRequest::StartContainer(data) => (
Operation::StartContainer,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::RestartContainer(data) => (
Operation::RestartContainer,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PauseContainer(data) => (
Operation::PauseContainer,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::UnpauseContainer(data) => (
Operation::UnpauseContainer,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::StopContainer(data) => (
Operation::StopContainer,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::DestroyContainer(data) => (
Operation::DestroyContainer,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::StartAllContainers(data) => (
Operation::StartAllContainers,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::RestartAllContainers(data) => (
Operation::RestartAllContainers,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PauseAllContainers(data) => (
Operation::PauseAllContainers,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::UnpauseAllContainers(data) => (
Operation::UnpauseAllContainers,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::StopAllContainers(data) => (
Operation::StopAllContainers,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PruneContainers(data) => (
Operation::PruneContainers,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::DeleteNetwork(data) => (
Operation::DeleteNetwork,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PruneNetworks(data) => (
Operation::PruneNetworks,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::DeleteImage(data) => (
Operation::DeleteImage,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PruneImages(data) => (
Operation::PruneImages,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::DeleteVolume(data) => (
Operation::DeleteVolume,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PruneVolumes(data) => (
Operation::PruneVolumes,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PruneDockerBuilders(data) => (
Operation::PruneDockerBuilders,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PruneBuildx(data) => (
Operation::PruneBuildx,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
ExecuteRequest::PruneSystem(data) => (
Operation::PruneSystem,
ResourceTarget::Server(
resource::get::<Server>(&data.server).await?.id,
),
),
// Deployment
ExecuteRequest::Deploy(data) => (
Operation::Deploy,
ResourceTarget::Deployment(
resource::get::<Deployment>(&data.deployment).await?.id,
),
),
ExecuteRequest::BatchDeploy(_data) => {
return Ok(Default::default())
}
ExecuteRequest::StartDeployment(data) => (
Operation::StartDeployment,
ResourceTarget::Deployment(
resource::get::<Deployment>(&data.deployment).await?.id,
),
),
ExecuteRequest::RestartDeployment(data) => (
Operation::RestartDeployment,
ResourceTarget::Deployment(
resource::get::<Deployment>(&data.deployment).await?.id,
),
),
ExecuteRequest::PauseDeployment(data) => (
Operation::PauseDeployment,
ResourceTarget::Deployment(
resource::get::<Deployment>(&data.deployment).await?.id,
),
),
ExecuteRequest::UnpauseDeployment(data) => (
Operation::UnpauseDeployment,
ResourceTarget::Deployment(
resource::get::<Deployment>(&data.deployment).await?.id,
),
),
ExecuteRequest::StopDeployment(data) => (
Operation::StopDeployment,
ResourceTarget::Deployment(
resource::get::<Deployment>(&data.deployment).await?.id,
),
),
ExecuteRequest::DestroyDeployment(data) => (
Operation::DestroyDeployment,
ResourceTarget::Deployment(
resource::get::<Deployment>(&data.deployment).await?.id,
),
),
// Build
ExecuteRequest::RunBuild(data) => (
Operation::RunBuild,
ResourceTarget::Build(
resource::get::<Build>(&data.build).await?.id,
),
),
ExecuteRequest::BatchRunBuild(_data) => {
return Ok(Default::default())
}
ExecuteRequest::CancelBuild(data) => (
Operation::CancelBuild,
ResourceTarget::Build(
resource::get::<Build>(&data.build).await?.id,
),
),
// Repo
ExecuteRequest::CloneRepo(data) => (
Operation::CloneRepo,
ResourceTarget::Repo(
resource::get::<Repo>(&data.repo).await?.id,
),
),
ExecuteRequest::BatchCloneRepo(_data) => {
return Ok(Default::default())
}
ExecuteRequest::PullRepo(data) => (
Operation::PullRepo,
ResourceTarget::Repo(
resource::get::<Repo>(&data.repo).await?.id,
),
),
ExecuteRequest::BatchPullRepo(_data) => {
return Ok(Default::default())
}
ExecuteRequest::BuildRepo(data) => (
Operation::BuildRepo,
ResourceTarget::Repo(
resource::get::<Repo>(&data.repo).await?.id,
),
),
ExecuteRequest::BatchBuildRepo(_data) => {
return Ok(Default::default())
}
ExecuteRequest::CancelRepoBuild(data) => (
Operation::CancelRepoBuild,
ResourceTarget::Repo(
resource::get::<Repo>(&data.repo).await?.id,
),
),
// Procedure
ExecuteRequest::RunProcedure(data) => (
Operation::RunProcedure,
ResourceTarget::Procedure(
resource::get::<Procedure>(&data.procedure).await?.id,
),
),
ExecuteRequest::BatchRunProcedure(_) => {
return Ok(Default::default())
}
// Action
ExecuteRequest::RunAction(data) => (
Operation::RunAction,
ResourceTarget::Action(
resource::get::<Action>(&data.action).await?.id,
),
),
ExecuteRequest::BatchRunAction(_) => {
return Ok(Default::default())
}
// Server template
ExecuteRequest::LaunchServer(data) => (
Operation::LaunchServer,
ResourceTarget::ServerTemplate(
resource::get::<ServerTemplate>(&data.server_template)
.await?
.id,
),
),
// Resource Sync
ExecuteRequest::RunSync(data) => (
Operation::RunSync,
ResourceTarget::ResourceSync(
resource::get::<ResourceSync>(&data.sync).await?.id,
),
),
// Stack
ExecuteRequest::DeployStack(data) => (
Operation::DeployStack,
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::BatchDeployStack(_data) => {
return Ok(Default::default())
}
ExecuteRequest::DeployStackIfChanged(data) => (
Operation::DeployStack,
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::BatchDeployStackIfChanged(_data) => {
return Ok(Default::default())
}
ExecuteRequest::StartStack(data) => (
if data.service.is_some() {
Operation::StartStackService
} else {
Operation::StartStack
},
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::RestartStack(data) => (
if data.service.is_some() {
Operation::RestartStackService
} else {
Operation::RestartStack
},
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::PauseStack(data) => (
if data.service.is_some() {
Operation::PauseStackService
} else {
Operation::PauseStack
},
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::UnpauseStack(data) => (
if data.service.is_some() {
Operation::UnpauseStackService
} else {
Operation::UnpauseStack
},
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::StopStack(data) => (
if data.service.is_some() {
Operation::StopStackService
} else {
Operation::StopStack
},
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::DestroyStack(data) => (
Operation::DestroyStack,
ResourceTarget::Stack(
resource::get::<Stack>(&data.stack).await?.id,
),
),
ExecuteRequest::BatchDestroyStack(_data) => {
return Ok(Default::default())
}
};
let mut update = make_update(target, operation, user);
update.in_progress();
// Hold off on even adding update for DeployStackIfChanged
if !matches!(&request, ExecuteRequest::DeployStackIfChanged(_)) {
// Don't actually send it here, let the handlers send it after they can set action state.
update.id = add_update_without_send(&update).await?;
}
Ok(update)
}