Update heed and arroy

This commit is contained in:
Louis Dureuil 2025-03-04 16:46:44 +01:00
parent e751342dfb
commit f1f2b5d0d3
No known key found for this signature in database
34 changed files with 261 additions and 150 deletions

86
Cargo.lock generated
View File

@ -391,6 +391,24 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711"
[[package]]
name = "arroy"
version = "0.5.0"
dependencies = [
"bytemuck",
"byteorder",
"heed 0.21.0",
"log",
"memmap2",
"nohash",
"ordered-float",
"rand",
"rayon",
"roaring",
"tempfile",
"thiserror 2.0.9",
]
[[package]]
name = "arroy"
version = "0.5.0"
@ -399,7 +417,7 @@ checksum = "dfc5f272f38fa063bbff0a7ab5219404e221493de005e2b4078c62d626ef567e"
dependencies = [
"bytemuck",
"byteorder",
"heed",
"heed 0.20.5",
"log",
"memmap2",
"nohash",
@ -418,7 +436,7 @@ source = "git+https://github.com/meilisearch/arroy/?tag=DO-NOT-DELETE-upgrade-v0
dependencies = [
"bytemuck",
"byteorder",
"heed",
"heed 0.20.5",
"log",
"memmap2",
"nohash",
@ -944,13 +962,13 @@ checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5"
[[package]]
name = "cc"
version = "1.0.104"
version = "1.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "74b6a57f98764a267ff415d50a25e6e166f3831a5071af4995296ea97d210490"
checksum = "be714c154be609ec7f5dad223a33bf1482fff90472de28f7362806e6d4832b8c"
dependencies = [
"jobserver",
"libc",
"once_cell",
"shlex",
]
[[package]]
@ -2403,16 +2421,37 @@ checksum = "7d4f449bab7320c56003d37732a917e18798e2f1709d80263face2b4f9436ddb"
dependencies = [
"bitflags 2.6.0",
"byteorder",
"heed-traits",
"heed-types",
"heed-traits 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)",
"heed-types 0.20.1",
"libc",
"lmdb-master-sys",
"lmdb-master-sys 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"once_cell",
"page_size",
"serde",
"synchronoise",
"url",
]
[[package]]
name = "heed"
version = "0.21.0"
dependencies = [
"bitflags 2.6.0",
"byteorder",
"heed-traits 0.20.0",
"heed-types 0.21.0",
"libc",
"lmdb-master-sys 0.2.4",
"once_cell",
"page_size",
"synchronoise",
"url",
]
[[package]]
name = "heed-traits"
version = "0.20.0"
[[package]]
name = "heed-traits"
version = "0.20.0"
@ -2427,7 +2466,18 @@ checksum = "9d3f528b053a6d700b2734eabcd0fd49cb8230647aa72958467527b0b7917114"
dependencies = [
"bincode",
"byteorder",
"heed-traits",
"heed-traits 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde",
"serde_json",
]
[[package]]
name = "heed-types"
version = "0.21.0"
dependencies = [
"bincode",
"byteorder",
"heed-traits 0.20.0",
"serde",
"serde_json",
]
@ -3466,6 +3516,15 @@ version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4ee93343901ab17bd981295f2cf0026d4ad018c7c31ba84549a4ddbb47a45104"
[[package]]
name = "lmdb-master-sys"
version = "0.2.4"
dependencies = [
"cc",
"doxygen-rs",
"libc",
]
[[package]]
name = "lmdb-master-sys"
version = "0.2.4"
@ -3513,9 +3572,9 @@ checksum = "9374ef4228402d4b7e403e5838cb880d9ee663314b0a900d5a6aabf0c213552e"
[[package]]
name = "log"
version = "0.4.21"
version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c"
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
[[package]]
name = "lzma-rs"
@ -3730,6 +3789,7 @@ dependencies = [
"clap",
"dump",
"file-store",
"heed 0.20.5",
"indexmap",
"meilisearch-auth",
"meilisearch-types",
@ -3761,7 +3821,7 @@ name = "milli"
version = "1.13.1"
dependencies = [
"allocator-api2",
"arroy 0.5.0 (registry+https://github.com/rust-lang/crates.io-index)",
"arroy 0.5.0",
"bbqueue",
"big_s",
"bimap",
@ -3790,7 +3850,7 @@ dependencies = [
"geoutils",
"grenad",
"hashbrown 0.15.2",
"heed",
"heed 0.21.0",
"hf-hub",
"indexmap",
"insta",

View File

@ -35,7 +35,7 @@ fn setup_dir(path: impl AsRef<Path>) {
fn setup_index() -> Index {
let path = "benches.mmdb";
setup_dir(path);
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(100 * 1024 * 1024 * 1024); // 100 GB
options.max_readers(100);
Index::new(options, path, true).unwrap()

View File

@ -65,7 +65,7 @@ pub fn base_setup(conf: &Conf) -> Index {
}
create_dir_all(conf.database_name).unwrap();
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(100 * 1024 * 1024 * 1024); // 100 GB
options.max_readers(100);
let index = Index::new(options, conf.database_name, true).unwrap();

View File

@ -57,7 +57,7 @@ fn main() {
let opt = opt.clone();
let handle = std::thread::spawn(move || {
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(1024 * 1024 * 1024 * 1024);
let tempdir = match opt.path {
Some(path) => TempDir::new_in(path).unwrap(),

View File

@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock};
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RwTxn};
use meilisearch_types::heed::{Database, Env, RwTxn, WithoutTls};
use crate::error::FeatureNotEnabledError;
use crate::Result;
@ -126,7 +126,7 @@ impl FeatureData {
}
pub fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
instance_features: InstanceTogglableFeatures,
) -> Result<Self> {

View File

@ -304,7 +304,7 @@ fn create_or_open_index(
map_size: usize,
creation: bool,
) -> Result<Index> {
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(clamp_to_page_size(map_size));
// You can find more details about this experimental
@ -333,7 +333,7 @@ fn create_or_open_index(
#[cfg(test)]
mod tests {
use meilisearch_types::heed::Env;
use meilisearch_types::heed::{Env, WithoutTls};
use meilisearch_types::Index;
use uuid::Uuid;
@ -343,7 +343,7 @@ mod tests {
use crate::IndexScheduler;
impl IndexMapper {
fn test() -> (Self, Env, IndexSchedulerHandle) {
fn test() -> (Self, Env<WithoutTls>, IndexSchedulerHandle) {
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
(index_scheduler.index_mapper, index_scheduler.env, handle)
}

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use std::{fs, thread};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli;
use meilisearch_types::milli::database_stats::DatabaseStats;
use meilisearch_types::milli::update::IndexerConfig;
@ -159,7 +159,7 @@ impl IndexMapper {
}
pub fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
options: &IndexSchedulerOptions,
budget: IndexBudget,

View File

@ -40,6 +40,7 @@ pub type TaskId = u32;
use std::collections::{BTreeMap, HashMap};
use std::io::{self, BufReader, Read};
use std::ops::Deref as _;
use std::panic::{catch_unwind, AssertUnwindSafe};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
@ -54,7 +55,7 @@ use meilisearch_types::batches::Batch;
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
use meilisearch_types::heed::byteorder::BE;
use meilisearch_types::heed::types::I128;
use meilisearch_types::heed::{self, Env, RoTxn};
use meilisearch_types::heed::{self, Env, RoTxn, WithoutTls};
use meilisearch_types::milli::index::IndexEmbeddingConfig;
use meilisearch_types::milli::update::IndexerConfig;
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
@ -131,7 +132,7 @@ pub struct IndexSchedulerOptions {
/// to be performed on them.
pub struct IndexScheduler {
/// The LMDB environment which the DBs are associated with.
pub(crate) env: Env,
pub(crate) env: Env<WithoutTls>,
/// The list of tasks currently processing
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
@ -241,6 +242,7 @@ impl IndexScheduler {
let env = unsafe {
heed::EnvOpenOptions::new()
.read_txn_without_tls()
.max_dbs(Self::nb_db())
.map_size(budget.task_db_size)
.open(&options.tasks_path)
@ -358,7 +360,7 @@ impl IndexScheduler {
}
}
pub fn read_txn(&self) -> Result<RoTxn> {
pub fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
self.env.read_txn().map_err(|e| e.into())
}
@ -427,12 +429,12 @@ impl IndexScheduler {
/// If you need to fetch information from or perform an action on all indexes,
/// see the `try_for_each_index` function.
pub fn index(&self, name: &str) -> Result<Index> {
self.index_mapper.index(&self.env.read_txn()?, name)
self.index_mapper.index(self.env.read_txn()?.deref(), name)
}
/// Return the boolean referring if index exists.
pub fn index_exists(&self, name: &str) -> Result<bool> {
self.index_mapper.index_exists(&self.env.read_txn()?, name)
self.index_mapper.index_exists(self.env.read_txn()?.deref(), name)
}
/// Return the name of all indexes without opening them.
@ -507,7 +509,7 @@ impl IndexScheduler {
/// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example.
/// 3. The number of times the properties appeared.
pub fn get_stats(&self) -> Result<BTreeMap<String, BTreeMap<String, u64>>> {
self.queue.get_stats(&self.read_txn()?, &self.processing_tasks.read().unwrap())
self.queue.get_stats(self.read_txn()?.deref(), &self.processing_tasks.read().unwrap())
}
// Return true if there is at least one task that is processing.

View File

@ -3,7 +3,7 @@ use std::ops::{Bound, RangeBounds};
use meilisearch_types::batches::{Batch, BatchId};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, Status};
use roaring::{MultiOps, RoaringBitmap};
@ -66,7 +66,7 @@ impl BatchQueue {
NUMBER_OF_DATABASES
}
pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
pub(super) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?,
status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?,

View File

@ -13,7 +13,7 @@ use std::time::Duration;
use file_store::FileStore;
use meilisearch_types::batches::BatchId;
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
use roaring::RoaringBitmap;
@ -157,7 +157,7 @@ impl Queue {
/// Create an index scheduler and start its run loop.
pub(crate) fn new(
env: &Env,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
options: &IndexSchedulerOptions,
) -> Result<Self> {

View File

@ -1,7 +1,7 @@
use std::ops::{Bound, RangeBounds};
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
use meilisearch_types::tasks::{Kind, Status, Task};
use roaring::{MultiOps, RoaringBitmap};
@ -68,7 +68,7 @@ impl TaskQueue {
NUMBER_OF_DATABASES
}
pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
Ok(Self {
all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?,
status: env.create_database(wtxn, Some(db_name::STATUS))?,

View File

@ -42,7 +42,8 @@ impl IndexScheduler {
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
let dst = temp_snapshot_dir.path().join("tasks");
fs::create_dir_all(&dst)?;
self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
let mut file = std::fs::File::create(dst.join("data.mdb"))?;
self.env.copy_to_file(&mut file, CompactionOption::Enabled)?;
// 2.2 Create a read transaction on the index-scheduler
let rtxn = self.env.read_txn()?;
@ -98,7 +99,8 @@ impl IndexScheduler {
.max_dbs(2)
.open(&self.scheduler.auth_path)
}?;
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
let mut file = std::fs::File::create(dst.join("data.mdb"))?;
auth.copy_to_file(&mut file, CompactionOption::Enabled)?;
// 5. Copy and tarball the flat snapshot
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);

View File

@ -1,5 +1,5 @@
use anyhow::bail;
use meilisearch_types::heed::{Env, RwTxn};
use meilisearch_types::heed::{Env, RwTxn, WithoutTls};
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
use time::OffsetDateTime;
@ -9,13 +9,17 @@ use crate::queue::TaskQueue;
use crate::versioning::Versioning;
trait UpgradeIndexScheduler {
fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32))
-> anyhow::Result<()>;
fn upgrade(
&self,
env: &Env<WithoutTls>,
wtxn: &mut RwTxn,
original: (u32, u32, u32),
) -> anyhow::Result<()>;
fn target_version(&self) -> (u32, u32, u32);
}
pub fn upgrade_index_scheduler(
env: &Env,
env: &Env<WithoutTls>,
versioning: &Versioning,
from: (u32, u32, u32),
to: (u32, u32, u32),
@ -90,7 +94,7 @@ struct V1_12_ToCurrent {}
impl UpgradeIndexScheduler for V1_12_ToCurrent {
fn upgrade(
&self,
_env: &Env,
_env: &Env<WithoutTls>,
_wtxn: &mut RwTxn,
_original: (u32, u32, u32),
) -> anyhow::Result<()> {

View File

@ -1,5 +1,5 @@
use meilisearch_types::heed::types::Str;
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls};
use meilisearch_types::milli::heed_codec::version::VersionCodec;
use meilisearch_types::versioning;
@ -46,12 +46,12 @@ impl Versioning {
}
/// Return `Self` without checking anything about the version
pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
pub fn raw_new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
let version = env.create_database(wtxn, Some(db_name::VERSION))?;
Ok(Self { version })
}
pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> {
pub(crate) fn new(env: &Env<WithoutTls>, db_version: (u32, u32, u32)) -> Result<Self> {
let mut wtxn = env.write_txn()?;
let this = Self::raw_new(env, &mut wtxn)?;
let from = match this.get_version(&wtxn)? {

View File

@ -9,7 +9,7 @@ use std::str::FromStr;
use std::sync::Arc;
use hmac::{Hmac, Mac};
use meilisearch_types::heed::BoxedError;
use meilisearch_types::heed::{BoxedError, WithoutTls};
use meilisearch_types::index_uid_pattern::IndexUidPattern;
use meilisearch_types::keys::KeyId;
use meilisearch_types::milli;
@ -31,7 +31,7 @@ const KEY_ID_ACTION_INDEX_EXPIRATION_DB_NAME: &str = "keyid-action-index-expirat
#[derive(Clone)]
pub struct HeedAuthStore {
env: Arc<Env>,
env: Arc<Env<WithoutTls>>,
keys: Database<Bytes, SerdeJson<Key>>,
action_keyid_index_expiration: Database<KeyIdActionCodec, SerdeJson<Option<OffsetDateTime>>>,
should_close_on_drop: bool,
@ -45,8 +45,8 @@ impl Drop for HeedAuthStore {
}
}
pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env> {
let mut options = EnvOpenOptions::new();
pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env<WithoutTls>> {
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(AUTH_STORE_SIZE); // 1GB
options.max_dbs(2);
unsafe { options.open(path) }

View File

@ -500,8 +500,7 @@ impl ErrorCode for HeedError {
HeedError::Mdb(_)
| HeedError::Encoding(_)
| HeedError::Decoding(_)
| HeedError::DatabaseClosing
| HeedError::BadOpenOptions { .. } => Code::Internal,
| HeedError::EnvAlreadyOpened => Code::Internal,
}
}
}

View File

@ -420,6 +420,7 @@ pub fn update_version_file_for_dumpless_upgrade(
if from_major == 1 && from_minor == 12 {
let env = unsafe {
heed::EnvOpenOptions::new()
.read_txn_without_tls()
.max_dbs(Versioning::nb_db())
.map_size(index_scheduler_opt.task_db_size)
.open(&index_scheduler_opt.tasks_path)

View File

@ -1,6 +1,7 @@
use core::fmt;
use std::cmp::min;
use std::collections::{BTreeMap, BTreeSet, HashSet};
use std::ops::Deref as _;
use std::str::FromStr;
use std::sync::Arc;
use std::time::{Duration, Instant};
@ -340,7 +341,7 @@ impl SearchKind {
vector_len: Option<usize>,
route: Route,
) -> Result<(String, Arc<Embedder>, bool), ResponseError> {
let embedder_configs = index.embedding_configs(&index.read_txn()?)?;
let embedder_configs = index.embedding_configs(index.read_txn()?.deref())?;
let embedders = index_scheduler.embedders(index_uid, embedder_configs)?;
let (embedder, _, quantized) = embedders

View File

@ -11,6 +11,7 @@ license.workspace = true
[dependencies]
anyhow = "1.0.95"
arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/arroy/", tag = "DO-NOT-DELETE-upgrade-v04-to-v05" }
heed_arroy_v04_to_v05 = { package = "heed", version = "0.20.5" }
clap = { version = "4.5.24", features = ["derive"] }
dump = { path = "../dump" }
file-store = { path = "../file-store" }

View File

@ -11,7 +11,7 @@ use meilisearch_auth::AuthController;
use meilisearch_types::batches::Batch;
use meilisearch_types::heed::types::{Bytes, SerdeJson, Str};
use meilisearch_types::heed::{
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, WithoutTls,
};
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
@ -172,7 +172,7 @@ fn main() -> anyhow::Result<()> {
/// Clears the task queue located at `db_path`.
fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
let path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&path) }
let env = unsafe { EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&path) }
.with_context(|| format!("While trying to open {:?}", path.display()))?;
eprintln!("Deleting tasks from the database...");
@ -225,8 +225,8 @@ fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
}
fn try_opening_database<KC: 'static, DC: 'static>(
env: &Env,
rtxn: &RoTxn,
env: &Env<WithoutTls>,
rtxn: &RoTxn<WithoutTls>,
db_name: &str,
) -> anyhow::Result<Database<KC, DC>> {
env.open_database(rtxn, Some(db_name))
@ -235,8 +235,8 @@ fn try_opening_database<KC: 'static, DC: 'static>(
}
fn try_opening_poly_database(
env: &Env,
rtxn: &RoTxn,
env: &Env<WithoutTls>,
rtxn: &RoTxn<WithoutTls>,
db_name: &str,
) -> anyhow::Result<Database<Unspecified, Unspecified>> {
env.database_options()
@ -246,6 +246,23 @@ fn try_opening_poly_database(
.with_context(|| format!("Missing the {db_name:?} poly database"))
}
fn try_opening_poly_database_arroy_v04_to_v05(
env: &heed_arroy_v04_to_v05::Env,
rtxn: &heed_arroy_v04_to_v05::RoTxn,
db_name: &str,
) -> anyhow::Result<
heed_arroy_v04_to_v05::Database<
heed_arroy_v04_to_v05::Unspecified,
heed_arroy_v04_to_v05::Unspecified,
>,
> {
env.database_options()
.name(db_name)
.open(rtxn)
.with_context(|| format!("While opening the {db_name:?} poly database"))?
.with_context(|| format!("Missing the {db_name:?} poly database"))
}
fn try_clearing_poly_database(
wtxn: &mut RwTxn,
database: Database<Unspecified, Unspecified>,
@ -284,8 +301,10 @@ fn export_a_dump(
FileStore::new(db_path.join("update_files")).context("While opening the FileStore")?;
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
eprintln!("Dumping the keys...");
@ -386,9 +405,10 @@ fn export_a_dump(
for result in index_mapping.iter(&rtxn)? {
let (uid, uuid) = result?;
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let rtxn = index.read_txn()?;
let metadata = IndexMetadata {
@ -438,8 +458,10 @@ fn export_a_dump(
fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let rtxn = env.read_txn()?;
let index_mapping: Database<Str, UuidCodec> =
@ -456,9 +478,10 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
}
let index_path = db_path.join("indexes").join(uuid.to_string());
let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
eprintln!("Awaiting for a mutable transaction...");
let _wtxn = index.write_txn().context("While awaiting for a write transaction")?;
@ -514,8 +537,10 @@ fn export_documents(
offset: Option<usize>,
) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let rtxn = env.read_txn()?;
let index_mapping: Database<Str, UuidCodec> =
@ -526,9 +551,10 @@ fn export_documents(
if uid == index_name {
let index_path = db_path.join("indexes").join(uuid.to_string());
let index =
Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
let rtxn = index.read_txn()?;
let fields_ids_map = index.fields_ids_map(&rtxn)?;
@ -616,8 +642,10 @@ fn hair_dryer(
index_parts: &[IndexPart],
) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
eprintln!("Trying to get a read transaction on the index scheduler...");
@ -630,9 +658,10 @@ fn hair_dryer(
if index_names.iter().any(|i| i == uid) {
let index_path = db_path.join("indexes").join(uuid.to_string());
let index =
Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
.with_context(|| {
format!("While trying to open the index at path {:?}", index_path.display())
})?;
eprintln!("Trying to get a read transaction on the {uid} index...");

View File

@ -2,7 +2,9 @@ use std::path::Path;
use anyhow::{bail, Context};
use meilisearch_types::heed::types::{SerdeJson, Str};
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
use meilisearch_types::heed::{
Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified, WithoutTls,
};
use meilisearch_types::milli::index::{db_name, main_key};
use super::v1_9;
@ -92,7 +94,7 @@ fn update_index_stats(
fn update_date_format(
index_uid: &str,
index_env: &Env,
index_env: &Env<WithoutTls>,
index_wtxn: &mut RwTxn,
) -> anyhow::Result<()> {
let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN)
@ -106,8 +108,8 @@ fn update_date_format(
fn find_rest_embedders(
index_uid: &str,
index_env: &Env,
index_txn: &RoTxn,
index_env: &Env<WithoutTls>,
index_txn: &RoTxn<WithoutTls>,
) -> anyhow::Result<Vec<String>> {
let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN)
.with_context(|| format!("while checking REST embedders for index `{index_uid}`"))?;
@ -164,8 +166,10 @@ pub fn v1_9_to_v1_10(
// 2. REST embedders. We don't support this case right now, so bail
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let mut sched_wtxn = env.write_txn()?;
@ -205,9 +209,13 @@ pub fn v1_9_to_v1_10(
let index_env = unsafe {
// FIXME: fetch the 25 magic number from the index file
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
EnvOpenOptions::new()
.read_txn_without_tls()
.max_dbs(25)
.open(&index_path)
.with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
};
let index_txn = index_env.read_txn().with_context(|| {
@ -252,9 +260,13 @@ pub fn v1_9_to_v1_10(
let index_env = unsafe {
// FIXME: fetch the 25 magic number from the index file
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
EnvOpenOptions::new()
.read_txn_without_tls()
.max_dbs(25)
.open(&index_path)
.with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
};
let mut index_wtxn = index_env.write_txn().with_context(|| {

View File

@ -12,7 +12,7 @@ use meilisearch_types::heed::{Database, EnvOpenOptions};
use meilisearch_types::milli::index::db_name;
use crate::uuid_codec::UuidCodec;
use crate::{try_opening_database, try_opening_poly_database};
use crate::{try_opening_database, try_opening_poly_database_arroy_v04_to_v05};
pub fn v1_10_to_v1_11(
db_path: &Path,
@ -23,8 +23,10 @@ pub fn v1_10_to_v1_11(
println!("Upgrading from v1.10.0 to v1.11.0");
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let sched_rtxn = env.read_txn()?;
@ -50,9 +52,12 @@ pub fn v1_10_to_v1_11(
);
let index_env = unsafe {
EnvOpenOptions::new().max_dbs(25).open(&index_path).with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
heed_arroy_v04_to_v05::EnvOpenOptions::new()
.max_dbs(25)
.open(&index_path)
.with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?
};
let index_rtxn = index_env.read_txn().with_context(|| {
@ -61,9 +66,12 @@ pub fn v1_10_to_v1_11(
index_path.display()
)
})?;
let index_read_database =
try_opening_poly_database(&index_env, &index_rtxn, db_name::VECTOR_ARROY)
.with_context(|| format!("while updating date format for index `{uid}`"))?;
let index_read_database = try_opening_poly_database_arroy_v04_to_v05(
&index_env,
&index_rtxn,
db_name::VECTOR_ARROY,
)
.with_context(|| format!("while updating date format for index `{uid}`"))?;
let mut index_wtxn = index_env.write_txn().with_context(|| {
format!(
@ -72,9 +80,12 @@ pub fn v1_10_to_v1_11(
)
})?;
let index_write_database =
try_opening_poly_database(&index_env, &index_wtxn, db_name::VECTOR_ARROY)
.with_context(|| format!("while updating date format for index `{uid}`"))?;
let index_write_database = try_opening_poly_database_arroy_v04_to_v05(
&index_env,
&index_wtxn,
db_name::VECTOR_ARROY,
)
.with_context(|| format!("while updating date format for index `{uid}`"))?;
arroy_v04_to_v05::ugrade_from_prev_version(
&index_rtxn,

View File

@ -115,8 +115,10 @@ fn convert_update_files(db_path: &Path) -> anyhow::Result<()> {
/// Rebuild field distribution as it was wrongly computed in v1.12.x if x < 3
fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
let index_scheduler_path = db_path.join("tasks");
let env = unsafe { EnvOpenOptions::new().max_dbs(100).open(&index_scheduler_path) }
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let env = unsafe {
EnvOpenOptions::new().read_txn_without_tls().max_dbs(100).open(&index_scheduler_path)
}
.with_context(|| format!("While trying to open {:?}", index_scheduler_path.display()))?;
let mut sched_wtxn = env.write_txn()?;
@ -173,11 +175,12 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
println!("\t- Rebuilding field distribution");
let index =
meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path, false)
.with_context(|| {
format!("while opening index {uid} at '{}'", index_path.display())
})?;
let index = meilisearch_types::milli::Index::new(
EnvOpenOptions::new().read_txn_without_tls(),
&index_path,
false,
)
.with_context(|| format!("while opening index {uid} at '{}'", index_path.display()))?;
let mut index_txn = index.write_txn()?;

View File

@ -29,10 +29,9 @@ fst = "0.4.7"
fxhash = "0.2.1"
geoutils = "0.5.1"
grenad = { version = "0.5.0", default-features = false, features = ["rayon", "tempfile"] }
heed = { version = "0.20.5", default-features = false, features = [
heed = { path = "../../../heed/heed", version = "0.21.0", default-features = false, features = [
"serde-json",
"serde-bincode",
"read-txn-no-tls",
] }
indexmap = { version = "2.7.0", features = ["serde"] }
json-depth-checker = { path = "../json-depth-checker" }
@ -85,7 +84,7 @@ rhai = { git = "https://github.com/rhaiscript/rhai", rev = "ef3df63121d27aacd838
"no_time",
"sync",
] }
arroy = "0.5.0"
arroy = { version = "0.5.0", path = "../../../arroy" }
rand = "0.8.5"
tracing = "0.1.41"
ureq = { version = "2.12.1", features = ["json"] }

View File

@ -75,21 +75,6 @@ impl DocumentsBatchIndex {
pub fn id(&self, name: &str) -> Option<FieldId> {
self.0.get_by_right(name).cloned()
}
pub fn recreate_json(&self, document: &obkv::KvReaderU16) -> Result<Object> {
let mut map = Object::new();
for (k, v) in document.iter() {
// TODO: TAMO: update the error type
let key =
self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone();
let value = serde_json::from_slice::<serde_json::Value>(v)
.map_err(crate::error::InternalError::SerdeJson)?;
map.insert(key, value);
}
Ok(map)
}
}
impl FieldIdMapper for DocumentsBatchIndex {

View File

@ -32,8 +32,6 @@ pub enum Error {
#[derive(Error, Debug)]
pub enum InternalError {
#[error("{}", HeedError::DatabaseClosing)]
DatabaseClosing,
#[error("missing {} in the {db_name} database", key.unwrap_or("key"))]
DatabaseMissingEntry { db_name: &'static str, key: Option<&'static str> },
#[error("missing {key} in the fieldids weights mapping")]
@ -80,6 +78,9 @@ pub enum InternalError {
ArroyError(#[from] arroy::Error),
#[error(transparent)]
VectorEmbeddingError(#[from] crate::vector::Error),
#[error("{}", HeedError::EnvAlreadyOpened)]
EnvAlreadyOpened,
}
#[derive(Error, Debug)]
@ -471,8 +472,7 @@ impl From<HeedError> for Error {
// TODO use the encoding
HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })),
HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })),
HeedError::DatabaseClosing => InternalError(DatabaseClosing),
HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions),
HeedError::EnvAlreadyOpened => InternalError(EnvAlreadyOpened),
}
}
}

View File

@ -5,7 +5,7 @@ use std::fs::File;
use std::path::Path;
use heed::types::*;
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified, WithoutTls};
use roaring::RoaringBitmap;
use rstar::RTree;
use serde::{Deserialize, Serialize};
@ -109,7 +109,7 @@ pub mod db_name {
#[derive(Clone)]
pub struct Index {
/// The LMDB environment which this index is associated with.
pub(crate) env: heed::Env,
pub(crate) env: heed::Env<heed::WithoutTls>,
/// Contains many different types (e.g. the fields ids map).
pub(crate) main: Database<Unspecified, Unspecified>,
@ -176,7 +176,7 @@ pub struct Index {
impl Index {
pub fn new_with_creation_dates<P: AsRef<Path>>(
mut options: heed::EnvOpenOptions,
mut options: heed::EnvOpenOptions<WithoutTls>,
path: P,
created_at: time::OffsetDateTime,
updated_at: time::OffsetDateTime,
@ -274,7 +274,7 @@ impl Index {
}
pub fn new<P: AsRef<Path>>(
options: heed::EnvOpenOptions,
options: heed::EnvOpenOptions<WithoutTls>,
path: P,
creation: bool,
) -> Result<Index> {
@ -283,7 +283,7 @@ impl Index {
}
fn set_creation_dates(
env: &heed::Env,
env: &heed::Env<WithoutTls>,
main: Database<Unspecified, Unspecified>,
created_at: time::OffsetDateTime,
updated_at: time::OffsetDateTime,
@ -305,12 +305,12 @@ impl Index {
}
/// Create a read transaction to be able to read the index.
pub fn read_txn(&self) -> heed::Result<RoTxn<'_>> {
pub fn read_txn(&self) -> heed::Result<RoTxn<'_, WithoutTls>> {
self.env.read_txn()
}
/// Create a static read transaction to be able to read the index without keeping a reference to it.
pub fn static_read_txn(&self) -> heed::Result<RoTxn<'static>> {
pub fn static_read_txn(&self) -> heed::Result<RoTxn<'static, WithoutTls>> {
self.env.clone().static_read_txn()
}
@ -340,7 +340,9 @@ impl Index {
}
pub fn copy_to_file<P: AsRef<Path>>(&self, path: P, option: CompactionOption) -> Result<File> {
self.env.copy_to_file(path, option).map_err(Into::into)
let mut file = std::fs::File::create(path.as_ref())?;
self.env.copy_to_file(&mut file, option)?;
Ok(file)
}
/// Returns an `EnvClosingEvent` that can be used to wait for the closing event,
@ -1870,7 +1872,7 @@ pub(crate) mod tests {
impl TempIndex {
/// Creates a temporary index
pub fn new_with_map_size(size: usize) -> Self {
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(size);
let _tempdir = TempDir::new_in(".").unwrap();
let inner = Index::new(options, _tempdir.path(), true).unwrap();

View File

@ -15,7 +15,7 @@ use crate::constants::RESERVED_GEO_FIELD_NAME;
pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path, true).unwrap();

View File

@ -301,7 +301,7 @@ pub(crate) mod test_helpers {
use grenad::MergerBuilder;
use heed::types::Bytes;
use heed::{BytesDecode, BytesEncode, Env, RoTxn, RwTxn};
use heed::{BytesDecode, BytesEncode, Env, RoTxn, RwTxn, WithoutTls};
use roaring::RoaringBitmap;
use super::bulk::FacetsUpdateBulkInner;
@ -339,7 +339,7 @@ pub(crate) mod test_helpers {
for<'a> BoundCodec:
BytesEncode<'a> + BytesDecode<'a, DItem = <BoundCodec as BytesEncode<'a>>::EItem>,
{
pub env: Env,
pub env: Env<WithoutTls>,
pub content: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
pub group_size: Cell<u8>,
pub min_level_size: Cell<u8>,
@ -361,7 +361,7 @@ pub(crate) mod test_helpers {
let group_size = group_size.clamp(2, 127);
let max_group_size = std::cmp::min(127, std::cmp::max(group_size * 2, max_group_size)); // 2*group_size <= x <= 127
let min_level_size = std::cmp::max(1, min_level_size); // 1 <= x <= inf
let mut options = heed::EnvOpenOptions::new();
let mut options = heed::EnvOpenOptions::new().read_txn_without_tls();
let options = options.map_size(4096 * 4 * 1000 * 100);
let tempdir = tempfile::TempDir::new().unwrap();
let env = unsafe { options.open(tempdir.path()) }.unwrap();

View File

@ -3,7 +3,7 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock};
use bumpalo::Bump;
use heed::RoTxn;
use heed::{RoTxn, WithoutTls};
use rayon::iter::IndexedParallelIterator;
use super::super::document_change::DocumentChange;
@ -28,7 +28,7 @@ pub struct DocumentChangeContext<
/// inside of the DB.
pub db_fields_ids_map: &'indexer FieldsIdsMap,
/// A transaction providing data from the DB before all indexing operations
pub rtxn: RoTxn<'indexer>,
pub rtxn: RoTxn<'indexer, WithoutTls>,
/// Global field id map that is up to date with the current state of the indexing process.
///

View File

@ -13,7 +13,7 @@ use serde_json::{from_value, json};
#[test]
fn test_facet_distribution_with_no_facet_values() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path, true).unwrap();

View File

@ -32,7 +32,7 @@ pub const CONTENT: &str = include_str!("../assets/test_set.ndjson");
pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(10 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path, true).unwrap();

View File

@ -262,7 +262,7 @@ fn criteria_mixup() {
#[test]
fn criteria_ascdesc() {
let path = tempfile::tempdir().unwrap();
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(12 * 1024 * 1024); // 10 MB
let index = Index::new(options, &path, true).unwrap();

View File

@ -108,7 +108,7 @@ fn test_typo_tolerance_two_typo() {
#[test]
fn test_typo_disabled_on_word() {
let tmp = tempdir().unwrap();
let mut options = EnvOpenOptions::new();
let mut options = EnvOpenOptions::new().read_txn_without_tls();
options.map_size(4096 * 100);
let index = Index::new(options, tmp.path(), true).unwrap();