back to LinesCodec

This commit is contained in:
mbecker20
2025-08-01 05:05:38 -04:00
parent 178203c07c
commit 4729dc2e3c
2 changed files with 10 additions and 11 deletions

View File

@@ -58,14 +58,14 @@ pub async fn main() -> anyhow::Result<()> {
.context("Failed to get next document")?
{
count += 1;
let json = match serde_json::to_string(&doc).context("Failed to serialize document to json") {
Ok(json) => json,
let str = match serde_json::to_string(&doc).context("Failed to serialize document") {
Ok(str) => str,
Err(e) => {
warn!("{e:#}");
continue
}
};
if let Err(e) = writer.send(json)
if let Err(e) = writer.send(str)
.await
.context("Failed to write document to file")
{

View File

@@ -8,6 +8,7 @@ use futures_util::{
use mungos::mongodb::{bson::Document, options::InsertManyOptions};
use serde::Deserialize;
use tokio::io::BufReader;
use tokio_util::codec::{FramedRead, LinesCodec};
#[derive(Deserialize)]
struct Env {
@@ -57,25 +58,24 @@ pub async fn main() -> anyhow::Result<()> {
.await
.with_context(|| format!("Failed to open file {restore_file:?}"))?;
let mut reader = tokio_util::codec::FramedRead::new(
let mut reader = FramedRead::new(
GzipDecoder::new(BufReader::new(file)),
tokio_util::codec::LinesCodec::new()
LinesCodec::new()
);
while let Some(line) = reader.try_next()
.await
.context("Failed to get next line")?
{
let line = line.trim();
if line.is_empty() {
continue;
}
let doc = match serde_json::from_str::<Document>(line)
.context("Failed to deserialize line to document")
let doc = match serde_json::from_str::<Document>(&line)
.context("Failed to deserialize line")
{
Ok(doc) => doc,
Err(e) => {
warn!("{e:#} | {line}");
warn!("{e:#}");
continue;
}
};
@@ -198,8 +198,7 @@ async fn get_restore_files(
let Some(file_name) = file_name.to_str() else {
continue;
};
let Some(collection) = file_name.strip_suffix(".gz")
else {
let Some(collection) = file_name.strip_suffix(".gz") else {
continue;
};
restore_files.push((