util backup / restore

This commit is contained in:
mbecker20
2025-08-01 04:18:03 -04:00
parent e2c52fea6b
commit 03f3b2c80d
11 changed files with 703 additions and 118 deletions

87
Cargo.lock generated
View File

@@ -116,6 +116,19 @@ version = "1.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "69f7f8c3906b62b754cd5326047894316021dcfe5a194c8ea52bdd94934a3457"
[[package]]
name = "async-compression"
version = "0.4.27"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddb939d66e4ae03cee6091612804ba446b12878410cfa17f785f4dd67d4014e8"
dependencies = [
"flate2",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
]
[[package]]
name = "async-recursion"
version = "1.1.1"
@@ -254,9 +267,9 @@ dependencies = [
[[package]]
name = "aws-sdk-ec2"
version = "1.152.0"
version = "1.154.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e169baec1fd3989e1c627e5522d548da98f3cc92c071ce89298b9862f0b0edf7"
checksum = "6de1f8c39aea7cf84fcd71c6e9b0e736033757cf13e403cea81c47860f757761"
dependencies = [
"aws-credential-types",
"aws-runtime",
@@ -416,7 +429,7 @@ dependencies = [
"hyper-util",
"pin-project-lite",
"rustls 0.21.12",
"rustls 0.23.30",
"rustls 0.23.31",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
@@ -648,7 +661,7 @@ dependencies = [
"hyper 1.6.0",
"hyper-util",
"pin-project-lite",
"rustls 0.23.30",
"rustls 0.23.31",
"rustls-pemfile 2.2.0",
"rustls-pki-types",
"tokio",
@@ -983,9 +996,9 @@ dependencies = [
[[package]]
name = "clap"
version = "4.5.41"
version = "4.5.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "be92d32e80243a54711e5d7ce823c35c41c9d929dc4ab58e1276f625841aadf9"
checksum = "ed87a9d530bb41a67537289bafcac159cb3ee28460e0a4571123d2a778a6a882"
dependencies = [
"clap_builder",
"clap_derive",
@@ -993,9 +1006,9 @@ dependencies = [
[[package]]
name = "clap_builder"
version = "4.5.41"
version = "4.5.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "707eab41e9622f9139419d573eca0900137718000c517d47da73045f54331c3d"
checksum = "64f4f3f3c77c94aff3c7e9aac9a2ca1974a5adf392a8bb751e827d6d127ab966"
dependencies = [
"anstream",
"anstyle",
@@ -1121,6 +1134,15 @@ dependencies = [
"libc",
]
[[package]]
name = "crc32fast"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9481c1c90cbf2ac953f07c8d4a58aa3945c425b7185c9154d67a65e4230da511"
dependencies = [
"cfg-if",
]
[[package]]
name = "croner"
version = "2.2.0"
@@ -1602,6 +1624,16 @@ dependencies = [
"winapi",
]
[[package]]
name = "flate2"
version = "1.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4a3d7db9596fecd151c5f638c0ee5d5bd487b6e0ea232e5dc96d5250f6f94b1d"
dependencies = [
"crc32fast",
"miniz_oxide",
]
[[package]]
name = "fnv"
version = "1.0.7"
@@ -2142,7 +2174,7 @@ dependencies = [
"http 1.3.1",
"hyper 1.6.0",
"hyper-util",
"rustls 0.23.30",
"rustls 0.23.31",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
@@ -2614,7 +2646,7 @@ dependencies = [
"reqwest",
"resolver_api",
"response",
"rustls 0.23.30",
"rustls 0.23.31",
"serde",
"serde_json",
"serde_yaml",
@@ -2625,7 +2657,7 @@ dependencies = [
"tokio",
"tokio-tungstenite 0.27.0",
"tokio-util",
"toml 0.9.2",
"toml 0.9.4",
"toml_pretty",
"tower-http",
"tracing",
@@ -2668,7 +2700,7 @@ dependencies = [
"resolver_api",
"response",
"run_command",
"rustls 0.23.30",
"rustls 0.23.31",
"serde",
"serde_json",
"serde_yaml",
@@ -2686,12 +2718,17 @@ name = "komodo_util"
version = "1.18.5-dev-6"
dependencies = [
"anyhow",
"async-compression",
"chrono",
"dotenvy",
"environment_file",
"envy",
"futures-util",
"mungos",
"serde",
"serde_json",
"tokio",
"tokio-util",
"tracing",
"tracing-subscriber",
]
@@ -3532,7 +3569,7 @@ dependencies = [
"komodo_client",
"reqwest",
"resolver_api",
"rustls 0.23.30",
"rustls 0.23.31",
"serde",
"serde_json",
"serde_qs",
@@ -3721,7 +3758,7 @@ dependencies = [
"quinn-proto",
"quinn-udp",
"rustc-hash 2.1.1",
"rustls 0.23.30",
"rustls 0.23.31",
"socket2 0.5.10",
"thiserror 2.0.12",
"tokio",
@@ -3741,7 +3778,7 @@ dependencies = [
"rand 0.9.2",
"ring",
"rustc-hash 2.1.1",
"rustls 0.23.30",
"rustls 0.23.31",
"rustls-pki-types",
"slab",
"thiserror 2.0.12",
@@ -3943,7 +3980,7 @@ dependencies = [
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls 0.23.30",
"rustls 0.23.31",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"serde",
@@ -4192,9 +4229,9 @@ dependencies = [
[[package]]
name = "rustls"
version = "0.23.30"
version = "0.23.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "069a8df149a16b1a12dcc31497c3396a173844be3cac4bd40c9e7671fef96671"
checksum = "c0ebcbd2f03de0fc1122ad9bb24b127a5a6cd51d72604a3f3c50ac459762b6cc"
dependencies = [
"aws-lc-rs",
"log",
@@ -4477,9 +4514,9 @@ dependencies = [
[[package]]
name = "serde_json"
version = "1.0.141"
version = "1.0.142"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30b9eff21ebe718216c6ec64e1d9ac57087aad11efc64e32002bce4a0d4c03d3"
checksum = "030fedb782600dcbd6f02d479bf0d817ac3bb40d644745b769d6a96bc3afc5a7"
dependencies = [
"indexmap 2.10.0",
"itoa",
@@ -5109,7 +5146,7 @@ version = "0.26.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e727b36a1a0e8b74c376ac2211e40c2c8af09fb4013c60d910495810f008e9b"
dependencies = [
"rustls 0.23.30",
"rustls 0.23.31",
"tokio",
]
@@ -5145,7 +5182,7 @@ checksum = "489a59b6730eda1b0171fcfda8b121f4bee2b35cba8645ca35c5f7ba3eb736c1"
dependencies = [
"futures-util",
"log",
"rustls 0.23.30",
"rustls 0.23.31",
"rustls-native-certs 0.8.1",
"rustls-pki-types",
"tokio",
@@ -5181,9 +5218,9 @@ dependencies = [
[[package]]
name = "toml"
version = "0.9.2"
version = "0.9.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed0aee96c12fa71097902e0bb061a5e1ebd766a6636bb605ba401c45c1650eac"
checksum = "41ae868b5a0f67631c14589f7e250c1ea2c574ee5ba21c6c8dd4b1485705a5a1"
dependencies = [
"indexmap 2.10.0",
"serde",
@@ -5470,7 +5507,7 @@ dependencies = [
"httparse",
"log",
"rand 0.9.2",
"rustls 0.23.30",
"rustls 0.23.31",
"rustls-pki-types",
"sha1",
"thiserror 2.0.12",

View File

@@ -67,10 +67,10 @@ ipnetwork = { version = "0.21.1", features = ["serde"] }
indexmap = { version = "2.10.0", features = ["serde"] }
serde = { version = "1.0.219", features = ["derive"] }
strum = { version = "0.27.2", features = ["derive"] }
serde_json = "1.0.141"
serde_json = "1.0.142"
serde_yaml = "0.9.34"
serde_qs = "0.15.0"
toml = "0.9.2"
toml = "0.9.4"
# ERROR
anyhow = "1.0.98"
@@ -86,7 +86,7 @@ opentelemetry = "0.30.0"
tracing = "0.1.41"
# CONFIG
clap = { version = "4.5.41", features = ["derive"] }
clap = { version = "4.5.42", features = ["derive"] }
dotenvy = "0.15.7"
envy = "0.4.2"
@@ -98,7 +98,7 @@ urlencoding = "2.1.3"
nom_pem = "4.0.0"
bcrypt = "0.17.0"
base64 = "0.22.1"
rustls = "0.23.30"
rustls = "0.23.31"
hmac = "0.12.1"
sha2 = "0.10.9"
rand = "0.9.2"
@@ -111,7 +111,7 @@ sysinfo = "0.36.1"
# CLOUD
aws-config = "1.8.3"
aws-sdk-ec2 = "1.152.0"
aws-sdk-ec2 = "1.154.0"
aws-credential-types = "1.2.4"
## CRON
@@ -121,6 +121,7 @@ chrono = "0.4.41"
croner = "2.2.0"
# MISC
async-compression = { version = "0.4.27", features = ["tokio", "gzip"] }
derive_builder = "0.20.2"
typeshare = "1.0.4"
octorust = "0.10.0"

View File

@@ -1,4 +1,3 @@
use std::net::{SocketAddr, IpAddr};
use anyhow::{Context, anyhow};
use axum::{
Router,
@@ -12,6 +11,7 @@ use axum::{
use derive_variants::ExtractVariant;
use resolver_api::Resolve;
use serror::{AddStatusCode, AddStatusCodeError, Json};
use std::net::{IpAddr, SocketAddr};
use uuid::Uuid;
use crate::config::periphery_config;
@@ -124,23 +124,22 @@ async fn guard_request_by_ip(
.status_code(StatusCode::UNAUTHORIZED)?;
let ip = socket_addr.ip();
let ip_match = periphery_config()
.allowed_ips
.iter()
.any(|net| {
net.contains(ip) ||
match ip {
IpAddr::V4(ipv4) => net.contains(IpAddr::V6(ipv4.to_ipv6_mapped())),
IpAddr::V6(_) => net.contains(ip.to_canonical()),
let ip_match = periphery_config().allowed_ips.iter().any(|net| {
net.contains(ip)
|| match ip {
IpAddr::V4(ipv4) => {
net.contains(IpAddr::V6(ipv4.to_ipv6_mapped()))
}
});
IpAddr::V6(_) => net.contains(ip.to_canonical()),
}
});
if ip_match {
Ok(next.run(req).await)
Ok(next.run(req).await)
} else {
Err(
anyhow!("requesting ip {ip} not allowed")
.status_code(StatusCode::UNAUTHORIZED),
)
Err(
anyhow!("requesting ip {ip} not allowed")
.status_code(StatusCode::UNAUTHORIZED),
)
}
}

View File

@@ -12,12 +12,19 @@ name = "util"
path = "src/main.rs"
[dependencies]
# Local
environment_file.workspace = true
# External
tracing-subscriber.workspace = true
async-compression.workspace = true
futures-util.workspace = true
tokio-util.workspace = true
serde_json.workspace = true
dotenvy.workspace = true
tracing.workspace = true
anyhow.workspace = true
mungos.workspace = true
chrono.workspace = true
tokio.workspace = true
serde.workspace = true
envy.workspace = true

View File

@@ -0,0 +1,219 @@
use std::{path::PathBuf, str::FromStr, time::Duration};
use anyhow::Context;
use async_compression::tokio::write::GzipEncoder;
// use async_compression::tokio::write::GzipEncoder;
use chrono::Local;
use environment_file::maybe_read_item_from_file;
use futures_util::{
SinkExt, StreamExt, TryStreamExt, stream::FuturesUnordered,
};
use mungos::{
init::MongoBuilder,
mongodb::bson::{Document, RawDocumentBuf},
};
use serde::Deserialize;
use tokio::io::{AsyncWriteExt, BufWriter};
use tokio_util::codec::{FramedWrite, LinesCodec};
#[derive(Deserialize)]
struct Env {
/// The root folder to store timestamped backup folders in.
#[serde(default = "default_backup_folder")]
komodo_backup_folder: PathBuf,
/// Give the source database some time to initialize.
/// Default: 0
#[serde(default)]
startup_sleep_seconds: u64,
#[serde(alias = "komodo_mongo_uri")]
komodo_database_uri: Option<String>,
#[serde(alias = "komodo_mongo_uri_file")]
komodo_database_uri_file: Option<PathBuf>,
#[serde(alias = "komodo_mongo_address")]
komodo_database_address: Option<String>,
#[serde(alias = "komodo_mongo_username")]
komodo_database_username: Option<String>,
#[serde(alias = "komodo_mongo_username_file")]
komodo_database_username_file: Option<PathBuf>,
#[serde(alias = "komodo_mongo_password")]
komodo_database_password: Option<String>,
#[serde(alias = "komodo_mongo_password_file")]
komodo_database_password_file: Option<PathBuf>,
#[serde(
default = "default_app_name",
alias = "komodo_mongo_app_name"
)]
komodo_database_app_name: String,
#[serde(
default = "default_db_name",
alias = "komodo_mongo_db_name"
)]
komodo_database_db_name: String,
}
fn default_app_name() -> String {
String::from("komodo-backup")
}
fn default_db_name() -> String {
String::from("komodo")
}
fn default_backup_folder() -> PathBuf {
// SAFE: /backup is a valid path.
PathBuf::from_str("/backup").unwrap()
}
pub async fn main() -> anyhow::Result<()> {
let env = envy::from_env::<Env>()?;
if env.startup_sleep_seconds > 0 {
info!("Sleeping for {} seconds...", env.startup_sleep_seconds);
tokio::time::sleep(Duration::from_secs(
env.startup_sleep_seconds,
))
.await;
}
let now_backup_folder = env
.komodo_backup_folder
.join(Local::now().format("%Y-%m-%d_%H-%M-%S").to_string());
tokio::fs::create_dir_all(&now_backup_folder)
.await
.context("Failed to create backup folder")?;
info!("Backing up to {now_backup_folder:?}");
let mut db_builder = MongoBuilder::default();
if let Some(uri) = maybe_read_item_from_file(
env.komodo_database_uri_file,
env.komodo_database_uri,
) {
db_builder = db_builder.uri(uri);
}
if let Some(address) = env.komodo_database_address {
db_builder = db_builder.address(address);
}
if let Some(username) = maybe_read_item_from_file(
env.komodo_database_username_file,
env.komodo_database_username,
) {
db_builder = db_builder.username(username);
}
if let Some(password) = maybe_read_item_from_file(
env.komodo_database_password_file,
env.komodo_database_password,
) {
db_builder = db_builder.password(password);
}
let source_db = db_builder
.app_name(env.komodo_database_app_name)
.build()
.await
.context("Failed to initialize source database")?
.database(&env.komodo_database_db_name);
let mut handles = source_db
.list_collection_names()
.await
.context("Failed to list collections on source db")?.into_iter().map(|collection| {
let source = source_db.collection::<RawDocumentBuf>(&collection);
let file_path = if collection == "Stats" {
env.komodo_backup_folder.join("Stats.jsonl.gz")
} else {
now_backup_folder.join(format!("{collection}.jsonl.gz"))
};
tokio::spawn(async move {
let res = async {
let mut count = 0;
let _ = tokio::fs::remove_file(&file_path).await;
let file = tokio::fs::File::create(&file_path)
.await
.with_context(|| format!("Failed to create file at {file_path:?}"))?;
let mut writer = FramedWrite::new(
BufWriter::new(GzipEncoder::with_quality(
file, async_compression::Level::Best
)),
LinesCodec::new()
);
let mut cursor = source
.find(Document::new())
.await
.context("Failed to query source collection")?;
while let Some(doc) = cursor
.try_next()
.await
.context("Failed to get next document")?
{
count += 1;
let json = match serde_json::to_string(&doc).context("Failed to serialize document to json") {
Ok(json) => json,
Err(e) => {
warn!("{e:#}");
continue
}
};
if let Err(e) = writer.send(json)
.await
.context("Failed to write document to file")
{
warn!("{e:#}");
}
}
if let Err(e) = <_ as SinkExt<String>>::flush(&mut writer)
.await
.context("Failed to flush writer")
{
error!("{e:#}");
};
if let Err(e) = writer
.into_inner()
.shutdown()
.await
.context("Failed to shutdown writer compression")
{
error!("{e:#}");
}
anyhow::Ok(count)
}
.await;
match res {
Ok(count) => {
if count > 0 {
info!("Finished backing up {collection} collection | Backed up {count}");
}
}
Err(e) => {
error!("Failed to backup {collection} collection | {e:#}")
}
}
})
}).collect::<FuturesUnordered<_>>();
loop {
match handles.next().await {
Some(Ok(())) => {}
Some(Err(e)) => {
error!("{e:#}");
}
None => break,
}
}
info!("Finished backing up database ✅");
Ok(())
}

View File

@@ -1,7 +1,9 @@
use std::time::Duration;
use anyhow::Context;
use futures_util::{TryStreamExt, future::join_all};
use futures_util::{
StreamExt, TryStreamExt, stream::FuturesUnordered,
};
use mungos::{
init::MongoBuilder,
mongodb::{
@@ -26,6 +28,7 @@ struct Env {
#[serde(default = "default_db_name")]
target_db_name: String,
/// Give the target database some time to initialize.
/// Default: 5
#[serde(default = "default_startup_sleep_seconds")]
startup_sleep_seconds: u64,
}
@@ -60,70 +63,75 @@ pub async fn main() -> anyhow::Result<()> {
.context("Invalid SOURCE_URI")?
.database(&env.target_db_name);
let mut handles = Vec::new();
for collection in source_db
let mut handles = source_db
.list_collection_names()
.await
.context("Failed to list collections on source db")?
{
let source = source_db.collection::<RawDocumentBuf>(&collection);
let target = target_db.collection::<RawDocumentBuf>(&collection);
.context("Failed to list collections on source db")?.into_iter().map(|collection| {
let source = source_db.collection::<RawDocumentBuf>(&collection);
let target = target_db.collection::<RawDocumentBuf>(&collection);
handles.push(tokio::spawn(async move {
let res = async {
let mut buffer = Vec::<RawDocumentBuf>::new();
let mut count = 0;
let mut cursor = source
.find(Document::new())
.await
.context("Failed to query source collection")?;
while let Some(doc) = cursor
.try_next()
.await
.context("Failed to get next document")?
{
count += 1;
buffer.push(doc);
if buffer.len() >= 20_000 {
if let Err(e) = target
tokio::spawn(async move {
let res = async {
let mut buffer = Vec::<RawDocumentBuf>::new();
let mut count = 0;
let mut cursor = source
.find(Document::new())
.await
.context("Failed to query source collection")?;
while let Some(doc) = cursor
.try_next()
.await
.context("Failed to get next document")?
{
count += 1;
buffer.push(doc);
if buffer.len() >= 20_000 {
if let Err(e) = target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
{
error!("Failed to flush document batch in {collection} collection | {e:#}");
};
buffer.clear();
}
}
if !buffer.is_empty() {
target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
{
error!("Failed to flush document batch in {collection} collection | {e:#}");
};
buffer.clear();
.context("Failed to flush documents")?;
}
anyhow::Ok(count)
}
.await;
match res {
Ok(count) => {
if count > 0 {
info!("Finished copying {collection} collection | Copied {count}");
}
}
Err(e) => {
error!("Failed to copy {collection} collection | {e:#}")
}
}
if !buffer.is_empty() {
target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
.context("Failed to flush documents")?;
}
anyhow::Ok(count)
}
.await;
match res {
Ok(count) => {
if count > 0 {
info!("Finished copying {collection} collection | Copied {count}");
}
}
Err(e) => {
error!("Failed to copy {collection} collection | {e:#}")
}
}
}));
}
})
}).collect::<FuturesUnordered<_>>();
join_all(handles).await;
loop {
match handles.next().await {
Some(Ok(())) => {}
Some(Err(e)) => {
error!("{e:#}");
}
None => break,
}
}
info!("Finished copying database ✅");

View File

@@ -3,11 +3,15 @@ extern crate tracing;
use serde::Deserialize;
mod backup_database;
mod copy_database;
mod restore_database;
#[derive(Deserialize, Debug, Default)]
enum Mode {
#[default]
BackupDatabase,
RestoreDatabase,
CopyDatabase,
}
@@ -27,6 +31,8 @@ async fn app() -> anyhow::Result<()> {
match env.mode {
Mode::CopyDatabase => copy_database::main().await,
Mode::BackupDatabase => backup_database::main().await,
Mode::RestoreDatabase => restore_database::main().await,
}
}

View File

@@ -0,0 +1,274 @@
use std::{path::PathBuf, str::FromStr};
use anyhow::Context;
use async_compression::tokio::bufread::GzipDecoder;
use environment_file::maybe_read_item_from_file;
use futures_util::{
StreamExt, TryStreamExt, stream::FuturesUnordered,
};
use mungos::{
init::MongoBuilder,
mongodb::{bson::Document, options::InsertManyOptions},
};
use serde::Deserialize;
use tokio::io::BufReader;
#[derive(Deserialize)]
struct Env {
/// The root folder to store timestamped backup folders in.
#[serde(default = "default_backup_folder")]
komodo_backup_folder: PathBuf,
/// A specific dated folder to restore, relative to `KOMODO_BACKUP_FOLDER`.
/// If not provided, will use the most recent folder.
komodo_restore_folder: Option<PathBuf>,
komodo_database_uri: Option<String>,
komodo_database_uri_file: Option<PathBuf>,
komodo_database_address: Option<String>,
komodo_database_username: Option<String>,
komodo_database_username_file: Option<PathBuf>,
komodo_database_password: Option<String>,
komodo_database_password_file: Option<PathBuf>,
#[serde(default = "default_app_name")]
komodo_database_app_name: String,
#[serde(default = "default_db_name")]
komodo_database_db_name: String,
}
fn default_app_name() -> String {
String::from("komodo-restore")
}
fn default_db_name() -> String {
String::from("komodo")
}
fn default_backup_folder() -> PathBuf {
// SAFE: /backup is a valid path.
PathBuf::from_str("/backup").unwrap()
}
pub async fn main() -> anyhow::Result<()> {
let env = envy::from_env::<Env>()?;
let restore_folder =
if let Some(restore_folder) = env.komodo_restore_folder {
env.komodo_backup_folder.join(&restore_folder)
} else {
latest_restore_folder(&env).await?
}
.components()
.collect::<PathBuf>();
info!("Restore folder: {restore_folder:?}");
let mut restore_dir = tokio::fs::read_dir(&restore_folder)
.await
.with_context(|| {
format!("Failed to read restore directory {restore_folder:?}")
})?;
let mut restore_files: Vec<(String, PathBuf)> = vec![(
String::from("Stats"),
env
.komodo_backup_folder
.join("Stats.jsonl.gz")
.components()
.collect(),
)];
loop {
match restore_dir
.next_entry()
.await
.context("Failed to read restore dir entry")
{
Ok(Some(file)) => {
let path = file.path();
let Some(file_name) = path.file_name() else {
continue;
};
let Some(file_name) = file_name.to_str() else {
continue;
};
let Some(collection) = file_name.strip_suffix(".jsonl.gz")
else {
continue;
};
restore_files.push((
collection.to_string(),
path.components().collect(),
));
}
Ok(None) => break,
Err(e) => {
warn!("{e:#}");
continue;
}
}
}
// info!("Restoring: {restore_files:#?}");
let mut db_builder = MongoBuilder::default();
if let Some(uri) = maybe_read_item_from_file(
env.komodo_database_uri_file,
env.komodo_database_uri,
) {
db_builder = db_builder.uri(uri);
}
if let Some(address) = env.komodo_database_address {
db_builder = db_builder.address(address);
}
if let Some(username) = maybe_read_item_from_file(
env.komodo_database_username_file,
env.komodo_database_username,
) {
db_builder = db_builder.username(username);
}
if let Some(password) = maybe_read_item_from_file(
env.komodo_database_password_file,
env.komodo_database_password,
) {
db_builder = db_builder.password(password);
}
let target_db = db_builder
.app_name(env.komodo_database_app_name)
.build()
.await
.context("Failed to initialize target database")?
.database(&env.komodo_database_db_name);
let mut handles = restore_files
.into_iter()
.map(|(collection, restore_file)| {
let target =
target_db.collection::<Document>(&collection);
async {
let col = collection.clone();
tokio::join!(
async { col },
tokio::spawn(async move {
let res = async {
let mut buffer = Vec::<Document>::new();
let mut count = 0;
let file = tokio::fs::File::open(&restore_file)
.await
.with_context(|| format!("Failed to open file {restore_file:?}"))?;
let mut reader = tokio_util::codec::FramedRead::new(
GzipDecoder::new(BufReader::new(file)),
tokio_util::codec::LinesCodec::new()
);
while let Some(line) = reader.try_next()
.await
.context("Failed to get next line")?
{
let line = line.trim();
if line.is_empty() {
continue;
}
let doc = match serde_json::from_str::<Document>(line)
.context("Failed to deserialize line to document")
{
Ok(doc) => doc,
Err(e) => {
warn!("{e:#} | {line}");
continue;
}
};
count += 1;
buffer.push(doc);
if buffer.len() >= 20_000 {
if let Err(e) = target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
{
error!("Failed to flush document batch in {collection} collection | {e:#}");
};
buffer.clear();
}
}
if !buffer.is_empty() {
target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
.context("Failed to flush documents")?;
}
anyhow::Ok(count)
}.await;
match res {
Ok(count) => {
if count > 0 {
info!("Finished restoring {collection} collection | Restored {count}");
}
}
Err(e) => {
error!("Failed to restore {collection} collection | {e:#}")
}
}
})
)
}
})
.collect::<FuturesUnordered<_>>();
loop {
match handles.next().await {
Some((_collection, Ok(()))) => {
// info!("[{collection}]: finished");
}
Some((collection, Err(e))) => {
error!("[{collection}]: {e:#}");
}
None => break,
}
}
info!("Finished restoring database ✅");
Ok(())
}
async fn latest_restore_folder(env: &Env) -> anyhow::Result<PathBuf> {
let mut max = PathBuf::new();
let mut backups_dir =
tokio::fs::read_dir(&env.komodo_backup_folder)
.await
.context("Failed to read restore directory")?;
loop {
match backups_dir
.next_entry()
.await
.context("Failed to read dir entry")
{
Ok(Some(entry)) => {
let path = entry.path();
if path.is_dir() && path > max {
max = path;
}
}
Ok(None) => break,
Err(e) => {
warn!("{e:#}");
continue;
}
}
}
Ok(max.components().collect())
}

View File

@@ -0,0 +1,49 @@
use clap::Parser;
use derive_empty_traits::EmptyTraits;
use resolver_api::Resolve;
use serde::{Deserialize, Serialize};
use typeshare::typeshare;
use crate::entities::update::Update;
use super::KomodoExecuteRequest;
/// Clears all repos from the Core repo cache. Admin only.
#[typeshare]
#[derive(
Debug,
Clone,
PartialEq,
Serialize,
Deserialize,
Resolve,
EmptyTraits,
Parser,
)]
#[empty_traits(KomodoExecuteRequest)]
#[response(Update)]
#[error(serror::Error)]
pub struct ClearRepoCache {}
/// Backs up the database to compressed jsonl files. Admin only.
///
/// Mount a folder to `/backup`, and Core will use it to create
/// timestamped database dumps, which can be restored using
/// `ghcr.io/moghtech/komodo-util`.
///
/// TODO: Link to docs
#[typeshare]
#[derive(
Debug,
Clone,
PartialEq,
Serialize,
Deserialize,
Resolve,
EmptyTraits,
Parser,
)]
#[empty_traits(KomodoExecuteRequest)]
#[response(Update)]
#[error(serror::Error)]
pub struct BackupDatabase {}

View File

@@ -10,6 +10,7 @@ mod action;
mod alerter;
mod build;
mod deployment;
mod maintenance;
mod procedure;
mod repo;
mod server;
@@ -20,6 +21,7 @@ pub use action::*;
pub use alerter::*;
pub use build::*;
pub use deployment::*;
pub use maintenance::*;
pub use procedure::*;
pub use repo::*;
pub use server::*;
@@ -154,23 +156,6 @@ pub struct Sleep {
pub duration_ms: I64,
}
/// Clears all repos from the Core repo cache. Admin only.
#[typeshare]
#[derive(
Serialize,
Deserialize,
Debug,
Clone,
PartialEq,
Resolve,
EmptyTraits,
Parser,
)]
#[empty_traits(KomodoExecuteRequest)]
#[response(Update)]
#[error(serror::Error)]
pub struct ClearRepoCache {}
#[typeshare]
pub type BatchExecutionResponse = Vec<BatchExecutionResponseItem>;

View File

@@ -3,18 +3,18 @@
mod conversion;
mod environment;
mod file_contents;
mod forgiving_vec;
mod labels;
mod maybe_string_i64;
mod permission;
mod string_list;
mod term_signal_labels;
mod forgiving_vec;
pub use conversion::*;
pub use environment::*;
pub use file_contents::*;
pub use forgiving_vec::*;
pub use labels::*;
pub use maybe_string_i64::*;
pub use string_list::*;
pub use term_signal_labels::*;
pub use forgiving_vec::*;