From b17896d899ecce691c9b7e47b35d6e48fc5a7d32 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Cl=C3=A9ment=20Renault?= <clement@meilisearch.com>
Date: Thu, 7 Nov 2024 15:05:20 +0100
Subject: [PATCH] Finialize the GeoExtractor

---
 Cargo.lock                                    |  10 +
 .../tests/documents/add_documents.rs          |   4 +-
 crates/milli/Cargo.toml                       |   1 +
 .../src/update/index_documents/typed_chunk.rs |   2 +-
 crates/milli/src/update/new/channel.rs        |  63 +++-
 crates/milli/src/update/new/document.rs       |   6 +
 .../milli/src/update/new/extract/documents.rs |   8 +-
 .../new/extract/faceted/extract_facets.rs     |   2 +-
 .../new/extract/faceted/facet_document.rs     |   3 +-
 .../milli/src/update/new/extract/geo/mod.rs   | 302 ++++++++++++++++++
 crates/milli/src/update/new/extract/mod.rs    |   2 +
 .../extract/searchable/extract_word_docids.rs |   2 +-
 .../extract_word_pair_proximity_docids.rs     |   2 +-
 .../src/update/new/extract/vectors/mod.rs     |  22 +-
 .../update/new/indexer/document_changes.rs    |   4 +-
 .../update/new/indexer/document_deletion.rs   |   2 +-
 crates/milli/src/update/new/indexer/mod.rs    |  62 +++-
 .../update/new/indexer/update_by_function.rs  |   2 +-
 crates/milli/src/update/new/merger.rs         |  91 +++---
 19 files changed, 497 insertions(+), 93 deletions(-)
 create mode 100644 crates/milli/src/update/new/extract/geo/mod.rs

