From afa3ae0cbd9c7223d4068dd438d043a43d0d4fae Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 19 Sep 2024 17:42:52 +0200 Subject: [PATCH 01/11] WIP --- milli/src/update/index_documents/mod.rs | 17 ++----- .../src/update/index_documents/typed_chunk.rs | 16 ++---- milli/src/vector/mod.rs | 51 +++++++++++-------- 3 files changed, 38 insertions(+), 46 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index 326dd842d..b03ab259a 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -689,9 +689,8 @@ where key: None, }, )?; - let first_id = crate::vector::arroy_db_range_for_embedder(index).next().unwrap(); let reader = - ArroyWrapper::new(self.index.vector_arroy, first_id, action.was_quantized); + ArroyWrapper::new(self.index.vector_arroy, index, action.was_quantized); let dim = reader.dimensions(self.wtxn)?; dimension.insert(name.to_string(), dim); } @@ -713,17 +712,11 @@ where let is_quantizing = embedder_config.map_or(false, |action| action.is_being_quantized); pool.install(|| { - for k in crate::vector::arroy_db_range_for_embedder(embedder_index) { - let mut writer = ArroyWrapper::new(vector_arroy, k, was_quantized); - if is_quantizing { - writer.quantize(wtxn, k, dimension)?; - } - if writer.need_build(wtxn, dimension)? { - writer.build(wtxn, &mut rng, dimension)?; - } else if writer.is_empty(wtxn, dimension)? { - break; - } + let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); + if is_quantizing { + writer.quantize(wtxn, dimension)?; } + writer.build(wtxn, &mut rng, dimension)?; Result::Ok(()) }) .map_err(InternalError::from)??; diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index 97a4bf712..e340137e2 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -673,22 +673,14 @@ pub(crate) fn write_typed_chunk_into_index( .get(&embedder_name) .map_or(false, |conf| conf.2); // FIXME: allow customizing distance - let writers: Vec<_> = crate::vector::arroy_db_range_for_embedder(embedder_index) - .map(|k| ArroyWrapper::new(index.vector_arroy, k, binary_quantized)) - .collect(); + let writer = ArroyWrapper::new(index.vector_arroy, embedder_index, binary_quantized); // remove vectors for docids we want them removed let merger = remove_vectors_builder.build(); let mut iter = merger.into_stream_merger_iter()?; while let Some((key, _)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - - for writer in &writers { - // Uses invariant: vectors are packed in the first writers. - if !writer.del_item(wtxn, expected_dimension, docid)? { - break; - } - } + writer.del_item(wtxn, expected_dimension, docid)?; } // add generated embeddings @@ -716,9 +708,7 @@ pub(crate) fn write_typed_chunk_into_index( embeddings.embedding_count(), ))); } - for (embedding, writer) in embeddings.iter().zip(&writers) { - writer.add_item(wtxn, expected_dimension, docid, embedding)?; - } + writer.add_items(wtxn, expected_dimension, docid, embeddings)?; } // perform the manual diff diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index d52e68bbe..644826dcd 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -32,60 +32,69 @@ pub const REQUEST_PARALLELISM: usize = 40; pub struct ArroyWrapper { quantized: bool, - index: u16, + index: u8, database: arroy::Database, } impl ArroyWrapper { - pub fn new(database: arroy::Database, index: u16, quantized: bool) -> Self { + pub fn new(database: arroy::Database, index: u8, quantized: bool) -> Self { Self { database, index, quantized } } - pub fn index(&self) -> u16 { + pub fn index(&self) -> u8 { self.index } pub fn dimensions(&self, rtxn: &RoTxn) -> Result { + let first_id = arroy_db_range_for_embedder(self.index).next().unwrap(); if self.quantized { - Ok(arroy::Reader::open(rtxn, self.index, self.quantized_db())?.dimensions()) + Ok(arroy::Reader::open(rtxn, first_id, self.quantized_db())?.dimensions()) } else { - Ok(arroy::Reader::open(rtxn, self.index, self.angular_db())?.dimensions()) + Ok(arroy::Reader::open(rtxn, first_id, self.angular_db())?.dimensions()) } } - pub fn quantize( - &mut self, - wtxn: &mut RwTxn, - index: u16, - dimension: usize, - ) -> Result<(), arroy::Error> { + pub fn quantize(&mut self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { if !self.quantized { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - writer.prepare_changing_distance::(wtxn)?; + for index in arroy_db_range_for_embedder(self.index) { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + writer.prepare_changing_distance::(wtxn)?; + } self.quantized = true; } Ok(()) } + // TODO: We can stop early when we find an empty DB pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).need_build(rtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).need_build(rtxn) + for index in arroy_db_range_for_embedder(self.index) { + let need_build = if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).need_build(rtxn) + } else { + arroy::Writer::new(self.angular_db(), index, dimension).need_build(rtxn) + }; + if need_build? { + return Ok(true); + } } + Ok(false) } + /// TODO: We should early exit when it doesn't need to be built pub fn build( &self, wtxn: &mut RwTxn, rng: &mut R, dimension: usize, ) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).build(wtxn, rng, None) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).build(wtxn, rng, None) + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).build(wtxn, rng, None)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension).build(wtxn, rng, None)? + } } + Ok(()) } pub fn add_item( From 6ba4baecbf47e39339c22c67b60a5d0953f53fc5 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 23 Sep 2024 15:15:26 +0200 Subject: [PATCH 02/11] first ugly step --- milli/src/search/similar.rs | 26 +- .../src/update/index_documents/typed_chunk.rs | 45 +--- milli/src/vector/mod.rs | 232 ++++++++++++++---- 3 files changed, 203 insertions(+), 100 deletions(-) diff --git a/milli/src/search/similar.rs b/milli/src/search/similar.rs index 0cb8d723d..e408c94b1 100644 --- a/milli/src/search/similar.rs +++ b/milli/src/search/similar.rs @@ -4,7 +4,7 @@ use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use crate::score_details::{self, ScoreDetails}; -use crate::vector::Embedder; +use crate::vector::{ArroyWrapper, Embedder}; use crate::{filtered_universe, DocumentId, Filter, Index, Result, SearchResult}; pub struct Similar<'a> { @@ -71,23 +71,13 @@ impl<'a> Similar<'a> { .get(self.rtxn, &self.embedder_name)? .ok_or_else(|| crate::UserError::InvalidEmbedder(self.embedder_name.to_owned()))?; - let mut results = Vec::new(); - - for reader in self.index.arroy_readers(self.rtxn, embedder_index, self.quantized) { - let nns_by_item = reader?.nns_by_item( - self.rtxn, - self.id, - self.limit + self.offset + 1, - Some(&universe), - )?; - if let Some(mut nns_by_item) = nns_by_item { - results.append(&mut nns_by_item); - } else { - break; - } - } - - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + let reader = ArroyWrapper::new(self.index.vector_arroy, embedder_index, self.quantized); + let results = reader.nns_by_item( + self.rtxn, + self.id, + self.limit + self.offset + 1, + Some(&universe), + )?; let mut documents_ids = Vec::with_capacity(self.limit); let mut document_scores = Vec::with_capacity(self.limit); diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index e340137e2..e118420d8 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -680,7 +680,7 @@ pub(crate) fn write_typed_chunk_into_index( let mut iter = merger.into_stream_merger_iter()?; while let Some((key, _)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - writer.del_item(wtxn, expected_dimension, docid)?; + writer.del_item_raw(wtxn, expected_dimension, docid)?; } // add generated embeddings @@ -708,7 +708,7 @@ pub(crate) fn write_typed_chunk_into_index( embeddings.embedding_count(), ))); } - writer.add_items(wtxn, expected_dimension, docid, embeddings)?; + writer.add_items(wtxn, docid, &embeddings)?; } // perform the manual diff @@ -723,51 +723,14 @@ pub(crate) fn write_typed_chunk_into_index( if let Some(value) = vector_deladd_obkv.get(DelAdd::Deletion) { let vector: Vec = pod_collect_to_vec(value); - let mut deleted_index = None; - for (index, writer) in writers.iter().enumerate() { - let Some(candidate) = writer.item_vector(wtxn, docid)? else { - // uses invariant: vectors are packed in the first writers. - break; - }; - if candidate == vector { - writer.del_item(wtxn, expected_dimension, docid)?; - deleted_index = Some(index); - } - } - - // 🥲 enforce invariant: vectors are packed in the first writers. - if let Some(deleted_index) = deleted_index { - let mut last_index_with_a_vector = None; - for (index, writer) in writers.iter().enumerate().skip(deleted_index) { - let Some(candidate) = writer.item_vector(wtxn, docid)? else { - break; - }; - last_index_with_a_vector = Some((index, candidate)); - } - if let Some((last_index, vector)) = last_index_with_a_vector { - // unwrap: computed the index from the list of writers - let writer = writers.get(last_index).unwrap(); - writer.del_item(wtxn, expected_dimension, docid)?; - writers.get(deleted_index).unwrap().add_item( - wtxn, - expected_dimension, - docid, - &vector, - )?; - } - } + writer.del_item(wtxn, docid, &vector)?; } if let Some(value) = vector_deladd_obkv.get(DelAdd::Addition) { let vector = pod_collect_to_vec(value); // overflow was detected during vector extraction. - for writer in &writers { - if !writer.contains_item(wtxn, expected_dimension, docid)? { - writer.add_item(wtxn, expected_dimension, docid, &vector)?; - break; - } - } + writer.add_item(wtxn, docid, &vector)?; } } diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 644826dcd..54765cfef 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -97,49 +97,165 @@ impl ArroyWrapper { Ok(()) } + pub fn add_items( + &self, + wtxn: &mut RwTxn, + item_id: arroy::ItemId, + embeddings: &Embeddings, + ) -> Result<(), arroy::Error> { + let dimension = embeddings.dimension(); + for (index, vector) in arroy_db_range_for_embedder(self.index).zip(embeddings.iter()) { + if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } + } + Ok(()) + } + pub fn add_item( &self, wtxn: &mut RwTxn, - dimension: usize, item_id: arroy::ItemId, vector: &[f32], ) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension) - .add_item(wtxn, item_id, vector) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension) - .add_item(wtxn, item_id, vector) + let dimension = vector.len(); + + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, &vector)?; + break; + } + } else { + arroy::Writer::new(self.angular_db(), index, dimension) + .add_item(wtxn, item_id, vector)? + } } + + Ok(()) } - pub fn del_item( + pub fn del_item_raw( &self, wtxn: &mut RwTxn, dimension: usize, item_id: arroy::ItemId, ) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).del_item(wtxn, item_id) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).del_item(wtxn, item_id) + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.del_item(wtxn, item_id)? { + return Ok(true); + } + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + if writer.del_item(wtxn, item_id)? { + return Ok(true); + } + } } + + Ok(false) + } + + pub fn del_item( + &self, + wtxn: &mut RwTxn, + itemid: arroy::ItemId, + vector: &[f32], + ) -> Result { + let dimension = vector.len(); + let mut deleted_index = None; + + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + // uses invariant: vectors are packed in the first writers. + break; + }; + if candidate == vector { + writer.del_item(wtxn, itemid)?; + deleted_index = Some(index); + } + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + // uses invariant: vectors are packed in the first writers. + break; + }; + if candidate == vector { + writer.del_item(wtxn, itemid)?; + deleted_index = Some(index); + } + } + } + + // 🥲 enforce invariant: vectors are packed in the first writers. + if let Some(deleted_index) = deleted_index { + let mut last_index_with_a_vector = None; + for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) { + if self.quantized { + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); + } else { + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + let Some(candidate) = writer.item_vector(wtxn, itemid)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); + } + } + if let Some((last_index, vector)) = last_index_with_a_vector { + if self.quantized { + // unwrap: computed the index from the list of writers + let writer = arroy::Writer::new(self.quantized_db(), last_index, dimension); + writer.del_item(wtxn, itemid)?; + let writer = arroy::Writer::new(self.quantized_db(), deleted_index, dimension); + writer.add_item(wtxn, itemid, &vector)?; + } else { + // unwrap: computed the index from the list of writers + let writer = arroy::Writer::new(self.angular_db(), last_index, dimension); + writer.del_item(wtxn, itemid)?; + let writer = arroy::Writer::new(self.angular_db(), deleted_index, dimension); + writer.add_item(wtxn, itemid, &vector)?; + } + } + } + Ok(deleted_index.is_some()) } pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).clear(wtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).clear(wtxn) + for index in arroy_db_range_for_embedder(self.index) { + if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).clear(wtxn)?; + } else { + arroy::Writer::new(self.angular_db(), index, dimension).clear(wtxn)?; + } } + Ok(()) } pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).is_empty(rtxn) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).is_empty(rtxn) + for index in arroy_db_range_for_embedder(self.index) { + let empty = if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension).is_empty(rtxn)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension).is_empty(rtxn)? + }; + if !empty { + return Ok(false); + } } + Ok(true) } pub fn contains_item( @@ -148,11 +264,18 @@ impl ArroyWrapper { dimension: usize, item: arroy::ItemId, ) -> Result { - if self.quantized { - arroy::Writer::new(self.quantized_db(), self.index, dimension).contains_item(rtxn, item) - } else { - arroy::Writer::new(self.angular_db(), self.index, dimension).contains_item(rtxn, item) + for index in arroy_db_range_for_embedder(self.index) { + let contains = if self.quantized { + arroy::Writer::new(self.quantized_db(), index, dimension) + .contains_item(rtxn, item)? + } else { + arroy::Writer::new(self.angular_db(), index, dimension).contains_item(rtxn, item)? + }; + if contains { + return Ok(contains); + } } + Ok(false) } pub fn nns_by_item( @@ -161,14 +284,26 @@ impl ArroyWrapper { item: ItemId, limit: usize, filter: Option<&RoaringBitmap>, - ) -> Result>, arroy::Error> { - if self.quantized { - arroy::Reader::open(rtxn, self.index, self.quantized_db())? - .nns_by_item(rtxn, item, limit, None, None, filter) - } else { - arroy::Reader::open(rtxn, self.index, self.angular_db())? - .nns_by_item(rtxn, item, limit, None, None, filter) + ) -> Result, arroy::Error> { + let mut results = Vec::new(); + + for index in arroy_db_range_for_embedder(self.index) { + let ret = if self.quantized { + arroy::Reader::open(rtxn, index, self.quantized_db())? + .nns_by_item(rtxn, item, limit, None, None, filter)? + } else { + arroy::Reader::open(rtxn, index, self.angular_db())? + .nns_by_item(rtxn, item, limit, None, None, filter)? + }; + if let Some(mut ret) = ret { + results.append(&mut ret); + } else { + break; + } } + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) } pub fn nns_by_vector( @@ -178,21 +313,36 @@ impl ArroyWrapper { limit: usize, filter: Option<&RoaringBitmap>, ) -> Result, arroy::Error> { - if self.quantized { - arroy::Reader::open(txn, self.index, self.quantized_db())? - .nns_by_vector(txn, item, limit, None, None, filter) - } else { - arroy::Reader::open(txn, self.index, self.angular_db())? - .nns_by_vector(txn, item, limit, None, None, filter) + let mut results = Vec::new(); + + for index in arroy_db_range_for_embedder(self.index) { + let mut ret = if self.quantized { + arroy::Reader::open(txn, index, self.quantized_db())? + .nns_by_vector(txn, item, limit, None, None, filter)? + } else { + arroy::Reader::open(txn, index, self.angular_db())? + .nns_by_vector(txn, item, limit, None, None, filter)? + }; + results.append(&mut ret); } + + results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + + Ok(results) } pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result>, arroy::Error> { - if self.quantized { - arroy::Reader::open(rtxn, self.index, self.quantized_db())?.item_vector(rtxn, docid) - } else { - arroy::Reader::open(rtxn, self.index, self.angular_db())?.item_vector(rtxn, docid) + for index in arroy_db_range_for_embedder(self.index) { + let ret = if self.quantized { + arroy::Reader::open(rtxn, index, self.quantized_db())?.item_vector(rtxn, docid)? + } else { + arroy::Reader::open(rtxn, index, self.angular_db())?.item_vector(rtxn, docid)? + }; + if ret.is_some() { + return Ok(ret); + } } + Ok(None) } fn angular_db(&self) -> arroy::Database { From 1e4d4e69c4cebee8f09d905c5cc8130b08214f04 Mon Sep 17 00:00:00 2001 From: Tamo Date: Mon, 23 Sep 2024 18:56:15 +0200 Subject: [PATCH 03/11] finish the arroywrapper --- milli/src/index.rs | 29 +-- milli/src/search/new/vector_sort.rs | 12 +- milli/src/search/similar.rs | 1 - milli/src/update/index_documents/transform.rs | 63 ++---- milli/src/vector/mod.rs | 211 +++++++++++------- 5 files changed, 155 insertions(+), 161 deletions(-) diff --git a/milli/src/index.rs b/milli/src/index.rs index c47896df7..5b7a9c58c 100644 --- a/milli/src/index.rs +++ b/milli/src/index.rs @@ -1610,24 +1610,6 @@ impl Index { .unwrap_or_default()) } - pub fn arroy_readers<'a>( - &'a self, - rtxn: &'a RoTxn<'a>, - embedder_id: u8, - quantized: bool, - ) -> impl Iterator> + 'a { - crate::vector::arroy_db_range_for_embedder(embedder_id).map_while(move |k| { - let reader = ArroyWrapper::new(self.vector_arroy, k, quantized); - // Here we don't care about the dimensions, but we want to know if we can read - // in the database or if its metadata are missing because there is no document with that many vectors. - match reader.dimensions(rtxn) { - Ok(_) => Some(Ok(reader)), - Err(arroy::Error::MissingMetadata(_)) => None, - Err(e) => Some(Err(e.into())), - } - }) - } - pub(crate) fn put_search_cutoff(&self, wtxn: &mut RwTxn<'_>, cutoff: u64) -> heed::Result<()> { self.main.remap_types::().put(wtxn, main_key::SEARCH_CUTOFF, &cutoff) } @@ -1649,14 +1631,9 @@ impl Index { let embedding_configs = self.embedding_configs(rtxn)?; for config in embedding_configs { let embedder_id = self.embedder_category_id.get(rtxn, &config.name)?.unwrap(); - let embeddings = self - .arroy_readers(rtxn, embedder_id, config.config.quantized()) - .map_while(|reader| { - reader - .and_then(|r| r.item_vector(rtxn, docid).map_err(|e| e.into())) - .transpose() - }) - .collect::>>()?; + let reader = + ArroyWrapper::new(self.vector_arroy, embedder_id, config.config.quantized()); + let embeddings = reader.item_vectors(rtxn, docid)?; res.insert(config.name.to_owned(), embeddings); } Ok(res) diff --git a/milli/src/search/new/vector_sort.rs b/milli/src/search/new/vector_sort.rs index de1dacbe7..90377c09c 100644 --- a/milli/src/search/new/vector_sort.rs +++ b/milli/src/search/new/vector_sort.rs @@ -1,11 +1,10 @@ use std::iter::FromIterator; -use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use super::ranking_rules::{RankingRule, RankingRuleOutput, RankingRuleQueryTrait}; use crate::score_details::{self, ScoreDetails}; -use crate::vector::{DistributionShift, Embedder}; +use crate::vector::{ArroyWrapper, DistributionShift, Embedder}; use crate::{DocumentId, Result, SearchContext, SearchLogger}; pub struct VectorSort { @@ -53,14 +52,9 @@ impl VectorSort { vector_candidates: &RoaringBitmap, ) -> Result<()> { let target = &self.target; - let mut results = Vec::new(); - for reader in ctx.index.arroy_readers(ctx.txn, self.embedder_index, self.quantized) { - let nns_by_vector = - reader?.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; - results.extend(nns_by_vector.into_iter()); - } - results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); + let reader = ArroyWrapper::new(ctx.index.vector_arroy, self.embedder_index, self.quantized); + let results = reader.nns_by_vector(ctx.txn, target, self.limit, Some(vector_candidates))?; self.cached_sorted_docids = results.into_iter(); Ok(()) diff --git a/milli/src/search/similar.rs b/milli/src/search/similar.rs index e408c94b1..5547d800e 100644 --- a/milli/src/search/similar.rs +++ b/milli/src/search/similar.rs @@ -1,6 +1,5 @@ use std::sync::Arc; -use ordered_float::OrderedFloat; use roaring::RoaringBitmap; use crate::score_details::{self, ScoreDetails}; diff --git a/milli/src/update/index_documents/transform.rs b/milli/src/update/index_documents/transform.rs index bb2cfe56c..763f30d0f 100644 --- a/milli/src/update/index_documents/transform.rs +++ b/milli/src/update/index_documents/transform.rs @@ -990,27 +990,24 @@ impl<'a, 'i> Transform<'a, 'i> { None }; - let readers: Result, &RoaringBitmap)>> = settings_diff + let readers: BTreeMap<&str, (ArroyWrapper, &RoaringBitmap)> = settings_diff .embedding_config_updates .iter() .filter_map(|(name, action)| { if let Some(WriteBackToDocuments { embedder_id, user_provided }) = action.write_back() { - let readers: Result> = self - .index - .arroy_readers(wtxn, *embedder_id, action.was_quantized) - .collect(); - match readers { - Ok(readers) => Some(Ok((name.as_str(), (readers, user_provided)))), - Err(error) => Some(Err(error)), - } + let reader = ArroyWrapper::new( + self.index.vector_arroy, + *embedder_id, + action.was_quantized, + ); + Some((name.as_str(), (reader, user_provided))) } else { None } }) .collect(); - let readers = readers?; let old_vectors_fid = settings_diff .old @@ -1048,34 +1045,24 @@ impl<'a, 'i> Transform<'a, 'i> { arroy::Error, > = readers .iter() - .filter_map(|(name, (readers, user_provided))| { + .filter_map(|(name, (reader, user_provided))| { if !user_provided.contains(docid) { return None; } - let mut vectors = Vec::new(); - for reader in readers { - let Some(vector) = reader.item_vector(wtxn, docid).transpose() else { - break; - }; - - match vector { - Ok(vector) => vectors.push(vector), - Err(error) => return Some(Err(error)), - } + match reader.item_vectors(wtxn, docid) { + Ok(vectors) if vectors.is_empty() => None, + Ok(vectors) => Some(Ok(( + name.to_string(), + serde_json::to_value(ExplicitVectors { + embeddings: Some( + VectorOrArrayOfVectors::from_array_of_vectors(vectors), + ), + regenerate: false, + }) + .unwrap(), + ))), + Err(e) => Some(Err(e)), } - if vectors.is_empty() { - return None; - } - Some(Ok(( - name.to_string(), - serde_json::to_value(ExplicitVectors { - embeddings: Some(VectorOrArrayOfVectors::from_array_of_vectors( - vectors, - )), - regenerate: false, - }) - .unwrap(), - ))) }) .collect(); @@ -1104,11 +1091,9 @@ impl<'a, 'i> Transform<'a, 'i> { } // delete all vectors from the embedders that need removal - for (_, (readers, _)) in readers { - for reader in readers { - let dimensions = reader.dimensions(wtxn)?; - reader.clear(wtxn, dimensions)?; - } + for (_, (reader, _)) in readers { + let dimensions = reader.dimensions(wtxn)?; + reader.clear(wtxn, dimensions)?; } let grenad_params = GrenadParameters { diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 54765cfef..b5b6cd953 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -45,6 +45,20 @@ impl ArroyWrapper { self.index } + fn readers<'a, D: arroy::Distance>( + &'a self, + rtxn: &'a RoTxn<'a>, + db: arroy::Database, + ) -> impl Iterator, arroy::Error>> + 'a { + arroy_db_range_for_embedder(self.index).map_while(move |index| { + match arroy::Reader::open(rtxn, index, db) { + Ok(reader) => Some(Ok(reader)), + Err(arroy::Error::MissingMetadata(_)) => None, + Err(e) => Some(Err(e)), + } + }) + } + pub fn dimensions(&self, rtxn: &RoTxn) -> Result { let first_id = arroy_db_range_for_embedder(self.index).next().unwrap(); if self.quantized { @@ -97,6 +111,7 @@ impl ArroyWrapper { Ok(()) } + /// Overwrite all the embeddings associated to the index and item id. pub fn add_items( &self, wtxn: &mut RwTxn, @@ -116,30 +131,41 @@ impl ArroyWrapper { Ok(()) } + /// Add one document int for this index where we can find an empty spot. pub fn add_item( &self, wtxn: &mut RwTxn, item_id: arroy::ItemId, vector: &[f32], + ) -> Result<(), arroy::Error> { + if self.quantized { + self._add_item(wtxn, self.quantized_db(), item_id, vector) + } else { + self._add_item(wtxn, self.angular_db(), item_id, vector) + } + } + + fn _add_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, + vector: &[f32], ) -> Result<(), arroy::Error> { let dimension = vector.len(); for index in arroy_db_range_for_embedder(self.index) { - if self.quantized { - let writer = arroy::Writer::new(self.quantized_db(), index, dimension); - if !writer.contains_item(wtxn, item_id)? { - writer.add_item(wtxn, item_id, &vector)?; - break; - } - } else { - arroy::Writer::new(self.angular_db(), index, dimension) - .add_item(wtxn, item_id, vector)? + let writer = arroy::Writer::new(db, index, dimension); + if !writer.contains_item(wtxn, item_id)? { + writer.add_item(wtxn, item_id, vector)?; + break; } } - Ok(()) } + /// Delete an item from the index. It **does not** take care of fixing the hole + /// made after deleting the item. pub fn del_item_raw( &self, wtxn: &mut RwTxn, @@ -163,36 +189,39 @@ impl ArroyWrapper { Ok(false) } + /// Delete one item. pub fn del_item( &self, wtxn: &mut RwTxn, - itemid: arroy::ItemId, + item_id: arroy::ItemId, + vector: &[f32], + ) -> Result { + if self.quantized { + self._del_item(wtxn, self.quantized_db(), item_id, vector) + } else { + self._del_item(wtxn, self.angular_db(), item_id, vector) + } + } + + fn _del_item( + &self, + wtxn: &mut RwTxn, + db: arroy::Database, + item_id: arroy::ItemId, vector: &[f32], ) -> Result { let dimension = vector.len(); let mut deleted_index = None; for index in arroy_db_range_for_embedder(self.index) { - if self.quantized { - let writer = arroy::Writer::new(self.quantized_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - // uses invariant: vectors are packed in the first writers. - break; - }; - if candidate == vector { - writer.del_item(wtxn, itemid)?; - deleted_index = Some(index); - } - } else { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - // uses invariant: vectors are packed in the first writers. - break; - }; - if candidate == vector { - writer.del_item(wtxn, itemid)?; - deleted_index = Some(index); - } + let writer = arroy::Writer::new(db, index, dimension); + let Some(candidate) = writer.item_vector(wtxn, item_id)? else { + // uses invariant: vectors are packed in the first writers. + break; + }; + if candidate == vector { + writer.del_item(wtxn, item_id)?; + deleted_index = Some(index); } } @@ -200,34 +229,18 @@ impl ArroyWrapper { if let Some(deleted_index) = deleted_index { let mut last_index_with_a_vector = None; for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) { - if self.quantized { - let writer = arroy::Writer::new(self.quantized_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - break; - }; - last_index_with_a_vector = Some((index, candidate)); - } else { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - let Some(candidate) = writer.item_vector(wtxn, itemid)? else { - break; - }; - last_index_with_a_vector = Some((index, candidate)); - } + let writer = arroy::Writer::new(db, index, dimension); + let Some(candidate) = writer.item_vector(wtxn, item_id)? else { + break; + }; + last_index_with_a_vector = Some((index, candidate)); } if let Some((last_index, vector)) = last_index_with_a_vector { - if self.quantized { - // unwrap: computed the index from the list of writers - let writer = arroy::Writer::new(self.quantized_db(), last_index, dimension); - writer.del_item(wtxn, itemid)?; - let writer = arroy::Writer::new(self.quantized_db(), deleted_index, dimension); - writer.add_item(wtxn, itemid, &vector)?; - } else { - // unwrap: computed the index from the list of writers - let writer = arroy::Writer::new(self.angular_db(), last_index, dimension); - writer.del_item(wtxn, itemid)?; - let writer = arroy::Writer::new(self.angular_db(), deleted_index, dimension); - writer.add_item(wtxn, itemid, &vector)?; - } + // unwrap: computed the index from the list of writers + let writer = arroy::Writer::new(db, last_index, dimension); + writer.del_item(wtxn, item_id)?; + let writer = arroy::Writer::new(db, deleted_index, dimension); + writer.add_item(wtxn, item_id, &vector)?; } } Ok(deleted_index.is_some()) @@ -284,17 +297,26 @@ impl ArroyWrapper { item: ItemId, limit: usize, filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + if self.quantized { + self._nns_by_item(rtxn, self.quantized_db(), item, limit, filter) + } else { + self._nns_by_item(rtxn, self.angular_db(), item, limit, filter) + } + } + + fn _nns_by_item( + &self, + rtxn: &RoTxn, + db: arroy::Database, + item: ItemId, + limit: usize, + filter: Option<&RoaringBitmap>, ) -> Result, arroy::Error> { let mut results = Vec::new(); - for index in arroy_db_range_for_embedder(self.index) { - let ret = if self.quantized { - arroy::Reader::open(rtxn, index, self.quantized_db())? - .nns_by_item(rtxn, item, limit, None, None, filter)? - } else { - arroy::Reader::open(rtxn, index, self.angular_db())? - .nns_by_item(rtxn, item, limit, None, None, filter)? - }; + for reader in self.readers(rtxn, db) { + let ret = reader?.nns_by_item(rtxn, item, limit, None, None, filter)?; if let Some(mut ret) = ret { results.append(&mut ret); } else { @@ -302,27 +324,35 @@ impl ArroyWrapper { } } results.sort_unstable_by_key(|(_, distance)| OrderedFloat(*distance)); - Ok(results) } pub fn nns_by_vector( &self, - txn: &RoTxn, - item: &[f32], + rtxn: &RoTxn, + vector: &[f32], + limit: usize, + filter: Option<&RoaringBitmap>, + ) -> Result, arroy::Error> { + if self.quantized { + self._nns_by_vector(rtxn, self.quantized_db(), vector, limit, filter) + } else { + self._nns_by_vector(rtxn, self.angular_db(), vector, limit, filter) + } + } + + fn _nns_by_vector( + &self, + rtxn: &RoTxn, + db: arroy::Database, + vector: &[f32], limit: usize, filter: Option<&RoaringBitmap>, ) -> Result, arroy::Error> { let mut results = Vec::new(); - for index in arroy_db_range_for_embedder(self.index) { - let mut ret = if self.quantized { - arroy::Reader::open(txn, index, self.quantized_db())? - .nns_by_vector(txn, item, limit, None, None, filter)? - } else { - arroy::Reader::open(txn, index, self.angular_db())? - .nns_by_vector(txn, item, limit, None, None, filter)? - }; + for reader in self.readers(rtxn, db) { + let mut ret = reader?.nns_by_vector(rtxn, vector, limit, None, None, filter)?; results.append(&mut ret); } @@ -331,18 +361,27 @@ impl ArroyWrapper { Ok(results) } - pub fn item_vector(&self, rtxn: &RoTxn, docid: u32) -> Result>, arroy::Error> { - for index in arroy_db_range_for_embedder(self.index) { - let ret = if self.quantized { - arroy::Reader::open(rtxn, index, self.quantized_db())?.item_vector(rtxn, docid)? - } else { - arroy::Reader::open(rtxn, index, self.angular_db())?.item_vector(rtxn, docid)? - }; - if ret.is_some() { - return Ok(ret); + pub fn item_vectors(&self, rtxn: &RoTxn, item_id: u32) -> Result>, arroy::Error> { + let mut vectors = Vec::new(); + + if self.quantized { + for reader in self.readers(rtxn, self.quantized_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } else { + break; + } + } + } else { + for reader in self.readers(rtxn, self.angular_db()) { + if let Some(vec) = reader?.item_vector(rtxn, item_id)? { + vectors.push(vec); + } else { + break; + } } } - Ok(None) + Ok(vectors) } fn angular_db(&self) -> arroy::Database { From 79d8a7a51a13fc089c3ebe58721302c856191d8d Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 10:36:28 +0200 Subject: [PATCH 04/11] rename the embedder index for clarity --- milli/src/vector/mod.rs | 42 ++++++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 17 deletions(-) diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index b5b6cd953..2da8ecd57 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -32,17 +32,21 @@ pub const REQUEST_PARALLELISM: usize = 40; pub struct ArroyWrapper { quantized: bool, - index: u8, + embedder_index: u8, database: arroy::Database, } impl ArroyWrapper { - pub fn new(database: arroy::Database, index: u8, quantized: bool) -> Self { - Self { database, index, quantized } + pub fn new( + database: arroy::Database, + embedder_index: u8, + quantized: bool, + ) -> Self { + Self { database, embedder_index, quantized } } pub fn index(&self) -> u8 { - self.index + self.embedder_index } fn readers<'a, D: arroy::Distance>( @@ -50,7 +54,7 @@ impl ArroyWrapper { rtxn: &'a RoTxn<'a>, db: arroy::Database, ) -> impl Iterator, arroy::Error>> + 'a { - arroy_db_range_for_embedder(self.index).map_while(move |index| { + arroy_db_range_for_embedder(self.embedder_index).map_while(move |index| { match arroy::Reader::open(rtxn, index, db) { Ok(reader) => Some(Ok(reader)), Err(arroy::Error::MissingMetadata(_)) => None, @@ -60,7 +64,7 @@ impl ArroyWrapper { } pub fn dimensions(&self, rtxn: &RoTxn) -> Result { - let first_id = arroy_db_range_for_embedder(self.index).next().unwrap(); + let first_id = arroy_db_range_for_embedder(self.embedder_index).next().unwrap(); if self.quantized { Ok(arroy::Reader::open(rtxn, first_id, self.quantized_db())?.dimensions()) } else { @@ -70,7 +74,7 @@ impl ArroyWrapper { pub fn quantize(&mut self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { if !self.quantized { - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { let writer = arroy::Writer::new(self.angular_db(), index, dimension); writer.prepare_changing_distance::(wtxn)?; } @@ -81,7 +85,7 @@ impl ArroyWrapper { // TODO: We can stop early when we find an empty DB pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result { - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { let need_build = if self.quantized { arroy::Writer::new(self.quantized_db(), index, dimension).need_build(rtxn) } else { @@ -101,7 +105,7 @@ impl ArroyWrapper { rng: &mut R, dimension: usize, ) -> Result<(), arroy::Error> { - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { if self.quantized { arroy::Writer::new(self.quantized_db(), index, dimension).build(wtxn, rng, None)? } else { @@ -119,7 +123,9 @@ impl ArroyWrapper { embeddings: &Embeddings, ) -> Result<(), arroy::Error> { let dimension = embeddings.dimension(); - for (index, vector) in arroy_db_range_for_embedder(self.index).zip(embeddings.iter()) { + for (index, vector) in + arroy_db_range_for_embedder(self.embedder_index).zip(embeddings.iter()) + { if self.quantized { arroy::Writer::new(self.quantized_db(), index, dimension) .add_item(wtxn, item_id, vector)? @@ -154,7 +160,7 @@ impl ArroyWrapper { ) -> Result<(), arroy::Error> { let dimension = vector.len(); - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { let writer = arroy::Writer::new(db, index, dimension); if !writer.contains_item(wtxn, item_id)? { writer.add_item(wtxn, item_id, vector)?; @@ -172,7 +178,7 @@ impl ArroyWrapper { dimension: usize, item_id: arroy::ItemId, ) -> Result { - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { if self.quantized { let writer = arroy::Writer::new(self.quantized_db(), index, dimension); if writer.del_item(wtxn, item_id)? { @@ -213,7 +219,7 @@ impl ArroyWrapper { let dimension = vector.len(); let mut deleted_index = None; - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { let writer = arroy::Writer::new(db, index, dimension); let Some(candidate) = writer.item_vector(wtxn, item_id)? else { // uses invariant: vectors are packed in the first writers. @@ -228,7 +234,9 @@ impl ArroyWrapper { // 🥲 enforce invariant: vectors are packed in the first writers. if let Some(deleted_index) = deleted_index { let mut last_index_with_a_vector = None; - for index in arroy_db_range_for_embedder(self.index).skip(deleted_index as usize) { + for index in + arroy_db_range_for_embedder(self.embedder_index).skip(deleted_index as usize) + { let writer = arroy::Writer::new(db, index, dimension); let Some(candidate) = writer.item_vector(wtxn, item_id)? else { break; @@ -247,7 +255,7 @@ impl ArroyWrapper { } pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { if self.quantized { arroy::Writer::new(self.quantized_db(), index, dimension).clear(wtxn)?; } else { @@ -258,7 +266,7 @@ impl ArroyWrapper { } pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result { - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { let empty = if self.quantized { arroy::Writer::new(self.quantized_db(), index, dimension).is_empty(rtxn)? } else { @@ -277,7 +285,7 @@ impl ArroyWrapper { dimension: usize, item: arroy::ItemId, ) -> Result { - for index in arroy_db_range_for_embedder(self.index) { + for index in arroy_db_range_for_embedder(self.embedder_index) { let contains = if self.quantized { arroy::Writer::new(self.quantized_db(), index, dimension) .contains_item(rtxn, item)? From f2d187ba3e779c0644ad0e1dbf3174dea2614d35 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 10:39:40 +0200 Subject: [PATCH 05/11] rename the index method to embedder_index --- milli/src/vector/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 2da8ecd57..ca607c892 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -45,7 +45,7 @@ impl ArroyWrapper { Self { database, embedder_index, quantized } } - pub fn index(&self) -> u8 { + pub fn embedder_index(&self) -> u8 { self.embedder_index } From fd8447c5214b62b724f18ec5de9b92fa34537462 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 10:52:05 +0200 Subject: [PATCH 06/11] fix the del items thing --- milli/src/update/index_documents/typed_chunk.rs | 2 +- milli/src/vector/mod.rs | 17 ++++++++--------- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/milli/src/update/index_documents/typed_chunk.rs b/milli/src/update/index_documents/typed_chunk.rs index e118420d8..20e70b2a6 100644 --- a/milli/src/update/index_documents/typed_chunk.rs +++ b/milli/src/update/index_documents/typed_chunk.rs @@ -680,7 +680,7 @@ pub(crate) fn write_typed_chunk_into_index( let mut iter = merger.into_stream_merger_iter()?; while let Some((key, _)) = iter.next()? { let docid = key.try_into().map(DocumentId::from_be_bytes).unwrap(); - writer.del_item_raw(wtxn, expected_dimension, docid)?; + writer.del_items(wtxn, expected_dimension, docid)?; } // add generated embeddings diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index ca607c892..4b322ddf4 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -170,29 +170,28 @@ impl ArroyWrapper { Ok(()) } - /// Delete an item from the index. It **does not** take care of fixing the hole - /// made after deleting the item. - pub fn del_item_raw( + /// Delete all embeddings from a specific `item_id` + pub fn del_items( &self, wtxn: &mut RwTxn, dimension: usize, item_id: arroy::ItemId, - ) -> Result { + ) -> Result<(), arroy::Error> { for index in arroy_db_range_for_embedder(self.embedder_index) { if self.quantized { let writer = arroy::Writer::new(self.quantized_db(), index, dimension); - if writer.del_item(wtxn, item_id)? { - return Ok(true); + if !writer.del_item(wtxn, item_id)? { + break; } } else { let writer = arroy::Writer::new(self.angular_db(), index, dimension); - if writer.del_item(wtxn, item_id)? { - return Ok(true); + if !writer.del_item(wtxn, item_id)? { + break; } } } - Ok(false) + Ok(()) } /// Delete one item. From b8a74e04647af60a396539b6ba3b47d19771cc49 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 10:59:15 +0200 Subject: [PATCH 07/11] fix comments --- milli/src/vector/mod.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 4b322ddf4..8341ab923 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -115,7 +115,10 @@ impl ArroyWrapper { Ok(()) } - /// Overwrite all the embeddings associated to the index and item id. + /// Overwrite all the embeddings associated with the index and item ID. + /// /!\ It won't remove embeddings after the last passed embedding, which can leave stale embeddings. + /// You should call `del_items` on the `item_id` before calling this method. + /// /!\ Cannot insert more than u8::MAX embeddings; after inserting u8::MAX embeddings, all the remaining ones will be silently ignored. pub fn add_items( &self, wtxn: &mut RwTxn, @@ -243,7 +246,6 @@ impl ArroyWrapper { last_index_with_a_vector = Some((index, candidate)); } if let Some((last_index, vector)) = last_index_with_a_vector { - // unwrap: computed the index from the list of writers let writer = arroy::Writer::new(db, last_index, dimension); writer.del_item(wtxn, item_id)?; let writer = arroy::Writer::new(db, deleted_index, dimension); From 645a55317af91f37d68d26527568032016bf5393 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 14:54:24 +0200 Subject: [PATCH 08/11] merge the build and quantize method --- milli/src/update/index_documents/mod.rs | 5 +-- milli/src/vector/mod.rs | 43 ++++++++++++++----------- 2 files changed, 25 insertions(+), 23 deletions(-) diff --git a/milli/src/update/index_documents/mod.rs b/milli/src/update/index_documents/mod.rs index b03ab259a..e164a0817 100644 --- a/milli/src/update/index_documents/mod.rs +++ b/milli/src/update/index_documents/mod.rs @@ -713,10 +713,7 @@ where pool.install(|| { let mut writer = ArroyWrapper::new(vector_arroy, embedder_index, was_quantized); - if is_quantizing { - writer.quantize(wtxn, dimension)?; - } - writer.build(wtxn, &mut rng, dimension)?; + writer.build_and_quantize(wtxn, &mut rng, dimension, is_quantizing)?; Result::Ok(()) }) .map_err(InternalError::from)??; diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 8341ab923..a33f76559 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -98,18 +98,37 @@ impl ArroyWrapper { Ok(false) } - /// TODO: We should early exit when it doesn't need to be built - pub fn build( - &self, + pub fn build_and_quantize( + &mut self, wtxn: &mut RwTxn, rng: &mut R, dimension: usize, + quantizing: bool, ) -> Result<(), arroy::Error> { for index in arroy_db_range_for_embedder(self.embedder_index) { if self.quantized { - arroy::Writer::new(self.quantized_db(), index, dimension).build(wtxn, rng, None)? + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.need_build(wtxn)? { + writer.build(wtxn, rng, None)? + } else if writer.is_empty(wtxn)? { + break; + } } else { - arroy::Writer::new(self.angular_db(), index, dimension).build(wtxn, rng, None)? + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + // If we are quantizing the databases, we can't know from meilisearch + // if the db was empty but still contained the wrong metadata, thus we need + // to quantize everything and can't stop early. Since this operation can + // only happens once in the life of an embedder, it's not very performances + // sensitive. + if quantizing && !self.quantized { + let writer = + writer.prepare_changing_distance::(wtxn)?; + writer.build(wtxn, rng, None)? + } else if writer.need_build(wtxn)? { + writer.build(wtxn, rng, None)? + } else if writer.is_empty(wtxn)? { + break; + } } } Ok(()) @@ -266,20 +285,6 @@ impl ArroyWrapper { Ok(()) } - pub fn is_empty(&self, rtxn: &RoTxn, dimension: usize) -> Result { - for index in arroy_db_range_for_embedder(self.embedder_index) { - let empty = if self.quantized { - arroy::Writer::new(self.quantized_db(), index, dimension).is_empty(rtxn)? - } else { - arroy::Writer::new(self.angular_db(), index, dimension).is_empty(rtxn)? - }; - if !empty { - return Ok(false); - } - } - Ok(true) - } - pub fn contains_item( &self, rtxn: &RoTxn, From 8b4e2c7b1798e58a71dfb0538dbc980155b688cc Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 15:00:25 +0200 Subject: [PATCH 09/11] Remove now unused method --- milli/src/vector/mod.rs | 26 -------------------------- 1 file changed, 26 deletions(-) diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index a33f76559..39655e72a 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -72,32 +72,6 @@ impl ArroyWrapper { } } - pub fn quantize(&mut self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { - if !self.quantized { - for index in arroy_db_range_for_embedder(self.embedder_index) { - let writer = arroy::Writer::new(self.angular_db(), index, dimension); - writer.prepare_changing_distance::(wtxn)?; - } - self.quantized = true; - } - Ok(()) - } - - // TODO: We can stop early when we find an empty DB - pub fn need_build(&self, rtxn: &RoTxn, dimension: usize) -> Result { - for index in arroy_db_range_for_embedder(self.embedder_index) { - let need_build = if self.quantized { - arroy::Writer::new(self.quantized_db(), index, dimension).need_build(rtxn) - } else { - arroy::Writer::new(self.angular_db(), index, dimension).need_build(rtxn) - }; - if need_build? { - return Ok(true); - } - } - Ok(false) - } - pub fn build_and_quantize( &mut self, wtxn: &mut RwTxn, From 7f048b9732a048624bbe4beacb2e93f59c6d510d Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 15:02:38 +0200 Subject: [PATCH 10/11] early exit in the clear and contains --- milli/src/vector/mod.rs | 25 ++++++++++++++++++++----- 1 file changed, 20 insertions(+), 5 deletions(-) diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index 39655e72a..d5b80db83 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -251,9 +251,17 @@ impl ArroyWrapper { pub fn clear(&self, wtxn: &mut RwTxn, dimension: usize) -> Result<(), arroy::Error> { for index in arroy_db_range_for_embedder(self.embedder_index) { if self.quantized { - arroy::Writer::new(self.quantized_db(), index, dimension).clear(wtxn)?; + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.is_empty(wtxn)? { + break; + } + writer.clear(wtxn)?; } else { - arroy::Writer::new(self.angular_db(), index, dimension).clear(wtxn)?; + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + if writer.is_empty(wtxn)? { + break; + } + writer.clear(wtxn)?; } } Ok(()) @@ -267,10 +275,17 @@ impl ArroyWrapper { ) -> Result { for index in arroy_db_range_for_embedder(self.embedder_index) { let contains = if self.quantized { - arroy::Writer::new(self.quantized_db(), index, dimension) - .contains_item(rtxn, item)? + let writer = arroy::Writer::new(self.quantized_db(), index, dimension); + if writer.is_empty(rtxn)? { + break; + } + writer.contains_item(rtxn, item)? } else { - arroy::Writer::new(self.angular_db(), index, dimension).contains_item(rtxn, item)? + let writer = arroy::Writer::new(self.angular_db(), index, dimension); + if writer.is_empty(rtxn)? { + break; + } + writer.contains_item(rtxn, item)? }; if contains { return Ok(contains); From b31e9bea26c098750dece8fb38eb2f57d6c254b5 Mon Sep 17 00:00:00 2001 From: Tamo Date: Tue, 24 Sep 2024 16:33:17 +0200 Subject: [PATCH 11/11] while retrieving the readers on an arroywrapper, stops at the first empty reader --- milli/src/vector/mod.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/milli/src/vector/mod.rs b/milli/src/vector/mod.rs index d5b80db83..b6d6510af 100644 --- a/milli/src/vector/mod.rs +++ b/milli/src/vector/mod.rs @@ -56,7 +56,11 @@ impl ArroyWrapper { ) -> impl Iterator, arroy::Error>> + 'a { arroy_db_range_for_embedder(self.embedder_index).map_while(move |index| { match arroy::Reader::open(rtxn, index, db) { - Ok(reader) => Some(Ok(reader)), + Ok(reader) => match reader.is_empty(rtxn) { + Ok(false) => Some(Ok(reader)), + Ok(true) => None, + Err(e) => Some(Err(e)), + }, Err(arroy::Error::MissingMetadata(_)) => None, Err(e) => Some(Err(e)), }