stream documents
This commit is contained in:
parent
673b6e1dc0
commit
8e6ffbfc6f
|
@ -3348,6 +3348,7 @@ dependencies = [
|
|||
"rayon",
|
||||
"regex",
|
||||
"reqwest",
|
||||
"roaring",
|
||||
"rustls 0.21.12",
|
||||
"rustls-pemfile",
|
||||
"segment",
|
||||
|
@ -4416,12 +4417,6 @@ dependencies = [
|
|||
"winreg",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "retain_mut"
|
||||
version = "0.1.7"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8c31b5c4033f8fdde8700e4657be2c497e7288f01515be52168c631e2e4d4086"
|
||||
|
||||
[[package]]
|
||||
name = "ring"
|
||||
version = "0.17.8"
|
||||
|
@ -4439,13 +4434,12 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "roaring"
|
||||
version = "0.10.2"
|
||||
version = "0.10.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "6106b5cf8587f5834158895e9715a3c6c9716c8aefab57f1f7680917191c7873"
|
||||
checksum = "a1c77081a55300e016cb86f2864415b7518741879db925b8d488a0ee0d2da6bf"
|
||||
dependencies = [
|
||||
"bytemuck",
|
||||
"byteorder",
|
||||
"retain_mut",
|
||||
"serde",
|
||||
]
|
||||
|
||||
|
|
|
@ -108,6 +108,7 @@ tracing-subscriber = { version = "0.3.18", features = ["json"] }
|
|||
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
|
||||
tracing-actix-web = "0.7.9"
|
||||
build-info = { version = "1.7.0", path = "../build-info" }
|
||||
roaring = "0.10.3"
|
||||
|
||||
[dev-dependencies]
|
||||
actix-rt = "2.9.0"
|
||||
|
|
|
@ -1,12 +1,14 @@
|
|||
use std::io::ErrorKind;
|
||||
use std::io::{ErrorKind, Write};
|
||||
|
||||
use actix_web::http::header::CONTENT_TYPE;
|
||||
use actix_web::web::Data;
|
||||
use actix_web::{web, HttpMessage, HttpRequest, HttpResponse};
|
||||
use bstr::ByteSlice as _;
|
||||
use bytes::Bytes;
|
||||
use deserr::actix_web::{AwebJson, AwebQueryParameter};
|
||||
use deserr::Deserr;
|
||||
use futures::StreamExt;
|
||||
use futures_util::Stream;
|
||||
use index_scheduler::{IndexScheduler, TaskId};
|
||||
use meilisearch_types::deserr::query_params::Param;
|
||||
use meilisearch_types::deserr::{DeserrJsonError, DeserrQueryParamError};
|
||||
|
@ -22,7 +24,9 @@ use meilisearch_types::tasks::KindWithContent;
|
|||
use meilisearch_types::{milli, Document, Index};
|
||||
use mime::Mime;
|
||||
use once_cell::sync::Lazy;
|
||||
use serde::Deserialize;
|
||||
use roaring::RoaringBitmap;
|
||||
use serde::ser::SerializeSeq;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
use tempfile::tempfile;
|
||||
use tokio::fs::File;
|
||||
|
@ -230,6 +234,34 @@ pub async fn get_documents(
|
|||
documents_by_query(&index_scheduler, index_uid, query)
|
||||
}
|
||||
|
||||
pub struct Writer2Streamer {
|
||||
sender: tokio::sync::mpsc::Sender<Result<Bytes, anyhow::Error>>,
|
||||
}
|
||||
|
||||
impl Write for Writer2Streamer {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
self.sender.blocking_send(Ok(buf.to_vec().into())).map_err(std::io::Error::other)?;
|
||||
Ok(buf.len())
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stream(
|
||||
data: impl Serialize + Send + Sync + 'static,
|
||||
) -> impl Stream<Item = Result<Bytes, anyhow::Error>> {
|
||||
let (sender, receiver) = tokio::sync::mpsc::channel::<Result<Bytes, anyhow::Error>>(1);
|
||||
|
||||
tokio::task::spawn_blocking(move || {
|
||||
serde_json::to_writer(std::io::BufWriter::new(Writer2Streamer { sender }), &data)
|
||||
});
|
||||
futures_util::stream::unfold(receiver, |mut receiver| async {
|
||||
receiver.recv().await.map(|value| (value, receiver))
|
||||
})
|
||||
}
|
||||
|
||||
fn documents_by_query(
|
||||
index_scheduler: &IndexScheduler,
|
||||
index_uid: web::Path<String>,
|
||||
|
@ -239,12 +271,13 @@ fn documents_by_query(
|
|||
let BrowseQuery { offset, limit, fields, filter } = query;
|
||||
|
||||
let index = index_scheduler.index(&index_uid)?;
|
||||
let (total, documents) = retrieve_documents(&index, offset, limit, filter, fields)?;
|
||||
let documents = retrieve_documents(index, offset, limit, filter, fields)?;
|
||||
|
||||
let ret = PaginationView::new(offset, limit, total as usize, documents);
|
||||
let ret = PaginationView::new(offset, limit, documents.total_documents as usize, documents);
|
||||
|
||||
debug!(returns = ?ret, "Get documents");
|
||||
Ok(HttpResponse::Ok().json(ret))
|
||||
|
||||
Ok(HttpResponse::Ok().streaming(stream(ret)))
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Debug, Deserr)]
|
||||
|
@ -590,13 +623,46 @@ fn some_documents<'a, 't: 'a>(
|
|||
}))
|
||||
}
|
||||
|
||||
fn retrieve_documents<S: AsRef<str>>(
|
||||
index: &Index,
|
||||
pub struct DocumentsStreamer {
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
documents: RoaringBitmap,
|
||||
index: Index,
|
||||
pub total_documents: u64,
|
||||
}
|
||||
|
||||
impl Serialize for DocumentsStreamer {
|
||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
let rtxn = self.index.read_txn().unwrap();
|
||||
|
||||
let mut seq = serializer.serialize_seq(Some(self.documents.len() as usize)).unwrap();
|
||||
|
||||
let documents = some_documents(&self.index, &rtxn, self.documents.iter()).unwrap();
|
||||
for document in documents {
|
||||
let document = document.unwrap();
|
||||
let document = match self.attributes_to_retrieve {
|
||||
Some(ref attributes_to_retrieve) => permissive_json_pointer::select_values(
|
||||
&document,
|
||||
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
||||
),
|
||||
None => document,
|
||||
};
|
||||
|
||||
seq.serialize_element(&document)?;
|
||||
}
|
||||
seq.end()
|
||||
}
|
||||
}
|
||||
|
||||
fn retrieve_documents(
|
||||
index: Index,
|
||||
offset: usize,
|
||||
limit: usize,
|
||||
filter: Option<Value>,
|
||||
attributes_to_retrieve: Option<Vec<S>>,
|
||||
) -> Result<(u64, Vec<Document>), ResponseError> {
|
||||
attributes_to_retrieve: Option<Vec<String>>,
|
||||
) -> Result<DocumentsStreamer, ResponseError> {
|
||||
let rtxn = index.read_txn()?;
|
||||
let filter = &filter;
|
||||
let filter = if let Some(filter) = filter {
|
||||
|
@ -607,7 +673,7 @@ fn retrieve_documents<S: AsRef<str>>(
|
|||
};
|
||||
|
||||
let candidates = if let Some(filter) = filter {
|
||||
filter.evaluate(&rtxn, index).map_err(|err| match err {
|
||||
filter.evaluate(&rtxn, &index).map_err(|err| match err {
|
||||
milli::Error::UserError(milli::UserError::InvalidFilter(_)) => {
|
||||
ResponseError::from_msg(err.to_string(), Code::InvalidDocumentFilter)
|
||||
}
|
||||
|
@ -616,28 +682,14 @@ fn retrieve_documents<S: AsRef<str>>(
|
|||
} else {
|
||||
index.documents_ids(&rtxn)?
|
||||
};
|
||||
drop(rtxn);
|
||||
|
||||
let (it, number_of_documents) = {
|
||||
let number_of_documents = candidates.len();
|
||||
(
|
||||
some_documents(index, &rtxn, candidates.into_iter().skip(offset).take(limit))?,
|
||||
number_of_documents,
|
||||
)
|
||||
};
|
||||
|
||||
let documents: Result<Vec<_>, ResponseError> = it
|
||||
.map(|document| {
|
||||
Ok(match &attributes_to_retrieve {
|
||||
Some(attributes_to_retrieve) => permissive_json_pointer::select_values(
|
||||
&document?,
|
||||
attributes_to_retrieve.iter().map(|s| s.as_ref()),
|
||||
),
|
||||
None => document?,
|
||||
Ok(DocumentsStreamer {
|
||||
total_documents: candidates.len(),
|
||||
attributes_to_retrieve,
|
||||
documents: candidates.into_iter().skip(offset).take(limit).collect(),
|
||||
index,
|
||||
})
|
||||
})
|
||||
.collect();
|
||||
|
||||
Ok((number_of_documents, documents?))
|
||||
}
|
||||
|
||||
fn retrieve_document<S: AsRef<str>>(
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use std::collections::BTreeMap;
|
||||
use std::fmt;
|
||||
|
||||
use actix_web::web::Data;
|
||||
use actix_web::{web, HttpRequest, HttpResponse};
|
||||
|
@ -124,20 +125,31 @@ pub struct Pagination {
|
|||
pub limit: usize,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct PaginationView<T> {
|
||||
pub results: Vec<T>,
|
||||
#[derive(Clone, Serialize)]
|
||||
pub struct PaginationView<T: Serialize> {
|
||||
pub results: T,
|
||||
pub offset: usize,
|
||||
pub limit: usize,
|
||||
pub total: usize,
|
||||
}
|
||||
|
||||
impl<T: Serialize> fmt::Debug for PaginationView<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.debug_struct("PaginationView")
|
||||
.field("offset", &self.offset)
|
||||
.field("limit", &self.limit)
|
||||
.field("total", &self.total)
|
||||
.field("results", &"[...]")
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl Pagination {
|
||||
/// Given the full data to paginate, returns the selected section.
|
||||
pub fn auto_paginate_sized<T>(
|
||||
self,
|
||||
content: impl IntoIterator<Item = T> + ExactSizeIterator,
|
||||
) -> PaginationView<T>
|
||||
) -> PaginationView<Vec<T>>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
|
@ -151,7 +163,7 @@ impl Pagination {
|
|||
self,
|
||||
total: usize,
|
||||
content: impl IntoIterator<Item = T>,
|
||||
) -> PaginationView<T>
|
||||
) -> PaginationView<Vec<T>>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
|
@ -161,7 +173,7 @@ impl Pagination {
|
|||
|
||||
/// Given the data already paginated + the total number of elements, it stores
|
||||
/// everything in a [PaginationResult].
|
||||
pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<T>
|
||||
pub fn format_with<T>(self, total: usize, results: Vec<T>) -> PaginationView<Vec<T>>
|
||||
where
|
||||
T: Serialize,
|
||||
{
|
||||
|
@ -169,8 +181,8 @@ impl Pagination {
|
|||
}
|
||||
}
|
||||
|
||||
impl<T> PaginationView<T> {
|
||||
pub fn new(offset: usize, limit: usize, total: usize, results: Vec<T>) -> Self {
|
||||
impl<T: Serialize> PaginationView<T> {
|
||||
pub fn new(offset: usize, limit: usize, total: usize, results: T) -> Self {
|
||||
Self { offset, limit, results, total }
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue