Merge bbb681361c
into 1cd00f37c0
This commit is contained in:
commit
cca1dbd186
File diff suppressed because it is too large
Load Diff
|
@ -36,6 +36,9 @@ license = "MIT"
|
|||
[profile.release]
|
||||
codegen-units = 1
|
||||
|
||||
[profile.release.package.heed]
|
||||
debug-assertions = true
|
||||
|
||||
[profile.dev.package.flate2]
|
||||
opt-level = 3
|
||||
|
||||
|
|
|
@ -35,7 +35,8 @@ fn setup_dir(path: impl AsRef<Path>) {
|
|||
fn setup_index() -> Index {
|
||||
let path = "benches.mmdb";
|
||||
setup_dir(path);
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(100 * 1024 * 1024 * 1024); // 100 GB
|
||||
options.max_readers(100);
|
||||
Index::new(options, path, true).unwrap()
|
||||
|
|
|
@ -65,7 +65,8 @@ pub fn base_setup(conf: &Conf) -> Index {
|
|||
}
|
||||
create_dir_all(conf.database_name).unwrap();
|
||||
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(100 * 1024 * 1024 * 1024); // 100 GB
|
||||
options.max_readers(100);
|
||||
let index = Index::new(options, conf.database_name, true).unwrap();
|
||||
|
|
|
@ -57,7 +57,8 @@ fn main() {
|
|||
let opt = opt.clone();
|
||||
|
||||
let handle = std::thread::spawn(move || {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(1024 * 1024 * 1024 * 1024);
|
||||
let tempdir = match opt.path {
|
||||
Some(path) => TempDir::new_in(path).unwrap(),
|
||||
|
|
|
@ -44,7 +44,7 @@ ureq = "2.12.1"
|
|||
uuid = { version = "1.11.0", features = ["serde", "v4"] }
|
||||
|
||||
[dev-dependencies]
|
||||
arroy = "0.5.0"
|
||||
arroy = { git = "https://github.com/meilisearch/arroy", branch = "main" }
|
||||
big_s = "1.0.2"
|
||||
crossbeam-channel = "0.5.14"
|
||||
# fixed version due to format breakages in v1.40
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::sync::{Arc, RwLock};
|
|||
|
||||
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RwTxn};
|
||||
use meilisearch_types::heed::{Database, Env, RwTxn, WithoutTls};
|
||||
|
||||
use crate::error::FeatureNotEnabledError;
|
||||
use crate::Result;
|
||||
|
@ -139,7 +139,7 @@ impl FeatureData {
|
|||
}
|
||||
|
||||
pub fn new(
|
||||
env: &Env,
|
||||
env: &Env<WithoutTls>,
|
||||
wtxn: &mut RwTxn,
|
||||
instance_features: InstanceTogglableFeatures,
|
||||
) -> Result<Self> {
|
||||
|
|
|
@ -304,7 +304,8 @@ fn create_or_open_index(
|
|||
map_size: usize,
|
||||
creation: bool,
|
||||
) -> Result<Index> {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(clamp_to_page_size(map_size));
|
||||
|
||||
// You can find more details about this experimental
|
||||
|
@ -333,7 +334,7 @@ fn create_or_open_index(
|
|||
#[cfg(test)]
|
||||
mod tests {
|
||||
|
||||
use meilisearch_types::heed::Env;
|
||||
use meilisearch_types::heed::{Env, WithoutTls};
|
||||
use meilisearch_types::Index;
|
||||
use uuid::Uuid;
|
||||
|
||||
|
@ -343,7 +344,7 @@ mod tests {
|
|||
use crate::IndexScheduler;
|
||||
|
||||
impl IndexMapper {
|
||||
fn test() -> (Self, Env, IndexSchedulerHandle) {
|
||||
fn test() -> (Self, Env<WithoutTls>, IndexSchedulerHandle) {
|
||||
let (index_scheduler, handle) = IndexScheduler::test(true, vec![]);
|
||||
(index_scheduler.index_mapper, index_scheduler.env, handle)
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@ use std::time::Duration;
|
|||
use std::{fs, thread};
|
||||
|
||||
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::database_stats::DatabaseStats;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
|
@ -164,7 +164,7 @@ impl IndexMapper {
|
|||
}
|
||||
|
||||
pub fn new(
|
||||
env: &Env,
|
||||
env: &Env<WithoutTls>,
|
||||
wtxn: &mut RwTxn,
|
||||
options: &IndexSchedulerOptions,
|
||||
budget: IndexBudget,
|
||||
|
|
|
@ -54,7 +54,7 @@ use meilisearch_types::batches::Batch;
|
|||
use meilisearch_types::features::{InstanceTogglableFeatures, Network, RuntimeTogglableFeatures};
|
||||
use meilisearch_types::heed::byteorder::BE;
|
||||
use meilisearch_types::heed::types::I128;
|
||||
use meilisearch_types::heed::{self, Env, RoTxn};
|
||||
use meilisearch_types::heed::{self, Env, RoTxn, WithoutTls};
|
||||
use meilisearch_types::milli::index::IndexEmbeddingConfig;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
use meilisearch_types::milli::vector::{Embedder, EmbedderOptions, EmbeddingConfigs};
|
||||
|
@ -131,7 +131,7 @@ pub struct IndexSchedulerOptions {
|
|||
/// to be performed on them.
|
||||
pub struct IndexScheduler {
|
||||
/// The LMDB environment which the DBs are associated with.
|
||||
pub(crate) env: Env,
|
||||
pub(crate) env: Env<WithoutTls>,
|
||||
|
||||
/// The list of tasks currently processing
|
||||
pub(crate) processing_tasks: Arc<RwLock<ProcessingTasks>>,
|
||||
|
@ -209,6 +209,7 @@ impl IndexScheduler {
|
|||
#[allow(private_interfaces)] // because test_utils is private
|
||||
pub fn new(
|
||||
options: IndexSchedulerOptions,
|
||||
auth_env: Env<WithoutTls>,
|
||||
from_db_version: (u32, u32, u32),
|
||||
#[cfg(test)] test_breakpoint_sdr: crossbeam_channel::Sender<(test_utils::Breakpoint, bool)>,
|
||||
#[cfg(test)] planned_failures: Vec<(usize, test_utils::FailureLocation)>,
|
||||
|
@ -240,7 +241,9 @@ impl IndexScheduler {
|
|||
};
|
||||
|
||||
let env = unsafe {
|
||||
heed::EnvOpenOptions::new()
|
||||
let env_options = heed::EnvOpenOptions::new();
|
||||
let mut env_options = env_options.read_txn_without_tls();
|
||||
env_options
|
||||
.max_dbs(Self::nb_db())
|
||||
.map_size(budget.task_db_size)
|
||||
.open(&options.tasks_path)
|
||||
|
@ -260,7 +263,7 @@ impl IndexScheduler {
|
|||
processing_tasks: Arc::new(RwLock::new(ProcessingTasks::new())),
|
||||
version,
|
||||
queue,
|
||||
scheduler: Scheduler::new(&options),
|
||||
scheduler: Scheduler::new(&options, auth_env),
|
||||
|
||||
index_mapper,
|
||||
env,
|
||||
|
@ -358,7 +361,7 @@ impl IndexScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn read_txn(&self) -> Result<RoTxn> {
|
||||
pub fn read_txn(&self) -> Result<RoTxn<WithoutTls>> {
|
||||
self.env.read_txn().map_err(|e| e.into())
|
||||
}
|
||||
|
||||
|
@ -427,12 +430,14 @@ impl IndexScheduler {
|
|||
/// If you need to fetch information from or perform an action on all indexes,
|
||||
/// see the `try_for_each_index` function.
|
||||
pub fn index(&self, name: &str) -> Result<Index> {
|
||||
self.index_mapper.index(&self.env.read_txn()?, name)
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.index(&rtxn, name)
|
||||
}
|
||||
|
||||
/// Return the boolean referring if index exists.
|
||||
pub fn index_exists(&self, name: &str) -> Result<bool> {
|
||||
self.index_mapper.index_exists(&self.env.read_txn()?, name)
|
||||
let rtxn = self.env.read_txn()?;
|
||||
self.index_mapper.index_exists(&rtxn, name)
|
||||
}
|
||||
|
||||
/// Return the name of all indexes without opening them.
|
||||
|
@ -507,7 +512,8 @@ impl IndexScheduler {
|
|||
/// 2. The name of the specific data related to the property can be `enqueued` for the `statuses`, `settingsUpdate` for the `types`, or the name of the index for the `indexes`, for example.
|
||||
/// 3. The number of times the properties appeared.
|
||||
pub fn get_stats(&self) -> Result<BTreeMap<String, BTreeMap<String, u64>>> {
|
||||
self.queue.get_stats(&self.read_txn()?, &self.processing_tasks.read().unwrap())
|
||||
let rtxn = self.read_txn()?;
|
||||
self.queue.get_stats(&rtxn, &self.processing_tasks.read().unwrap())
|
||||
}
|
||||
|
||||
// Return true if there is at least one task that is processing.
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::ops::{Bound, RangeBounds};
|
|||
|
||||
use meilisearch_types::batches::{Batch, BatchId};
|
||||
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::tasks::{Kind, Status};
|
||||
use roaring::{MultiOps, RoaringBitmap};
|
||||
|
@ -66,7 +66,7 @@ impl BatchQueue {
|
|||
NUMBER_OF_DATABASES
|
||||
}
|
||||
|
||||
pub(super) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
pub(super) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
Ok(Self {
|
||||
all_batches: env.create_database(wtxn, Some(db_name::ALL_BATCHES))?,
|
||||
status: env.create_database(wtxn, Some(db_name::BATCH_STATUS))?,
|
||||
|
|
|
@ -13,7 +13,7 @@ use std::time::Duration;
|
|||
|
||||
use file_store::FileStore;
|
||||
use meilisearch_types::batches::BatchId;
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::tasks::{Kind, KindWithContent, Status, Task};
|
||||
use roaring::RoaringBitmap;
|
||||
|
@ -157,7 +157,7 @@ impl Queue {
|
|||
|
||||
/// Create an index scheduler and start its run loop.
|
||||
pub(crate) fn new(
|
||||
env: &Env,
|
||||
env: &Env<WithoutTls>,
|
||||
wtxn: &mut RwTxn,
|
||||
options: &IndexSchedulerOptions,
|
||||
) -> Result<Self> {
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::ops::{Bound, RangeBounds};
|
||||
|
||||
use meilisearch_types::heed::types::{DecodeIgnore, SerdeBincode, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::{Database, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use meilisearch_types::milli::{CboRoaringBitmapCodec, RoaringBitmapCodec, BEU32};
|
||||
use meilisearch_types::tasks::{Kind, Status, Task};
|
||||
use roaring::{MultiOps, RoaringBitmap};
|
||||
|
@ -68,7 +68,7 @@ impl TaskQueue {
|
|||
NUMBER_OF_DATABASES
|
||||
}
|
||||
|
||||
pub(crate) fn new(env: &Env, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
pub(crate) fn new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self> {
|
||||
Ok(Self {
|
||||
all_tasks: env.create_database(wtxn, Some(db_name::ALL_TASKS))?,
|
||||
status: env.create_database(wtxn, Some(db_name::STATUS))?,
|
||||
|
|
|
@ -21,6 +21,7 @@ use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
|
|||
use std::sync::Arc;
|
||||
|
||||
use meilisearch_types::error::ResponseError;
|
||||
use meilisearch_types::heed::{Env, WithoutTls};
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::tasks::Status;
|
||||
use rayon::current_num_threads;
|
||||
|
@ -71,7 +72,7 @@ pub struct Scheduler {
|
|||
pub(crate) snapshots_path: PathBuf,
|
||||
|
||||
/// The path to the folder containing the auth LMDB env.
|
||||
pub(crate) auth_path: PathBuf,
|
||||
pub(crate) auth_env: Env<WithoutTls>,
|
||||
|
||||
/// The path to the version file of Meilisearch.
|
||||
pub(crate) version_file_path: PathBuf,
|
||||
|
@ -87,12 +88,12 @@ impl Scheduler {
|
|||
batched_tasks_size_limit: self.batched_tasks_size_limit,
|
||||
dumps_path: self.dumps_path.clone(),
|
||||
snapshots_path: self.snapshots_path.clone(),
|
||||
auth_path: self.auth_path.clone(),
|
||||
auth_env: self.auth_env.clone(),
|
||||
version_file_path: self.version_file_path.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new(options: &IndexSchedulerOptions) -> Scheduler {
|
||||
pub fn new(options: &IndexSchedulerOptions, auth_env: Env<WithoutTls>) -> Scheduler {
|
||||
Scheduler {
|
||||
must_stop_processing: MustStopProcessing::default(),
|
||||
// we want to start the loop right away in case meilisearch was ctrl+Ced while processing things
|
||||
|
@ -102,7 +103,7 @@ impl Scheduler {
|
|||
batched_tasks_size_limit: options.batched_tasks_size_limit,
|
||||
dumps_path: options.dumps_path.clone(),
|
||||
snapshots_path: options.snapshots_path.clone(),
|
||||
auth_path: options.auth_path.clone(),
|
||||
auth_env,
|
||||
version_file_path: options.version_file_path.clone(),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,6 @@ use std::sync::atomic::Ordering;
|
|||
|
||||
use meilisearch_types::heed::CompactionOption;
|
||||
use meilisearch_types::milli::progress::{Progress, VariableNameStep};
|
||||
use meilisearch_types::milli::{self};
|
||||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::{compression, VERSION_FILE_NAME};
|
||||
|
||||
|
@ -28,7 +27,7 @@ impl IndexScheduler {
|
|||
|
||||
// 2. Snapshot the index-scheduler LMDB env
|
||||
//
|
||||
// When we call copy_to_file, LMDB opens a read transaction by itself,
|
||||
// When we call copy_to_path, LMDB opens a read transaction by itself,
|
||||
// we can't provide our own. It is an issue as we would like to know
|
||||
// the update files to copy but new ones can be enqueued between the copy
|
||||
// of the env and the new transaction we open to retrieve the enqueued tasks.
|
||||
|
@ -42,7 +41,7 @@ impl IndexScheduler {
|
|||
progress.update_progress(SnapshotCreationProgress::SnapshotTheIndexScheduler);
|
||||
let dst = temp_snapshot_dir.path().join("tasks");
|
||||
fs::create_dir_all(&dst)?;
|
||||
self.env.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
self.env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 2.2 Create a read transaction on the index-scheduler
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
@ -81,7 +80,7 @@ impl IndexScheduler {
|
|||
let dst = temp_snapshot_dir.path().join("indexes").join(uuid.to_string());
|
||||
fs::create_dir_all(&dst)?;
|
||||
index
|
||||
.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)
|
||||
.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)
|
||||
.map_err(|e| Error::from_milli(e, Some(name.to_string())))?;
|
||||
}
|
||||
|
||||
|
@ -91,14 +90,7 @@ impl IndexScheduler {
|
|||
progress.update_progress(SnapshotCreationProgress::SnapshotTheApiKeys);
|
||||
let dst = temp_snapshot_dir.path().join("auth");
|
||||
fs::create_dir_all(&dst)?;
|
||||
// TODO We can't use the open_auth_store_env function here but we should
|
||||
let auth = unsafe {
|
||||
milli::heed::EnvOpenOptions::new()
|
||||
.map_size(1024 * 1024 * 1024) // 1 GiB
|
||||
.max_dbs(2)
|
||||
.open(&self.scheduler.auth_path)
|
||||
}?;
|
||||
auth.copy_to_file(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
self.scheduler.auth_env.copy_to_path(dst.join("data.mdb"), CompactionOption::Enabled)?;
|
||||
|
||||
// 5. Copy and tarball the flat snapshot
|
||||
progress.update_progress(SnapshotCreationProgress::CreateTheTarball);
|
||||
|
|
|
@ -5,6 +5,7 @@ use std::time::Duration;
|
|||
use big_s::S;
|
||||
use crossbeam_channel::RecvTimeoutError;
|
||||
use file_store::File;
|
||||
use meilisearch_auth::open_auth_store_env;
|
||||
use meilisearch_types::document_formats::DocumentFormatError;
|
||||
use meilisearch_types::milli::update::IndexDocumentsMethod::ReplaceDocuments;
|
||||
use meilisearch_types::milli::update::IndexerConfig;
|
||||
|
@ -120,7 +121,10 @@ impl IndexScheduler {
|
|||
)
|
||||
});
|
||||
|
||||
let index_scheduler = Self::new(options, version, sender, planned_failures).unwrap();
|
||||
std::fs::create_dir_all(&options.auth_path).unwrap();
|
||||
let auth_env = open_auth_store_env(&options.auth_path).unwrap();
|
||||
let index_scheduler =
|
||||
Self::new(options, auth_env, version, sender, planned_failures).unwrap();
|
||||
|
||||
// To be 100% consistent between all test we're going to start the scheduler right now
|
||||
// and ensure it's in the expected starting state.
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use anyhow::bail;
|
||||
use meilisearch_types::heed::{Env, RwTxn};
|
||||
use meilisearch_types::heed::{Env, RwTxn, WithoutTls};
|
||||
use meilisearch_types::tasks::{Details, KindWithContent, Status, Task};
|
||||
use meilisearch_types::versioning::{VERSION_MAJOR, VERSION_MINOR, VERSION_PATCH};
|
||||
use time::OffsetDateTime;
|
||||
|
@ -9,13 +9,17 @@ use crate::queue::TaskQueue;
|
|||
use crate::versioning::Versioning;
|
||||
|
||||
trait UpgradeIndexScheduler {
|
||||
fn upgrade(&self, env: &Env, wtxn: &mut RwTxn, original: (u32, u32, u32))
|
||||
-> anyhow::Result<()>;
|
||||
fn upgrade(
|
||||
&self,
|
||||
env: &Env<WithoutTls>,
|
||||
wtxn: &mut RwTxn,
|
||||
original: (u32, u32, u32),
|
||||
) -> anyhow::Result<()>;
|
||||
fn target_version(&self) -> (u32, u32, u32);
|
||||
}
|
||||
|
||||
pub fn upgrade_index_scheduler(
|
||||
env: &Env,
|
||||
env: &Env<WithoutTls>,
|
||||
versioning: &Versioning,
|
||||
from: (u32, u32, u32),
|
||||
to: (u32, u32, u32),
|
||||
|
@ -91,7 +95,7 @@ struct ToCurrentNoOp {}
|
|||
impl UpgradeIndexScheduler for ToCurrentNoOp {
|
||||
fn upgrade(
|
||||
&self,
|
||||
_env: &Env,
|
||||
_env: &Env<WithoutTls>,
|
||||
_wtxn: &mut RwTxn,
|
||||
_original: (u32, u32, u32),
|
||||
) -> anyhow::Result<()> {
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
use meilisearch_types::heed::types::Str;
|
||||
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn};
|
||||
use meilisearch_types::heed::{self, Database, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use meilisearch_types::milli::heed_codec::version::VersionCodec;
|
||||
use meilisearch_types::versioning;
|
||||
|
||||
|
@ -46,12 +46,12 @@ impl Versioning {
|
|||
}
|
||||
|
||||
/// Return `Self` without checking anything about the version
|
||||
pub fn raw_new(env: &Env, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
|
||||
pub fn raw_new(env: &Env<WithoutTls>, wtxn: &mut RwTxn) -> Result<Self, heed::Error> {
|
||||
let version = env.create_database(wtxn, Some(db_name::VERSION))?;
|
||||
Ok(Self { version })
|
||||
}
|
||||
|
||||
pub(crate) fn new(env: &Env, db_version: (u32, u32, u32)) -> Result<Self> {
|
||||
pub(crate) fn new(env: &Env<WithoutTls>, db_version: (u32, u32, u32)) -> Result<Self> {
|
||||
let mut wtxn = env.write_txn()?;
|
||||
let this = Self::raw_new(env, &mut wtxn)?;
|
||||
let from = match this.get_version(&wtxn)? {
|
||||
|
|
|
@ -2,6 +2,7 @@ use std::fs::File;
|
|||
use std::io::{BufReader, Write};
|
||||
use std::path::Path;
|
||||
|
||||
use meilisearch_types::heed::{Env, WithoutTls};
|
||||
use serde_json::Deserializer;
|
||||
|
||||
use crate::{AuthController, HeedAuthStore, Result};
|
||||
|
@ -9,11 +10,8 @@ use crate::{AuthController, HeedAuthStore, Result};
|
|||
const KEYS_PATH: &str = "keys";
|
||||
|
||||
impl AuthController {
|
||||
pub fn dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<()> {
|
||||
let mut store = HeedAuthStore::new(&src)?;
|
||||
|
||||
// do not attempt to close the database on drop!
|
||||
store.set_drop_on_close(false);
|
||||
pub fn dump(auth_env: Env<WithoutTls>, dst: impl AsRef<Path>) -> Result<()> {
|
||||
let store = HeedAuthStore::new(auth_env)?;
|
||||
|
||||
let keys_file_path = dst.as_ref().join(KEYS_PATH);
|
||||
|
||||
|
@ -27,8 +25,8 @@ impl AuthController {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub fn load_dump(src: impl AsRef<Path>, dst: impl AsRef<Path>) -> Result<()> {
|
||||
let store = HeedAuthStore::new(&dst)?;
|
||||
pub fn load_dump(src: impl AsRef<Path>, auth_env: Env<WithoutTls>) -> Result<()> {
|
||||
let store = HeedAuthStore::new(auth_env)?;
|
||||
|
||||
let keys_file_path = src.as_ref().join(KEYS_PATH);
|
||||
|
||||
|
|
|
@ -3,11 +3,10 @@ pub mod error;
|
|||
mod store;
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
|
||||
use error::{AuthControllerError, Result};
|
||||
use maplit::hashset;
|
||||
use meilisearch_types::heed::{Env, WithoutTls};
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::keys::{Action, CreateApiKey, Key, PatchApiKey};
|
||||
use meilisearch_types::milli::update::Setting;
|
||||
|
@ -19,19 +18,19 @@ use uuid::Uuid;
|
|||
|
||||
#[derive(Clone)]
|
||||
pub struct AuthController {
|
||||
store: Arc<HeedAuthStore>,
|
||||
store: HeedAuthStore,
|
||||
master_key: Option<String>,
|
||||
}
|
||||
|
||||
impl AuthController {
|
||||
pub fn new(db_path: impl AsRef<Path>, master_key: &Option<String>) -> Result<Self> {
|
||||
let store = HeedAuthStore::new(db_path)?;
|
||||
pub fn new(auth_env: Env<WithoutTls>, master_key: &Option<String>) -> Result<Self> {
|
||||
let store = HeedAuthStore::new(auth_env)?;
|
||||
|
||||
if store.is_empty()? {
|
||||
generate_default_keys(&store)?;
|
||||
}
|
||||
|
||||
Ok(Self { store: Arc::new(store), master_key: master_key.clone() })
|
||||
Ok(Self { store, master_key: master_key.clone() })
|
||||
}
|
||||
|
||||
/// Return `Ok(())` if the auth controller is able to access one of its database.
|
||||
|
|
|
@ -1,18 +1,16 @@
|
|||
use std::borrow::Cow;
|
||||
use std::cmp::Reverse;
|
||||
use std::collections::HashSet;
|
||||
use std::fs::create_dir_all;
|
||||
use std::path::Path;
|
||||
use std::result::Result as StdResult;
|
||||
use std::str;
|
||||
use std::str::FromStr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use hmac::{Hmac, Mac};
|
||||
use meilisearch_types::heed::BoxedError;
|
||||
use meilisearch_types::heed::{BoxedError, WithoutTls};
|
||||
use meilisearch_types::index_uid_pattern::IndexUidPattern;
|
||||
use meilisearch_types::keys::KeyId;
|
||||
use meilisearch_types::milli;
|
||||
use meilisearch_types::milli::heed;
|
||||
use meilisearch_types::milli::heed::types::{Bytes, DecodeIgnore, SerdeJson};
|
||||
use meilisearch_types::milli::heed::{Database, Env, EnvOpenOptions, RwTxn};
|
||||
use sha2::Sha256;
|
||||
|
@ -25,44 +23,32 @@ use super::error::{AuthControllerError, Result};
|
|||
use super::{Action, Key};
|
||||
|
||||
const AUTH_STORE_SIZE: usize = 1_073_741_824; //1GiB
|
||||
const AUTH_DB_PATH: &str = "auth";
|
||||
const KEY_DB_NAME: &str = "api-keys";
|
||||
const KEY_ID_ACTION_INDEX_EXPIRATION_DB_NAME: &str = "keyid-action-index-expiration";
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct HeedAuthStore {
|
||||
env: Arc<Env>,
|
||||
env: Env<WithoutTls>,
|
||||
keys: Database<Bytes, SerdeJson<Key>>,
|
||||
action_keyid_index_expiration: Database<KeyIdActionCodec, SerdeJson<Option<OffsetDateTime>>>,
|
||||
should_close_on_drop: bool,
|
||||
}
|
||||
|
||||
impl Drop for HeedAuthStore {
|
||||
fn drop(&mut self) {
|
||||
if self.should_close_on_drop && Arc::strong_count(&self.env) == 1 {
|
||||
self.env.as_ref().clone().prepare_for_closing();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn open_auth_store_env(path: &Path) -> milli::heed::Result<milli::heed::Env> {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
pub fn open_auth_store_env(path: &Path) -> heed::Result<Env<WithoutTls>> {
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(AUTH_STORE_SIZE); // 1GB
|
||||
options.max_dbs(2);
|
||||
unsafe { options.open(path) }
|
||||
}
|
||||
|
||||
impl HeedAuthStore {
|
||||
pub fn new(path: impl AsRef<Path>) -> Result<Self> {
|
||||
let path = path.as_ref().join(AUTH_DB_PATH);
|
||||
create_dir_all(&path)?;
|
||||
let env = Arc::new(open_auth_store_env(path.as_ref())?);
|
||||
pub fn new(env: Env<WithoutTls>) -> Result<Self> {
|
||||
let mut wtxn = env.write_txn()?;
|
||||
let keys = env.create_database(&mut wtxn, Some(KEY_DB_NAME))?;
|
||||
let action_keyid_index_expiration =
|
||||
env.create_database(&mut wtxn, Some(KEY_ID_ACTION_INDEX_EXPIRATION_DB_NAME))?;
|
||||
wtxn.commit()?;
|
||||
Ok(Self { env, keys, action_keyid_index_expiration, should_close_on_drop: true })
|
||||
Ok(Self { env, keys, action_keyid_index_expiration })
|
||||
}
|
||||
|
||||
/// Return `Ok(())` if the auth store is able to access one of its database.
|
||||
|
@ -82,10 +68,6 @@ impl HeedAuthStore {
|
|||
Ok(self.env.non_free_pages_size()?)
|
||||
}
|
||||
|
||||
pub fn set_drop_on_close(&mut self, v: bool) {
|
||||
self.should_close_on_drop = v;
|
||||
}
|
||||
|
||||
pub fn is_empty(&self) -> Result<bool> {
|
||||
let rtxn = self.env.read_txn()?;
|
||||
|
||||
|
@ -293,7 +275,7 @@ impl HeedAuthStore {
|
|||
/// optionally on a specific index, for a given key.
|
||||
pub struct KeyIdActionCodec;
|
||||
|
||||
impl<'a> milli::heed::BytesDecode<'a> for KeyIdActionCodec {
|
||||
impl<'a> heed::BytesDecode<'a> for KeyIdActionCodec {
|
||||
type DItem = (KeyId, Action, Option<&'a [u8]>);
|
||||
|
||||
fn bytes_decode(bytes: &'a [u8]) -> StdResult<Self::DItem, BoxedError> {
|
||||
|
@ -310,7 +292,7 @@ impl<'a> milli::heed::BytesDecode<'a> for KeyIdActionCodec {
|
|||
}
|
||||
}
|
||||
|
||||
impl<'a> milli::heed::BytesEncode<'a> for KeyIdActionCodec {
|
||||
impl<'a> heed::BytesEncode<'a> for KeyIdActionCodec {
|
||||
type EItem = (&'a KeyId, &'a Action, Option<&'a [u8]>);
|
||||
|
||||
fn bytes_encode((key_id, action, index): &Self::EItem) -> StdResult<Cow<[u8]>, BoxedError> {
|
||||
|
|
|
@ -405,7 +405,7 @@ impl ErrorCode for milli::Error {
|
|||
match error {
|
||||
// TODO: wait for spec for new error codes.
|
||||
UserError::SerdeJson(_)
|
||||
| UserError::InvalidLmdbOpenOptions
|
||||
| UserError::EnvAlreadyOpened
|
||||
| UserError::DocumentLimitReached
|
||||
| UserError::UnknownInternalDocumentId { .. } => Code::Internal,
|
||||
UserError::InvalidStoreFile => Code::InvalidStoreFile,
|
||||
|
@ -502,8 +502,7 @@ impl ErrorCode for HeedError {
|
|||
HeedError::Mdb(_)
|
||||
| HeedError::Encoding(_)
|
||||
| HeedError::Decoding(_)
|
||||
| HeedError::DatabaseClosing
|
||||
| HeedError::BadOpenOptions { .. } => Code::Internal,
|
||||
| HeedError::EnvAlreadyOpened => Code::Internal,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ use error::PayloadError;
|
|||
use extractors::payload::PayloadConfig;
|
||||
use index_scheduler::versioning::Versioning;
|
||||
use index_scheduler::{IndexScheduler, IndexSchedulerOptions};
|
||||
use meilisearch_auth::AuthController;
|
||||
use meilisearch_auth::{open_auth_store_env, AuthController};
|
||||
use meilisearch_types::milli::constants::VERSION_MAJOR;
|
||||
use meilisearch_types::milli::documents::{DocumentsBatchBuilder, DocumentsBatchReader};
|
||||
use meilisearch_types::milli::update::{IndexDocumentsConfig, IndexDocumentsMethod};
|
||||
|
@ -335,9 +335,12 @@ fn open_or_create_database_unchecked(
|
|||
) -> anyhow::Result<(IndexScheduler, AuthController)> {
|
||||
// we don't want to create anything in the data.ms yet, thus we
|
||||
// wrap our two builders in a closure that'll be executed later.
|
||||
let auth_controller = AuthController::new(&opt.db_path, &opt.master_key);
|
||||
let index_scheduler_builder =
|
||||
|| -> anyhow::Result<_> { Ok(IndexScheduler::new(index_scheduler_opt, version)?) };
|
||||
std::fs::create_dir_all(&index_scheduler_opt.auth_path)?;
|
||||
let auth_env = open_auth_store_env(&index_scheduler_opt.auth_path).unwrap();
|
||||
let auth_controller = AuthController::new(auth_env.clone(), &opt.master_key);
|
||||
let index_scheduler_builder = || -> anyhow::Result<_> {
|
||||
Ok(IndexScheduler::new(index_scheduler_opt, auth_env, version)?)
|
||||
};
|
||||
|
||||
match (
|
||||
index_scheduler_builder(),
|
||||
|
@ -420,6 +423,7 @@ pub fn update_version_file_for_dumpless_upgrade(
|
|||
if from_major == 1 && from_minor == 12 {
|
||||
let env = unsafe {
|
||||
heed::EnvOpenOptions::new()
|
||||
.read_txn_without_tls()
|
||||
.max_dbs(Versioning::nb_db())
|
||||
.map_size(index_scheduler_opt.task_db_size)
|
||||
.open(&index_scheduler_opt.tasks_path)
|
||||
|
|
|
@ -340,7 +340,8 @@ impl SearchKind {
|
|||
vector_len: Option<usize>,
|
||||
route: Route,
|
||||
) -> Result<(String, Arc<Embedder>, bool), ResponseError> {
|
||||
let embedder_configs = index.embedding_configs(&index.read_txn()?)?;
|
||||
let rtxn = index.read_txn()?;
|
||||
let embedder_configs = index.embedding_configs(&rtxn)?;
|
||||
let embedders = index_scheduler.embedders(index_uid, embedder_configs)?;
|
||||
|
||||
let (embedder, _, quantized) = embedders
|
||||
|
|
|
@ -144,14 +144,6 @@ async fn experimental_feature_metrics() {
|
|||
let (response, code) = server.get_metrics().await;
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(response, @"null");
|
||||
|
||||
// startup without flag respects persisted metrics value
|
||||
let disable_metrics =
|
||||
Opt { experimental_enable_metrics: false, ..default_settings(dir.path()) };
|
||||
let server_no_flag = Server::new_with_options(disable_metrics).await.unwrap();
|
||||
let (response, code) = server_no_flag.get_metrics().await;
|
||||
meili_snap::snapshot!(code, @"200 OK");
|
||||
meili_snap::snapshot!(response, @"null");
|
||||
}
|
||||
|
||||
#[actix_rt::test]
|
||||
|
|
|
@ -64,8 +64,8 @@ async fn version_requires_downgrade() {
|
|||
#[actix_rt::test]
|
||||
async fn upgrade_to_the_current_version() {
|
||||
let temp = tempfile::tempdir().unwrap();
|
||||
let server = Server::new_with_options(default_settings(temp.path())).await.unwrap();
|
||||
drop(server);
|
||||
// let server = Server::new_with_options(default_settings(temp.path())).await.unwrap();
|
||||
// drop(server);
|
||||
|
||||
let server = Server::new_with_options(Opt {
|
||||
experimental_dumpless_upgrade: true,
|
||||
|
|
|
@ -108,6 +108,10 @@ async fn check_the_keys(server: &Server) {
|
|||
/// 5.2. Enqueue a new task
|
||||
/// 5.3. Create an index
|
||||
async fn check_the_index_scheduler(server: &Server) {
|
||||
// Wait until the upgrade has been applied to all indexes to avoid flakyness
|
||||
let (tasks, _) = server.tasks_filter("types=upgradeDatabase&limit=1").await;
|
||||
server.wait_task(Value(tasks["results"][0].clone()).uid()).await.succeeded();
|
||||
|
||||
// All the indexes are still present
|
||||
let (indexes, _) = server.list_indexes(None, None).await;
|
||||
snapshot!(indexes, @r#"
|
||||
|
@ -156,10 +160,6 @@ async fn check_the_index_scheduler(server: &Server) {
|
|||
}
|
||||
"###);
|
||||
|
||||
// Wait until the upgrade has been applied to all indexes to avoid flakyness
|
||||
let (tasks, _) = server.tasks_filter("types=upgradeDatabase&limit=1").await;
|
||||
server.wait_task(Value(tasks["results"][0].clone()).uid()).await.succeeded();
|
||||
|
||||
// Tasks and batches should still work
|
||||
// We rewrite the first task for all calls because it may be the upgrade database with unknown dates and duration.
|
||||
// The other tasks should NOT change
|
||||
|
|
|
@ -10,7 +10,6 @@ license.workspace = true
|
|||
|
||||
[dependencies]
|
||||
anyhow = "1.0.95"
|
||||
arroy_v04_to_v05 = { package = "arroy", git = "https://github.com/meilisearch/arroy/", tag = "DO-NOT-DELETE-upgrade-v04-to-v05" }
|
||||
clap = { version = "4.5.24", features = ["derive"] }
|
||||
dump = { path = "../dump" }
|
||||
file-store = { path = "../file-store" }
|
||||
|
|
|
@ -7,11 +7,11 @@ use anyhow::{bail, Context};
|
|||
use clap::{Parser, Subcommand, ValueEnum};
|
||||
use dump::{DumpWriter, IndexMetadata};
|
||||
use file_store::FileStore;
|
||||
use meilisearch_auth::AuthController;
|
||||
use meilisearch_auth::{open_auth_store_env, AuthController};
|
||||
use meilisearch_types::batches::Batch;
|
||||
use meilisearch_types::heed::types::{Bytes, SerdeJson, Str};
|
||||
use meilisearch_types::heed::{
|
||||
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified,
|
||||
CompactionOption, Database, Env, EnvOpenOptions, RoTxn, RwTxn, TlsUsage, Unspecified,
|
||||
};
|
||||
use meilisearch_types::milli::constants::RESERVED_VECTORS_FIELD_NAME;
|
||||
use meilisearch_types::milli::documents::{obkv_to_object, DocumentsBatchReader};
|
||||
|
@ -224,8 +224,8 @@ fn clear_task_queue(db_path: PathBuf) -> anyhow::Result<()> {
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn try_opening_database<KC: 'static, DC: 'static>(
|
||||
env: &Env,
|
||||
fn try_opening_database<KC: 'static, DC: 'static, T: TlsUsage>(
|
||||
env: &Env<T>,
|
||||
rtxn: &RoTxn,
|
||||
db_name: &str,
|
||||
) -> anyhow::Result<Database<KC, DC>> {
|
||||
|
@ -234,8 +234,8 @@ fn try_opening_database<KC: 'static, DC: 'static>(
|
|||
.with_context(|| format!("Missing the {db_name:?} database"))
|
||||
}
|
||||
|
||||
fn try_opening_poly_database(
|
||||
env: &Env,
|
||||
fn try_opening_poly_database<T: TlsUsage>(
|
||||
env: &Env<T>,
|
||||
rtxn: &RoTxn,
|
||||
db_name: &str,
|
||||
) -> anyhow::Result<Database<Unspecified, Unspecified>> {
|
||||
|
@ -290,7 +290,10 @@ fn export_a_dump(
|
|||
eprintln!("Dumping the keys...");
|
||||
|
||||
// 2. dump the keys
|
||||
let auth_store = AuthController::new(&db_path, &None)
|
||||
let auth_path = db_path.join("auth");
|
||||
std::fs::create_dir_all(&auth_path).context("While creating the auth directory")?;
|
||||
let auth_env = open_auth_store_env(&auth_path).context("While opening the auth store")?;
|
||||
let auth_store = AuthController::new(auth_env, &None)
|
||||
.with_context(|| format!("While opening the auth store at {}", db_path.display()))?;
|
||||
let mut dump_keys = dump.create_keys()?;
|
||||
let mut count = 0;
|
||||
|
@ -386,9 +389,10 @@ fn export_a_dump(
|
|||
for result in index_mapping.iter(&rtxn)? {
|
||||
let (uid, uuid) = result?;
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
|
||||
.with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
|
||||
let rtxn = index.read_txn()?;
|
||||
let metadata = IndexMetadata {
|
||||
|
@ -456,9 +460,10 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
|
|||
}
|
||||
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
let index = Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
let index = Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
|
||||
.with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
|
||||
eprintln!("Awaiting for a mutable transaction...");
|
||||
let _wtxn = index.write_txn().context("While awaiting for a write transaction")?;
|
||||
|
@ -470,7 +475,7 @@ fn compact_index(db_path: PathBuf, index_name: &str) -> anyhow::Result<()> {
|
|||
eprintln!("Compacting the index...");
|
||||
let before_compaction = Instant::now();
|
||||
let new_file = index
|
||||
.copy_to_file(&compacted_index_file_path, CompactionOption::Enabled)
|
||||
.copy_to_path(&compacted_index_file_path, CompactionOption::Enabled)
|
||||
.with_context(|| format!("While compacting {}", compacted_index_file_path.display()))?;
|
||||
|
||||
let after_size = new_file.metadata()?.len();
|
||||
|
@ -526,9 +531,10 @@ fn export_documents(
|
|||
if uid == index_name {
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
let index =
|
||||
Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
|
||||
.with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
|
||||
let rtxn = index.read_txn()?;
|
||||
let fields_ids_map = index.fields_ids_map(&rtxn)?;
|
||||
|
@ -630,9 +636,10 @@ fn hair_dryer(
|
|||
if index_names.iter().any(|i| i == uid) {
|
||||
let index_path = db_path.join("indexes").join(uuid.to_string());
|
||||
let index =
|
||||
Index::new(EnvOpenOptions::new(), &index_path, false).with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
Index::new(EnvOpenOptions::new().read_txn_without_tls(), &index_path, false)
|
||||
.with_context(|| {
|
||||
format!("While trying to open the index at path {:?}", index_path.display())
|
||||
})?;
|
||||
|
||||
eprintln!("Trying to get a read transaction on the {uid} index...");
|
||||
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::path::Path;
|
|||
|
||||
use anyhow::{bail, Context};
|
||||
use meilisearch_types::heed::types::{SerdeJson, Str};
|
||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, Unspecified};
|
||||
use meilisearch_types::heed::{Database, Env, EnvOpenOptions, RoTxn, RwTxn, TlsUsage, Unspecified};
|
||||
use meilisearch_types::milli::index::{db_name, main_key};
|
||||
|
||||
use super::v1_9;
|
||||
|
@ -90,9 +90,9 @@ fn update_index_stats(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn update_date_format(
|
||||
fn update_date_format<T: TlsUsage>(
|
||||
index_uid: &str,
|
||||
index_env: &Env,
|
||||
index_env: &Env<T>,
|
||||
index_wtxn: &mut RwTxn,
|
||||
) -> anyhow::Result<()> {
|
||||
let main = try_opening_poly_database(index_env, index_wtxn, db_name::MAIN)
|
||||
|
@ -104,9 +104,9 @@ fn update_date_format(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
fn find_rest_embedders(
|
||||
fn find_rest_embedders<T: TlsUsage>(
|
||||
index_uid: &str,
|
||||
index_env: &Env,
|
||||
index_env: &Env<T>,
|
||||
index_txn: &RoTxn,
|
||||
) -> anyhow::Result<Vec<String>> {
|
||||
let main = try_opening_poly_database(index_env, index_txn, db_name::MAIN)
|
||||
|
|
|
@ -76,11 +76,11 @@ pub fn v1_10_to_v1_11(
|
|||
try_opening_poly_database(&index_env, &index_wtxn, db_name::VECTOR_ARROY)
|
||||
.with_context(|| format!("while updating date format for index `{uid}`"))?;
|
||||
|
||||
arroy_v04_to_v05::ugrade_from_prev_version(
|
||||
meilisearch_types::milli::arroy::upgrade::cosine_from_0_4_to_0_5(
|
||||
&index_rtxn,
|
||||
index_read_database,
|
||||
index_read_database.remap_types(),
|
||||
&mut index_wtxn,
|
||||
index_write_database,
|
||||
index_write_database.remap_types(),
|
||||
)?;
|
||||
|
||||
index_wtxn.commit()?;
|
||||
|
|
|
@ -173,11 +173,12 @@ fn rebuild_field_distribution(db_path: &Path) -> anyhow::Result<()> {
|
|||
|
||||
println!("\t- Rebuilding field distribution");
|
||||
|
||||
let index =
|
||||
meilisearch_types::milli::Index::new(EnvOpenOptions::new(), &index_path, false)
|
||||
.with_context(|| {
|
||||
format!("while opening index {uid} at '{}'", index_path.display())
|
||||
})?;
|
||||
let index = meilisearch_types::milli::Index::new(
|
||||
EnvOpenOptions::new().read_txn_without_tls(),
|
||||
&index_path,
|
||||
false,
|
||||
)
|
||||
.with_context(|| format!("while opening index {uid} at '{}'", index_path.display()))?;
|
||||
|
||||
let mut index_txn = index.write_txn()?;
|
||||
|
||||
|
|
|
@ -28,11 +28,13 @@ flatten-serde-json = { path = "../flatten-serde-json" }
|
|||
fst = "0.4.7"
|
||||
fxhash = "0.2.1"
|
||||
geoutils = "0.5.1"
|
||||
grenad = { version = "0.5.0", default-features = false, features = ["rayon", "tempfile"] }
|
||||
heed = { version = "0.20.5", default-features = false, features = [
|
||||
grenad = { version = "0.5.0", default-features = false, features = [
|
||||
"rayon",
|
||||
"tempfile",
|
||||
] }
|
||||
heed = { version = "0.22.0", branch = "main", git = "https://github.com/meilisearch/heed", default-features = false, features = [
|
||||
"serde-json",
|
||||
"serde-bincode",
|
||||
"read-txn-no-tls",
|
||||
] }
|
||||
indexmap = { version = "2.7.0", features = ["serde"] }
|
||||
json-depth-checker = { path = "../json-depth-checker" }
|
||||
|
@ -85,7 +87,7 @@ rhai = { git = "https://github.com/rhaiscript/rhai", rev = "ef3df63121d27aacd838
|
|||
"no_time",
|
||||
"sync",
|
||||
] }
|
||||
arroy = "0.5.0"
|
||||
arroy = { git = "https://github.com/meilisearch/arroy", branch = "main" }
|
||||
rand = "0.8.5"
|
||||
tracing = "0.1.41"
|
||||
ureq = { version = "2.12.1", features = ["json"] }
|
||||
|
@ -101,7 +103,13 @@ uell = "0.1.0"
|
|||
enum-iterator = "2.1.0"
|
||||
bbqueue = { git = "https://github.com/meilisearch/bbqueue" }
|
||||
flume = { version = "0.11.1", default-features = false }
|
||||
utoipa = { version = "5.3.1", features = ["non_strict_integers", "preserve_order", "uuid", "time", "openapi_extensions"] }
|
||||
utoipa = { version = "5.3.1", features = [
|
||||
"non_strict_integers",
|
||||
"preserve_order",
|
||||
"uuid",
|
||||
"time",
|
||||
"openapi_extensions",
|
||||
] }
|
||||
|
||||
[dev-dependencies]
|
||||
mimalloc = { version = "0.1.43", default-features = false }
|
||||
|
@ -113,9 +121,7 @@ meili-snap = { path = "../meili-snap" }
|
|||
rand = { version = "0.8.5", features = ["small_rng"] }
|
||||
|
||||
[features]
|
||||
all-tokenizations = [
|
||||
"charabia/default",
|
||||
]
|
||||
all-tokenizations = ["charabia/default"]
|
||||
|
||||
# Use POSIX semaphores instead of SysV semaphores in LMDB
|
||||
# For more information on this feature, see heed's Cargo.toml
|
||||
|
|
|
@ -80,9 +80,13 @@ impl DocumentsBatchIndex {
|
|||
let mut map = Object::new();
|
||||
|
||||
for (k, v) in document.iter() {
|
||||
// TODO: TAMO: update the error type
|
||||
let key =
|
||||
self.0.get_by_left(&k).ok_or(crate::error::InternalError::DatabaseClosing)?.clone();
|
||||
let key = self
|
||||
.0
|
||||
.get_by_left(&k)
|
||||
.ok_or(crate::error::InternalError::FieldIdMapMissingEntry(
|
||||
FieldIdMapMissingEntry::FieldId { field_id: k, process: "recreate_json" },
|
||||
))?
|
||||
.clone();
|
||||
let value = serde_json::from_slice::<serde_json::Value>(v)
|
||||
.map_err(crate::error::InternalError::SerdeJson)?;
|
||||
map.insert(key, value);
|
||||
|
|
|
@ -33,8 +33,6 @@ pub enum Error {
|
|||
|
||||
#[derive(Error, Debug)]
|
||||
pub enum InternalError {
|
||||
#[error("{}", HeedError::DatabaseClosing)]
|
||||
DatabaseClosing,
|
||||
#[error("missing {} in the {db_name} database", key.unwrap_or("key"))]
|
||||
DatabaseMissingEntry { db_name: &'static str, key: Option<&'static str> },
|
||||
#[error("missing {key} in the fieldids weights mapping")]
|
||||
|
@ -197,8 +195,8 @@ and can not be more than 511 bytes.", .document_id.to_string()
|
|||
valid_fields: BTreeSet<String>,
|
||||
hidden_fields: bool,
|
||||
},
|
||||
#[error("an environment is already opened with different options")]
|
||||
InvalidLmdbOpenOptions,
|
||||
#[error("An LMDB environment is already opened")]
|
||||
EnvAlreadyOpened,
|
||||
#[error("You must specify where `sort` is listed in the rankingRules setting to use the sort parameter at search time.")]
|
||||
SortRankingRuleMissing,
|
||||
#[error("The database file is in an invalid state.")]
|
||||
|
@ -362,7 +360,8 @@ impl From<arroy::Error> for Error {
|
|||
| arroy::Error::UnmatchingDistance { .. }
|
||||
| arroy::Error::NeedBuild(_)
|
||||
| arroy::Error::MissingKey { .. }
|
||||
| arroy::Error::MissingMetadata(_) => {
|
||||
| arroy::Error::MissingMetadata(_)
|
||||
| arroy::Error::CannotDecodeKeyMode { .. } => {
|
||||
Error::InternalError(InternalError::ArroyError(value))
|
||||
}
|
||||
}
|
||||
|
@ -516,8 +515,7 @@ impl From<HeedError> for Error {
|
|||
// TODO use the encoding
|
||||
HeedError::Encoding(_) => InternalError(Serialization(Encoding { db_name: None })),
|
||||
HeedError::Decoding(_) => InternalError(Serialization(Decoding { db_name: None })),
|
||||
HeedError::DatabaseClosing => InternalError(DatabaseClosing),
|
||||
HeedError::BadOpenOptions { .. } => UserError(InvalidLmdbOpenOptions),
|
||||
HeedError::EnvAlreadyOpened { .. } => UserError(EnvAlreadyOpened),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use std::borrow::Cow;
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::Seek;
|
||||
use std::path::Path;
|
||||
|
||||
use heed::types::*;
|
||||
use heed::{types::*, WithoutTls};
|
||||
use heed::{CompactionOption, Database, RoTxn, RwTxn, Unspecified};
|
||||
use roaring::RoaringBitmap;
|
||||
use rstar::RTree;
|
||||
|
@ -110,7 +111,7 @@ pub mod db_name {
|
|||
#[derive(Clone)]
|
||||
pub struct Index {
|
||||
/// The LMDB environment which this index is associated with.
|
||||
pub(crate) env: heed::Env,
|
||||
pub(crate) env: heed::Env<WithoutTls>,
|
||||
|
||||
/// Contains many different types (e.g. the fields ids map).
|
||||
pub(crate) main: Database<Unspecified, Unspecified>,
|
||||
|
@ -177,7 +178,7 @@ pub struct Index {
|
|||
|
||||
impl Index {
|
||||
pub fn new_with_creation_dates<P: AsRef<Path>>(
|
||||
mut options: heed::EnvOpenOptions,
|
||||
mut options: heed::EnvOpenOptions<WithoutTls>,
|
||||
path: P,
|
||||
created_at: time::OffsetDateTime,
|
||||
updated_at: time::OffsetDateTime,
|
||||
|
@ -275,7 +276,7 @@ impl Index {
|
|||
}
|
||||
|
||||
pub fn new<P: AsRef<Path>>(
|
||||
options: heed::EnvOpenOptions,
|
||||
options: heed::EnvOpenOptions<WithoutTls>,
|
||||
path: P,
|
||||
creation: bool,
|
||||
) -> Result<Index> {
|
||||
|
@ -284,7 +285,7 @@ impl Index {
|
|||
}
|
||||
|
||||
fn set_creation_dates(
|
||||
env: &heed::Env,
|
||||
env: &heed::Env<WithoutTls>,
|
||||
main: Database<Unspecified, Unspecified>,
|
||||
created_at: time::OffsetDateTime,
|
||||
updated_at: time::OffsetDateTime,
|
||||
|
@ -306,12 +307,12 @@ impl Index {
|
|||
}
|
||||
|
||||
/// Create a read transaction to be able to read the index.
|
||||
pub fn read_txn(&self) -> heed::Result<RoTxn<'_>> {
|
||||
pub fn read_txn(&self) -> heed::Result<RoTxn<'_, WithoutTls>> {
|
||||
self.env.read_txn()
|
||||
}
|
||||
|
||||
/// Create a static read transaction to be able to read the index without keeping a reference to it.
|
||||
pub fn static_read_txn(&self) -> heed::Result<RoTxn<'static>> {
|
||||
pub fn static_read_txn(&self) -> heed::Result<RoTxn<'static, WithoutTls>> {
|
||||
self.env.clone().static_read_txn()
|
||||
}
|
||||
|
||||
|
@ -340,8 +341,16 @@ impl Index {
|
|||
self.env.info().map_size
|
||||
}
|
||||
|
||||
pub fn copy_to_file<P: AsRef<Path>>(&self, path: P, option: CompactionOption) -> Result<File> {
|
||||
self.env.copy_to_file(path, option).map_err(Into::into)
|
||||
pub fn copy_to_file(&self, file: &mut File, option: CompactionOption) -> Result<()> {
|
||||
self.env.copy_to_file(file, option).map_err(Into::into)
|
||||
}
|
||||
|
||||
pub fn copy_to_path<P: AsRef<Path>>(&self, path: P, option: CompactionOption) -> Result<File> {
|
||||
let mut file =
|
||||
File::options().create(true).write(true).truncate(true).read(true).open(path)?;
|
||||
self.copy_to_file(&mut file, option)?;
|
||||
file.rewind()?;
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
/// Returns an `EnvClosingEvent` that can be used to wait for the closing event,
|
||||
|
@ -1825,7 +1834,8 @@ pub(crate) mod tests {
|
|||
impl TempIndex {
|
||||
/// Creates a temporary index
|
||||
pub fn new_with_map_size(size: usize) -> Self {
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(size);
|
||||
let _tempdir = TempDir::new_in(".").unwrap();
|
||||
let inner = Index::new(options, _tempdir.path(), true).unwrap();
|
||||
|
|
|
@ -83,6 +83,8 @@ pub use self::search::{
|
|||
};
|
||||
pub use self::update::ChannelCongestion;
|
||||
|
||||
pub use arroy;
|
||||
|
||||
pub type Result<T> = std::result::Result<T, error::Error>;
|
||||
|
||||
pub type Attribute = u32;
|
||||
|
|
|
@ -15,7 +15,8 @@ use crate::constants::RESERVED_GEO_FIELD_NAME;
|
|||
|
||||
pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
|
||||
let path = tempfile::tempdir().unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(10 * 1024 * 1024); // 10 MB
|
||||
let index = Index::new(options, &path, true).unwrap();
|
||||
|
||||
|
|
|
@ -352,7 +352,7 @@ pub(crate) mod test_helpers {
|
|||
|
||||
use grenad::MergerBuilder;
|
||||
use heed::types::Bytes;
|
||||
use heed::{BytesDecode, BytesEncode, Env, RoTxn, RwTxn};
|
||||
use heed::{BytesDecode, BytesEncode, Env, RoTxn, RwTxn, WithoutTls};
|
||||
use roaring::RoaringBitmap;
|
||||
|
||||
use super::bulk::FacetsUpdateBulkInner;
|
||||
|
@ -390,7 +390,7 @@ pub(crate) mod test_helpers {
|
|||
for<'a> BoundCodec:
|
||||
BytesEncode<'a> + BytesDecode<'a, DItem = <BoundCodec as BytesEncode<'a>>::EItem>,
|
||||
{
|
||||
pub env: Env,
|
||||
pub env: Env<WithoutTls>,
|
||||
pub content: heed::Database<FacetGroupKeyCodec<BytesRefCodec>, FacetGroupValueCodec>,
|
||||
pub group_size: Cell<u8>,
|
||||
pub min_level_size: Cell<u8>,
|
||||
|
@ -412,7 +412,8 @@ pub(crate) mod test_helpers {
|
|||
let group_size = group_size.clamp(2, 127);
|
||||
let max_group_size = std::cmp::min(127, std::cmp::max(group_size * 2, max_group_size)); // 2*group_size <= x <= 127
|
||||
let min_level_size = std::cmp::max(1, min_level_size); // 1 <= x <= inf
|
||||
let mut options = heed::EnvOpenOptions::new();
|
||||
let options = heed::EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
let options = options.map_size(4096 * 4 * 1000 * 100);
|
||||
let tempdir = tempfile::TempDir::new().unwrap();
|
||||
let env = unsafe { options.open(tempdir.path()) }.unwrap();
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::atomic::Ordering;
|
|||
use std::sync::{Arc, RwLock};
|
||||
|
||||
use bumpalo::Bump;
|
||||
use heed::RoTxn;
|
||||
use heed::{RoTxn, WithoutTls};
|
||||
use rayon::iter::IndexedParallelIterator;
|
||||
|
||||
use super::super::document_change::DocumentChange;
|
||||
|
@ -28,7 +28,7 @@ pub struct DocumentChangeContext<
|
|||
/// inside of the DB.
|
||||
pub db_fields_ids_map: &'indexer FieldsIdsMap,
|
||||
/// A transaction providing data from the DB before all indexing operations
|
||||
pub rtxn: RoTxn<'indexer>,
|
||||
pub rtxn: RoTxn<'indexer, WithoutTls>,
|
||||
|
||||
/// Global field id map that is up to date with the current state of the indexing process.
|
||||
///
|
||||
|
|
|
@ -1,15 +1,17 @@
|
|||
mod v1_12;
|
||||
mod v1_13;
|
||||
mod v1_14;
|
||||
|
||||
use heed::RwTxn;
|
||||
use v1_12::{V1_12_3_To_V1_13_0, V1_12_To_V1_12_3};
|
||||
use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Current};
|
||||
use v1_13::{V1_13_0_To_V1_13_1, V1_13_1_To_Latest_V1_13};
|
||||
use v1_14::Latest_V1_13_To_Latest_V1_14;
|
||||
|
||||
use crate::progress::{Progress, VariableNameStep};
|
||||
use crate::{Index, InternalError, Result};
|
||||
|
||||
trait UpgradeIndex {
|
||||
/// Returns true if the index scheduler must regenerate its cached stats
|
||||
/// Returns `true` if the index scheduler must regenerate its cached stats.
|
||||
fn upgrade(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
|
@ -32,15 +34,17 @@ pub fn upgrade(
|
|||
&V1_12_To_V1_12_3 {},
|
||||
&V1_12_3_To_V1_13_0 {},
|
||||
&V1_13_0_To_V1_13_1 {},
|
||||
&V1_13_1_To_Current {},
|
||||
&V1_13_1_To_Latest_V1_13 {},
|
||||
&Latest_V1_13_To_Latest_V1_14 {},
|
||||
];
|
||||
|
||||
let start = match from {
|
||||
(1, 12, 0..=2) => 0,
|
||||
(1, 12, 3..) => 1,
|
||||
(1, 13, 0) => 2,
|
||||
(1, 13, _) => 4,
|
||||
// We must handle the current version in the match because in case of a failure some index may have been upgraded but not other.
|
||||
(1, 13, _) => 3,
|
||||
(1, 14, _) => 4,
|
||||
(major, minor, patch) => {
|
||||
return Err(InternalError::CannotUpgradeToVersion(major, minor, patch).into())
|
||||
}
|
||||
|
@ -50,7 +54,6 @@ pub fn upgrade(
|
|||
let upgrade_path = &upgrade_functions[start..];
|
||||
|
||||
let mut current_version = from;
|
||||
|
||||
let mut regenerate_stats = false;
|
||||
for (i, upgrade) in upgrade_path.iter().enumerate() {
|
||||
let target = upgrade.target_version();
|
||||
|
|
|
@ -37,9 +37,9 @@ impl UpgradeIndex for V1_13_0_To_V1_13_1 {
|
|||
}
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct V1_13_1_To_Current();
|
||||
pub(super) struct V1_13_1_To_Latest_V1_13();
|
||||
|
||||
impl UpgradeIndex for V1_13_1_To_Current {
|
||||
impl UpgradeIndex for V1_13_1_To_Latest_V1_13 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
_wtxn: &mut RwTxn,
|
||||
|
|
|
@ -0,0 +1,41 @@
|
|||
use arroy::distances::Cosine;
|
||||
use heed::RwTxn;
|
||||
|
||||
use super::UpgradeIndex;
|
||||
use crate::progress::Progress;
|
||||
use crate::{make_enum_progress, Index, Result};
|
||||
|
||||
#[allow(non_camel_case_types)]
|
||||
pub(super) struct Latest_V1_13_To_Latest_V1_14();
|
||||
|
||||
impl UpgradeIndex for Latest_V1_13_To_Latest_V1_14 {
|
||||
fn upgrade(
|
||||
&self,
|
||||
wtxn: &mut RwTxn,
|
||||
index: &Index,
|
||||
_original: (u32, u32, u32),
|
||||
progress: Progress,
|
||||
) -> Result<bool> {
|
||||
make_enum_progress! {
|
||||
enum VectorStore {
|
||||
UpdateInternalVersions,
|
||||
}
|
||||
};
|
||||
|
||||
progress.update_progress(VectorStore::UpdateInternalVersions);
|
||||
|
||||
let rtxn = index.read_txn()?;
|
||||
arroy::upgrade::from_0_5_to_0_6::<Cosine>(
|
||||
&rtxn,
|
||||
index.vector_arroy.remap_data_type(),
|
||||
wtxn,
|
||||
index.vector_arroy.remap_data_type(),
|
||||
)?;
|
||||
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
fn target_version(&self) -> (u32, u32, u32) {
|
||||
(1, 14, 0)
|
||||
}
|
||||
}
|
|
@ -12,7 +12,8 @@ use serde_json::{from_value, json};
|
|||
#[test]
|
||||
fn test_facet_distribution_with_no_facet_values() {
|
||||
let path = tempfile::tempdir().unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(10 * 1024 * 1024); // 10 MB
|
||||
let index = Index::new(options, &path, true).unwrap();
|
||||
|
||||
|
|
|
@ -34,7 +34,8 @@ pub const CONTENT: &str = include_str!("../assets/test_set.ndjson");
|
|||
|
||||
pub fn setup_search_index_with_criteria(criteria: &[Criterion]) -> Index {
|
||||
let path = tempfile::tempdir().unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(10 * 1024 * 1024); // 10 MB
|
||||
let index = Index::new(options, &path, true).unwrap();
|
||||
|
||||
|
|
|
@ -262,7 +262,8 @@ fn criteria_mixup() {
|
|||
#[test]
|
||||
fn criteria_ascdesc() {
|
||||
let path = tempfile::tempdir().unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(12 * 1024 * 1024); // 10 MB
|
||||
let index = Index::new(options, &path, true).unwrap();
|
||||
|
||||
|
|
|
@ -108,7 +108,8 @@ fn test_typo_tolerance_two_typo() {
|
|||
#[test]
|
||||
fn test_typo_disabled_on_word() {
|
||||
let tmp = tempdir().unwrap();
|
||||
let mut options = EnvOpenOptions::new();
|
||||
let options = EnvOpenOptions::new();
|
||||
let mut options = options.read_txn_without_tls();
|
||||
options.map_size(4096 * 100);
|
||||
let index = Index::new(options, tmp.path(), true).unwrap();
|
||||
|
||||
|
|
Loading…
Reference in New Issue