fix-snapshot

This commit is contained in:
Marin Postma 2021-05-11 12:18:10 +02:00
parent ceb8d6e1c9
commit 1b671d4302
No known key found for this signature in database
GPG Key ID: D5241F0C0C865F30
4 changed files with 38 additions and 26 deletions

View File

@ -1,6 +1,7 @@
name: Rust name: Rust
on: on:
workflow_dispatch:
pull_request: pull_request:
push: push:
# trying and staging branches are for Bors config # trying and staging branches are for Bors config

View File

@ -3,17 +3,15 @@ use std::io::SeekFrom;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::Arc; use std::sync::Arc;
use futures::StreamExt;
use log::info; use log::info;
use oxidized_json_checker::JsonChecker; use oxidized_json_checker::JsonChecker;
use tokio::fs; use tokio::fs;
use tokio::io::AsyncWriteExt; use tokio::io::AsyncWriteExt;
use tokio::runtime::Handle;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo}; use super::{PayloadData, Result, UpdateError, UpdateMsg, UpdateStore, UpdateStoreInfo};
use crate::index_controller::index_actor::{IndexActorHandle, CONCURRENT_INDEX_MSG}; use crate::index_controller::index_actor::{IndexActorHandle};
use crate::index_controller::{UpdateMeta, UpdateStatus}; use crate::index_controller::{UpdateMeta, UpdateStatus};
pub struct UpdateActor<D, I> { pub struct UpdateActor<D, I> {
@ -207,25 +205,8 @@ where
async fn handle_snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> { async fn handle_snapshot(&self, uuids: HashSet<Uuid>, path: PathBuf) -> Result<()> {
let index_handle = self.index_handle.clone(); let index_handle = self.index_handle.clone();
let update_store = self.store.clone(); let update_store = self.store.clone();
tokio::task::spawn_blocking(move || -> anyhow::Result<()> {
update_store.snapshot(&uuids, &path)?;
// Perform the snapshot of each index concurently. Only a third of the capabilities of tokio::task::spawn_blocking(move || update_store.snapshot(&uuids, &path, index_handle))
// the index actor at a time not to put too much pressure on the index actor
let path = &path;
let handle = &index_handle;
let mut stream = futures::stream::iter(uuids.iter())
.map(|&uuid| handle.snapshot(uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(())
})
})
.await .await
.map_err(|e| UpdateError::Error(e.into()))? .map_err(|e| UpdateError::Error(e.into()))?
.map_err(|e| UpdateError::Error(e.into()))?; .map_err(|e| UpdateError::Error(e.into()))?;

View File

@ -8,6 +8,7 @@ use std::sync::Arc;
use anyhow::Context; use anyhow::Context;
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use futures::StreamExt;
use heed::types::{ByteSlice, OwnedType, SerdeJson}; use heed::types::{ByteSlice, OwnedType, SerdeJson};
use heed::zerocopy::U64; use heed::zerocopy::U64;
use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions}; use heed::{BytesDecode, BytesEncode, CompactionOption, Database, Env, EnvOpenOptions};
@ -17,8 +18,11 @@ use tokio::sync::mpsc;
use uuid::Uuid; use uuid::Uuid;
use super::UpdateMeta; use super::UpdateMeta;
use crate::helpers::EnvSizer; use crate::index_controller::{updates::*, IndexActorHandle};
use crate::index_controller::{IndexActorHandle, updates::*}; use crate::{
helpers::EnvSizer,
index_controller::index_actor::{IndexResult, CONCURRENT_INDEX_MSG},
};
#[allow(clippy::upper_case_acronyms)] #[allow(clippy::upper_case_acronyms)]
type BEU64 = U64<heed::byteorder::BE>; type BEU64 = U64<heed::byteorder::BE>;
@ -202,7 +206,14 @@ impl UpdateStore {
.try_send(()) .try_send(())
.expect("Failed to init update store"); .expect("Failed to init update store");
let update_store = Arc::new(UpdateStore { env, pending_queue, next_update_id, updates, state, notification_sender }); let update_store = Arc::new(UpdateStore {
env,
pending_queue,
next_update_id,
updates,
state,
notification_sender,
});
// We need a weak reference so we can take ownership on the arc later when we // We need a weak reference so we can take ownership on the arc later when we
// want to close the index. // want to close the index.
@ -464,7 +475,12 @@ impl UpdateStore {
Ok(()) Ok(())
} }
pub fn snapshot(&self, uuids: &HashSet<Uuid>, path: impl AsRef<Path>) -> anyhow::Result<()> { pub fn snapshot(
&self,
uuids: &HashSet<Uuid>,
path: impl AsRef<Path>,
handle: impl IndexActorHandle + Clone,
) -> anyhow::Result<()> {
let state_lock = self.state.write(); let state_lock = self.state.write();
state_lock.swap(State::Snapshoting); state_lock.swap(State::Snapshoting);
@ -496,6 +512,21 @@ impl UpdateStore {
} }
} }
let path = &path.as_ref().to_path_buf();
let handle = &handle;
// Perform the snapshot of each index concurently. Only a third of the capabilities of
// the index actor at a time not to put too much pressure on the index actor
let mut stream = futures::stream::iter(uuids.iter())
.map(move |uuid| handle.snapshot(*uuid, path.clone()))
.buffer_unordered(CONCURRENT_INDEX_MSG / 3);
Handle::current().block_on(async {
while let Some(res) = stream.next().await {
res?;
}
Ok(()) as IndexResult<()>
})?;
Ok(()) Ok(())
} }

View File

@ -7,7 +7,6 @@ use tokio::time::sleep;
use meilisearch_http::Opt; use meilisearch_http::Opt;
#[ignore]
#[actix_rt::test] #[actix_rt::test]
async fn perform_snapshot() { async fn perform_snapshot() {
let temp = tempfile::tempdir_in(".").unwrap(); let temp = tempfile::tempdir_in(".").unwrap();