db copy / restore uses idempotent upsert

This commit is contained in:
mbecker20
2025-08-06 21:25:44 -04:00
parent 34da2063f9
commit 2c328f27e4
2 changed files with 48 additions and 46 deletions

View File

@@ -2,10 +2,12 @@ use anyhow::Context;
use futures_util::{
StreamExt, TryStreamExt, stream::FuturesUnordered,
};
use mungos::mongodb::{
Database,
bson::{Document, RawDocumentBuf},
options::InsertManyOptions,
use mungos::{
bulk_update::{BulkUpdate, bulk_update_retry_too_big},
mongodb::{
Database,
bson::{Document, doc},
},
};
use tracing::{error, info};
@@ -17,31 +19,35 @@ pub async fn copy(
.list_collection_names()
.await
.context("Failed to list collections on source db")?.into_iter().map(|collection| {
let source = source_db.collection::<RawDocumentBuf>(&collection);
let target = target_db.collection::<RawDocumentBuf>(&collection);
let source = source_db.collection::<Document>(&collection);
let target_db = target_db.clone();
tokio::spawn(async move {
let res = async {
let mut buffer = Vec::<RawDocumentBuf>::new();
let mut buffer = Vec::<BulkUpdate>::new();
// The update collection is bigger than others,
// can hit the max bson limit on the bulk upsert call without this.
let max_buffer = if collection == "Update" {
1_000
} else {
10_000
};
let mut count = 0;
let mut cursor = source
.find(Document::new())
.await
.context("Failed to query source collection")?;
while let Some(doc) = cursor
while let Some(document) = cursor
.try_next()
.await
.context("Failed to get next document")?
{
let Some(id) = document.get("_id").and_then(|id| id.as_object_id()) else {
continue;
};
count += 1;
buffer.push(doc);
if buffer.len() >= 20_000 {
if let Err(e) = target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
buffer.push(BulkUpdate { query: doc! { "_id": id }, update: doc! { "$set": document } });
if buffer.len() >= max_buffer {
if let Err(e) = bulk_update_retry_too_big(&target_db, &collection, &buffer, true).await.context("Failed to flush documents")
{
error!("Failed to flush document batch in {collection} collection | {e:#}");
};
@@ -49,13 +55,8 @@ pub async fn copy(
}
}
if !buffer.is_empty() {
target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
.context("Failed to flush documents")?;
bulk_update_retry_too_big(&target_db, &collection, &buffer, true).await.context("Failed to flush documents")?;
}
anyhow::Ok(count)
}

View File

@@ -5,8 +5,12 @@ use async_compression::tokio::bufread::GzipDecoder;
use futures_util::{
StreamExt, TryStreamExt, stream::FuturesUnordered,
};
use mungos::mongodb::{
Database, bson::Document, options::InsertManyOptions,
use mungos::{
bulk_update::{BulkUpdate, bulk_update_retry_too_big},
mongodb::{
Database,
bson::{Document, doc},
},
};
use tokio::io::BufReader;
use tokio_util::codec::{FramedRead, LinesCodec};
@@ -34,16 +38,21 @@ pub async fn restore(
let mut handles = restore_files
.into_iter()
.map(|(collection, restore_file)| {
let target =
db.collection::<Document>(&collection);
let db = db.clone();
async {
let col = collection.clone();
tokio::join!(
async { col },
tokio::spawn(async move {
let res = async {
let mut buffer = Vec::<Document>::new();
let mut buffer = Vec::<BulkUpdate>::new();
// The update collection is bigger than others,
// can hit the max bson limit on the bulk upsert call without this.
let max_buffer = if collection == "Update" {
1_000
} else {
10_000
};
let mut count = 0;
let file = tokio::fs::File::open(&restore_file)
@@ -62,7 +71,7 @@ pub async fn restore(
if line.is_empty() {
continue;
}
let doc = match serde_json::from_str::<Document>(&line)
let document = match serde_json::from_str::<Document>(&line)
.context("Failed to deserialize line")
{
Ok(doc) => doc,
@@ -71,15 +80,13 @@ pub async fn restore(
continue;
}
};
let Some(id) = document.get("_id").and_then(|id| id.as_object_id()) else {
continue;
};
count += 1;
buffer.push(doc);
if buffer.len() >= 20_000 {
if let Err(e) = target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
buffer.push(BulkUpdate { query: doc! { "_id": id }, update: doc! { "$set": document } });
if buffer.len() >= max_buffer {
if let Err(e) = bulk_update_retry_too_big(&db, &collection, &buffer, true).await.context("Failed to flush documents")
{
error!("Failed to flush document batch in {collection} collection | {e:#}");
};
@@ -87,13 +94,7 @@ pub async fn restore(
}
}
if !buffer.is_empty() {
target
.insert_many(&buffer)
.with_options(
InsertManyOptions::builder().ordered(false).build(),
)
.await
.context("Failed to flush documents")?;
bulk_update_retry_too_big(&db, &collection, &buffer, true).await.context("Failed to flush documents")?;
}
anyhow::Ok(count)
}.await;