diff --git a/Cargo.lock b/Cargo.lock
index 30b1102b5..fd01352a9 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3664,6 +3664,7 @@ dependencies = [
  "time",
  "tokenizers",
  "tracing",
+ "uell",
  "ureq",
  "url",
  "uuid",
@@ -5792,6 +5793,15 @@ version = "0.1.6"
 source = "registry+https://github.com/rust-lang/crates.io-index"
 checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9"
 
+[[package]]
+name = "uell"
+version = "0.1.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "40de5982e28612e20330e77d81f1559b74f66caf3c7fc10b19ada4843f4b4fd7"
+dependencies = [
+ "bumpalo",
+]
+
 [[package]]
 name = "unescaper"
 version = "0.1.5"
diff --git a/crates/meilisearch/tests/documents/add_documents.rs b/crates/meilisearch/tests/documents/add_documents.rs
index 0209a6d57..8c9601e0f 100644
--- a/crates/meilisearch/tests/documents/add_documents.rs
+++ b/crates/meilisearch/tests/documents/add_documents.rs
@@ -2201,7 +2201,7 @@ async fn add_invalid_geo_and_then_settings() {
     let index = server.index("test");
     index.create(Some("id")).await;
 
-    // _geo is not an object
+    // _geo is not a correct object
     let documents = json!([
         {
             "id": "11",
@@ -2230,7 +2230,7 @@ async fn add_invalid_geo_and_then_settings() {
     }
     "###);
 
-    let (ret, code) = index.update_settings(json!({"sortableAttributes": ["_geo"]})).await;
+    let (ret, code) = index.update_settings(json!({ "sortableAttributes": ["_geo"] })).await;
     snapshot!(code, @"202 Accepted");
     let ret = index.wait_task(ret.uid()).await;
     snapshot!(ret, @r###"
diff --git a/crates/milli/Cargo.toml b/crates/milli/Cargo.toml
index 005393411..622292e8a 100644
--- a/crates/milli/Cargo.toml
+++ b/crates/milli/Cargo.toml
@@ -100,6 +100,7 @@ bumpalo = "3.16.0"
 thread_local = "1.1.8"
 allocator-api2 = "0.2.18"
 rustc-hash = "2.0.0"
+uell = "0.1.0"
 
 [dev-dependencies]
 mimalloc = { version = "0.1.43", default-features = false }
diff --git a/crates/milli/src/update/index_documents/typed_chunk.rs b/crates/milli/src/update/index_documents/typed_chunk.rs
index 2c30220bc..a97569800 100644
--- a/crates/milli/src/update/index_documents/typed_chunk.rs
+++ b/crates/milli/src/update/index_documents/typed_chunk.rs
@@ -737,7 +737,7 @@ pub(crate) fn write_typed_chunk_into_index(
 }
 
 /// Converts the latitude and longitude back to an xyz GeoPoint.
-fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
+pub fn extract_geo_point(value: &[u8], docid: DocumentId) -> GeoPoint {
     let (lat, tail) = helpers::try_split_array_at::<u8, 8>(value).unwrap();
     let (lng, _) = helpers::try_split_array_at::<u8, 8>(tail).unwrap();
     let point = [f64::from_ne_bytes(lat), f64::from_ne_bytes(lng)];
diff --git a/crates/milli/src/update/new/channel.rs b/crates/milli/src/update/new/channel.rs
index 5b91ae77f..2027b4db8 100644
--- a/crates/milli/src/update/new/channel.rs
+++ b/crates/milli/src/update/new/channel.rs
@@ -3,9 +3,12 @@ use std::sync::atomic::{AtomicUsize, Ordering};
 
 use crossbeam_channel::{IntoIter, Receiver, SendError, Sender};
 use heed::types::Bytes;
+use memmap2::Mmap;
+use roaring::RoaringBitmap;
 
 use super::extract::FacetKind;
 use super::StdResult;
+use crate::index::main_key::{GEO_FACETED_DOCUMENTS_IDS_KEY, GEO_RTREE_KEY};
 use crate::index::IndexEmbeddingConfig;
 use crate::update::new::KvReaderFieldId;
 use crate::vector::Embedding;
@@ -25,9 +28,9 @@ pub fn extractor_writer_channel(cap: usize) -> (ExtractorSender, WriterReceiver)
     )
 }
 
-pub struct KeyValueEntry {
-    pub key_length: usize,
-    pub data: Box<[u8]>,
+pub enum KeyValueEntry {
+    Small { key_length: usize, data: Box<[u8]> },
+    Large { key_entry: KeyEntry, data: Mmap },
 }
 
 impl KeyValueEntry {
@@ -35,14 +38,25 @@ impl KeyValueEntry {
         let mut data = Vec::with_capacity(key.len() + value.len());
         data.extend_from_slice(key);
         data.extend_from_slice(value);
-        KeyValueEntry { key_length: key.len(), data: data.into_boxed_slice() }
+        KeyValueEntry::Small { key_length: key.len(), data: data.into_boxed_slice() }
     }
+
+    fn from_large_key_value(key: &[u8], value: Mmap) -> Self {
+        KeyValueEntry::Large { key_entry: KeyEntry::from_key(key), data: value }
+    }
+
     pub fn key(&self) -> &[u8] {
-        &self.data[..self.key_length]
+        match self {
+            KeyValueEntry::Small { key_length, data } => &data[..*key_length],
+            KeyValueEntry::Large { key_entry, data: _ } => key_entry.entry(),
+        }
     }
 
     pub fn value(&self) -> &[u8] {
-        &self.data[self.key_length..]
+        match self {
+            KeyValueEntry::Small { key_length, data } => &data[*key_length..],
+            KeyValueEntry::Large { key_entry: _, data } => &data[..],
+        }
     }
 }
 
@@ -97,6 +111,7 @@ pub struct DbOperation {
 
 #[derive(Debug)]
 pub enum Database {
+    Main,
     Documents,
     ExternalDocumentsIds,
     ExactWordDocids,
@@ -115,6 +130,7 @@ pub enum Database {
 impl Database {
     pub fn database(&self, index: &Index) -> heed::Database<Bytes, Bytes> {
         match self {
+            Database::Main => index.main.remap_types(),
             Database::Documents => index.documents.remap_types(),
             Database::ExternalDocumentsIds => index.external_documents_ids.remap_types(),
             Database::ExactWordDocids => index.exact_word_docids.remap_types(),
@@ -207,6 +223,10 @@ impl ExtractorSender {
         EmbeddingSender(&self.sender)
     }
 
+    pub fn geo(&self) -> GeoSender<'_> {
+        GeoSender(&self.sender)
+    }
+
     fn send_delete_vector(&self, docid: DocumentId) -> StdResult<(), SendError<()>> {
         match self
             .sender
@@ -423,3 +443,34 @@ impl EmbeddingSender<'_> {
             .map_err(|_| SendError(()))
     }
 }
+
+pub struct GeoSender<'a>(&'a Sender<WriterOperation>);
+
+impl GeoSender<'_> {
+    pub fn set_rtree(&self, value: Mmap) -> StdResult<(), SendError<()>> {
+        self.0
+            .send(WriterOperation::DbOperation(DbOperation {
+                database: Database::Main,
+                entry: EntryOperation::Write(KeyValueEntry::from_large_key_value(
+                    GEO_RTREE_KEY.as_bytes(),
+                    value,
+                )),
+            }))
+            .map_err(|_| SendError(()))
+    }
+
+    pub fn set_geo_faceted(&self, bitmap: &RoaringBitmap) -> StdResult<(), SendError<()>> {
+        let mut buffer = Vec::new();
+        bitmap.serialize_into(&mut buffer).unwrap();
+
+        self.0
+            .send(WriterOperation::DbOperation(DbOperation {
+                database: Database::Main,
+                entry: EntryOperation::Write(KeyValueEntry::from_small_key_value(
+                    GEO_FACETED_DOCUMENTS_IDS_KEY.as_bytes(),
+                    &buffer,
+                )),
+            }))
+            .map_err(|_| SendError(()))
+    }
+}
diff --git a/crates/milli/src/update/new/document.rs b/crates/milli/src/update/new/document.rs
index 692277597..8d4e3b0a9 100644
--- a/crates/milli/src/update/new/document.rs
+++ b/crates/milli/src/update/new/document.rs
@@ -352,6 +352,11 @@ where
         unordered_field_buffer.push((vectors_fid, &vectors_value));
     }
 
+    if let Some(geo_value) = document.geo_field()? {
+        let fid = fields_ids_map.id_or_insert("_geo").ok_or(UserError::AttributeLimitReached)?;
+        unordered_field_buffer.push((fid, geo_value));
+    }
+
     unordered_field_buffer.sort_by_key(|(fid, _)| *fid);
     for (fid, value) in unordered_field_buffer.iter() {
         writer.insert(*fid, value.get().as_bytes()).unwrap();
@@ -406,6 +411,7 @@ impl<'doc> Versions<'doc> {
     pub fn is_empty(&self) -> bool {
         self.data.is_empty()
     }
+
     pub fn top_level_field(&self, k: &str) -> Option<&'doc RawValue> {
         if k == RESERVED_VECTORS_FIELD_NAME || k == "_geo" {
             return None;
diff --git a/crates/milli/src/update/new/extract/documents.rs b/crates/milli/src/update/new/extract/documents.rs
index 2c93a5def..b76fe207a 100644
--- a/crates/milli/src/update/new/extract/documents.rs
+++ b/crates/milli/src/update/new/extract/documents.rs
@@ -54,7 +54,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
                 DocumentChange::Deletion(deletion) => {
                     let docid = deletion.docid();
                     let content = deletion.current(
-                        &context.txn,
+                        &context.rtxn,
                         context.index,
                         &context.db_fields_ids_map,
                     )?;
@@ -72,7 +72,7 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
                 DocumentChange::Update(update) => {
                     let docid = update.docid();
                     let content =
-                        update.current(&context.txn, context.index, &context.db_fields_ids_map)?;
+                        update.current(&context.rtxn, context.index, &context.db_fields_ids_map)?;
                     for res in content.iter_top_level_fields() {
                         let (f, _) = res?;
                         let entry = document_extractor_data
@@ -92,9 +92,9 @@ impl<'a, 'extractor> Extractor<'extractor> for DocumentsExtractor<'a> {
                     }
 
                     let content =
-                        update.merged(&context.txn, context.index, &context.db_fields_ids_map)?;
+                        update.merged(&context.rtxn, context.index, &context.db_fields_ids_map)?;
                     let vector_content = update.merged_vectors(
-                        &context.txn,
+                        &context.rtxn,
                         context.index,
                         &context.db_fields_ids_map,
                         &context.doc_alloc,
diff --git a/crates/milli/src/update/new/extract/faceted/extract_facets.rs b/crates/milli/src/update/new/extract/faceted/extract_facets.rs
index 11dc8f3c7..d0dc425ae 100644
--- a/crates/milli/src/update/new/extract/faceted/extract_facets.rs
+++ b/crates/milli/src/update/new/extract/faceted/extract_facets.rs
@@ -63,7 +63,7 @@ impl FacetedDocidsExtractor {
         document_change: DocumentChange,
     ) -> Result<()> {
         let index = &context.index;
-        let rtxn = &context.txn;
+        let rtxn = &context.rtxn;
         let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
         let mut cached_sorter = context.data.borrow_mut_or_yield();
         match document_change {
diff --git a/crates/milli/src/update/new/extract/faceted/facet_document.rs b/crates/milli/src/update/new/extract/faceted/facet_document.rs
index cf8984f9c..4308d0aa5 100644
--- a/crates/milli/src/update/new/extract/faceted/facet_document.rs
+++ b/crates/milli/src/update/new/extract/faceted/facet_document.rs
@@ -10,7 +10,8 @@ pub fn extract_document_facets<'doc>(
     field_id_map: &mut GlobalFieldsIdsMap,
     facet_fn: &mut impl FnMut(FieldId, &Value) -> Result<()>,
 ) -> Result<()> {
-    for res in document.iter_top_level_fields() {
+    let geo = document.geo_field().transpose().map(|res|  res.map(|rval| ("_geo", rval)));
+    for res in document.iter_top_level_fields().chain(geo) {
         let (field_name, value) = res?;
 
         let mut tokenize_field = |name: &str, value: &Value| match field_id_map.id_or_insert(name) {
diff --git a/crates/milli/src/update/new/extract/geo/mod.rs b/crates/milli/src/update/new/extract/geo/mod.rs
new file mode 100644
index 000000000..180611eee
--- /dev/null
+++ b/crates/milli/src/update/new/extract/geo/mod.rs
@@ -0,0 +1,302 @@
+use std::cell::RefCell;
+use std::fs::File;
+use std::io::{self, BufReader, BufWriter, ErrorKind, Read, Write as _};
+use std::{iter, mem, result};
+
+use bumpalo::Bump;
+use bytemuck::{bytes_of, from_bytes, pod_read_unaligned, Pod, Zeroable};
+use heed::RoTxn;
+use serde_json::value::RawValue;
+use serde_json::Value;
+
+use crate::error::GeoError;
+use crate::update::new::document::Document;
+use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
+use crate::update::new::ref_cell_ext::RefCellExt as _;
+use crate::update::new::DocumentChange;
+use crate::update::GrenadParameters;
+use crate::{lat_lng_to_xyz, DocumentId, GeoPoint, Index, InternalError, Object, Result};
+
+pub struct GeoExtractor {
+    grenad_parameters: GrenadParameters,
+}
+
+impl GeoExtractor {
+    pub fn new(
+        rtxn: &RoTxn,
+        index: &Index,
+        grenad_parameters: GrenadParameters,
+    ) -> Result<Option<Self>> {
+        let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
+        let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
+        if is_sortable || is_filterable {
+            Ok(Some(GeoExtractor { grenad_parameters }))
+        } else {
+            Ok(None)
+        }
+    }
+}
+
+#[derive(Pod, Zeroable, Copy, Clone)]
+#[repr(C, packed)]
+pub struct ExtractedGeoPoint {
+    pub docid: DocumentId,
+    pub lat_lng: [f64; 2],
+}
+
+impl From<ExtractedGeoPoint> for GeoPoint {
+    /// Converts the latitude and longitude back to an xyz GeoPoint.
+    fn from(value: ExtractedGeoPoint) -> Self {
+        let [lat, lng] = value.lat_lng;
+        let point = [lat, lng];
+        let xyz_point = lat_lng_to_xyz(&point);
+        GeoPoint::new(xyz_point, (value.docid, point))
+    }
+}
+
+pub struct GeoExtractorData<'extractor> {
+    /// The set of documents ids that were removed. If a document sees its geo
+    /// point being updated, we first put it in the deleted and then in the inserted.
+    removed: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
+    inserted: bumpalo::collections::Vec<'extractor, ExtractedGeoPoint>,
+    /// TODO Do the doc
+    spilled_removed: Option<BufWriter<File>>,
+    /// TODO Do the doc
+    spilled_inserted: Option<BufWriter<File>>,
+}
+
+impl<'extractor> GeoExtractorData<'extractor> {
+    pub fn freeze(self) -> Result<FrozenGeoExtractorData<'extractor>> {
+        let GeoExtractorData { removed, inserted, spilled_removed, spilled_inserted } = self;
+
+        Ok(FrozenGeoExtractorData {
+            removed: removed.into_bump_slice(),
+            inserted: inserted.into_bump_slice(),
+            spilled_removed: spilled_removed
+                .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
+                .transpose()?,
+            spilled_inserted: spilled_inserted
+                .map(|bw| bw.into_inner().map(BufReader::new).map_err(|iie| iie.into_error()))
+                .transpose()?,
+        })
+    }
+}
+
+unsafe impl MostlySend for GeoExtractorData<'_> {}
+
+pub struct FrozenGeoExtractorData<'extractor> {
+    pub removed: &'extractor [ExtractedGeoPoint],
+    pub inserted: &'extractor [ExtractedGeoPoint],
+    pub spilled_removed: Option<BufReader<File>>,
+    pub spilled_inserted: Option<BufReader<File>>,
+}
+
+impl<'extractor> FrozenGeoExtractorData<'extractor> {
+    pub fn iter_and_clear_removed(
+        &mut self,
+    ) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
+        mem::take(&mut self.removed)
+            .iter()
+            .copied()
+            .map(Ok)
+            .chain(iterator_over_spilled_geopoints(&mut self.spilled_removed))
+    }
+
+    pub fn iter_and_clear_inserted(
+        &mut self,
+    ) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
+        mem::take(&mut self.inserted)
+            .iter()
+            .copied()
+            .map(Ok)
+            .chain(iterator_over_spilled_geopoints(&mut self.spilled_inserted))
+    }
+}
+
+fn iterator_over_spilled_geopoints(
+    spilled: &mut Option<BufReader<File>>,
+) -> impl IntoIterator<Item = io::Result<ExtractedGeoPoint>> + '_ {
+    let mut spilled = spilled.take();
+    iter::from_fn(move || match &mut spilled {
+        Some(file) => {
+            let geopoint_bytes = &mut [0u8; mem::size_of::<ExtractedGeoPoint>()];
+            match file.read_exact(geopoint_bytes) {
+                Ok(()) => Some(Ok(pod_read_unaligned(geopoint_bytes))),
+                Err(e) if e.kind() == ErrorKind::UnexpectedEof => None,
+                Err(e) => Some(Err(e)),
+            }
+        }
+        None => None,
+    })
+}
+
+impl<'extractor> Extractor<'extractor> for GeoExtractor {
+    type Data = RefCell<GeoExtractorData<'extractor>>;
+
+    fn init_data<'doc>(&'doc self, extractor_alloc: &'extractor Bump) -> Result<Self::Data> {
+        Ok(RefCell::new(GeoExtractorData {
+            removed: bumpalo::collections::Vec::new_in(extractor_alloc),
+            // inserted: Uell::new_in(extractor_alloc),
+            inserted: bumpalo::collections::Vec::new_in(extractor_alloc),
+            spilled_inserted: None,
+            spilled_removed: None,
+        }))
+    }
+
+    fn process<'doc>(
+        &'doc self,
+        changes: impl Iterator<Item = Result<DocumentChange<'doc>>>,
+        context: &'doc DocumentChangeContext<Self::Data>,
+    ) -> Result<()> {
+        let rtxn = &context.rtxn;
+        let index = context.index;
+        let max_memory = self.grenad_parameters.max_memory;
+        let db_fields_ids_map = context.db_fields_ids_map;
+        let mut data_ref = context.data.borrow_mut_or_yield();
+
+        for change in changes {
+            if max_memory.map_or(false, |mm| context.extractor_alloc.allocated_bytes() >= mm) {
+                // We must spill as we allocated too much memory
+                data_ref.spilled_removed = tempfile::tempfile().map(BufWriter::new).map(Some)?;
+                data_ref.spilled_inserted = tempfile::tempfile().map(BufWriter::new).map(Some)?;
+            }
+
+            match change? {
+                DocumentChange::Deletion(deletion) => {
+                    let docid = deletion.docid();
+                    let external_id = deletion.external_document_id();
+                    let current = deletion.current(rtxn, index, db_fields_ids_map)?;
+                    let current_geo = current
+                        .geo_field()?
+                        .map(|geo| extract_geo_coordinates(external_id, geo))
+                        .transpose()?;
+
+                    if let Some(lat_lng) = current_geo.flatten() {
+                        let geopoint = ExtractedGeoPoint { docid, lat_lng };
+                        match &mut data_ref.spilled_removed {
+                            Some(file) => file.write_all(bytes_of(&geopoint))?,
+                            None => data_ref.removed.push(geopoint),
+                        }
+                    }
+                }
+                DocumentChange::Update(update) => {
+                    let current = update.current(rtxn, index, db_fields_ids_map)?;
+                    let external_id = update.external_document_id();
+                    let docid = update.docid();
+
+                    let current_geo = current
+                        .geo_field()?
+                        .map(|geo| extract_geo_coordinates(external_id, geo))
+                        .transpose()?;
+
+                    let updated_geo = update
+                        .updated()
+                        .geo_field()?
+                        .map(|geo| extract_geo_coordinates(external_id, geo))
+                        .transpose()?;
+
+                    if current_geo != updated_geo {
+                        // If the current and new geo points are different it means that
+                        // we need to replace the current by the new point and therefore
+                        // delete the current point from the RTree.
+                        if let Some(lat_lng) = current_geo.flatten() {
+                            let geopoint = ExtractedGeoPoint { docid, lat_lng };
+                            match &mut data_ref.spilled_removed {
+                                Some(file) => file.write_all(bytes_of(&geopoint))?,
+                                None => data_ref.removed.push(geopoint),
+                            }
+                        }
+
+                        if let Some(lat_lng) = updated_geo.flatten() {
+                            let geopoint = ExtractedGeoPoint { docid, lat_lng };
+                            match &mut data_ref.spilled_inserted {
+                                Some(file) => file.write_all(bytes_of(&geopoint))?,
+                                None => data_ref.inserted.push(geopoint),
+                            }
+                        }
+                    }
+                }
+                DocumentChange::Insertion(insertion) => {
+                    let external_id = insertion.external_document_id();
+                    let docid = insertion.docid();
+
+                    let inserted_geo = insertion
+                        .inserted()
+                        .geo_field()?
+                        .map(|geo| extract_geo_coordinates(external_id, geo))
+                        .transpose()?;
+
+                    if let Some(lat_lng) = inserted_geo.flatten() {
+                        let geopoint = ExtractedGeoPoint { docid, lat_lng };
+                        match &mut data_ref.spilled_inserted {
+                            Some(file) => file.write_all(bytes_of(&geopoint))?,
+                            None => data_ref.inserted.push(geopoint),
+                        }
+                    }
+                }
+            }
+        }
+
+        Ok(())
+    }
+}
+
+/// Extracts and validate the latitude and latitude from a document geo field.
+///
+/// It can be of the form `{ "lat": 0.0, "lng": "1.0" }`.
+fn extract_geo_coordinates(external_id: &str, raw_value: &RawValue) -> Result<Option<[f64; 2]>> {
+    let mut geo = match serde_json::from_str(raw_value.get()).map_err(InternalError::SerdeJson)? {
+        Value::Null => return Ok(None),
+        Value::Object(map) => map,
+        value => {
+            return Err(
+                GeoError::NotAnObject { document_id: Value::from(external_id), value }.into()
+            )
+        }
+    };
+
+    let [lat, lng] = match (geo.remove("lat"), geo.remove("lng")) {
+        (Some(lat), Some(lng)) => [lat, lng],
+        (Some(_), None) => {
+            return Err(GeoError::MissingLatitude { document_id: Value::from(external_id) }.into())
+        }
+        (None, Some(_)) => {
+            return Err(GeoError::MissingLongitude { document_id: Value::from(external_id) }.into())
+        }
+        (None, None) => {
+            return Err(GeoError::MissingLatitudeAndLongitude {
+                document_id: Value::from(external_id),
+            }
+            .into())
+        }
+    };
+
+    let lat = extract_finite_float_from_value(lat)
+        .map_err(|value| GeoError::BadLatitude { document_id: Value::from(external_id), value })?;
+
+    let lng = extract_finite_float_from_value(lng)
+        .map_err(|value| GeoError::BadLongitude { document_id: Value::from(external_id), value })?;
+
+    Ok(Some([lat, lng]))
+}
+
+/// Extracts and validate that a serde JSON Value is actually a finite f64.
+pub fn extract_finite_float_from_value(value: Value) -> result::Result<f64, Value> {
+    let number = match value {
+        Value::Number(ref n) => match n.as_f64() {
+            Some(number) => number,
+            None => return Err(value),
+        },
+        Value::String(ref s) => match s.parse::<f64>() {
+            Ok(number) => number,
+            Err(_) => return Err(value),
+        },
+        value => return Err(value),
+    };
+
+    if number.is_finite() {
+        Ok(number)
+    } else {
+        Err(value)
+    }
+}
diff --git a/crates/milli/src/update/new/extract/mod.rs b/crates/milli/src/update/new/extract/mod.rs
index af6a29d07..14cfa83cb 100644
--- a/crates/milli/src/update/new/extract/mod.rs
+++ b/crates/milli/src/update/new/extract/mod.rs
@@ -1,6 +1,7 @@
 mod cache;
 mod documents;
 mod faceted;
+mod geo;
 mod searchable;
 mod vectors;
 
@@ -8,6 +9,7 @@ use bumpalo::Bump;
 pub use cache::{merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap};
 pub use documents::*;
 pub use faceted::*;
+pub use geo::*;
 pub use searchable::*;
 pub use vectors::EmbeddingExtractor;
 
diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs
index 89583bd93..0223895e6 100644
--- a/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs
+++ b/crates/milli/src/update/new/extract/searchable/extract_word_docids.rs
@@ -326,7 +326,7 @@ impl WordDocidsExtractors {
         document_change: DocumentChange,
     ) -> Result<()> {
         let index = &context.index;
-        let rtxn = &context.txn;
+        let rtxn = &context.rtxn;
         let mut cached_sorter_ref = context.data.borrow_mut_or_yield();
         let cached_sorter = cached_sorter_ref.as_mut().unwrap();
         let mut new_fields_ids_map = context.new_fields_ids_map.borrow_mut_or_yield();
diff --git a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs
index 7f9fff38f..f637cff49 100644
--- a/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs
+++ b/crates/milli/src/update/new/extract/searchable/extract_word_pair_proximity_docids.rs
@@ -39,7 +39,7 @@ impl SearchableExtractor for WordPairProximityDocidsExtractor {
         let doc_alloc = &context.doc_alloc;
 
         let index = context.index;
-        let rtxn = &context.txn;
+        let rtxn = &context.rtxn;
 
         let mut key_buffer = bumpalo::collections::Vec::new_in(doc_alloc);
         let mut del_word_pair_proximity = bumpalo::collections::Vec::new_in(doc_alloc);
diff --git a/crates/milli/src/update/new/extract/vectors/mod.rs b/crates/milli/src/update/new/extract/vectors/mod.rs
index 3a73ff82f..2fb717c71 100644
--- a/crates/milli/src/update/new/extract/vectors/mod.rs
+++ b/crates/milli/src/update/new/extract/vectors/mod.rs
@@ -2,13 +2,13 @@ use std::cell::RefCell;
 
 use bumpalo::collections::Vec as BVec;
 use bumpalo::Bump;
-use hashbrown::HashMap;
+use hashbrown::{DefaultHashBuilder, HashMap};
 
 use super::cache::DelAddRoaringBitmap;
 use crate::error::FaultSource;
 use crate::prompt::Prompt;
 use crate::update::new::channel::EmbeddingSender;
-use crate::update::new::indexer::document_changes::{Extractor, MostlySend};
+use crate::update::new::indexer::document_changes::{DocumentChangeContext, Extractor, MostlySend};
 use crate::update::new::vector_document::VectorDocument;
 use crate::update::new::DocumentChange;
 use crate::vector::error::{
@@ -37,7 +37,7 @@ impl<'a> EmbeddingExtractor<'a> {
 }
 
 pub struct EmbeddingExtractorData<'extractor>(
-    pub HashMap<String, DelAddRoaringBitmap, hashbrown::DefaultHashBuilder, &'extractor Bump>,
+    pub HashMap<String, DelAddRoaringBitmap, DefaultHashBuilder, &'extractor Bump>,
 );
 
 unsafe impl MostlySend for EmbeddingExtractorData<'_> {}
@@ -52,9 +52,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
     fn process<'doc>(
         &'doc self,
         changes: impl Iterator<Item = crate::Result<DocumentChange<'doc>>>,
-        context: &'doc crate::update::new::indexer::document_changes::DocumentChangeContext<
-            Self::Data,
-        >,
+        context: &'doc DocumentChangeContext<Self::Data>,
     ) -> crate::Result<()> {
         let embedders = self.embedders.inner_as_ref();
         let mut unused_vectors_distribution =
@@ -63,7 +61,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
         let mut all_chunks = BVec::with_capacity_in(embedders.len(), &context.doc_alloc);
         for (embedder_name, (embedder, prompt, _is_quantized)) in embedders {
             let embedder_id =
-                context.index.embedder_category_id.get(&context.txn, embedder_name)?.ok_or_else(
+                context.index.embedder_category_id.get(&context.rtxn, embedder_name)?.ok_or_else(
                     || InternalError::DatabaseMissingEntry {
                         db_name: "embedder_category_id",
                         key: None,
@@ -95,7 +93,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
                 }
                 DocumentChange::Update(update) => {
                     let old_vectors = update.current_vectors(
-                        &context.txn,
+                        &context.rtxn,
                         context.index,
                         context.db_fields_ids_map,
                         &context.doc_alloc,
@@ -132,7 +130,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
                             } else if new_vectors.regenerate {
                                 let new_rendered = prompt.render_document(
                                     update.current(
-                                        &context.txn,
+                                        &context.rtxn,
                                         context.index,
                                         context.db_fields_ids_map,
                                     )?,
@@ -141,7 +139,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
                                 )?;
                                 let old_rendered = prompt.render_document(
                                     update.merged(
-                                        &context.txn,
+                                        &context.rtxn,
                                         context.index,
                                         context.db_fields_ids_map,
                                     )?,
@@ -160,7 +158,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
                         } else if old_vectors.regenerate {
                             let old_rendered = prompt.render_document(
                                 update.current(
-                                    &context.txn,
+                                    &context.rtxn,
                                     context.index,
                                     context.db_fields_ids_map,
                                 )?,
@@ -169,7 +167,7 @@ impl<'a, 'extractor> Extractor<'extractor> for EmbeddingExtractor<'a> {
                             )?;
                             let new_rendered = prompt.render_document(
                                 update.merged(
-                                    &context.txn,
+                                    &context.rtxn,
                                     context.index,
                                     context.db_fields_ids_map,
                                 )?,
diff --git a/crates/milli/src/update/new/indexer/document_changes.rs b/crates/milli/src/update/new/indexer/document_changes.rs
index b9bf79e47..e4b088f31 100644
--- a/crates/milli/src/update/new/indexer/document_changes.rs
+++ b/crates/milli/src/update/new/indexer/document_changes.rs
@@ -197,7 +197,7 @@ pub struct DocumentChangeContext<
     /// inside of the DB.
     pub db_fields_ids_map: &'indexer FieldsIdsMap,
     /// A transaction providing data from the DB before all indexing operations
-    pub txn: RoTxn<'indexer>,
+    pub rtxn: RoTxn<'indexer>,
 
     /// Global field id map that is up to date with the current state of the indexing process.
     ///
@@ -255,7 +255,7 @@ impl<
         let txn = index.read_txn()?;
         Ok(DocumentChangeContext {
             index,
-            txn,
+            rtxn: txn,
             db_fields_ids_map,
             new_fields_ids_map: fields_ids_map,
             doc_alloc,
diff --git a/crates/milli/src/update/new/indexer/document_deletion.rs b/crates/milli/src/update/new/indexer/document_deletion.rs
index d7648acd8..e89b04223 100644
--- a/crates/milli/src/update/new/indexer/document_deletion.rs
+++ b/crates/milli/src/update/new/indexer/document_deletion.rs
@@ -63,7 +63,7 @@ impl<'pl> DocumentChanges<'pl> for DocumentDeletionChanges<'pl> {
     where
         'pl: 'doc, // the payload must survive the process calls
     {
-        let current = context.index.document(&context.txn, *docid)?;
+        let current = context.index.document(&context.rtxn, *docid)?;
 
         let external_document_id = self.primary_key.extract_docid_from_db(
             current,
diff --git a/crates/milli/src/update/new/indexer/mod.rs b/crates/milli/src/update/new/indexer/mod.rs
index 6d1d0eea8..e3b24642e 100644
--- a/crates/milli/src/update/new/indexer/mod.rs
+++ b/crates/milli/src/update/new/indexer/mod.rs
@@ -33,6 +33,7 @@ use crate::index::main_key::{WORDS_FST_KEY, WORDS_PREFIXES_FST_KEY};
 use crate::proximity::ProximityPrecision;
 use crate::update::del_add::DelAdd;
 use crate::update::new::extract::EmbeddingExtractor;
+use crate::update::new::merger::merge_and_send_rtree;
 use crate::update::new::words_prefix_docids::compute_exact_word_prefix_docids;
 use crate::update::new::{merge_and_send_docids, merge_and_send_facet_docids, FacetDatabases};
 use crate::update::settings::InnerIndexSettings;
@@ -57,6 +58,7 @@ mod steps {
         "extracting words",
         "extracting word proximity",
         "extracting embeddings",
+        "writing geo points",
         "writing to database",
         "writing embeddings to database",
         "waiting for extractors",
@@ -93,29 +95,33 @@ mod steps {
         step(4)
     }
 
-    pub const fn write_db() -> (u16, &'static str) {
+    pub const fn extract_geo_points() -> (u16, &'static str) {
         step(5)
     }
 
-    pub const fn write_embedding_db() -> (u16, &'static str) {
+    pub const fn write_db() -> (u16, &'static str) {
         step(6)
     }
 
-    pub const fn waiting_extractors() -> (u16, &'static str) {
+    pub const fn write_embedding_db() -> (u16, &'static str) {
         step(7)
     }
 
-    pub const fn post_processing_facets() -> (u16, &'static str) {
+    pub const fn waiting_extractors() -> (u16, &'static str) {
         step(8)
     }
 
-    pub const fn post_processing_words() -> (u16, &'static str) {
+    pub const fn post_processing_facets() -> (u16, &'static str) {
         step(9)
     }
 
-    pub const fn finalizing() -> (u16, &'static str) {
+    pub const fn post_processing_words() -> (u16, &'static str) {
         step(10)
     }
+
+    pub const fn finalizing() -> (u16, &'static str) {
+        step(11)
+    }
 }
 
 /// This is the main function of this crate.
@@ -144,11 +150,8 @@ where
     let (extractor_sender, writer_receiver) = extractor_writer_channel(10_000);
 
     let metadata_builder = MetadataBuilder::from_index(index, wtxn)?;
-
     let new_fields_ids_map = FieldIdMapWithMetadata::new(new_fields_ids_map, metadata_builder);
-
     let new_fields_ids_map = RwLock::new(new_fields_ids_map);
-
     let fields_ids_map_store = ThreadLocal::with_capacity(pool.current_num_threads());
     let mut extractor_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
     let doc_allocs = ThreadLocal::with_capacity(pool.current_num_threads());
@@ -328,7 +331,15 @@ where
 
                     let (finished_steps, step_name) = steps::extract_word_proximity();
 
-                    let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters, document_changes, indexing_context, &mut extractor_allocs, finished_steps, total_steps, step_name)?;
+                    let caches = <WordPairProximityDocidsExtractor as DocidsExtractor>::run_extraction(grenad_parameters,
+                        document_changes,
+                        indexing_context,
+                        &mut extractor_allocs,
+                        finished_steps,
+                        total_steps,
+                        step_name,
+                    )?;
+
                     merge_and_send_docids(
                         caches,
                         index.word_pair_proximity_docids.remap_types(),
@@ -351,8 +362,6 @@ where
                     let extractor = EmbeddingExtractor::new(embedders, &embedding_sender, field_distribution, request_threads());
                     let mut datastore = ThreadLocal::with_capacity(pool.current_num_threads());
                     let (finished_steps, step_name) = steps::extract_embeddings();
-
-
                     extract(document_changes, &extractor, indexing_context, &mut extractor_allocs, &datastore, finished_steps, total_steps, step_name)?;
 
                     for config in &mut index_embeddings {
@@ -366,6 +375,35 @@ where
                     embedding_sender.finish(index_embeddings).unwrap();
                 }
 
+                'geo: {
+                    let span = tracing::trace_span!(target: "indexing::documents::extract", "geo");
+                    let _entered = span.enter();
+
+                    // let geo_sender = extractor_sender.geo_points();
+                    let Some(extractor) = GeoExtractor::new(&rtxn, index, grenad_parameters)? else {
+                        break 'geo;
+                    };
+                    let datastore = ThreadLocal::with_capacity(pool.current_num_threads());
+                    let (finished_steps, step_name) = steps::extract_geo_points();
+                    extract(document_changes,
+                        &extractor,
+                        indexing_context,
+                        &mut extractor_allocs,
+                        &datastore,
+                        finished_steps,
+                        total_steps,
+                        step_name,
+                    )?;
+
+                    merge_and_send_rtree(
+                        datastore,
+                        &rtxn,
+                        index,
+                        extractor_sender.geo(),
+                        &indexing_context.must_stop_processing,
+                    )?;
+                }
+
                 // TODO THIS IS TOO MUCH
                 // - [ ] Extract fieldid docid facet number
                 // - [ ] Extract fieldid docid facet string
diff --git a/crates/milli/src/update/new/indexer/update_by_function.rs b/crates/milli/src/update/new/indexer/update_by_function.rs
index eb7252445..f6df3981d 100644
--- a/crates/milli/src/update/new/indexer/update_by_function.rs
+++ b/crates/milli/src/update/new/indexer/update_by_function.rs
@@ -93,7 +93,7 @@ impl<'index> DocumentChanges<'index> for UpdateByFunctionChanges<'index> {
         let DocumentChangeContext {
             index,
             db_fields_ids_map,
-            txn,
+            rtxn: txn,
             new_fields_ids_map,
             doc_alloc,
             ..
diff --git a/crates/milli/src/update/new/merger.rs b/crates/milli/src/update/new/merger.rs
index 4eca113ea..c81f84f43 100644
--- a/crates/milli/src/update/new/merger.rs
+++ b/crates/milli/src/update/new/merger.rs
@@ -1,68 +1,63 @@
-use std::io::{self};
+use std::cell::RefCell;
+use std::io;
 
-use bincode::ErrorKind;
 use hashbrown::HashSet;
 use heed::types::Bytes;
 use heed::{Database, RoTxn};
+use memmap2::Mmap;
 use rayon::iter::{IntoParallelIterator, ParallelIterator};
 use roaring::RoaringBitmap;
 
 use super::channel::*;
 use super::extract::{
     merge_caches, transpose_and_freeze_caches, BalancedCaches, DelAddRoaringBitmap, FacetKind,
+    GeoExtractorData,
 };
-use super::DocumentChange;
-use crate::{
-    CboRoaringBitmapCodec, Error, FieldId, GeoPoint, GlobalFieldsIdsMap, Index, InternalError,
-    Result,
-};
+use crate::{CboRoaringBitmapCodec, FieldId, GeoPoint, Index, InternalError, Result};
 
-pub struct GeoExtractor {
-    rtree: Option<rstar::RTree<GeoPoint>>,
-}
+#[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]
+pub fn merge_and_send_rtree<'extractor, MSP>(
+    datastore: impl IntoIterator<Item = RefCell<GeoExtractorData<'extractor>>>,
+    rtxn: &RoTxn,
+    index: &Index,
+    geo_sender: GeoSender<'_>,
+    must_stop_processing: &MSP,
+) -> Result<()>
+where
+    MSP: Fn() -> bool + Sync,
+{
+    let mut rtree = index.geo_rtree(rtxn)?.unwrap_or_default();
+    let mut faceted = index.geo_faceted_documents_ids(rtxn)?;
 
-impl GeoExtractor {
-    pub fn new(rtxn: &RoTxn, index: &Index) -> Result<Option<Self>> {
-        let is_sortable = index.sortable_fields(rtxn)?.contains("_geo");
-        let is_filterable = index.filterable_fields(rtxn)?.contains("_geo");
-        if is_sortable || is_filterable {
-            Ok(Some(GeoExtractor { rtree: index.geo_rtree(rtxn)? }))
-        } else {
-            Ok(None)
+    for data in datastore {
+        if must_stop_processing() {
+            return Err(InternalError::AbortedIndexation.into());
+        }
+
+        let mut frozen = data.into_inner().freeze()?;
+        for result in frozen.iter_and_clear_removed() {
+            let extracted_geo_point = result?;
+            debug_assert!(rtree.remove(&GeoPoint::from(extracted_geo_point)).is_some());
+            debug_assert!(faceted.remove(extracted_geo_point.docid));
+        }
+
+        for result in frozen.iter_and_clear_inserted() {
+            let extracted_geo_point = result?;
+            rtree.insert(GeoPoint::from(extracted_geo_point));
+            debug_assert!(faceted.insert(extracted_geo_point.docid));
         }
     }
 
-    pub fn manage_change(
-        &mut self,
-        fidmap: &mut GlobalFieldsIdsMap,
-        change: &DocumentChange,
-    ) -> Result<()> {
-        match change {
-            DocumentChange::Deletion(_) => todo!(),
-            DocumentChange::Update(_) => todo!(),
-            DocumentChange::Insertion(_) => todo!(),
-        }
-    }
+    let mut file = tempfile::tempfile()?;
+    /// manage error
+    bincode::serialize_into(&mut file, dbg!(&rtree)).unwrap();
+    file.sync_all()?;
 
-    pub fn serialize_rtree<W: io::Write>(self, writer: &mut W) -> Result<bool> {
-        match self.rtree {
-            Some(rtree) => {
-                // TODO What should I do?
-                bincode::serialize_into(writer, &rtree).map(|_| true).map_err(|e| match *e {
-                    ErrorKind::Io(e) => Error::IoError(e),
-                    ErrorKind::InvalidUtf8Encoding(_) => todo!(),
-                    ErrorKind::InvalidBoolEncoding(_) => todo!(),
-                    ErrorKind::InvalidCharEncoding => todo!(),
-                    ErrorKind::InvalidTagEncoding(_) => todo!(),
-                    ErrorKind::DeserializeAnyNotSupported => todo!(),
-                    ErrorKind::SizeLimit => todo!(),
-                    ErrorKind::SequenceMustHaveLength => todo!(),
-                    ErrorKind::Custom(_) => todo!(),
-                })
-            }
-            None => Ok(false),
-        }
-    }
+    let rtree_mmap = unsafe { Mmap::map(&file)? };
+    geo_sender.set_rtree(rtree_mmap).unwrap();
+    geo_sender.set_geo_faceted(&faceted).unwrap();
+
+    Ok(())
 }
 
 #[tracing::instrument(level = "trace", skip_all, target = "indexing::merge")]