diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs index 5cfea80ce..8af54dd9b 100644 --- a/crates/milli/src/update/new/indexer/mod.rs +++ b/crates/milli/src/update/new/indexer/mod.rs @@ -1,29 +1,24 @@ -use std::cell::RefCell; -use std::sync::atomic::{self, AtomicBool, AtomicUsize}; +use std::sync::atomic::AtomicBool; use std::sync::RwLock; use std::thread::{self, Builder}; use big_s::S; -use bumpalo::Bump; + pub use document_changes::{extract, DocumentChanges, IndexingContext}; -use document_changes::{DocumentChangeContext, Extractor}; -use bumparaw_collections::RawMap; pub use document_deletion::DocumentDeletion; pub use document_operation::{DocumentOperation, PayloadStats}; use hashbrown::HashMap; -use heed::{RoTxn, RwTxn}; +use heed::RwTxn; pub use partial_dump::PartialDump; pub use update_by_function::UpdateByFunction; use write::{build_vectors, update_index, write_to_db}; -use zstd::dict::{DecoderDictionary, EncoderDictionary}; +use zstd::dict::DecoderDictionary; -use super::document::Document as _; use super::extract::*; -use super::ref_cell_ext::RefCellExt as _; use super::steps::IndexingStep; -use super::thread_local::{FullySend, MostlySend, ThreadLocal}; +use super::thread_local::ThreadLocal; -use super::{channel::*, DocumentChange}; +use super::channel::*; use crate::documents::PrimaryKey; use crate::fields_ids_map::metadata::{FieldIdMapWithMetadata, MetadataBuilder}; @@ -134,7 +129,7 @@ where }) .unwrap()?; - let mut index_embeddings = index.embedding_configs(wtxn)?; + let index_embeddings = index.embedding_configs(wtxn)?; let mut field_distribution = index.field_distribution(wtxn)?; let mut document_ids = index.documents_ids(wtxn)?; @@ -235,137 +230,3 @@ where Ok(()) } - -/// The compression level to use when compressing documents. -const DOCUMENT_COMPRESSION_LEVEL: i32 = 19; -/// The sample size used to generate the document compression dictionary. -const DOCUMENT_COMPRESSION_SAMPLE_SIZE: usize = 10_000; -/// The maximum size the document compression dictionary can be. -const DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE: usize = 64_000; -/// The maximum number of documents we accept to compress if they -/// weren't already compressed in the database. If this threshold -/// is reached we do not generate a dictionary and continue as is. -const DOCUMENT_COMPRESSION_COMPRESS_LIMIT: u64 = 5_000_000; - -/// A function dedicated to use the existing or generate an appropriate -/// document compression dictionay based on the documents available in -/// the database and the ones in the payload. -/// -/// If there are too many documents already in the database and no -/// compression dictionary we prefer not to generate a dictionary to avoid -/// compressing all of the documents and potentially blow up disk space. -fn compute_document_compression_dictionary<'pl, 'extractor, DC, MSP>( - index: &Index, - rtxn: &RoTxn<'_>, - document_changes: &DC, - indexing_context: IndexingContext, - extractor_allocs: &'extractor mut ThreadLocal>, -) -> Result>> -where - DC: DocumentChanges<'pl>, - MSP: Fn() -> bool + Sync, -{ - match index.document_compression_raw_dictionary(rtxn)? { - Some(dict) => Ok(Some(EncoderDictionary::copy(dict, DOCUMENT_COMPRESSION_LEVEL))), - None if index.number_of_documents(rtxn)? >= DOCUMENT_COMPRESSION_COMPRESS_LIMIT => Ok(None), - None => { - let datastore = ThreadLocal::with_capacity(rayon::current_num_threads()); - let extractor = CompressorExtractor { - total_documents_to_extract: DOCUMENT_COMPRESSION_SAMPLE_SIZE, - extracted_documents_count: AtomicUsize::new(0), - }; - - todo!("collect the documents samples from the database first (or after)"); - - // This extraction only takes care about documents replacement - // and not update (merges). The merged documents are ignore as - // we will only use the previous version of them in the database. - extract( - document_changes, - &extractor, - indexing_context, - extractor_allocs, - &datastore, - IndexingStep::PreparingCompressionDictionary, - )?; - - let mut sample_data = Vec::new(); - let mut sample_sizes = Vec::new(); - for data in datastore { - let CompressorExtractorData { buffer, must_stop: _ } = data.into_inner(); - let mut subsample_size = 0; - for subsample in buffer { - sample_data.extend_from_slice(subsample); - subsample_size += subsample.len(); - } - sample_sizes.push(subsample_size); - } - - let dictionary = zstd::dict::from_continuous( - &sample_data, - &sample_sizes, - DOCUMENT_COMPRESSION_DICTIONARY_MAX_SIZE, - )?; - - Ok(Some(EncoderDictionary::copy(&dictionary, DOCUMENT_COMPRESSION_LEVEL))) - } - } -} - -struct CompressorExtractor { - total_documents_to_extract: usize, - extracted_documents_count: AtomicUsize, -} - -#[derive(Default)] -struct CompressorExtractorData<'extractor> { - buffer: Vec<&'extractor [u8]>, - /// We extracted the expected count of documents, we can skip everything now. - must_stop: bool, -} - -unsafe impl<'extractor> MostlySend for RefCell> {} - -impl<'extractor> Extractor<'extractor> for CompressorExtractor { - type Data = RefCell>; - - fn init_data<'doc>( - &'doc self, - _extractor_alloc: &'extractor bumpalo::Bump, - ) -> crate::Result { - Ok(RefCell::new(CompressorExtractorData::default())) - } - - fn process<'doc>( - &'doc self, - changes: impl Iterator>>, - context: &'doc DocumentChangeContext<'_, 'extractor, '_, '_, Self::Data>, - ) -> crate::Result<()> { - let mut data = context.data.borrow_mut_or_yield(); - - for change in changes { - if data.must_stop { - return Ok(()); - } - - let change = change?; - match change { - DocumentChange::Deletion(_) => (), - DocumentChange::Update(_) => (), - DocumentChange::Insertion(insertion) => { - for result in insertion.inserted().iter_top_level_fields() { - let (_field_name, raw_value) = result?; - let bytes = raw_value.get().as_bytes(); - data.buffer.push(context.extractor_alloc.alloc_slice_copy(bytes)); - } - - let previous_count = - self.extracted_documents_count.fetch_add(1, atomic::Ordering::SeqCst); - data.must_stop = previous_count >= self.total_documents_to_extract; - } - } - } - - Ok(()) - } -}