retrieve update status
This commit is contained in:
parent
0cd9e62fc6
commit
54861335a0
|
@ -1188,6 +1188,22 @@ dependencies = [
|
|||
"unicode-segmentation",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heed"
|
||||
version = "0.10.6"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"heed-traits 0.7.0",
|
||||
"heed-types 0.7.2",
|
||||
"libc",
|
||||
"lmdb-rkv-sys",
|
||||
"once_cell",
|
||||
"page_size",
|
||||
"synchronoise",
|
||||
"url",
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heed"
|
||||
version = "0.10.6"
|
||||
|
@ -1195,8 +1211,8 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "afcc6c911acaadad3ebe9f1ef1707d80bd71c92037566f47b6238a03b60adf1a"
|
||||
dependencies = [
|
||||
"byteorder",
|
||||
"heed-traits",
|
||||
"heed-types",
|
||||
"heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"heed-types 0.7.2 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"libc",
|
||||
"lmdb-rkv-sys",
|
||||
"once_cell",
|
||||
|
@ -1207,12 +1223,27 @@ dependencies = [
|
|||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heed-traits"
|
||||
version = "0.7.0"
|
||||
|
||||
[[package]]
|
||||
name = "heed-traits"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b328f6260a7e51bdb0ca6b68e6ea27ee3d11fba5dee930896ee7ff6ad5fc072c"
|
||||
|
||||
[[package]]
|
||||
name = "heed-types"
|
||||
version = "0.7.2"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"heed-traits 0.7.0",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"zerocopy",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "heed-types"
|
||||
version = "0.7.2"
|
||||
|
@ -1220,7 +1251,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "e628efb08beaee58355f80dc4adba79d644940ea9eef60175ea17dc218aab405"
|
||||
dependencies = [
|
||||
"bincode",
|
||||
"heed-traits",
|
||||
"heed-traits 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"zerocopy",
|
||||
|
@ -1615,7 +1646,7 @@ dependencies = [
|
|||
"futures",
|
||||
"futures-util",
|
||||
"grenad",
|
||||
"heed",
|
||||
"heed 0.10.6 (registry+https://github.com/rust-lang/crates.io-index)",
|
||||
"http",
|
||||
"indexmap",
|
||||
"jemallocator",
|
||||
|
@ -1699,6 +1730,7 @@ dependencies = [
|
|||
"bstr",
|
||||
"byte-unit",
|
||||
"byteorder",
|
||||
"chrono",
|
||||
"crossbeam-channel",
|
||||
"csv",
|
||||
"either",
|
||||
|
@ -1706,7 +1738,7 @@ dependencies = [
|
|||
"fst",
|
||||
"fxhash",
|
||||
"grenad",
|
||||
"heed",
|
||||
"heed 0.10.6",
|
||||
"human_format",
|
||||
"itertools",
|
||||
"jemallocator",
|
||||
|
@ -1728,6 +1760,7 @@ dependencies = [
|
|||
"roaring",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_millis",
|
||||
"slice-group-by",
|
||||
"smallstr",
|
||||
"smallvec",
|
||||
|
@ -2594,6 +2627,15 @@ dependencies = [
|
|||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_millis"
|
||||
version = "0.1.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e6e2dc780ca5ee2c369d1d01d100270203c4ff923d2a4264812d723766434d00"
|
||||
dependencies = [
|
||||
"serde",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "serde_qs"
|
||||
version = "0.8.2"
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
mod search;
|
||||
mod updates;
|
||||
|
||||
pub use search::{SearchQuery, SearchResult};
|
||||
|
||||
use std::fs::create_dir_all;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
|
||||
use milli::Index;
|
||||
use sha2::Digest;
|
||||
|
||||
use crate::option::Opt;
|
||||
use crate::updates::UpdateQueue;
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Data {
|
||||
inner: Arc<DataInner>,
|
||||
}
|
||||
|
||||
impl Deref for Data {
|
||||
type Target = DataInner;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DataInner {
|
||||
pub indexes: Arc<Index>,
|
||||
pub update_queue: Arc<UpdateQueue>,
|
||||
api_keys: ApiKeys,
|
||||
options: Opt,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApiKeys {
|
||||
pub public: Option<String>,
|
||||
pub private: Option<String>,
|
||||
pub master: Option<String>,
|
||||
}
|
||||
|
||||
impl ApiKeys {
|
||||
pub fn generate_missing_api_keys(&mut self) {
|
||||
if let Some(master_key) = &self.master {
|
||||
if self.private.is_none() {
|
||||
let key = format!("{}-private", master_key);
|
||||
let sha = sha2::Sha256::digest(key.as_bytes());
|
||||
self.private = Some(format!("{:x}", sha));
|
||||
}
|
||||
if self.public.is_none() {
|
||||
let key = format!("{}-public", master_key);
|
||||
let sha = sha2::Sha256::digest(key.as_bytes());
|
||||
self.public = Some(format!("{:x}", sha));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Data {
|
||||
pub fn new(options: Opt) -> anyhow::Result<Data> {
|
||||
let db_size = options.max_mdb_size.get_bytes() as usize;
|
||||
let path = options.db_path.join("main");
|
||||
create_dir_all(&path)?;
|
||||
let indexes = Index::new(&path, Some(db_size))?;
|
||||
let indexes = Arc::new(indexes);
|
||||
|
||||
let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?);
|
||||
|
||||
let mut api_keys = ApiKeys {
|
||||
master: options.clone().master_key,
|
||||
private: None,
|
||||
public: None,
|
||||
};
|
||||
|
||||
api_keys.generate_missing_api_keys();
|
||||
|
||||
let inner = DataInner { indexes, options, update_queue, api_keys };
|
||||
let inner = Arc::new(inner);
|
||||
|
||||
Ok(Data { inner })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn http_payload_size_limit(&self) -> usize {
|
||||
self.options.http_payload_size_limit.get_bytes() as usize
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn api_keys(&self) -> &ApiKeys {
|
||||
&self.api_keys
|
||||
}
|
||||
}
|
|
@ -1,28 +1,20 @@
|
|||
use std::borrow::Cow;
|
||||
use std::collections::HashSet;
|
||||
use std::fs::create_dir_all;
|
||||
use std::mem;
|
||||
use std::ops::Deref;
|
||||
use std::sync::Arc;
|
||||
use std::time::Instant;
|
||||
|
||||
use async_compression::tokio_02::write::GzipEncoder;
|
||||
use futures_util::stream::StreamExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use milli::{Index, SearchResult as Results, obkv_to_json};
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||
use sha2::Digest;
|
||||
use serde_json::{Value, Map};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use milli::{SearchResult as Results, obkv_to_json};
|
||||
use meilisearch_tokenizer::{Analyzer, AnalyzerConfig};
|
||||
|
||||
use crate::option::Opt;
|
||||
use crate::updates::{UpdateQueue, UpdateMeta, UpdateStatus, UpdateMetaProgress};
|
||||
use super::Data;
|
||||
|
||||
const DEFAULT_SEARCH_LIMIT: usize = 20;
|
||||
|
||||
#[derive(Deserialize)]
|
||||
#[serde(rename_all = "camelCase", deny_unknown_fields)]
|
||||
#[allow(dead_code)]
|
||||
pub struct SearchQuery {
|
||||
q: Option<String>,
|
||||
offset: Option<usize>,
|
||||
|
@ -48,176 +40,6 @@ pub struct SearchResult {
|
|||
processing_time_ms: u128,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct Data {
|
||||
inner: Arc<DataInner>,
|
||||
}
|
||||
|
||||
impl Deref for Data {
|
||||
type Target = DataInner;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct DataInner {
|
||||
pub indexes: Arc<Index>,
|
||||
pub update_queue: Arc<UpdateQueue>,
|
||||
api_keys: ApiKeys,
|
||||
options: Opt,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct ApiKeys {
|
||||
pub public: Option<String>,
|
||||
pub private: Option<String>,
|
||||
pub master: Option<String>,
|
||||
}
|
||||
|
||||
impl ApiKeys {
|
||||
pub fn generate_missing_api_keys(&mut self) {
|
||||
if let Some(master_key) = &self.master {
|
||||
if self.private.is_none() {
|
||||
let key = format!("{}-private", master_key);
|
||||
let sha = sha2::Sha256::digest(key.as_bytes());
|
||||
self.private = Some(format!("{:x}", sha));
|
||||
}
|
||||
if self.public.is_none() {
|
||||
let key = format!("{}-public", master_key);
|
||||
let sha = sha2::Sha256::digest(key.as_bytes());
|
||||
self.public = Some(format!("{:x}", sha));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Data {
|
||||
pub fn new(options: Opt) -> anyhow::Result<Data> {
|
||||
let db_size = options.max_mdb_size.get_bytes() as usize;
|
||||
let path = options.db_path.join("main");
|
||||
create_dir_all(&path)?;
|
||||
let indexes = Index::new(&path, Some(db_size))?;
|
||||
let indexes = Arc::new(indexes);
|
||||
|
||||
let update_queue = Arc::new(UpdateQueue::new(&options, indexes.clone())?);
|
||||
|
||||
let mut api_keys = ApiKeys {
|
||||
master: options.clone().master_key,
|
||||
private: None,
|
||||
public: None,
|
||||
};
|
||||
|
||||
api_keys.generate_missing_api_keys();
|
||||
|
||||
let inner = DataInner { indexes, options, update_queue, api_keys };
|
||||
let inner = Arc::new(inner);
|
||||
|
||||
Ok(Data { inner })
|
||||
}
|
||||
|
||||
pub async fn add_documents<B, E, S>(
|
||||
&self,
|
||||
_index: S,
|
||||
method: IndexDocumentsMethod,
|
||||
format: UpdateFormat,
|
||||
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
|
||||
) -> anyhow::Result<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>
|
||||
where
|
||||
B: Deref<Target = [u8]>,
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
S: AsRef<str>,
|
||||
{
|
||||
let file = tokio::task::spawn_blocking(tempfile::tempfile).await?;
|
||||
let file = tokio::fs::File::from_std(file?);
|
||||
let mut encoder = GzipEncoder::new(file);
|
||||
|
||||
while let Some(result) = stream.next().await {
|
||||
let bytes = &*result?;
|
||||
encoder.write_all(&bytes[..]).await?;
|
||||
}
|
||||
|
||||
encoder.shutdown().await?;
|
||||
let mut file = encoder.into_inner();
|
||||
file.sync_all().await?;
|
||||
let file = file.into_std().await;
|
||||
let mmap = unsafe { memmap::Mmap::map(&file)? };
|
||||
|
||||
let meta = UpdateMeta::DocumentsAddition { method, format };
|
||||
|
||||
let queue = self.update_queue.clone();
|
||||
let meta_cloned = meta.clone();
|
||||
let update_id = tokio::task::spawn_blocking(move || queue.register_update(&meta_cloned, &mmap[..])).await??;
|
||||
|
||||
Ok(UpdateStatus::Pending { update_id, meta })
|
||||
}
|
||||
|
||||
pub fn search<S: AsRef<str>>(&self, _index: S, search_query: SearchQuery) -> anyhow::Result<SearchResult> {
|
||||
let start = Instant::now();
|
||||
let index = &self.indexes;
|
||||
let rtxn = index.read_txn()?;
|
||||
|
||||
let mut search = index.search(&rtxn);
|
||||
if let Some(query) = &search_query.q {
|
||||
search.query(query);
|
||||
}
|
||||
|
||||
if let Some(offset) = search_query.offset {
|
||||
search.offset(offset);
|
||||
}
|
||||
|
||||
let limit = search_query.limit.unwrap_or(DEFAULT_SEARCH_LIMIT);
|
||||
search.limit(limit);
|
||||
|
||||
let Results { found_words, documents_ids, nb_hits, .. } = search.execute().unwrap();
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
|
||||
let displayed_fields = match index.displayed_fields(&rtxn).unwrap() {
|
||||
Some(fields) => Cow::Borrowed(fields),
|
||||
None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()),
|
||||
};
|
||||
|
||||
let attributes_to_highlight = match search_query.attributes_to_highlight {
|
||||
Some(fields) => fields.iter().map(ToOwned::to_owned).collect(),
|
||||
None => HashSet::new(),
|
||||
};
|
||||
|
||||
let stop_words = fst::Set::default();
|
||||
let highlighter = Highlighter::new(&stop_words);
|
||||
let mut documents = Vec::new();
|
||||
for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() {
|
||||
let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
|
||||
highlighter.highlight_record(&mut object, &found_words, &attributes_to_highlight);
|
||||
documents.push(object);
|
||||
}
|
||||
|
||||
let processing_time_ms = start.elapsed().as_millis();
|
||||
|
||||
let result = SearchResult {
|
||||
hits: documents,
|
||||
nb_hits,
|
||||
query: search_query.q.unwrap_or_default(),
|
||||
offset: search_query.offset.unwrap_or(0),
|
||||
limit,
|
||||
processing_time_ms,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn http_payload_size_limit(&self) -> usize {
|
||||
self.options.http_payload_size_limit.get_bytes() as usize
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn api_keys(&self) -> &ApiKeys {
|
||||
&self.api_keys
|
||||
}
|
||||
}
|
||||
|
||||
struct Highlighter<'a, A> {
|
||||
analyzer: Analyzer<'a, A>,
|
||||
}
|
||||
|
@ -276,3 +98,59 @@ impl<'a, A: AsRef<[u8]>> Highlighter<'a, A> {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Data {
|
||||
pub fn search<S: AsRef<str>>(&self, _index: S, search_query: SearchQuery) -> anyhow::Result<SearchResult> {
|
||||
let start = Instant::now();
|
||||
let index = &self.indexes;
|
||||
let rtxn = index.read_txn()?;
|
||||
|
||||
let mut search = index.search(&rtxn);
|
||||
if let Some(query) = &search_query.q {
|
||||
search.query(query);
|
||||
}
|
||||
|
||||
if let Some(offset) = search_query.offset {
|
||||
search.offset(offset);
|
||||
}
|
||||
|
||||
let limit = search_query.limit.unwrap_or(DEFAULT_SEARCH_LIMIT);
|
||||
search.limit(limit);
|
||||
|
||||
let Results { found_words, documents_ids, nb_hits, .. } = search.execute().unwrap();
|
||||
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn).unwrap();
|
||||
|
||||
let displayed_fields = match index.displayed_fields(&rtxn).unwrap() {
|
||||
Some(fields) => Cow::Borrowed(fields),
|
||||
None => Cow::Owned(fields_ids_map.iter().map(|(id, _)| id).collect()),
|
||||
};
|
||||
|
||||
let attributes_to_highlight = match search_query.attributes_to_highlight {
|
||||
Some(fields) => fields.iter().map(ToOwned::to_owned).collect(),
|
||||
None => HashSet::new(),
|
||||
};
|
||||
|
||||
let stop_words = fst::Set::default();
|
||||
let highlighter = Highlighter::new(&stop_words);
|
||||
let mut documents = Vec::new();
|
||||
for (_id, obkv) in index.documents(&rtxn, documents_ids).unwrap() {
|
||||
let mut object = obkv_to_json(&displayed_fields, &fields_ids_map, obkv).unwrap();
|
||||
highlighter.highlight_record(&mut object, &found_words, &attributes_to_highlight);
|
||||
documents.push(object);
|
||||
}
|
||||
|
||||
let processing_time_ms = start.elapsed().as_millis();
|
||||
|
||||
let result = SearchResult {
|
||||
hits: documents,
|
||||
nb_hits,
|
||||
query: search_query.q.unwrap_or_default(),
|
||||
offset: search_query.offset.unwrap_or(0),
|
||||
limit,
|
||||
processing_time_ms,
|
||||
};
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
use std::ops::Deref;
|
||||
|
||||
use async_compression::tokio_02::write::GzipEncoder;
|
||||
use futures_util::stream::StreamExt;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use milli::update::{IndexDocumentsMethod, UpdateFormat};
|
||||
use milli::update_store::UpdateStatus;
|
||||
|
||||
use super::Data;
|
||||
use crate::updates::UpdateMeta;
|
||||
|
||||
impl Data {
|
||||
pub async fn add_documents<B, E, S>(
|
||||
&self,
|
||||
_index: S,
|
||||
method: IndexDocumentsMethod,
|
||||
format: UpdateFormat,
|
||||
mut stream: impl futures::Stream<Item=Result<B, E>> + Unpin,
|
||||
) -> anyhow::Result<UpdateStatus<UpdateMeta, String, String>>
|
||||
where
|
||||
B: Deref<Target = [u8]>,
|
||||
E: std::error::Error + Send + Sync + 'static,
|
||||
S: AsRef<str>,
|
||||
{
|
||||
let file = tokio::task::spawn_blocking(tempfile::tempfile).await?;
|
||||
let file = tokio::fs::File::from_std(file?);
|
||||
let mut encoder = GzipEncoder::new(file);
|
||||
|
||||
while let Some(result) = stream.next().await {
|
||||
let bytes = &*result?;
|
||||
encoder.write_all(&bytes[..]).await?;
|
||||
}
|
||||
|
||||
encoder.shutdown().await?;
|
||||
let mut file = encoder.into_inner();
|
||||
file.sync_all().await?;
|
||||
let file = file.into_std().await;
|
||||
let mmap = unsafe { memmap::Mmap::map(&file)? };
|
||||
|
||||
let meta = UpdateMeta::DocumentsAddition { method, format };
|
||||
|
||||
let queue = self.update_queue.clone();
|
||||
let update = tokio::task::spawn_blocking(move || queue.register_update(meta, &mmap[..])).await??;
|
||||
|
||||
Ok(update.into())
|
||||
}
|
||||
|
||||
|
||||
#[inline]
|
||||
pub fn get_update_status(&self, _index: &str, uid: u64) -> anyhow::Result<Option<UpdateStatus<UpdateMeta, String, String>>> {
|
||||
self.update_queue.get_update_status(uid)
|
||||
}
|
||||
}
|
|
@ -1,6 +1,7 @@
|
|||
use actix_web::{delete, get, post, put};
|
||||
use actix_web::{web, HttpResponse};
|
||||
use chrono::{DateTime, Utc};
|
||||
use log::error;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::Data;
|
||||
|
@ -93,8 +94,8 @@ async fn delete_index(
|
|||
|
||||
#[derive(Deserialize)]
|
||||
struct UpdateParam {
|
||||
_index_uid: String,
|
||||
_update_id: u64,
|
||||
index_uid: String,
|
||||
update_id: u64,
|
||||
}
|
||||
|
||||
#[get(
|
||||
|
@ -102,10 +103,23 @@ struct UpdateParam {
|
|||
wrap = "Authentication::Private"
|
||||
)]
|
||||
async fn get_update_status(
|
||||
_data: web::Data<Data>,
|
||||
_path: web::Path<UpdateParam>,
|
||||
data: web::Data<Data>,
|
||||
path: web::Path<UpdateParam>,
|
||||
) -> Result<HttpResponse, ResponseError> {
|
||||
todo!()
|
||||
let result = data.get_update_status(&path.index_uid, path.update_id);
|
||||
match result {
|
||||
Ok(Some(meta)) => {
|
||||
let json = serde_json::to_string(&meta).unwrap();
|
||||
Ok(HttpResponse::Ok().body(json))
|
||||
}
|
||||
Ok(None) => {
|
||||
todo!()
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{}", e);
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/indexes/{index_uid}/updates", wrap = "Authentication::Private")]
|
||||
|
|
|
@ -8,14 +8,15 @@ use std::ops::Deref;
|
|||
use std::fs::create_dir_all;
|
||||
|
||||
use anyhow::Result;
|
||||
use byte_unit::Byte;
|
||||
use flate2::read::GzDecoder;
|
||||
use grenad::CompressionType;
|
||||
use byte_unit::Byte;
|
||||
use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod, UpdateIndexingStep::*};
|
||||
use milli::{UpdateStore, UpdateHandler as Handler, Index};
|
||||
use log::info;
|
||||
use milli::Index;
|
||||
use milli::update::{UpdateBuilder, UpdateFormat, IndexDocumentsMethod };
|
||||
use milli::update_store::{UpdateStore, UpdateHandler as Handler, UpdateStatus, Processing, Processed, Failed};
|
||||
use rayon::ThreadPool;
|
||||
use serde::{Serialize, Deserialize};
|
||||
use tokio::sync::broadcast;
|
||||
use structopt::StructOpt;
|
||||
|
||||
use crate::option::Opt;
|
||||
|
@ -40,23 +41,13 @@ pub enum UpdateMetaProgress {
|
|||
},
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
#[serde(tag = "type")]
|
||||
#[allow(dead_code)]
|
||||
pub enum UpdateStatus<M, P, N> {
|
||||
Pending { update_id: u64, meta: M },
|
||||
Progressing { update_id: u64, meta: P },
|
||||
Processed { update_id: u64, meta: N },
|
||||
Aborted { update_id: u64, meta: M },
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UpdateQueue {
|
||||
inner: Arc<UpdateStore<UpdateMeta, String>>,
|
||||
inner: Arc<UpdateStore<UpdateMeta, String, String>>,
|
||||
}
|
||||
|
||||
impl Deref for UpdateQueue {
|
||||
type Target = Arc<UpdateStore<UpdateMeta, String>>;
|
||||
type Target = Arc<UpdateStore<UpdateMeta, String, String>>;
|
||||
|
||||
fn deref(&self) -> &Self::Target {
|
||||
&self.inner
|
||||
|
@ -115,8 +106,6 @@ pub struct IndexerOpts {
|
|||
pub indexing_jobs: Option<usize>,
|
||||
}
|
||||
|
||||
type UpdateSender = broadcast::Sender<UpdateStatus<UpdateMeta, UpdateMetaProgress, String>>;
|
||||
|
||||
struct UpdateHandler {
|
||||
indexes: Arc<Index>,
|
||||
max_nb_chunks: Option<usize>,
|
||||
|
@ -127,14 +116,12 @@ struct UpdateHandler {
|
|||
linked_hash_map_size: usize,
|
||||
chunk_compression_type: CompressionType,
|
||||
chunk_fusing_shrink_size: u64,
|
||||
update_status_sender: UpdateSender,
|
||||
}
|
||||
|
||||
impl UpdateHandler {
|
||||
fn new(
|
||||
opt: &IndexerOpts,
|
||||
indexes: Arc<Index>,
|
||||
update_status_sender: UpdateSender,
|
||||
) -> Result<Self> {
|
||||
let thread_pool = rayon::ThreadPoolBuilder::new()
|
||||
.num_threads(opt.indexing_jobs.unwrap_or(0))
|
||||
|
@ -149,7 +136,6 @@ impl UpdateHandler {
|
|||
linked_hash_map_size: opt.linked_hash_map_size,
|
||||
chunk_compression_type: opt.chunk_compression_type,
|
||||
chunk_fusing_shrink_size: opt.chunk_fusing_shrink_size.get_bytes(),
|
||||
update_status_sender,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -191,23 +177,7 @@ impl UpdateHandler {
|
|||
Box::new(content) as Box<dyn io::Read>
|
||||
};
|
||||
|
||||
let result = builder.execute(reader, |indexing_step, update_id| {
|
||||
let (current, total) = match indexing_step {
|
||||
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
|
||||
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
|
||||
};
|
||||
let _ = self.update_status_sender.send(UpdateStatus::Progressing {
|
||||
update_id,
|
||||
meta: UpdateMetaProgress::DocumentsAddition {
|
||||
step: indexing_step.step(),
|
||||
total_steps: indexing_step.number_of_steps(),
|
||||
current,
|
||||
total,
|
||||
}
|
||||
});
|
||||
});
|
||||
let result = builder.execute(reader, |indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
|
||||
|
||||
match result {
|
||||
Ok(()) => wtxn.commit().map_err(Into::into),
|
||||
|
@ -226,57 +196,41 @@ impl UpdateHandler {
|
|||
}
|
||||
}
|
||||
|
||||
fn update_settings(&self, settings: Settings, update_builder: UpdateBuilder) -> Result<()> {
|
||||
fn update_settings(&self, settings: &Settings, update_builder: UpdateBuilder) -> Result<()> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = self.indexes.write_txn()?;
|
||||
let mut builder = update_builder.settings(&mut wtxn, &self.indexes);
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(names) = settings.searchable_attributes {
|
||||
if let Some(ref names) = settings.searchable_attributes {
|
||||
match names {
|
||||
Some(names) => builder.set_searchable_fields(names),
|
||||
Some(names) => builder.set_searchable_fields(&names),
|
||||
None => builder.reset_searchable_fields(),
|
||||
}
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(names) = settings.displayed_attributes {
|
||||
if let Some(ref names) = settings.displayed_attributes {
|
||||
match names {
|
||||
Some(names) => builder.set_displayed_fields(names),
|
||||
Some(names) => builder.set_displayed_fields(&names),
|
||||
None => builder.reset_displayed_fields(),
|
||||
}
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(facet_types) = settings.faceted_attributes {
|
||||
builder.set_faceted_fields(facet_types);
|
||||
if let Some(ref facet_types) = settings.faceted_attributes {
|
||||
builder.set_faceted_fields(&facet_types);
|
||||
}
|
||||
|
||||
// We transpose the settings JSON struct into a real setting update.
|
||||
if let Some(criteria) = settings.criteria {
|
||||
if let Some(ref criteria) = settings.criteria {
|
||||
match criteria {
|
||||
Some(criteria) => builder.set_criteria(criteria),
|
||||
Some(criteria) => builder.set_criteria(&criteria),
|
||||
None => builder.reset_criteria(),
|
||||
}
|
||||
}
|
||||
|
||||
let result = builder.execute(|indexing_step, update_id| {
|
||||
let (current, total) = match indexing_step {
|
||||
TransformFromUserIntoGenericFormat { documents_seen } => (documents_seen, None),
|
||||
ComputeIdsAndMergeDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||
IndexDocuments { documents_seen, total_documents } => (documents_seen, Some(total_documents)),
|
||||
MergeDataIntoFinalDatabase { databases_seen, total_databases } => (databases_seen, Some(total_databases)),
|
||||
};
|
||||
let _ = self.update_status_sender.send(UpdateStatus::Progressing {
|
||||
update_id,
|
||||
meta: UpdateMetaProgress::DocumentsAddition {
|
||||
step: indexing_step.step(),
|
||||
total_steps: indexing_step.number_of_steps(),
|
||||
current,
|
||||
total,
|
||||
}
|
||||
});
|
||||
});
|
||||
let result = builder.execute(|indexing_step, update_id| info!("update {}: {:?}", update_id, indexing_step));
|
||||
|
||||
match result {
|
||||
Ok(_count) => wtxn.commit().map_err(Into::into),
|
||||
|
@ -284,7 +238,7 @@ impl UpdateHandler {
|
|||
}
|
||||
}
|
||||
|
||||
fn update_facets(&self, levels: Facets, update_builder: UpdateBuilder) -> Result<()> {
|
||||
fn update_facets(&self, levels: &Facets, update_builder: UpdateBuilder) -> Result<()> {
|
||||
// We must use the write transaction of the update here.
|
||||
let mut wtxn = self.indexes.write_txn()?;
|
||||
let mut builder = update_builder.facets(&mut wtxn, &self.indexes);
|
||||
|
@ -301,28 +255,30 @@ impl UpdateHandler {
|
|||
}
|
||||
}
|
||||
|
||||
impl Handler<UpdateMeta, String> for UpdateHandler {
|
||||
fn handle_update(&mut self, update_id: u64, meta: UpdateMeta, content: &[u8]) -> heed::Result<String> {
|
||||
impl Handler<UpdateMeta, String, String> for UpdateHandler {
|
||||
fn handle_update(
|
||||
&mut self,
|
||||
update_id: u64,
|
||||
meta: Processing<UpdateMeta>,
|
||||
content: &[u8]
|
||||
) -> Result<Processed<UpdateMeta, String>, Failed<UpdateMeta, String>> {
|
||||
use UpdateMeta::*;
|
||||
|
||||
let update_builder = self.update_buidler(update_id);
|
||||
|
||||
let result: anyhow::Result<()> = match meta {
|
||||
DocumentsAddition { method, format } => {
|
||||
self.update_documents(format, method, content, update_builder)
|
||||
},
|
||||
let result: anyhow::Result<()> = match meta.meta() {
|
||||
DocumentsAddition { method, format } => self.update_documents(*format, *method, content, update_builder),
|
||||
ClearDocuments => self.clear_documents(update_builder),
|
||||
Settings(settings) => self.update_settings(settings, update_builder),
|
||||
Facets(levels) => self.update_facets(levels, update_builder),
|
||||
};
|
||||
|
||||
let meta = match result {
|
||||
let new_meta = match result {
|
||||
Ok(()) => format!("valid update content"),
|
||||
Err(e) => format!("error while processing update content: {:?}", e),
|
||||
};
|
||||
|
||||
let processed = UpdateStatus::Processed { update_id, meta: meta.clone() };
|
||||
let _ = self.update_status_sender.send(processed);
|
||||
let meta = meta.process(new_meta);
|
||||
|
||||
Ok(meta)
|
||||
}
|
||||
|
@ -333,8 +289,7 @@ impl UpdateQueue {
|
|||
opt: &Opt,
|
||||
indexes: Arc<Index>,
|
||||
) -> Result<Self> {
|
||||
let (sender, _) = broadcast::channel(100);
|
||||
let handler = UpdateHandler::new(&opt.indexer_options, indexes, sender)?;
|
||||
let handler = UpdateHandler::new(&opt.indexer_options, indexes)?;
|
||||
let size = opt.max_udb_size.get_bytes() as usize;
|
||||
let path = opt.db_path.join("updates.mdb");
|
||||
create_dir_all(&path)?;
|
||||
|
@ -345,4 +300,9 @@ impl UpdateQueue {
|
|||
)?;
|
||||
Ok(Self { inner })
|
||||
}
|
||||
|
||||
#[inline]
|
||||
pub fn get_update_status(&self, update_id: u64) -> Result<Option<UpdateStatus<UpdateMeta, String, String>>> {
|
||||
Ok(self.inner.meta(update_id)?)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue