From 8e6ffbfc6f55580784d9322af0453b874fe5cb0e Mon Sep 17 00:00:00 2001 From: Tamo Date: Thu, 28 Mar 2024 18:22:31 +0100 Subject: [PATCH] stream documents --- Cargo.lock | 12 +-- meilisearch/Cargo.toml | 1 + meilisearch/src/routes/indexes/documents.rs | 114 ++++++++++++++------ meilisearch/src/routes/mod.rs | 28 +++-- 4 files changed, 107 insertions(+), 48 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 937fce64a..5d87830a5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3348,6 +3348,7 @@ dependencies = [ "rayon", "regex", "reqwest", + "roaring", "rustls 0.21.12", "rustls-pemfile", "segment", @@ -4416,12 +4417,6 @@ dependencies = [ "winreg", ] -[[package]] -name = "retain_mut" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086" - [[package]] name = "ring" version = "0.17.8" @@ -4439,13 +4434,12 @@ dependencies = [ [[package]] name = "roaring" -version = "0.10.2" +version = "0.10.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873" +checksum = "a1c77081a55300e016cb86f2864415b7518741879db925b8d488a0ee0d2da6bf" dependencies = [ "bytemuck", "byteorder", - "retain_mut", "serde", ] diff --git a/meilisearch/Cargo.toml b/meilisearch/Cargo.toml index ed62c5f48..612c6731b 100644 --- a/meilisearch/Cargo.toml +++ b/meilisearch/Cargo.toml @@ -108,6 +108,7 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] } tracing-trace = { version = "0.1.0", path = "../tracing-trace" } tracing-actix-web = "0.7.9" build-info = { version = "1.7.0", path = "../build-info" } +roaring = "0.10.3" [dev-dependencies] actix-rt = "2.9.0" diff --git a/meilisearch/src/routes/indexes/documents.rs b/meilisearch/src/routes/indexes/documents.rs index 43fab1dae..78af7a098 100644 --- a/meilisearch/src/routes/indexes/documents.rs +++ b/meilisearch/src/routes/indexes/documents.rs @@ -1,12 +1,14 @@ -use std::io::ErrorKind; +use std::io::{ErrorKind, Write}; use actix_web::http::header::CONTENT_TYPE; use actix_web::web::Data; use actix_web::{web, HttpMessage, HttpRequest, HttpResponse}; use bstr::ByteSlice as _; +use bytes::Bytes; use deserr::actix_web::{AwebJson, AwebQueryParameter}; use deserr::Deserr; use futures::StreamExt; +use futures_util::Stream; use index_scheduler::{IndexScheduler, TaskId}; use meilisearch_types::deserr::query_params::Param; use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError}; @@ -22,7 +24,9 @@ use meilisearch_types::tasks::KindWithContent; use meilisearch_types::{milli, Document, Index}; use mime::Mime; use once_cell::sync::Lazy; -use serde::Deserialize; +use roaring::RoaringBitmap; +use serde::ser::SerializeSeq; +use serde::{Deserialize, Serialize}; use serde_json::Value; use tempfile::tempfile; use tokio::fs::File; @@ -230,6 +234,34 @@ pub async fn get_documents( documents_by_query(&index_scheduler, index_uid, query) } +pub struct Writer2Streamer { + sender: tokio::sync::mpsc::Sender>, +} + +impl Write for Writer2Streamer { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.sender.blocking_send(Ok(buf.to_vec().into())).map_err(std::io::Error::other)?; + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +pub fn stream( + data: impl Serialize + Send + Sync + 'static, +) -> impl Stream> { + let (sender, receiver) = tokio::sync::mpsc::channel::>(1); + + tokio::task::spawn_blocking(move || { + serde_json::to_writer(std::io::BufWriter::new(Writer2Streamer { sender }), &data) + }); + futures_util::stream::unfold(receiver, |mut receiver| async { + receiver.recv().await.map(|value| (value, receiver)) + }) +} + fn documents_by_query( index_scheduler: &IndexScheduler, index_uid: web::Path, @@ -239,12 +271,13 @@ fn documents_by_query( let BrowseQuery { offset, limit, fields, filter } = query; let index = index_scheduler.index(&index_uid)?; - let (total, documents) = retrieve_documents(&index, offset, limit, filter, fields)?; + let documents = retrieve_documents(index, offset, limit, filter, fields)?; - let ret = PaginationView::new(offset, limit, total as usize, documents); + let ret = PaginationView::new(offset, limit, documents.total_documents as usize, documents); debug!(returns = ?ret, "Get documents"); - Ok(HttpResponse::Ok().json(ret)) + + Ok(HttpResponse::Ok().streaming(stream(ret))) } #[derive(Deserialize, Debug, Deserr)] @@ -590,13 +623,46 @@ fn some_documents<'a, 't: 'a>( })) } -fn retrieve_documents>( - index: &Index, +pub struct DocumentsStreamer { + attributes_to_retrieve: Option>, + documents: RoaringBitmap, + index: Index, + pub total_documents: u64, +} + +impl Serialize for DocumentsStreamer { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let rtxn = self.index.read_txn().unwrap(); + + let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap(); + + let documents = some_documents(&self.index, &rtxn, self.documents.iter()).unwrap(); + for document in documents { + let document = document.unwrap(); + let document = match self.attributes_to_retrieve { + Some(ref attributes_to_retrieve) => permissive_json_pointer::select_values( + &document, + attributes_to_retrieve.iter().map(|s| s.as_ref()), + ), + None => document, + }; + + seq.serialize_element(&document)?; + } + seq.end() + } +} + +fn retrieve_documents( + index: Index, offset: usize, limit: usize, filter: Option, - attributes_to_retrieve: Option>, -) -> Result<(u64, Vec), ResponseError> { + attributes_to_retrieve: Option>, +) -> Result { let rtxn = index.read_txn()?; let filter = &filter; let filter = if let Some(filter) = filter { @@ -607,7 +673,7 @@ fn retrieve_documents>( }; let candidates = if let Some(filter) = filter { - filter.evaluate(&rtxn, index).map_err(|err| match err { + filter.evaluate(&rtxn, &index).map_err(|err| match err { milli::Error::UserError(milli::UserError::InvalidFilter(_)) => { ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter) } @@ -616,28 +682,14 @@ fn retrieve_documents>( } else { index.documents_ids(&rtxn)? }; + drop(rtxn); - let (it, number_of_documents) = { - let number_of_documents = candidates.len(); - ( - some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?, - number_of_documents, - ) - }; - - let documents: Result, ResponseError> = it - .map(|document| { - Ok(match &attributes_to_retrieve { - Some(attributes_to_retrieve) => permissive_json_pointer::select_values( - &document?, - attributes_to_retrieve.iter().map(|s| s.as_ref()), - ), - None => document?, - }) - }) - .collect(); - - Ok((number_of_documents, documents?)) + Ok(DocumentsStreamer { + total_documents: candidates.len(), + attributes_to_retrieve, + documents: candidates.into_iter().skip(offset).take(limit).collect(), + index, + }) } fn retrieve_document>( diff --git a/meilisearch/src/routes/mod.rs b/meilisearch/src/routes/mod.rs index c25aeee70..a7e84d19c 100644 --- a/meilisearch/src/routes/mod.rs +++ b/meilisearch/src/routes/mod.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::fmt; use actix_web::web::Data; use actix_web::{web, HttpRequest, HttpResponse}; @@ -124,20 +125,31 @@ pub struct Pagination { pub limit: usize, } -#[derive(Debug, Clone, Serialize)] -pub struct PaginationView { - pub results: Vec, +#[derive(Clone, Serialize)] +pub struct PaginationView { + pub results: T, pub offset: usize, pub limit: usize, pub total: usize, } +impl fmt::Debug for PaginationView { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("PaginationView") + .field("offset", &self.offset) + .field("limit", &self.limit) + .field("total", &self.total) + .field("results", &"[...]") + .finish() + } +} + impl Pagination { /// Given the full data to paginate, returns the selected section. pub fn auto_paginate_sized( self, content: impl IntoIterator + ExactSizeIterator, - ) -> PaginationView + ) -> PaginationView> where T: Serialize, { @@ -151,7 +163,7 @@ impl Pagination { self, total: usize, content: impl IntoIterator, - ) -> PaginationView + ) -> PaginationView> where T: Serialize, { @@ -161,7 +173,7 @@ impl Pagination { /// Given the data already paginated + the total number of elements, it stores /// everything in a [PaginationResult]. - pub fn format_with(self, total: usize, results: Vec) -> PaginationView + pub fn format_with(self, total: usize, results: Vec) -> PaginationView> where T: Serialize, { @@ -169,8 +181,8 @@ impl Pagination { } } -impl PaginationView { - pub fn new(offset: usize, limit: usize, total: usize, results: Vec) -> Self { +impl PaginationView { + pub fn new(offset: usize, limit: usize, total: usize, results: T) -> Self { Self { offset, limit, results, total } } }