Merge a2eb64a5de
into 5e7803632d
This commit is contained in:
commit
752b5a1ca1
|
@ -3733,6 +3733,7 @@ dependencies = [
|
|||
"indexmap",
|
||||
"meilisearch-auth",
|
||||
"meilisearch-types",
|
||||
"rand",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tempfile",
|
||||
|
|
|
@ -17,6 +17,7 @@ file-store = { path = "../file-store" }
|
|||
indexmap = { version = "2.7.0", features = ["serde"] }
|
||||
meilisearch-auth = { path = "../meilisearch-auth" }
|
||||
meilisearch-types = { path = "../meilisearch-types" }
|
||||
rand = { version = "0.8", default-features = false }
|
||||
serde = { version = "1.0.217", features = ["derive"] }
|
||||
serde_json = { version = "1.0.135", features = ["preserve_order"] }
|
||||
tempfile = "3.15.0"
|
||||
|
|
|
@ -1,6 +1,9 @@
|
|||
use std::fs::{read_dir, read_to_string, remove_file, File};
|
||||
use std::hint::black_box;
|
||||
use std::io::{BufWriter, Write as _};
|
||||
use std::path::PathBuf;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::thread;
|
||||
use std::time::Instant;
|
||||
|
||||
use anyhow::{bail, Context};
|
||||
|
@ -20,6 +23,8 @@ use meilisearch_types::milli::{obkv_to_json, BEU32};
|
|||
use meilisearch_types::tasks::{Status, Task};
|
||||
use meilisearch_types::versioning::{get_version, parse_version};
|
||||
use meilisearch_types::Index;
|
||||
use rand::rngs::StdRng;
|
||||
use rand::{Rng, SeedableRng};
|
||||
use serde_json::Value::Object;
|
||||
use time::macros::format_description;
|
||||
use time::OffsetDateTime;
|
||||
|
@ -640,28 +645,87 @@ fn hair_dryer(
|
|||
for part in index_parts {
|
||||
match part {
|
||||
IndexPart::Arroy => {
|
||||
// It would be better if it is a command parameter
|
||||
let total_threads = thread::available_parallelism().unwrap().get() * 10;
|
||||
eprintln!("Hair drying arroy for {uid} using {total_threads} threads...");
|
||||
|
||||
let database = index.vector_arroy.remap_types::<Bytes, Bytes>();
|
||||
let num_keys = database.len(&rtxn)? as usize;
|
||||
let first_entry = database.iter(&rtxn)?.next().transpose()?;
|
||||
let last_entry = database.rev_iter(&rtxn)?.next().transpose()?;
|
||||
// Visit more keys per thread to ensure wide coverage.
|
||||
let total_keys_to_visit = num_keys * 100;
|
||||
let keys_by_thread = num_keys / total_threads;
|
||||
|
||||
let Some(((first_key, _), (last_key, _))) = first_entry.zip(last_entry)
|
||||
else {
|
||||
continue;
|
||||
};
|
||||
|
||||
let first_key_num = first_key.try_into().map(u64::from_be_bytes).unwrap();
|
||||
let last_key_num = last_key.try_into().map(u64::from_be_bytes).unwrap();
|
||||
|
||||
eprintln!("Iterating over {keys_by_thread} entries by thread ({total_threads}x)...");
|
||||
|
||||
let progress = AtomicUsize::new(0);
|
||||
thread::scope(|s| -> anyhow::Result<()> {
|
||||
let mut handles = Vec::new();
|
||||
|
||||
for tid in 0..total_threads {
|
||||
let index = &index;
|
||||
let progress = &progress;
|
||||
let handle = s.spawn(move || -> anyhow::Result<()> {
|
||||
let rtxn = index.read_txn()?;
|
||||
|
||||
let mut rng = StdRng::seed_from_u64(tid as u64);
|
||||
for _ in 0..keys_by_thread {
|
||||
let random_key_num = rng.gen_range(first_key_num..=last_key_num);
|
||||
let random_key = random_key_num.to_be_bytes();
|
||||
|
||||
let Some((key, value)) = database.get_greater_than(&rtxn, &random_key)? else {
|
||||
continue;
|
||||
};
|
||||
|
||||
// All of this just to avoid compiler optimizations 🤞
|
||||
// We must read all the bytes to make the pages hot in cache.
|
||||
// <https://doc.rust-lang.org/std/hint/fn.black_box.html>
|
||||
black_box(key.iter().fold(0, |acc, _| acc + 1));
|
||||
black_box(value.iter().fold(0, |acc, _| acc + 1));
|
||||
|
||||
let current_progress = progress.fetch_add(1, Ordering::Relaxed);
|
||||
if current_progress % 10_000 == 0 {
|
||||
let perc = (current_progress as f64) / (total_keys_to_visit as f64) * 100.0;
|
||||
eprintln!("Visited {current_progress}/{total_keys_to_visit} ({perc:.2}%) keys");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
});
|
||||
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
handles.into_iter().try_for_each(|h| h.join().unwrap())
|
||||
})?;
|
||||
|
||||
eprintln!("Doing a last pass on all the keys...");
|
||||
|
||||
let mut count = 0;
|
||||
let total = index.vector_arroy.len(&rtxn)?;
|
||||
eprintln!("Hair drying arroy for {uid}...");
|
||||
for (i, result) in index
|
||||
.vector_arroy
|
||||
.remap_types::<Bytes, Bytes>()
|
||||
.iter(&rtxn)?
|
||||
.enumerate()
|
||||
{
|
||||
for (i, result) in database.iter(&rtxn)?.enumerate() {
|
||||
let (key, value) = result?;
|
||||
|
||||
// All of this just to avoid compiler optimizations 🤞
|
||||
// We must read all the bytes to make the pages hot in cache.
|
||||
// <https://doc.rust-lang.org/std/hint/fn.black_box.html>
|
||||
count += std::hint::black_box(key.iter().fold(0, |acc, _| acc + 1));
|
||||
count += std::hint::black_box(value.iter().fold(0, |acc, _| acc + 1));
|
||||
count += black_box(key.iter().fold(0, |acc, _| acc + 1));
|
||||
count += black_box(value.iter().fold(0, |acc, _| acc + 1));
|
||||
|
||||
if i % 10_000 == 0 {
|
||||
let perc = (i as f64) / (total as f64) * 100.0;
|
||||
eprintln!("Visited {i}/{total} ({perc:.2}%) keys")
|
||||
let perc = (i as f64) / (total_keys_to_visit as f64) * 100.0;
|
||||
eprintln!("Visited {i}/{total_keys_to_visit} ({perc:.2}%) keys");
|
||||
}
|
||||
}
|
||||
|
||||
eprintln!("Done hair drying a total of at least {count} bytes.");
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue