replace unwraps with correct error
This commit is contained in:
parent
56766cffc3
commit
3747f5bdd8
|
@ -1,27 +1,51 @@
|
|||
use std::fs::{File, create_dir_all};
|
||||
use std::io::{BufReader, BufWriter, Write};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::fs::{create_dir_all, File};
|
||||
use std::io::{self, BufReader, BufWriter, Write};
|
||||
use std::ops::{Deref, DerefMut};
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use milli::documents::DocumentBatchReader;
|
||||
use serde_json::Map;
|
||||
use tempfile::NamedTempFile;
|
||||
use tempfile::{NamedTempFile, PersistError};
|
||||
use uuid::Uuid;
|
||||
|
||||
const UPDATE_FILES_PATH: &str = "updates/updates_files";
|
||||
|
||||
use crate::document_formats::read_jsonl;
|
||||
|
||||
use super::error::Result;
|
||||
|
||||
pub struct UpdateFile {
|
||||
path: PathBuf,
|
||||
file: NamedTempFile,
|
||||
}
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
#[error("Error while persisting update to disk: {0}")]
|
||||
pub struct UpdateFileStoreError(Box<dyn std::error::Error + Sync + Send + 'static>);
|
||||
|
||||
type Result<T> = std::result::Result<T, UpdateFileStoreError>;
|
||||
|
||||
macro_rules! into_update_store_error {
|
||||
($($other:path),*) => {
|
||||
$(
|
||||
impl From<$other> for UpdateFileStoreError {
|
||||
fn from(other: $other) -> Self {
|
||||
Self(Box::new(other))
|
||||
}
|
||||
}
|
||||
)*
|
||||
};
|
||||
}
|
||||
|
||||
into_update_store_error!(
|
||||
PersistError,
|
||||
io::Error,
|
||||
serde_json::Error,
|
||||
milli::documents::Error
|
||||
);
|
||||
|
||||
impl UpdateFile {
|
||||
pub fn persist(self) {
|
||||
self.file.persist(&self.path).unwrap();
|
||||
pub fn persist(self) -> Result<()> {
|
||||
self.file.persist(&self.path)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -49,15 +73,17 @@ impl UpdateFileStore {
|
|||
let src_update_files_path = src.as_ref().join(UPDATE_FILES_PATH);
|
||||
let dst_update_files_path = dst.as_ref().join(UPDATE_FILES_PATH);
|
||||
|
||||
create_dir_all(&dst_update_files_path).unwrap();
|
||||
create_dir_all(&dst_update_files_path)?;
|
||||
|
||||
let entries = std::fs::read_dir(src_update_files_path).unwrap();
|
||||
let entries = std::fs::read_dir(src_update_files_path)?;
|
||||
|
||||
for entry in entries {
|
||||
let entry = entry.unwrap();
|
||||
let update_file = BufReader::new(File::open(entry.path()).unwrap());
|
||||
let entry = entry?;
|
||||
let update_file = BufReader::new(File::open(entry.path())?);
|
||||
let file_uuid = entry.file_name();
|
||||
let file_uuid = file_uuid.to_str().ok_or_else(|| anyhow::anyhow!("invalid update file name"))?;
|
||||
let file_uuid = file_uuid
|
||||
.to_str()
|
||||
.ok_or_else(|| anyhow::anyhow!("invalid update file name"))?;
|
||||
let dst_path = dst_update_files_path.join(file_uuid);
|
||||
let dst_file = BufWriter::new(File::create(dst_path)?);
|
||||
read_jsonl(update_file, dst_file)?;
|
||||
|
@ -68,7 +94,7 @@ impl UpdateFileStore {
|
|||
|
||||
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
|
||||
let path = path.as_ref().join(UPDATE_FILES_PATH);
|
||||
std::fs::create_dir_all(&path).unwrap();
|
||||
std::fs::create_dir_all(&path)?;
|
||||
Ok(Self { path })
|
||||
}
|
||||
|
||||
|
@ -76,7 +102,7 @@ impl UpdateFileStore {
|
|||
///
|
||||
/// A call to persist is needed to persist in the database.
|
||||
pub fn new_update(&self) -> Result<(Uuid, UpdateFile)> {
|
||||
let file = NamedTempFile::new().unwrap();
|
||||
let file = NamedTempFile::new()?;
|
||||
let uuid = Uuid::new_v4();
|
||||
let path = self.path.join(uuid.to_string());
|
||||
let update_file = UpdateFile { file, path };
|
||||
|
@ -87,7 +113,7 @@ impl UpdateFileStore {
|
|||
/// Returns a the file corresponding to the requested uuid.
|
||||
pub fn get_update(&self, uuid: Uuid) -> Result<File> {
|
||||
let path = self.path.join(uuid.to_string());
|
||||
let file = File::open(path).unwrap();
|
||||
let file = File::open(path)?;
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
|
@ -95,9 +121,9 @@ impl UpdateFileStore {
|
|||
pub fn snapshot(&self, uuid: Uuid, dst: impl AsRef<Path>) -> Result<()> {
|
||||
let src = self.path.join(uuid.to_string());
|
||||
let mut dst = dst.as_ref().join(UPDATE_FILES_PATH);
|
||||
std::fs::create_dir_all(&dst).unwrap();
|
||||
std::fs::create_dir_all(&dst)?;
|
||||
dst.push(uuid.to_string());
|
||||
std::fs::copy(src, dst).unwrap();
|
||||
std::fs::copy(src, dst)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
|
@ -106,29 +132,30 @@ impl UpdateFileStore {
|
|||
let uuid_string = uuid.to_string();
|
||||
let update_file_path = self.path.join(&uuid_string);
|
||||
let mut dst = dump_path.as_ref().join(UPDATE_FILES_PATH);
|
||||
std::fs::create_dir_all(&dst).unwrap();
|
||||
std::fs::create_dir_all(&dst)?;
|
||||
dst.push(&uuid_string);
|
||||
|
||||
let update_file = File::open(update_file_path).unwrap();
|
||||
let mut dst_file = NamedTempFile::new().unwrap();
|
||||
let mut document_reader = DocumentBatchReader::from_reader(update_file).unwrap();
|
||||
let update_file = File::open(update_file_path)?;
|
||||
let mut dst_file = NamedTempFile::new()?;
|
||||
let mut document_reader = DocumentBatchReader::from_reader(update_file)?;
|
||||
|
||||
let mut document_buffer = Map::new();
|
||||
// TODO: we need to find a way to do this more efficiently. (create a custom serializer to
|
||||
// jsonl for example...)
|
||||
while let Some((index, document)) = document_reader.next_document_with_index().unwrap() {
|
||||
while let Some((index, document)) = document_reader.next_document_with_index()? {
|
||||
for (field_id, content) in document.iter() {
|
||||
let field_name = index.get_by_left(&field_id).unwrap();
|
||||
let content = serde_json::from_slice(content).unwrap();
|
||||
document_buffer.insert(field_name.to_string(), content);
|
||||
if let Some(field_name) = index.get_by_left(&field_id) {
|
||||
let content = serde_json::from_slice(content)?;
|
||||
document_buffer.insert(field_name.to_string(), content);
|
||||
}
|
||||
}
|
||||
|
||||
serde_json::to_writer(&mut dst_file, &document_buffer).unwrap();
|
||||
dst_file.write(b"\n").unwrap();
|
||||
serde_json::to_writer(&mut dst_file, &document_buffer)?;
|
||||
dst_file.write(b"\n")?;
|
||||
document_buffer.clear();
|
||||
}
|
||||
|
||||
dst_file.persist(dst).unwrap();
|
||||
dst_file.persist(dst)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -3,6 +3,8 @@ use std::error::Error;
|
|||
|
||||
use meilisearch_error::{Code, ErrorCode};
|
||||
|
||||
use crate::index_controller::update_file_store::UpdateFileStoreError;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, UpdateLoopError>;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
|
@ -42,7 +44,8 @@ internal_error!(
|
|||
UpdateLoopError: heed::Error,
|
||||
std::io::Error,
|
||||
serde_json::Error,
|
||||
tokio::task::JoinError
|
||||
tokio::task::JoinError,
|
||||
UpdateFileStoreError
|
||||
);
|
||||
|
||||
impl ErrorCode for UpdateLoopError {
|
||||
|
|
|
@ -231,7 +231,7 @@ impl UpdateLoop {
|
|||
builder.add_documents(documents).unwrap();
|
||||
builder.finish().unwrap();
|
||||
|
||||
file.persist();
|
||||
file.persist()?;
|
||||
|
||||
Ok(uuid)
|
||||
})
|
||||
|
|
Loading…
Reference in New Issue