update procedure

This commit is contained in:
mbecker20
2024-03-27 03:59:51 -07:00
parent fd12921f6d
commit 851c1f450c
4 changed files with 149 additions and 213 deletions

View File

@@ -12,10 +12,8 @@ use tokio::sync::Mutex;
use crate::{
helpers::{
add_update, make_update,
procedure::{execute_procedure, make_procedure_map},
resource::StateResource,
update_update,
add_update, make_update, procedure::execute_procedure,
resource::StateResource, update_update,
},
state::State,
};
@@ -34,8 +32,6 @@ impl Resolve<RunProcedure, User> for State {
)
.await?;
let map = make_procedure_map(&procedure).await?;
let mut update =
make_update(&procedure, Operation::StopContainer, &user);
update.in_progress();
@@ -48,7 +44,7 @@ impl Resolve<RunProcedure, User> for State {
let update = Mutex::new(update);
let res = execute_procedure(&procedure, &map, &update).await;
let res = execute_procedure(&procedure, &update).await;
let mut update = update.into_inner();

View File

@@ -1,25 +1,20 @@
use std::{collections::HashMap, str::FromStr, time::Duration};
use std::time::Duration;
use anyhow::{anyhow, Context, Ok};
use async_recursion::async_recursion;
use futures::future::join_all;
use monitor_client::{
api::execute::Execution,
entities::{
monitor_timestamp,
procedure::{EnabledId, Procedure, ProcedureConfig},
procedure::{EnabledExecution, Procedure, ProcedureConfig},
update::Update,
user::User,
},
};
use mungos::{
find::find_collect,
mongodb::bson::{doc, oid::ObjectId},
};
use resolver_api::Resolve;
use tokio::sync::Mutex;
use crate::{db::db_client, state::State};
use crate::state::State;
use super::update_update;
@@ -36,47 +31,15 @@ async fn add_line_to_update(update: &Mutex<Update>, line: &str) {
};
}
#[async_recursion]
pub async fn execute_procedure(
procedure: &Procedure,
map: &HashMap<String, Procedure>,
update: &Mutex<Update>,
) -> anyhow::Result<()> {
let start_ts = monitor_timestamp();
use ProcedureConfig::*;
match &procedure.config {
Execution(execution) => {
add_line_to_update(
update,
&format!("executing: {} ({})", procedure.name, procedure.id),
)
.await;
execute_execution(execution.to_owned()).await.with_context(
|| {
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
format!(
"failed execution after {time:?}. {} ({})",
procedure.name, procedure.id
)
},
)?;
let time = Duration::from_millis(
(monitor_timestamp() - start_ts) as u64,
);
add_line_to_update(
update,
&format!(
"finished execution in {time:?}: {} ({}) ✅",
procedure.name, procedure.id
),
)
.await;
Ok(())
}
Sequence(ids) => {
Sequence(executions) => {
add_line_to_update(
update,
&format!(
@@ -85,7 +48,7 @@ pub async fn execute_procedure(
),
)
.await;
execute_sequence(&filter_list_by_enabled(ids), map, update)
execute_sequence(filter_list_by_enabled(executions))
.await
.with_context(|| {
let time = Duration::from_millis(
@@ -109,7 +72,7 @@ pub async fn execute_procedure(
.await;
Ok(())
}
Parallel(ids) => {
Parallel(executions) => {
add_line_to_update(
update,
&format!(
@@ -118,7 +81,7 @@ pub async fn execute_procedure(
),
)
.await;
execute_parallel(&filter_list_by_enabled(ids), map, update)
execute_parallel(filter_list_by_enabled(executions))
.await
.with_context(|| {
let time = Duration::from_millis(
@@ -211,27 +174,21 @@ async fn execute_execution(
}
async fn execute_sequence(
ids: &[String],
map: &HashMap<String, Procedure>,
update: &Mutex<Update>,
executions: Vec<Execution>,
) -> anyhow::Result<()> {
for id in ids {
let procedure = map
.get(id)
.with_context(|| format!("no procedure on map with id {id}"))?;
execute_procedure(procedure, map, update).await?;
for execution in executions {
let fail_log = format!("failed on {execution:?}");
execute_execution(execution).await.context(fail_log)?;
}
Ok(())
}
async fn execute_parallel(
ids: &[String],
map: &HashMap<String, Procedure>,
update: &Mutex<Update>,
executions: Vec<Execution>,
) -> anyhow::Result<()> {
let futures = ids.iter().map(|id| async {
let procedure = map.get(id).context("no procedure on map")?;
execute_procedure(procedure, map, update).await
let futures = executions.into_iter().map(|execution| async {
let fail_log = format!("failed on {execution:?}");
execute_execution(execution).await.context(fail_log)
});
join_all(futures)
.await
@@ -240,73 +197,12 @@ async fn execute_parallel(
Ok(())
}
pub async fn make_procedure_map(
procedure: &Procedure,
) -> anyhow::Result<HashMap<String, Procedure>> {
let map = Default::default();
make_procedure_map_rec(procedure, &map).await?;
Ok(map.into_inner())
}
#[async_recursion]
async fn make_procedure_map_rec(
procedure: &Procedure,
map: &Mutex<HashMap<String, Procedure>>,
) -> anyhow::Result<()> {
use ProcedureConfig::*;
let more = match &procedure.config {
Execution(_) => return Ok(()),
Sequence(sequence) => sequence,
Parallel(parallel) => parallel,
};
let more_ids = more
.iter()
.filter(|id| id.enabled)
.map(|id| {
ObjectId::from_str(&id.id).context("id is not ObjectId")
})
.collect::<anyhow::Result<Vec<_>>>()?;
let procedures = find_collect(
&db_client().await.procedures,
doc! { "_id": { "$in": &more_ids } },
None,
)
.await
.context("failed to find procedures from db")?
.into_iter()
.map(|proc| (proc.id.clone(), proc))
.collect::<HashMap<_, _>>();
// make sure we aren't missing any procedures
for EnabledId { id, enabled } in more {
if !enabled {
continue;
}
if !procedures.contains_key(id) {
return Err(anyhow!("did not find a procedure with id {id}",));
}
}
let futures = procedures.values().map(|procedure| async {
make_procedure_map_rec(procedure, map).await
});
join_all(futures)
.await
.into_iter()
.collect::<anyhow::Result<Vec<_>>>()?;
map.lock().await.extend(procedures);
Ok(())
}
fn filter_list_by_enabled(list: &[EnabledId]) -> Vec<String> {
fn filter_list_by_enabled(
list: &[EnabledExecution],
) -> Vec<Execution> {
list
.iter()
.filter(|item| item.enabled)
.map(|item| item.id.clone())
.map(|item| item.execution.clone())
.collect()
}

View File

@@ -29,8 +29,8 @@ pub struct ProcedureListItemInfo {
/// Allows to enable / disabled procedures in the sequence / parallel vec on the fly
#[typeshare]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct EnabledId {
pub id: String,
pub struct EnabledExecution {
pub execution: Execution,
pub enabled: bool,
}
@@ -48,11 +48,8 @@ pub struct EnabledId {
)]
#[serde(tag = "type", content = "data")]
pub enum ProcedureConfig {
Execution(Execution),
/// Vec<ProcedureId>
Sequence(Vec<EnabledId>),
/// Vec<ProdecureId>
Parallel(Vec<EnabledId>),
Sequence(Vec<EnabledExecution>),
Parallel(Vec<EnabledExecution>),
}
#[typeshare]

View File

@@ -137,11 +137,11 @@ export type ListBuildersResponse = BuilderListItem[];
export type DeploymentImage =
| { type: "Image", params: {
image: string;
image?: string;
}}
| { type: "Build", params: {
build_id: string;
version: Version;
build_id?: string;
version?: Version;
}};
export enum TerminationSignal {
@@ -317,11 +317,8 @@ export type ListApiKeysResponse = ApiKey[];
export type GetUsersResponse = User[];
export type ProcedureConfig =
| { type: "Execution", data: Execution }
/** Vec<ProcedureId> */
| { type: "Sequence", data: EnabledId[] }
/** Vec<ProdecureId> */
| { type: "Parallel", data: EnabledId[] };
| { type: "Sequence", data: EnabledExecution[] }
| { type: "Parallel", data: EnabledExecution[] };
export type Procedure = Resource<ProcedureConfig, undefined>;
@@ -344,8 +341,8 @@ export interface ProcedureActionState {
export type GetProcedureActionStateResponse = ProcedureActionState;
export interface RepoConfig {
server_id: string;
repo: string;
server_id?: string;
repo?: string;
branch: string;
github_account?: string;
on_clone?: SystemCommand;
@@ -853,38 +850,45 @@ export interface ExchangeForJwtResponse {
}
export interface RunBuild {
build_id: string;
/** Can be id or name */
build: string;
}
export interface CancelBuild {
build_id: string;
/** Can be id or name */
build: string;
}
export interface CancelBuildResponse {
}
export interface Deploy {
deployment_id: string;
/** Name or id */
deployment: string;
stop_signal?: TerminationSignal;
stop_time?: number;
}
export interface StartContainer {
deployment_id: string;
/** Name or id */
deployment: string;
}
export interface StopContainer {
deployment_id: string;
/** Name or id */
deployment: string;
signal?: TerminationSignal;
time?: number;
}
export interface StopAllContainers {
server_id: string;
/** Name or id */
server: string;
}
export interface RemoveContainer {
deployment_id: string;
/** Name or id */
deployment: string;
signal?: TerminationSignal;
time?: number;
}
@@ -893,27 +897,33 @@ export interface None {
}
export interface RunProcedure {
procedure_id: string;
/** Id or name */
procedure: string;
}
export interface CloneRepo {
id: string;
/** Id or name */
repo: string;
}
export interface PullRepo {
id: string;
/** Id or name */
repo: string;
}
export interface PruneDockerNetworks {
server_id: string;
/** Id or name */
server: string;
}
export interface PruneDockerImages {
server_id: string;
/** Id or name */
server: string;
}
export interface PruneDockerContainers {
server_id: string;
/** Id or name */
server: string;
}
export interface ListAlerts {
@@ -992,7 +1002,8 @@ export interface ListAlertsResponse {
}
export interface GetAlerter {
id: string;
/** Id or name */
alerter: string;
}
export interface ListAlerters {
@@ -1007,7 +1018,8 @@ export interface GetAlertersSummaryResponse {
}
export interface GetBuild {
id: string;
/** Id or name */
build: string;
}
export interface ListBuilds {
@@ -1015,7 +1027,8 @@ export interface ListBuilds {
}
export interface GetBuildActionState {
id: string;
/** Id or name */
build: string;
}
export interface GetBuildsSummary {
@@ -1042,7 +1055,8 @@ export interface GetBuildMonthlyStatsResponse {
}
export interface GetBuildVersions {
id: string;
/** Id or name */
build: string;
page?: number;
major?: number;
minor?: number;
@@ -1053,7 +1067,8 @@ export interface ListDockerOrganizations {
}
export interface GetBuilder {
id: string;
/** Id or name */
builder: string;
}
export interface ListBuilders {
@@ -1068,7 +1083,8 @@ export interface GetBuildersSummaryResponse {
}
export interface GetBuilderAvailableAccounts {
id: string;
/** Id or name */
builder: string;
}
export interface GetBuilderAvailableAccountsResponse {
@@ -1077,7 +1093,8 @@ export interface GetBuilderAvailableAccountsResponse {
}
export interface GetDeployment {
id: string;
/** Id or name */
deployment: string;
}
export interface ListDeployments {
@@ -1085,7 +1102,8 @@ export interface ListDeployments {
}
export interface GetDeploymentStatus {
id: string;
/** Id or name */
deployment: string;
}
export interface GetDeploymentStatusResponse {
@@ -1094,12 +1112,14 @@ export interface GetDeploymentStatusResponse {
}
export interface GetLog {
deployment_id: string;
/** Id or name */
deployment: string;
tail: U64;
}
export interface GetDeployedVersion {
deployment_id: string;
/** Id or name */
deployment: string;
}
export interface GetDeployedVersionResponse {
@@ -1107,11 +1127,13 @@ export interface GetDeployedVersionResponse {
}
export interface GetDeploymentStats {
id: string;
/** Id or name */
deployment: string;
}
export interface GetDeploymentActionState {
id: string;
/** Id or name */
deployment: string;
}
export interface GetDeploymentsSummary {
@@ -1158,7 +1180,8 @@ export interface GetCoreInfoResponse {
}
export interface GetProcedure {
id: string;
/** Id or name */
procedure: string;
}
export interface ListProcedures {
@@ -1177,11 +1200,13 @@ export interface GetProceduresSummaryResponse {
}
export interface GetProcedureActionState {
id: string;
/** Id or name */
procedure: string;
}
export interface GetRepo {
id: string;
/** Id or name */
repo: string;
}
export interface ListRepos {
@@ -1189,7 +1214,8 @@ export interface ListRepos {
}
export interface GetRepoActionState {
id: string;
/** Id or name */
repo: string;
}
export interface GetReposSummary {
@@ -1213,7 +1239,8 @@ export interface FindResourcesResponse {
}
export interface GetServer {
id: string;
/** Id or name */
server: string;
}
export interface ListServers {
@@ -1221,7 +1248,8 @@ export interface ListServers {
}
export interface GetServerStatus {
id: string;
/** Id or name */
server: string;
}
export interface GetServerStatusResponse {
@@ -1229,11 +1257,13 @@ export interface GetServerStatusResponse {
}
export interface GetServerActionState {
id: string;
/** Id or name */
server: string;
}
export interface GetPeripheryVersion {
server_id: string;
/** Id or name */
server: string;
}
export interface GetPeripheryVersionResponse {
@@ -1241,43 +1271,53 @@ export interface GetPeripheryVersionResponse {
}
export interface GetSystemInformation {
server_id: string;
/** Id or name */
server: string;
}
export interface GetAllSystemStats {
server_id: string;
/** Id or name */
server: string;
}
export interface GetBasicSystemStats {
server_id: string;
/** Id or name */
server: string;
}
export interface GetCpuUsage {
server_id: string;
/** Id or name */
server: string;
}
export interface GetDiskUsage {
server_id: string;
/** Id or name */
server: string;
}
export interface GetNetworkUsage {
server_id: string;
/** Id or name */
server: string;
}
export interface GetSystemProcesses {
server_id: string;
/** Id or name */
server: string;
}
export interface GetSystemComponents {
server_id: string;
/** Id or name */
server: string;
}
export interface GetDockerNetworks {
server_id: string;
/** Id or name */
server: string;
}
export interface GetHistoricalServerStats {
server_id: string;
/** Id or name */
server: string;
interval: Timelength;
page?: number;
}
@@ -1300,11 +1340,13 @@ export interface GetHistoricalServerStatsResponse {
}
export interface GetDockerImages {
server_id: string;
/** Id or name */
server: string;
}
export interface GetDockerContainers {
server_id: string;
/** Id or name */
server: string;
}
export interface GetServersSummary {
@@ -1318,7 +1360,8 @@ export interface GetServersSummaryResponse {
}
export interface GetAvailableAccounts {
server_id: string;
/** Id or name */
server: string;
}
export interface GetAvailableAccountsResponse {
@@ -1327,11 +1370,13 @@ export interface GetAvailableAccountsResponse {
}
export interface GetAvailableSecrets {
server_id: string;
/** Id or name */
server: string;
}
export interface GetTag {
id: string;
/** Id or name */
tag: string;
}
export interface ListTags {
@@ -1673,6 +1718,8 @@ export interface AwsBuilderConfig {
key_pair_name: string;
assign_public_ip: boolean;
use_public_ip: boolean;
/** The port periphery will be running on */
port: number;
github_accounts?: string[];
docker_accounts?: string[];
}
@@ -1693,9 +1740,25 @@ export interface Permission {
level?: PermissionLevel;
}
export type Execution =
/** For new executions upon instantiation */
| { type: "None", params: None }
| { type: "RunProcedure", params: RunProcedure }
| { type: "RunBuild", params: RunBuild }
| { type: "Deploy", params: Deploy }
| { type: "StartContainer", params: StartContainer }
| { type: "StopContainer", params: StopContainer }
| { type: "StopAllContainers", params: StopAllContainers }
| { type: "RemoveContainer", params: RemoveContainer }
| { type: "CloneRepo", params: CloneRepo }
| { type: "PullRepo", params: PullRepo }
| { type: "PruneDockerNetworks", params: PruneDockerNetworks }
| { type: "PruneDockerImages", params: PruneDockerImages }
| { type: "PruneDockerContainers", params: PruneDockerContainers };
/** Allows to enable / disabled procedures in the sequence / parallel vec on the fly */
export interface EnabledId {
id: string;
export interface EnabledExecution {
execution: Execution;
enabled: boolean;
}
@@ -1842,22 +1905,6 @@ export type WriteRequest =
| { type: "RenameTag", params: RenameTag }
| { type: "UpdateTagsOnResource", params: UpdateTagsOnResource };
export type Execution =
/** For new executions upon instantiation */
| { type: "None", params: None }
| { type: "RunProcedure", params: RunProcedure }
| { type: "RunBuild", params: RunBuild }
| { type: "Deploy", params: Deploy }
| { type: "StartContainer", params: StartContainer }
| { type: "StopContainer", params: StopContainer }
| { type: "StopAllContainers", params: StopAllContainers }
| { type: "RemoveContainer", params: RemoveContainer }
| { type: "CloneRepo", params: CloneRepo }
| { type: "PullRepo", params: PullRepo }
| { type: "PruneDockerNetworks", params: PruneDockerNetworks }
| { type: "PruneDockerImages", params: PruneDockerImages }
| { type: "PruneDockerContainers", params: PruneDockerContainers };
export type WsLoginMessage =
| { type: "Jwt", params: {
jwt: string;