feat: refactor merge file on compactor (#4971)

This commit is contained in:
Hengfei Yang 2024-11-07 17:30:10 +08:00 committed by GitHub
parent 1179109e2e
commit df0a4a79ae
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
33 changed files with 1073 additions and 1283 deletions

View File

@ -17,6 +17,7 @@ phases:
- GIT_TAG="$(git describe --tags --abbrev=0)"
# disable gxhash
- sed -i 's/default = \[\"gxhash\"\]/default = []/g' src/config/Cargo.toml
- sed -i 's/+aes,//g' .cargo/config.toml
# std version
- docker build -t openobserve:latest-arm64 -f deploy/build/Dockerfile.tag.aarch64 .

View File

@ -132,7 +132,7 @@ pub async fn cli() -> Result<bool, anyhow::Error> {
match command.get_one::<String>("path") {
Some(path) => {
set_permission(path, 0o777)?;
println!("init dir {} succeeded", path);
println!("init dir {} successfully", path);
}
None => {
return Err(anyhow::anyhow!("please set data path"));
@ -263,7 +263,7 @@ pub async fn cli() -> Result<bool, anyhow::Error> {
let file = command.get_one::<String>("file").unwrap();
match file_list::delete_parquet_file(file, true).await {
Ok(_) => {
println!("delete parquet file {} succeeded", file);
println!("delete parquet file {} successfully", file);
}
Err(e) => {
println!("delete parquet file {} failed, error: {}", file, e);
@ -291,6 +291,6 @@ pub async fn cli() -> Result<bool, anyhow::Error> {
log::error!("waiting for db close failed, error: {}", e);
}
println!("command {name} execute succeeded");
println!("command {name} execute successfully");
Ok(true)
}

View File

@ -13,23 +13,15 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::{
io::{Error, ErrorKind},
sync::Arc,
};
use std::io::{Error, ErrorKind};
use actix_web::HttpResponse;
use arrow::array::{Int64Array, RecordBatch};
use config::{
get_config,
meta::stream::{FileMeta, StreamType},
utils::{arrow::record_batches_to_json_rows, json},
FILE_EXT_JSON,
};
use datafusion::{
arrow::{datatypes::Schema, record_batch::RecordBatch},
datasource::MemTable,
prelude::SessionContext,
};
#[inline(always)]
pub fn stream_type_query_param_error() -> Result<HttpResponse, Error> {
@ -72,61 +64,59 @@ pub fn get_file_name_v1(org_id: &str, stream_name: &str, suffix: u32) -> String
}
pub async fn populate_file_meta(
schema: Arc<Schema>,
batch: Vec<Vec<RecordBatch>>,
batches: &[&RecordBatch],
file_meta: &mut FileMeta,
min_field: Option<&str>,
max_field: Option<&str>,
) -> Result<(), anyhow::Error> {
if schema.fields().is_empty() || batch.is_empty() {
if batches.is_empty() {
return Ok(());
}
let cfg = get_config();
let min_field = min_field.unwrap_or_else(|| cfg.common.column_timestamp.as_str());
let max_field = max_field.unwrap_or_else(|| cfg.common.column_timestamp.as_str());
let ctx = SessionContext::new();
let provider = MemTable::try_new(schema, batch)?;
ctx.register_table("temp", Arc::new(provider))?;
let sql = format!(
"SELECT min({}) as min, max({}) as max, count(*) as num_records FROM temp;",
min_field, max_field
);
let df = ctx.sql(sql.as_str()).await?;
let batches = df.collect().await?;
let batches_ref: Vec<&RecordBatch> = batches.iter().collect();
let json_rows = record_batches_to_json_rows(&batches_ref)?;
let mut result: Vec<json::Value> = json_rows.into_iter().map(json::Value::Object).collect();
if result.is_empty() {
return Ok(());
let total = batches.iter().map(|batch| batch.num_rows()).sum::<usize>();
let mut min_val = i64::MAX;
let mut max_val = 0;
for batch in batches.iter() {
let num_row = batch.num_rows();
let Some(min_field) = batch.column_by_name(min_field) else {
return Err(anyhow::anyhow!("No min_field found: {}", min_field));
};
let Some(max_field) = batch.column_by_name(max_field) else {
return Err(anyhow::anyhow!("No max_field found: {}", max_field));
};
let min_col = min_field.as_any().downcast_ref::<Int64Array>().unwrap();
let max_col = max_field.as_any().downcast_ref::<Int64Array>().unwrap();
for i in 0..num_row {
let val = min_col.value(i);
if val < min_val {
min_val = val;
}
let val = max_col.value(i);
if val > max_val {
max_val = val;
}
}
}
let record = result.pop().expect("No record found");
if record.is_null() {
return Ok(());
if min_val == i64::MAX {
min_val = 0;
}
file_meta.min_ts = record
.get("min")
.expect("No field found: min")
.as_i64()
.expect("No value found: min");
file_meta.max_ts = record
.get("max")
.expect("No field found: max")
.as_i64()
.expect("No value found: max");
file_meta.records = record
.get("num_records")
.expect("No field found: num_records")
.as_i64()
.expect("No value found: num_records");
file_meta.min_ts = min_val;
file_meta.max_ts = max_val;
file_meta.records = total as i64;
Ok(())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use datafusion::arrow::{
array::{Int64Array, StringArray},
datatypes::{DataType, Field},
array::StringArray,
datatypes::{DataType, Field, Schema},
};
use super::*;
@ -173,7 +163,7 @@ mod tests {
// define data.
let batch = RecordBatch::try_new(
schema.clone(),
schema,
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int64Array::from(vec![1, 2, 1, 2])),
@ -190,7 +180,7 @@ mod tests {
compressed_size: 700,
flattened: false,
};
populate_file_meta(schema, vec![vec![batch]], &mut file_meta, None, None)
populate_file_meta(&[&batch], &mut file_meta, None, None)
.await
.unwrap();
assert_eq!(file_meta.records, 4);
@ -209,7 +199,7 @@ mod tests {
// define data.
let batch = RecordBatch::try_new(
schema.clone(),
schema,
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int64Array::from(vec![1, 2, 1, 2])),
@ -226,15 +216,9 @@ mod tests {
compressed_size: 700,
flattened: false,
};
populate_file_meta(
schema,
vec![vec![batch]],
&mut file_meta,
Some("time"),
Some("time"),
)
.await
.unwrap();
populate_file_meta(&[&batch], &mut file_meta, Some("time"), Some("time"))
.await
.unwrap();
assert_eq!(file_meta.records, 4);
assert_eq!(file_meta.min_ts, val - 100);
}

View File

@ -1587,7 +1587,7 @@ fn check_common_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
}
if !["both", "parquet"].contains(&cfg.common.inverted_index_store_format.as_str()) {
return Err(anyhow::anyhow!(
"ZO_INVERTED_INDEX_SEARCH_FORMAT must be one of both, parquet."
"ZO_INVERTED_INDEX_STORE_FORMAT must be one of both, parquet."
));
}
if cfg.common.inverted_index_store_format != "both" {

View File

@ -17,8 +17,6 @@ pub mod reader;
pub mod search;
pub mod writer;
use std::{collections::HashMap, io::Write};
use anyhow::Result;
use serde::{Deserialize, Serialize};
@ -27,43 +25,8 @@ const INDEX_FILE_METAS_SIZE_SIZE: u64 = 4;
type Bytes = Vec<u8>;
type BytesRef<'a> = &'a [u8];
/// Tracks and consumes [`ColumnIndexMeta`] after each selected column within a parquet file
/// is indexed via [`ColumnIndexer`].
/// The aggregated metas is then compressed and written to buffer for writing to file system.
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
pub struct IndexFileMetas {
#[serde(default)]
pub metas: HashMap<String, ColumnIndexMeta>,
}
impl IndexFileMetas {
pub fn new() -> Self {
Self::default()
}
/// Writes aggregated [`ColumnIndexMeta`] into writer and compresses all written bytes.
/// Returns the length of bytes written to writer.
pub fn finish(&self, writer: &mut Vec<u8>) -> Result<u64> {
if self.metas.is_empty() {
return Ok(0u64);
}
let original_size = writer.len() as u64;
let meta_bytes = serde_json::to_vec(&self)?;
writer.write_all(&meta_bytes)?;
let metas_size = meta_bytes.len() as u32;
writer.write_all(&metas_size.to_le_bytes())?;
writer.flush()?;
let new_size = writer.len() as u64;
Ok(new_size - original_size)
}
}
#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)]
pub struct ColumnIndexMeta {
// base byte offset for this column index date within the index file (multiple column indices)
#[serde(default)]
pub base_offset: u64,
// total byte size of this column index date
#[serde(default)]
pub index_size: u64,

View File

@ -16,25 +16,49 @@
/// interface
use std::{io::SeekFrom, sync::Arc};
use anyhow::{anyhow, ensure, Result};
use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use anyhow::{anyhow, Result};
use futures::{io::Cursor, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt};
use super::{ColumnIndexMeta, IndexFileMetas, INDEX_FILE_METAS_SIZE_SIZE};
use crate::{meta::bitvec::BitVec, utils::inverted_index::unpack_u32_pair};
use super::{ColumnIndexMeta, INDEX_FILE_METAS_SIZE_SIZE};
use crate::{
meta::{bitvec::BitVec, puffin::reader::PuffinBytesReader},
utils::inverted_index::unpack_u32_pair,
};
/// Index reader helps read Index Blob which consists of multiple ColumnIndex.
pub struct IndexReader<R> {
source: R,
source: PuffinBytesReader<R>,
}
impl<R: AsyncRead + AsyncSeek + Unpin + Send> IndexReader<R> {
pub fn new(source: R) -> Self {
Self {
source: PuffinBytesReader::new(source),
}
}
pub async fn field(&mut self, field: &str) -> Result<Option<FieldReader>> {
let Some(field_metdata) = self.source.get_field(field).await? else {
return Ok(None);
};
let blob_bytes = self.source.read_blob_bytes(&field_metdata).await?;
let data = Cursor::new(blob_bytes);
Ok(Some(FieldReader::new(data)))
}
}
pub struct FieldReader {
source: Cursor<Vec<u8>>,
}
impl FieldReader {
pub fn new(source: Cursor<Vec<u8>>) -> Self {
Self { source }
}
/// Reads and parse the bytes read from source to construct [`IndexFileMetas`].
/// IndexFileMetas is used to find and read ColumnIndex for a particular column.
pub async fn metadata(&mut self) -> Result<Arc<IndexFileMetas>> {
pub async fn metadata(&mut self) -> Result<Arc<ColumnIndexMeta>> {
let end_offset = self.source.seek(SeekFrom::End(0)).await?;
// read index_size
@ -44,17 +68,16 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> IndexReader<R> {
self.source.read_exact(index_file_metas_size_buf).await?;
let index_file_metas_size = u32::from_le_bytes(*index_file_metas_size_buf) as u64;
// read index_file_metas
// read column index meta
let index_file_metas_offset =
SeekFrom::Start(end_offset - INDEX_FILE_METAS_SIZE_SIZE - index_file_metas_size);
self.source.seek(index_file_metas_offset).await?;
let index_file_metas_buf = &mut vec![0u8; index_file_metas_size as usize];
self.source.read_exact(index_file_metas_buf).await?;
let index_file_metas: IndexFileMetas = serde_json::from_slice(index_file_metas_buf)?;
Self::validate_meta(&index_file_metas, index_file_metas_size, end_offset)?;
let column_meta: ColumnIndexMeta = serde_json::from_slice(index_file_metas_buf)?;
Ok(Arc::new(index_file_metas))
Ok(Arc::new(column_meta))
}
pub async fn fst(&mut self, offset: u64, size: u32) -> Result<fst::Map<Vec<u8>>> {
@ -67,14 +90,9 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> IndexReader<R> {
})
}
pub async fn get_bitmap(
&mut self,
column_index_meta: &ColumnIndexMeta,
fst_val: u64,
) -> Result<BitVec> {
pub async fn get_bitmap(&mut self, fst_val: u64) -> Result<BitVec> {
let (relative_offset, size) = unpack_u32_pair(fst_val);
self.bitmap(column_index_meta.base_offset + relative_offset as u64, size)
.await
self.bitmap(relative_offset as u64, size).await
}
async fn bitmap(&mut self, offset: u64, size: u32) -> Result<BitVec> {
@ -89,34 +107,6 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> IndexReader<R> {
}
}
impl<R> IndexReader<R> {
fn validate_meta(
index_file_metas: &IndexFileMetas,
index_file_metas_size: u64,
end_offset: u64,
) -> Result<()> {
for col_meta in index_file_metas.metas.values() {
let ColumnIndexMeta {
base_offset,
index_size,
..
} = col_meta;
let limit = end_offset - INDEX_FILE_METAS_SIZE_SIZE - index_file_metas_size;
ensure!(
*base_offset + *index_size <= limit,
anyhow!(
"ColumnIndexMeta unexpected offset: {} and size: {}. IndexFileMetas size {}",
base_offset,
index_size,
index_file_metas_size
)
);
}
Ok(())
}
}
/// An automaton that matches if the input contains to a specific string.
///
/// ```rust
@ -192,56 +182,3 @@ impl<'a> fst::automaton::Automaton for Contains<'a> {
None
}
}
#[cfg(test)]
mod test {
use std::collections::HashMap;
use super::*;
#[test]
fn test_index_reader_validate_empty_meta() {
let index_file_metas = IndexFileMetas {
metas: HashMap::new(),
};
let index_file_metas_size = 0;
let end_offset = 0;
let result = IndexReader::<Vec<u8>>::validate_meta(
&index_file_metas,
index_file_metas_size,
end_offset,
);
assert!(result.is_ok());
}
#[test]
fn test_index_reader_validate_meta() {
let mut metas = HashMap::new();
metas.insert(
"col1".to_string(),
ColumnIndexMeta {
base_offset: 0,
index_size: 10,
relative_fst_offset: 10,
..Default::default()
},
);
metas.insert(
"col2".to_string(),
ColumnIndexMeta {
base_offset: 10,
index_size: 10,
relative_fst_offset: 10,
..Default::default()
},
);
let index_file_metas = IndexFileMetas { metas };
let index_file_metas_size = 0;
let end_offset = 30;
let result = IndexReader::<Vec<u8>>::validate_meta(
&index_file_metas,
index_file_metas_size,
end_offset,
);
assert!(result.is_ok());
}
}

View File

@ -4,10 +4,9 @@ use fst::{
automaton::{StartsWith, Str},
Automaton, IntoStreamer, Streamer,
};
use futures::{AsyncRead, AsyncSeek};
use super::{
reader::{Contains, IndexReader},
reader::{Contains, FieldReader},
ColumnIndexMeta,
};
@ -24,17 +23,14 @@ impl<'b> SubstringSearch<'b> {
}
}
pub async fn search<R>(&mut self, index_reader: &mut IndexReader<R>) -> Result<BitVec<u8>>
where
R: AsyncRead + AsyncSeek + Unpin + Send,
{
pub async fn search(&mut self, field_reader: &mut FieldReader) -> Result<BitVec<u8>> {
self.filter();
let matchers = self
.terms
.iter()
.map(|term| Contains::new(term))
.collect::<Vec<Contains>>();
inverted_index_search(index_reader, &matchers, self.meta).await
inverted_index_search(field_reader, &matchers, self.meta).await
}
fn filter(&mut self) {
@ -55,17 +51,14 @@ impl<'b> PrefixSearch<'b> {
}
}
pub async fn search<R>(&mut self, index_reader: &mut IndexReader<R>) -> Result<BitVec<u8>>
where
R: AsyncRead + AsyncSeek + Unpin + Send,
{
pub async fn search(&mut self, field_reader: &mut FieldReader) -> Result<BitVec<u8>> {
self.filter();
let matchers = self
.terms
.iter()
.map(|term| Str::new(term).starts_with())
.collect::<Vec<StartsWith<Str>>>();
inverted_index_search(index_reader, &matchers, self.meta).await
inverted_index_search(field_reader, &matchers, self.meta).await
}
fn filter(&mut self) {
@ -93,17 +86,14 @@ impl<'b> ExactSearch<'b> {
}
}
pub async fn search<R>(&mut self, index_reader: &mut IndexReader<R>) -> Result<BitVec<u8>>
where
R: AsyncRead + AsyncSeek + Unpin + Send,
{
pub async fn search(&mut self, field_reader: &mut FieldReader) -> Result<BitVec<u8>> {
self.filter();
let matchers = self
.terms
.iter()
.map(|term| Str::new(term))
.collect::<Vec<Str>>();
inverted_index_search(index_reader, &matchers, self.meta).await
inverted_index_search(field_reader, &matchers, self.meta).await
}
fn filter(&mut self) {
@ -118,16 +108,15 @@ impl<'b> ExactSearch<'b> {
}
}
pub async fn inverted_index_search<A, R>(
index_reader: &mut IndexReader<R>,
pub async fn inverted_index_search<A>(
index_reader: &mut FieldReader,
matchers: &[A],
column_index_meta: &ColumnIndexMeta,
) -> Result<BitVec<u8>>
where
R: AsyncRead + AsyncSeek + Unpin + Send,
A: Automaton,
{
let fst_offset = column_index_meta.base_offset + column_index_meta.relative_fst_offset as u64;
let fst_offset = column_index_meta.relative_fst_offset as u64;
let fst_size = column_index_meta.fst_size;
let fst_map = index_reader.fst(fst_offset, fst_size).await?;
let mut res = BitVec::<u8>::new();
@ -137,7 +126,7 @@ where
let mut stream = fst_map.search(matcher).into_stream();
// We do not care about the key at this point, only the offset
while let Some((_, value)) = stream.next() {
let bitmap = index_reader.get_bitmap(column_index_meta, value).await?;
let bitmap = index_reader.get_bitmap(value).await?;
// Resize if the res map is smaller than the bitmap
if res.len() < bitmap.len() {

View File

@ -55,7 +55,7 @@ impl ColumnIndexer {
pub fn push(&mut self, value: BytesRef<'_>, segment_id: usize, term_len: usize) {
let bitmap = self.sorter.entry(value.into()).or_default();
if segment_id >= bitmap.len() {
bitmap.resize(segment_id + 1, false);
bitmap.resize(segment_id + 64, false);
}
bitmap.set(segment_id, true);
@ -75,7 +75,8 @@ impl ColumnIndexer {
let min_val = self.sorter.keys().next().cloned();
let max_val = self.sorter.keys().next_back().cloned();
// 1. write bitmaps to writer
for (value, bitmap) in std::mem::take(&mut self.sorter) {
let sorter = std::mem::take(&mut self.sorter);
for (value, bitmap) in sorter {
self.append_value(value, bitmap, writer)?;
}
@ -84,12 +85,19 @@ impl ColumnIndexer {
writer.write_all(&fst_bytes)?;
// update meta
self.meta.relative_fst_offset = self.meta.index_size as _;
self.meta.fst_size = fst_bytes.len() as _;
self.meta.index_size += self.meta.fst_size as u64;
self.meta.index_size = 0;
self.meta.fst_size = fst_bytes.len() as u32;
self.meta.relative_fst_offset = writer.len() as u32 - self.meta.fst_size;
self.meta.min_val = min_val.unwrap_or_default();
self.meta.max_val = max_val.unwrap_or_default();
// write meta into writer buffer
let meta_bytes = serde_json::to_vec(&self.meta)?;
writer.write_all(&meta_bytes)?;
let metas_size = meta_bytes.len() as u32;
writer.write_all(&metas_size.to_le_bytes())?;
writer.flush()?;
Ok(self.meta)
}

View File

@ -28,14 +28,12 @@ pub const MIN_FILE_SIZE: u64 = MAGIC_SIZE + MIN_FOOTER_SIZE;
pub const FLAGS_SIZE: u64 = 4;
pub const FOOTER_PAYLOAD_SIZE_SIZE: u64 = 4;
pub const MIN_FOOTER_SIZE: u64 = MAGIC_SIZE + FLAGS_SIZE + FOOTER_PAYLOAD_SIZE_SIZE + MAGIC_SIZE; // without any blobs
// Version 1 of the inverted index blob
pub const BLOB_TYPE: &str = "o2_inverted_index_v1";
bitflags! {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct PuffinFooterFlags: u32 {
const DEFAULT = 0b00000000;
const COMPRESSED_ZSTD = 0b00000001;
const COMPRESSED = 0b00000001;
}
}
@ -43,7 +41,7 @@ bitflags! {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PuffinMeta {
/// Metadata for each blob in the file
pub blob_metadata: Vec<BlobMetadata>,
pub blobs: Vec<BlobMetadata>,
/// Storage for arbitrary meta-information, like writer identification/version
#[serde(default)]
@ -57,31 +55,31 @@ pub struct PuffinMeta {
pub struct BlobMetadata {
/// Blob type
#[serde(rename = "type")]
pub blob_type: String,
pub blob_type: BlobTypes,
/// Required by specs. Not used for InvertedIndex within OpenObserve
/// Required by specs
#[serde(default)]
pub fields: Vec<i32>,
pub fields: Vec<u32>,
/// Required by specs. Not used for InvertedIndex within OpenObserve
/// Required by specs
#[serde(default)]
pub snapshot_id: i64,
pub snapshot_id: u64,
/// Required by specs. Not used for InvertedIndex within OpenObserve
/// Required by specs
#[serde(default)]
pub sequence_number: i64,
pub sequence_number: u64,
/// The offset in the file where the blob contents start
pub offset: i64,
pub offset: u64,
/// The length of the blob stored in the file (after compression, if compressed)
pub length: i64,
pub length: u64,
/// Default to ZSTD compression for OpenObserve inverted index
#[serde(default, skip_serializing_if = "Option::is_none")]
pub compression_codec: Option<CompressionCodec>,
/// Additional meta information of the file. Not used for InvertedIndex within OpenObserve
/// Additional meta information of the file.
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub properties: HashMap<String, String>,
}
@ -93,30 +91,41 @@ pub enum CompressionCodec {
Zstd,
}
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub enum BlobTypes {
#[serde(rename = "apache-datasketches-theta-v1")]
ApacheDatasketchesThetaV1,
#[serde(rename = "deletion-vector-v1")]
DeletionVectorV1,
#[default]
#[serde(rename = "o2-fst-v1")]
O2FstV1,
}
#[derive(Default)]
pub struct BlobMetadataBuilder {
blob_type: Option<String>,
fields: Vec<i32>,
snapshot_id: i64,
sequence_number: i64,
offset: Option<i64>,
length: Option<i64>,
blob_type: Option<BlobTypes>,
fields: Vec<u32>,
snapshot_id: u64,
sequence_number: u64,
offset: Option<u64>,
length: Option<u64>,
compression_codec: Option<CompressionCodec>,
properties: HashMap<String, String>,
}
impl BlobMetadataBuilder {
pub fn blob_type(mut self, blob_type: String) -> Self {
pub fn blob_type(mut self, blob_type: BlobTypes) -> Self {
self.blob_type = Some(blob_type);
self
}
pub fn offset(mut self, offset: i64) -> Self {
pub fn offset(mut self, offset: u64) -> Self {
self.offset = Some(offset);
self
}
pub fn length(mut self, length: i64) -> Self {
pub fn length(mut self, length: u64) -> Self {
self.length = Some(length);
self
}
@ -133,7 +142,7 @@ impl BlobMetadataBuilder {
snapshot_id: self.snapshot_id,
sequence_number: self.sequence_number,
offset: self.offset.ok_or("offset is required")?,
length: self.length.ok_or("length is required")?,
length: self.length.unwrap_or_default(),
compression_codec: self.compression_codec,
properties: self.properties,
})

View File

@ -26,7 +26,6 @@ use crate::meta::puffin::{CompressionCodec, MIN_FILE_SIZE};
pub struct PuffinBytesReader<R> {
source: R,
metadata: Option<PuffinMeta>,
}
@ -46,23 +45,49 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> PuffinBytesReader<R> {
.seek(SeekFrom::Start(blob_metadata.offset as _))
.await?;
// decompress bytes since OpenObserve InvertedIndex compresses index data by default
ensure!(
blob_metadata.compression_codec == Some(CompressionCodec::Zstd),
anyhow!("Unexpected CompressionCodex found in BlobMetadata")
);
let mut compressed = vec![0u8; blob_metadata.length as usize];
self.source.read_exact(&mut compressed).await?;
let mut raw_data = vec![0u8; blob_metadata.length as usize];
self.source.read_exact(&mut raw_data).await?;
let mut decompressed = Vec::new();
let mut decoder = zstd::Decoder::new(&compressed[..])?;
decoder.read_to_end(&mut decompressed)?;
Ok(decompressed)
let data = match blob_metadata.compression_codec {
None => raw_data,
Some(CompressionCodec::Zstd) => {
let mut decompressed = Vec::new();
let mut decoder = zstd::Decoder::new(&raw_data[..])?;
decoder.read_to_end(&mut decompressed)?;
decompressed
}
Some(CompressionCodec::Lz4) => {
todo!("Lz4 decompression is not implemented yet")
}
};
Ok(data)
}
pub async fn get_metadata(&mut self) -> Result<PuffinMeta> {
if let Some(meta) = &self.metadata {
return Ok(meta.clone());
pub async fn get_field(&mut self, field: &str) -> Result<Option<BlobMetadata>> {
self.parse_footer().await?;
match self.metadata.as_ref() {
None => Err(anyhow!("Metadata not found")),
Some(v) => {
let Some(idx) = v.properties.get(field) else {
return Ok(None);
};
let idx = idx
.parse::<usize>()
.map_err(|_| anyhow!("Field not found"))?;
Ok(v.blobs.get(idx).cloned())
}
}
}
pub async fn get_metadata(&mut self) -> Result<Option<PuffinMeta>> {
self.parse_footer().await?;
Ok(self.metadata.clone())
}
pub async fn parse_footer(&mut self) -> Result<()> {
if self.metadata.is_some() {
return Ok(());
}
// check MAGIC
@ -83,9 +108,8 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> PuffinBytesReader<R> {
let puffin_meta = PuffinFooterBytesReader::new(&mut self.source, end_offset)
.parse()
.await?;
self.metadata = Some(puffin_meta.clone());
Ok(puffin_meta)
self.metadata = Some(puffin_meta);
Ok(())
}
}
@ -181,7 +205,7 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> PuffinFooterBytesReader<R> {
}
fn parse_payload(&self, bytes: &[u8]) -> Result<PuffinMeta> {
if self.flags.contains(PuffinFooterFlags::COMPRESSED_ZSTD) {
if self.flags.contains(PuffinFooterFlags::COMPRESSED) {
let decoder = zstd::Decoder::new(bytes)?;
serde_json::from_reader(decoder)
.map_err(|e| anyhow!("Error decompress footer payload {}", e.to_string()))
@ -195,18 +219,18 @@ impl<R: AsyncRead + AsyncSeek + Unpin + Send> PuffinFooterBytesReader<R> {
let puffin_metadata = self.metadata.as_ref().expect("metadata is not set");
let mut offset = MAGIC_SIZE;
for blob in &puffin_metadata.blob_metadata {
for blob in &puffin_metadata.blobs {
ensure!(
blob.offset as u64 == offset,
blob.offset == offset,
anyhow!("Blob payload offset mismatch")
);
offset += blob.length as u64;
offset += blob.length;
}
let payload_ends_at = puffin_metadata
.blob_metadata
.blobs
.last()
.map_or(MAGIC_SIZE, |blob| (blob.offset + blob.length) as u64);
.map_or(MAGIC_SIZE, |blob| blob.offset + blob.length);
ensure!(
payload_ends_at == self.head_magic_offset(),

View File

@ -22,7 +22,7 @@ use std::{
use anyhow::{Context, Result};
use super::{
BlobMetadata, BlobMetadataBuilder, CompressionCodec, PuffinFooterFlags, PuffinMeta, BLOB_TYPE,
BlobMetadata, BlobMetadataBuilder, BlobTypes, CompressionCodec, PuffinFooterFlags, PuffinMeta,
MAGIC, MAGIC_SIZE, MIN_FOOTER_SIZE,
};
@ -50,44 +50,53 @@ impl<W> PuffinBytesWriter<W> {
}
}
fn add_blob_metadata(
fn build_blob_metadata(
&self,
blob_type: String,
blob_type: BlobTypes,
compression_codec: Option<CompressionCodec>,
size: u64,
) -> BlobMetadata {
BlobMetadataBuilder::default()
.blob_type(blob_type)
.compression_codec(compression_codec)
.offset(self.written_bytes as _)
.length(size as _)
.build()
.expect("Missing required fields")
}
}
impl<W: io::Write> PuffinBytesWriter<W> {
pub fn add_blob(&mut self, raw_data: Vec<u8>) -> Result<()> {
pub fn add_blob(&mut self, field: String, raw_data: Vec<u8>) -> Result<()> {
self.add_header_if_needed()
.context("Error writing puffin header")?;
// compress blob raw data
let mut encoder = zstd::Encoder::new(vec![], 3)?;
encoder
.write_all(&raw_data)
.context("Error encoding blob raw data")?;
let compressed_bytes = encoder.finish()?;
let compressed_size = compressed_bytes.len() as u64;
self.writer.write_all(&compressed_bytes)?;
// build blob metadata
let mut metadata = self.build_blob_metadata(BlobTypes::O2FstV1, None);
// compress blob raw data
match metadata.compression_codec {
None => {
metadata.length = raw_data.len() as u64;
self.writer.write_all(&raw_data)?;
}
Some(CompressionCodec::Zstd) => {
let mut encoder = zstd::Encoder::new(vec![], 3)?;
encoder
.write_all(&raw_data)
.context("Error encoding blob raw data")?;
let compressed_bytes = encoder.finish()?;
self.writer.write_all(&compressed_bytes)?;
metadata.length = compressed_bytes.len() as u64;
}
Some(CompressionCodec::Lz4) => {
todo!("Lz4 compression is not implemented yet")
}
};
self.written_bytes += metadata.length;
self.properties
.insert(field, self.blobs_metadata.len().to_string());
self.blobs_metadata.push(metadata);
// add metadata for this blob
let blob_metadata = self.add_blob_metadata(
BLOB_TYPE.to_string(),
Some(CompressionCodec::Zstd),
compressed_size,
);
self.blobs_metadata.push(blob_metadata);
self.written_bytes += compressed_size;
Ok(())
}
@ -113,7 +122,6 @@ impl<W: io::Write> PuffinBytesWriter<W> {
mem::take(&mut self.properties),
)
.into_bytes()?;
self.writer.write_all(&footer_bytes)?;
self.written_bytes += footer_bytes.len() as u64;
Ok(())
@ -150,7 +158,7 @@ impl PuffinFooterWriter {
buf.extend_from_slice(&(payload_size as i32).to_le_bytes());
// flags
buf.extend_from_slice(&PuffinFooterFlags::COMPRESSED_ZSTD.bits().to_le_bytes());
buf.extend_from_slice(&PuffinFooterFlags::DEFAULT.bits().to_le_bytes());
// FootMagic
buf.extend_from_slice(&MAGIC);
@ -160,12 +168,9 @@ impl PuffinFooterWriter {
fn get_payload(&mut self) -> Result<Vec<u8>> {
let file_metdadata = PuffinMeta {
blob_metadata: mem::take(&mut self.blob_metadata),
blobs: mem::take(&mut self.blob_metadata),
properties: mem::take(&mut self.file_properties),
};
let mut encoder = zstd::Encoder::new(vec![], 3)?;
serde_json::to_writer(&mut encoder, &file_metdadata)?;
Ok(encoder.finish()?)
serde_json::to_vec(&file_metdadata).context("Error serializing puffin metadata")
}
}

View File

@ -708,6 +708,7 @@ impl From<&cluster_rpc::ScanStats> for ScanStats {
}
#[derive(Hash, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "lowercase")]
pub enum SearchEventType {
UI,
Dashboards,

View File

@ -15,14 +15,12 @@
use std::borrow::Cow;
use anyhow::{anyhow, ensure, Result};
use anyhow::Result;
use futures::io::Cursor;
use itertools::Itertools;
use crate::{
meta::{
inverted_index::reader::IndexReader, puffin::reader::PuffinBytesReader, stream::StreamType,
},
meta::{inverted_index::reader::IndexReader, stream::StreamType},
FILE_EXT_PARQUET, FILE_EXT_PUFFIN, INDEX_MIN_CHAR_LEN,
};
@ -70,16 +68,7 @@ pub fn unpack_u32_pair(packed: u64) -> (u32, u32) {
pub async fn create_index_reader_from_puffin_bytes(
buf: Vec<u8>,
) -> Result<IndexReader<Cursor<Vec<u8>>>> {
let mut puffin_reader = PuffinBytesReader::new(Cursor::new(buf));
let puffin_meta = puffin_reader.get_metadata().await?;
ensure!(
puffin_meta.blob_metadata.len() == 1,
anyhow!("InvertedIndex should only have one blob each puffin file")
);
let blob_bytes = puffin_reader
.read_blob_bytes(puffin_meta.blob_metadata.first().unwrap())
.await?;
Ok(IndexReader::new(Cursor::new(blob_bytes)))
Ok(IndexReader::new(Cursor::new(buf)))
}
/// FST inverted index solution has a 1:1 mapping between parquet and idx files.

View File

@ -24,7 +24,10 @@ use arrow::record_batch::RecordBatch;
use arrow_schema::Schema;
use futures::TryStreamExt;
use parquet::{
arrow::{arrow_reader::ArrowReaderMetadata, AsyncArrowWriter, ParquetRecordBatchStreamBuilder},
arrow::{
arrow_reader::ArrowReaderMetadata, async_reader::ParquetRecordBatchStream,
AsyncArrowWriter, ParquetRecordBatchStreamBuilder,
},
basic::{Compression, Encoding},
file::{metadata::KeyValue, properties::WriterProperties},
};
@ -115,25 +118,39 @@ pub fn parse_file_key_columns(key: &str) -> Result<(String, String, String), any
Ok((stream_key, date_key, file_name))
}
pub async fn read_recordbatch_from_bytes(
pub async fn get_recordbatch_reader_from_bytes(
data: &bytes::Bytes,
) -> Result<(Arc<Schema>, Vec<RecordBatch>), anyhow::Error> {
) -> Result<(Arc<Schema>, ParquetRecordBatchStream<Cursor<bytes::Bytes>>), anyhow::Error> {
let schema_reader = Cursor::new(data.clone());
let arrow_reader = ParquetRecordBatchStreamBuilder::new(schema_reader).await?;
let schema = arrow_reader.schema().clone();
let record_reader = arrow_reader.build()?;
let batches = record_reader.try_collect().await?;
let reader = arrow_reader.build()?;
Ok((schema, reader))
}
pub async fn get_recordbatch_reader_from_file(
path: &PathBuf,
) -> Result<(Arc<Schema>, ParquetRecordBatchStream<tokio::fs::File>), anyhow::Error> {
let file = tokio::fs::File::open(path).await?;
let arrow_reader = ParquetRecordBatchStreamBuilder::new(file).await?;
let schema = arrow_reader.schema().clone();
let reader = arrow_reader.build()?;
Ok((schema, reader))
}
pub async fn read_recordbatch_from_bytes(
data: &bytes::Bytes,
) -> Result<(Arc<Schema>, Vec<RecordBatch>), anyhow::Error> {
let (schema, reader) = get_recordbatch_reader_from_bytes(data).await?;
let batches = reader.try_collect().await?;
Ok((schema, batches))
}
pub async fn read_recordbatch_from_file(
path: &PathBuf,
) -> Result<(Arc<Schema>, Vec<RecordBatch>), anyhow::Error> {
let file = tokio::fs::File::open(path).await?;
let arrow_reader = ParquetRecordBatchStreamBuilder::new(file).await?;
let schema = arrow_reader.schema().clone();
let record_reader = arrow_reader.build()?;
let batches = record_reader.try_collect().await?;
let (schema, reader) = get_recordbatch_reader_from_file(path).await?;
let batches = reader.try_collect().await?;
Ok((schema, batches))
}

View File

@ -147,20 +147,26 @@ pub async fn search(
}
// set search event type
req.search_type = match get_search_type_from_request(&query) {
Ok(v) => v,
Err(e) => return Ok(MetaHttpResponse::bad_request(e)),
if req.search_type.is_none() {
req.search_type = match get_search_type_from_request(&query) {
Ok(v) => v,
Err(e) => return Ok(MetaHttpResponse::bad_request(e)),
};
};
req.search_event_context = req
.search_type
.as_ref()
.and_then(|event_type| get_search_event_context_from_request(event_type, &query));
if req.search_event_context.is_none() {
req.search_event_context = req
.search_type
.as_ref()
.and_then(|event_type| get_search_event_context_from_request(event_type, &query));
}
// set index_type
req.index_type = match get_index_type_from_request(&query) {
Ok(typ) => typ,
Err(e) => return Ok(MetaHttpResponse::bad_request(e)),
};
if req.index_type.is_empty() {
req.index_type = match get_index_type_from_request(&query) {
Ok(typ) => typ,
Err(e) => return Ok(MetaHttpResponse::bad_request(e)),
};
}
// get stream name
let stream_names = match resolve_stream_names(&req.query.sql) {

View File

@ -16,7 +16,7 @@
pub mod disk;
pub mod memory;
use std::collections::VecDeque;
use std::{collections::VecDeque, ops::Range};
use hashbrown::HashSet;
use hashlink::lru_cache::LruCache;
@ -133,6 +133,27 @@ pub async fn download(trace_id: &str, file: &str) -> Result<(), anyhow::Error> {
}
}
pub async fn get(file: &str, range: Option<Range<usize>>) -> Result<bytes::Bytes, anyhow::Error> {
let cfg = config::get_config();
// get from memory cache
if cfg.memory_cache.enabled {
if let Some(v) = memory::get(file, range.clone()).await {
return Ok(v);
}
}
// get from disk cache
if cfg.disk_cache.enabled {
if let Some(v) = disk::get(file, range.clone()).await {
return Ok(v);
}
}
// get from storage
match range {
Some(r) => crate::storage::get_range(file, r).await,
None => crate::storage::get(file).await,
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -1551,7 +1551,7 @@ pub async fn create_table_index() -> Result<()> {
&["stream", "date", "file"],
))
.await?;
log::warn!("[MYSQL] create table index(file_list_stream_file_idx) succeed");
log::warn!("[MYSQL] create table index(file_list_stream_file_idx) successfully");
}
Ok(())

View File

@ -1517,7 +1517,7 @@ pub async fn create_table_index() -> Result<()> {
&["stream", "date", "file"],
))
.await?;
log::warn!("[POSTGRES] create table index(file_list_stream_file_idx) succeed");
log::warn!("[POSTGRES] create table index(file_list_stream_file_idx) successfully");
}
Ok(())

View File

@ -1356,7 +1356,7 @@ pub async fn create_table_index() -> Result<()> {
&["stream", "date", "file"],
))
.await?;
log::warn!("[SQLITE] create table index(file_list_stream_file_idx) succeed");
log::warn!("[SQLITE] create table index(file_list_stream_file_idx) successfully");
}
// delete trigger for old version

View File

@ -13,6 +13,8 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::ops::Range;
use config::{get_config, is_local_disk_storage, metrics};
use datafusion::parquet::data_type::AsBytes;
use futures::{StreamExt, TryStreamExt};
@ -78,6 +80,11 @@ pub async fn get(file: &str) -> Result<bytes::Bytes, anyhow::Error> {
Ok(data)
}
pub async fn get_range(file: &str, range: Range<usize>) -> Result<bytes::Bytes, anyhow::Error> {
let data = DEFAULT.get_range(&file.into(), range).await?;
Ok(data)
}
pub async fn put(file: &str, data: bytes::Bytes) -> Result<(), anyhow::Error> {
if bytes_size_in_mb(&data) >= MULTI_PART_UPLOAD_DATA_SIZE {
put_multipart(file, data).await?;

View File

@ -131,7 +131,7 @@ async fn upload_file(path_str: &str, file_key: &str) -> Result<(), anyhow::Error
let result = storage::put(&new_file_key, bytes::Bytes::from(compressed_bytes)).await;
match result {
Ok(_output) => {
log::info!("[JOB] File_list upload succeeded: {}", new_file_key);
log::info!("[JOB] File_list upload successfully: {}", new_file_key);
Ok(())
}
Err(err) => {

View File

@ -24,7 +24,9 @@ use config::{
ider,
meta::stream::{FileMeta, StreamPartition, StreamPartitionType, StreamType},
utils::{
parquet::new_parquet_writer, record_batch_ext::concat_batches, schema::format_partition_key,
parquet::new_parquet_writer,
record_batch_ext::{concat_batches, RecordBatchExt},
schema::format_partition_key,
},
FILE_EXT_PARQUET,
};
@ -57,12 +59,10 @@ fn generate_index_file_name_from_compacted_file(
pub(crate) async fn write_parquet_index_to_disk(
batches: Vec<arrow::record_batch::RecordBatch>,
file_size: u64,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
file_name: &str,
caller: &str,
) -> Result<Vec<(String, FileMeta)>, anyhow::Error> {
let schema = if let Some(first_batch) = batches.first() {
first_batch.schema()
@ -71,7 +71,7 @@ pub(crate) async fn write_parquet_index_to_disk(
};
log::debug!(
"write_parquet_index_to_disk: batches row counts: {:?}",
"[JOB:IDX] write_parquet_index_to_disk: batches row counts: {:?}",
batches.iter().map(|b| b.num_rows()).sum::<usize>()
);
@ -97,22 +97,12 @@ pub(crate) async fn write_parquet_index_to_disk(
let mut ret = Vec::new();
for (prefix, batch) in partitioned_batches.into_iter() {
// write metadata
let batch_size = batch.size();
let mut file_meta = FileMeta {
min_ts: 0,
max_ts: 0,
records: 0,
original_size: file_size as i64,
compressed_size: 0,
flattened: false,
original_size: batch_size as i64,
..Default::default()
};
populate_file_meta(
schema.clone(),
vec![vec![batch.clone()]],
&mut file_meta,
Some("min_ts"),
Some("max_ts"),
)
.await?;
populate_file_meta(&[&batch], &mut file_meta, Some("min_ts"), Some("max_ts")).await?;
// write parquet file
let mut buf_parquet = Vec::new();
@ -129,24 +119,18 @@ pub(crate) async fn write_parquet_index_to_disk(
file_name,
&prefix,
);
log::info!(
"[JOB] IDX: write_to_disk: {}/{}/{} {} {} {}",
org_id,
stream_name,
stream_type,
new_idx_file_name,
file_name,
caller,
);
let store_file_name = new_idx_file_name.clone();
match storage::put(&store_file_name, bytes::Bytes::from(buf_parquet)).await {
Ok(_) => {
log::info!("[JOB] disk file upload succeeded: {}", &new_idx_file_name);
log::info!(
"[JOB:IDX] index file upload successfully: {}",
&new_idx_file_name
);
ret.push((new_idx_file_name, file_meta));
}
Err(err) => {
log::error!("[JOB] disk file upload error: {:?}", err);
log::error!("[JOB] index file upload error: {:?}", err);
return Err(anyhow::anyhow!(err));
}
}

File diff suppressed because it is too large Load Diff

View File

@ -388,7 +388,7 @@ pub async fn send_email(
let mut email = Message::builder()
.from(config.from_email.parse()?)
.subject(format!("{}", &email_details.title));
.subject(email_details.title.to_string());
for recipient in recipients {
email = email.to(recipient.parse()?);

View File

@ -355,7 +355,7 @@ pub trait AlertExt: Sync + Send + 'static {
start_time: Option<i64>,
) -> Result<(Option<Vec<Map<String, Value>>>, i64), anyhow::Error>;
/// Returns a tuple containing a boolean - if all the send notification jobs succeeded
/// Returns a tuple containing a boolean - if all the send notification jobs successfully
/// and the error message if any
async fn send_notification(
&self,
@ -556,7 +556,7 @@ pub async fn send_email_notification(
let mut email = Message::builder()
.from(cfg.smtp.smtp_from_email.parse()?)
.subject(format!("{}", email_subject));
.subject(email_subject.to_string());
for recipient in recipients {
email = email.to(recipient.parse()?);

View File

@ -327,7 +327,7 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> {
match storage::put(&file_name, compressed_bytes.into()).await {
Ok(_) => {
log::info!(
"[COMPACT] file_list merge succeed, {} files into a new file: {}",
"[COMPACT] file_list merge successfully, {} files into a new file: {}",
file_list.len(),
file_name
);

View File

@ -13,34 +13,37 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::HashMap, io::Write, sync::Arc};
use std::{io::Write, sync::Arc};
use ::datafusion::{arrow::datatypes::Schema, error::DataFusionError};
use arrow::array::RecordBatch;
use arrow_schema::{DataType, Field};
use bytes::Bytes;
use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc};
use config::{
cluster::LOCAL_NODE,
get_config, ider,
get_config, ider, is_local_disk_storage,
meta::{
inverted_index::InvertedIndexFormat,
search::StorageType,
stream::{FileKey, FileMeta, MergeStrategy, PartitionTimeLevel, StreamStats, StreamType},
},
metrics,
utils::{
json,
parquet::{
parse_file_key_columns, read_recordbatch_from_bytes, write_recordbatch_to_parquet,
get_recordbatch_reader_from_bytes, parse_file_key_columns, read_schema_from_bytes,
},
record_batch_ext::{concat_batches, merge_record_batches},
record_batch_ext::concat_batches,
schema_ext::SchemaExt,
time::hour_micros,
},
FILE_EXT_PARQUET,
};
use hashbrown::HashSet;
use hashbrown::{HashMap, HashSet};
use infra::{
cache, dist_lock, file_list as infra_file_list,
cache::file_data,
dist_lock, file_list as infra_file_list,
schema::{
get_stream_setting_bloom_filter_fields, get_stream_setting_fts_fields,
get_stream_setting_index_fields, unwrap_partition_time_level, unwrap_stream_settings,
@ -57,9 +60,7 @@ use crate::{
common::infra::cluster::get_node_by_uuid,
job::files::parquet::{generate_fst_inverted_index, generate_index_on_compactor},
service::{
db, file_list,
schema::generate_schema_for_defined_schema_fields,
search::datafusion::{self, file_type::FileType},
db, file_list, schema::generate_schema_for_defined_schema_fields, search::datafusion::exec,
stream,
},
};
@ -429,7 +430,8 @@ pub async fn merge_by_stream(
let cfg = get_config();
// sort by file size
let mut files_with_size = files_with_size.to_owned();
match MergeStrategy::from(&cfg.compact.strategy) {
let job_strategy = MergeStrategy::from(&cfg.compact.strategy);
match job_strategy {
MergeStrategy::FileSize => {
files_with_size.sort_by(|a, b| a.meta.original_size.cmp(&b.meta.original_size));
}
@ -451,7 +453,12 @@ pub async fn merge_by_stream(
for file in files_with_size.iter() {
if new_file_size + file.meta.original_size > cfg.compact.max_file_size as i64 {
if new_file_list.len() <= 1 {
break; // no files need to merge
if job_strategy == MergeStrategy::FileSize {
break;
}
new_file_size = 0;
new_file_list.clear();
continue; // this batch don't need to merge, skip
}
batch_groups.push(MergeBatch {
batch_id: batch_groups.len(),
@ -608,9 +615,7 @@ pub async fn merge_files(
let mut new_file_size = 0;
let mut new_compressed_file_size = 0;
let mut total_records = 0;
let mut new_file_list = Vec::new();
let mut deleted_files = Vec::new();
let cfg = get_config();
for file in files_with_size.iter() {
if new_file_size + file.meta.original_size > cfg.compact.max_file_size as i64
@ -621,7 +626,6 @@ pub async fn merge_files(
}
new_file_size += file.meta.original_size;
new_compressed_file_size += file.meta.compressed_size;
total_records += file.meta.records;
new_file_list.push(file.clone());
// metrics
metrics::COMPACT_MERGED_FILES
@ -638,40 +642,8 @@ pub async fn merge_files(
let retain_file_list = new_file_list.clone();
// write parquet files into tmpfs
let tmp_dir = cache::tmpfs::Directory::default();
let mut fi = 0;
for file in new_file_list.iter() {
fi += 1;
log::info!("[COMPACT:{thread_id}:{fi}] merge small file: {}", &file.key);
let data = match storage::get(&file.key).await {
Ok(body) => body,
Err(err) => {
log::error!(
"[COMPACT:{thread_id}] merge small file: {}, err: {}",
&file.key,
err
);
if err.to_string().to_lowercase().contains("not found") {
// delete file from file list
if let Err(err) = file_list::delete_parquet_file(&file.key, true).await {
log::error!(
"[COMPACT:{thread_id}] delete file: {}, from file_list err: {}",
&file.key,
err
);
}
}
// QUESTION(taiming): since parquet files deleted, should we delete previously
// created fst index files as well if there's any?
deleted_files.push(file.key.clone());
total_records -= file.meta.records;
new_file_size -= file.meta.original_size;
continue;
}
};
tmp_dir.set(&file.key, data)?;
}
// cache parquet files
let deleted_files = cache_remote_files(&new_file_list).await?;
if !deleted_files.is_empty() {
new_file_list.retain(|f| !deleted_files.contains(&f.key));
}
@ -682,7 +654,8 @@ pub async fn merge_files(
// get time range for these files
let min_ts = new_file_list.iter().map(|f| f.meta.min_ts).min().unwrap();
let max_ts = new_file_list.iter().map(|f| f.meta.max_ts).max().unwrap();
let total_records = new_file_list.iter().map(|f| f.meta.records).sum();
let new_file_size = new_file_list.iter().map(|f| f.meta.original_size).sum();
let mut new_file_meta = FileMeta {
min_ts,
max_ts,
@ -692,13 +665,16 @@ pub async fn merge_files(
flattened: false,
};
if new_file_meta.records == 0 {
return Err(anyhow::anyhow!("merge_parquet_files error: records is 0"));
return Err(anyhow::anyhow!("merge_files error: records is 0"));
}
// convert the file to the latest version of schema
// get latest version of schema
let schema_latest = infra::schema::get(org_id, stream_name, stream_type).await?;
let stream_setting = infra::schema::get_settings(org_id, stream_name, stream_type).await;
let (defined_schema_fields, need_original) = match stream_setting {
let stream_settings = infra::schema::get_settings(org_id, stream_name, stream_type).await;
let bloom_filter_fields = get_stream_setting_bloom_filter_fields(&stream_settings);
let full_text_search_fields = get_stream_setting_fts_fields(&stream_settings);
let index_fields = get_stream_setting_index_fields(&stream_settings);
let (defined_schema_fields, need_original) = match stream_settings {
Some(s) => (
s.defined_schema_fields.unwrap_or_default(),
s.store_original_data,
@ -717,102 +693,75 @@ pub async fn merge_files(
Arc::new(schema_latest)
};
let schema_versions =
infra::schema::get_versions(org_id, stream_name, stream_type, Some((min_ts, max_ts)))
.await?;
let schema_latest_id = schema_versions.len() - 1;
let schema_settings = unwrap_stream_settings(&schema_latest);
let bloom_filter_fields = get_stream_setting_bloom_filter_fields(&schema_settings);
let full_text_search_fields = get_stream_setting_fts_fields(&schema_settings);
let index_fields = get_stream_setting_index_fields(&schema_settings);
if cfg.common.widening_schema_evolution && schema_versions.len() > 1 {
for file in new_file_list.iter() {
// get the schema version of the file
let schema_ver_id = match db::schema::filter_schema_version_id(
&schema_versions,
file.meta.min_ts,
file.meta.max_ts,
) {
Some(id) => id,
None => {
log::error!(
"[COMPACT:{thread_id}] merge small file: {}, schema version not found, min_ts: {}, max_ts: {}",
&file.key,
file.meta.min_ts,
file.meta.max_ts
);
// HACK: use the latest version if not found in schema versions
schema_latest_id
}
};
if schema_ver_id == schema_latest_id {
continue;
}
// calculate the diff between latest schema and current schema
let schema = schema_versions[schema_ver_id]
.clone()
.with_metadata(HashMap::new());
let mut diff_fields = hashbrown::HashMap::new();
let cur_fields = schema.fields();
for field in cur_fields {
if let Ok(v) = schema_latest.field_with_name(field.name()) {
if v.data_type() != field.data_type() {
diff_fields.insert(v.name().clone(), v.data_type().clone());
}
}
}
if diff_fields.is_empty() {
continue;
}
// do the convert
let mut buf = Vec::new();
let file_tmp_dir = cache::tmpfs::Directory::default();
let file_data = tmp_dir.get(&file.key)?;
if file_data.is_empty() {
// delete file from file list
log::warn!("found invalid file: {}", file.key);
if let Err(err) = file_list::delete_parquet_file(&file.key, true).await {
log::error!(
"[COMPACT:{thread_id}] delete file: {}, from file_list err: {}",
&file.key,
err
);
}
return Err(anyhow::anyhow!("merge_files error: file data is empty"));
}
file_tmp_dir.set(&file.key, file_data)?;
datafusion::exec::convert_parquet_file(
file_tmp_dir.name(),
&mut buf,
Arc::new(schema),
&bloom_filter_fields,
diff_fields,
FileType::PARQUET,
)
.await
.map_err(|e| {
DataFusionError::Plan(format!("convert_parquet_file {}, err: {}", &file.key, e))
})?;
// replace the file in tmpfs
tmp_dir.set(&file.key, buf.into())?;
// read schema from parquet file and group files by schema
let mut schemas = HashMap::new();
let mut file_groups = HashMap::new();
let mut fi = 0;
for file in new_file_list.iter() {
fi += 1;
log::info!("[COMPACT:{thread_id}:{fi}] merge small file: {}", &file.key);
let buf = file_data::get(&file.key, None).await?;
let schema = read_schema_from_bytes(&buf).await?;
let schema = schema.as_ref().clone().with_metadata(Default::default());
let schema_key = schema.hash_key();
if !schemas.contains_key(&schema_key) {
schemas.insert(schema_key.clone(), schema);
file_groups.insert(schema_key.clone(), vec![]);
}
let entry = file_groups.get_mut(&schema_key).unwrap();
entry.push(file.clone());
}
// generate the final schema
let all_fields = schemas
.values()
.flat_map(|s| s.fields().iter().map(|f| f.name().to_string()))
.collect::<HashSet<_>>();
let schema_latest = Arc::new(schema_latest.retain(all_fields));
let mut schema_latest_fields = HashMap::with_capacity(schema_latest.fields().len());
for field in schema_latest.fields() {
schema_latest_fields.insert(field.name(), field);
}
// generate datafusion tables
let mut tables = Vec::new();
let trace_id = ider::generate();
for (schema_key, files) in file_groups {
if files.is_empty() {
continue;
}
let schema = schemas.get(&schema_key).unwrap().clone();
let session = config::meta::search::Session {
id: format!("{trace_id}-{schema_key}"),
storage_type: StorageType::Memory,
work_group: None,
target_partitions: 0,
};
let diff_fields = generate_schema_diff(&schema, &schema_latest_fields)?;
let table = exec::create_parquet_table(
&session,
schema_latest.clone(),
&files,
diff_fields,
true,
None,
)
.await?;
tables.push(table);
}
let start = std::time::Instant::now();
let merge_result = if stream_type.is_basic_type() {
merge_parquet_files(thread_id, tmp_dir.name(), schema_latest.clone()).await
} else {
datafusion::exec::merge_parquet_files(
tmp_dir.name(),
stream_type,
stream_name,
schema_latest.clone(),
)
.await
};
let (new_schema, new_batches) = merge_result.map_err(|e| {
let merge_result = exec::merge_parquet_files(
stream_type,
stream_name,
schema_latest.clone(),
tables,
&bloom_filter_fields,
&new_file_meta,
)
.await;
let (_new_schema, buf) = merge_result.map_err(|e| {
let files = new_file_list.into_iter().map(|f| f.key).collect::<Vec<_>>();
log::error!(
"merge_parquet_files err: {}, files: {:?}, schema: {:?}",
@ -824,13 +773,6 @@ pub async fn merge_files(
DataFusionError::Plan(format!("merge_parquet_files err: {:?}", e))
})?;
let buf = write_recordbatch_to_parquet(
new_schema.clone(),
&new_batches,
&bloom_filter_fields,
&new_file_meta,
)
.await?;
new_file_meta.compressed_size = buf.len() as i64;
if new_file_meta.compressed_size == 0 {
return Err(anyhow::anyhow!(
@ -841,7 +783,7 @@ pub async fn merge_files(
let id = ider::generate();
let new_file_key = format!("{prefix}/{id}{}", FILE_EXT_PARQUET);
log::info!(
"[COMPACT:{thread_id}] merge file succeeded, {} files into a new file: {}, original_size: {}, compressed_size: {}, took: {} ms",
"[COMPACT:{thread_id}] merge file successfully, {} files into a new file: {}, original_size: {}, compressed_size: {}, took: {} ms",
retain_file_list.len(),
new_file_key,
new_file_meta.original_size,
@ -849,89 +791,83 @@ pub async fn merge_files(
start.elapsed().as_millis(),
);
// upload file to storage
let buf = Bytes::from(buf);
// upload file
match storage::put(&new_file_key, buf.clone()).await {
Ok(_) => {
if cfg.common.inverted_index_enabled && stream_type.is_basic_type() {
// generate inverted index RecordBatch
if let Some(inverted_idx_batch) = generate_inverted_idx_recordbatch(
schema_latest.clone(),
&new_batches,
stream_type,
&full_text_search_fields,
&index_fields,
)? {
let index_format =
InvertedIndexFormat::from(&cfg.common.inverted_index_store_format);
if matches!(
index_format,
InvertedIndexFormat::Parquet | InvertedIndexFormat::Both
) {
let files = generate_index_on_compactor(
&retain_file_list,
inverted_idx_batch.clone(),
new_file_key.clone(),
org_id,
stream_type,
stream_name,
&full_text_search_fields,
&index_fields,
)
.await
.map_err(|e| {
anyhow::anyhow!(
"generate_index_on_compactor error: {}, need delete files: {:?}",
e,
retain_file_list
)
})?;
for (file_name, filemeta) in files {
log::info!(
"Created parquet index file during compaction {}",
file_name
);
// Notify that we wrote the index file to the db.
if let Err(e) = write_file_list(
org_id,
&[FileKey {
key: file_name.clone(),
meta: filemeta,
deleted: false,
segment_ids: None,
}],
)
.await
{
log::error!(
"generate_index_on_compactor write to file list: {}, error: {}, need delete files: {:?}",
file_name,
e.to_string(),
retain_file_list
);
}
}
}
if matches!(
index_format,
InvertedIndexFormat::FST | InvertedIndexFormat::Both
) {
// generate fst inverted index and write to storage
generate_fst_inverted_index(
inverted_idx_batch,
&new_file_key,
&full_text_search_fields,
&index_fields,
Some(&retain_file_list),
)
.await?;
}
}
}
Ok((new_file_key, new_file_meta, retain_file_list))
}
Err(e) => Err(e),
storage::put(&new_file_key, buf.clone()).await?;
if !cfg.common.inverted_index_enabled || !stream_type.is_basic_type() {
return Ok((new_file_key, new_file_meta, retain_file_list));
}
// generate parquet format inverted index
let index_format = InvertedIndexFormat::from(&cfg.common.inverted_index_store_format);
if matches!(
index_format,
InvertedIndexFormat::Parquet | InvertedIndexFormat::Both
) {
let (schema, mut reader) = get_recordbatch_reader_from_bytes(&buf).await?;
let files = generate_index_on_compactor(
&retain_file_list,
&new_file_key,
org_id,
stream_type,
stream_name,
&full_text_search_fields,
&index_fields,
schema,
&mut reader,
)
.await
.map_err(|e| {
anyhow::anyhow!(
"generate_index_on_compactor error: {}, need delete files: {:?}",
e,
retain_file_list
)
})?;
for (file_name, filemeta) in files {
log::info!("Created parquet index file during compaction {}", file_name);
// Notify that we wrote the index file to the db.
if let Err(e) = write_file_list(
org_id,
&[FileKey {
key: file_name.clone(),
meta: filemeta,
deleted: false,
segment_ids: None,
}],
)
.await
{
log::error!(
"generate_index_on_compactor write to file list: {}, error: {}, need delete files: {:?}",
file_name,
e.to_string(),
retain_file_list
);
}
}
}
// generate fst format inverted index
if matches!(
index_format,
InvertedIndexFormat::FST | InvertedIndexFormat::Both
) {
let (schema, mut reader) = get_recordbatch_reader_from_bytes(&buf).await?;
// generate fst inverted index and write to storage
generate_fst_inverted_index(
&new_file_key,
&full_text_search_fields,
&index_fields,
Some(&retain_file_list),
schema,
&mut reader,
)
.await?;
}
Ok((new_file_key, new_file_meta, retain_file_list))
}
async fn write_file_list(org_id: &str, events: &[FileKey]) -> Result<(), anyhow::Error> {
@ -1174,22 +1110,13 @@ pub fn generate_inverted_idx_recordbatch(
if inverted_idx_batches.is_empty() {
Ok(None)
} else {
let mut new_batch = if inverted_idx_batches.len() == 1 {
let new_batch = if inverted_idx_batches.len() == 1 {
inverted_idx_batches.remove(0)
} else {
let new_schema = inverted_idx_batches.first().unwrap().schema();
concat_batches(new_schema, inverted_idx_batches).map_err(anyhow::Error::from)?
};
let mut null_columns = 0;
for i in 0..new_batch.num_columns() {
let ni = i - null_columns;
if new_batch.column(ni).null_count() == new_batch.num_rows() {
new_batch.remove_column(ni);
null_columns += 1;
}
}
if matches!(
new_batch.schema().fields().len(),
0 | 1 if new_batch.schema().field(0).name() == &cfg.common.column_timestamp
@ -1201,57 +1128,85 @@ pub fn generate_inverted_idx_recordbatch(
}
}
pub async fn merge_parquet_files(
thread_id: usize,
trace_id: &str,
schema: Arc<Schema>,
) -> ::datafusion::error::Result<(Arc<Schema>, Vec<RecordBatch>)> {
let start = std::time::Instant::now();
async fn cache_remote_files(files: &[FileKey]) -> Result<Vec<String>, anyhow::Error> {
let cfg = get_config();
let scan_size = files.iter().map(|f| f.meta.compressed_size).sum::<i64>();
if is_local_disk_storage()
|| !cfg.disk_cache.enabled
|| scan_size >= cfg.disk_cache.skip_size as i64
{
return Ok(Vec::new());
};
// get record batches from tmpfs
let temp_files = infra::cache::tmpfs::list(trace_id, "parquet").map_err(|e| {
log::error!(
"[MERGE:JOB:{thread_id}] merge small files failed at getting temp files. Error {}",
e
);
DataFusionError::Execution(e.to_string())
})?;
let mut record_batches = Vec::new();
let mut shared_fields = HashSet::new();
for file in temp_files {
let bytes = infra::cache::tmpfs::get(&file.location).map_err(|e| {
log::error!(
"[MERGE:JOB:{thread_id}] merge small files failed at reading temp files to bytes. Error {}",
e
);
DataFusionError::Execution(e.to_string())
})?;
let (file_schema, batches) = read_recordbatch_from_bytes(&bytes).await.map_err(|e| {
log::error!("[MERGE:JOB:{thread_id}] read_recordbatch_from_bytes error");
log::error!(
"[MERGE:JOB:{thread_id}] read_recordbatch_from_bytes error for file: {}, err: {}",
file.location,
e
);
DataFusionError::Execution(e.to_string())
})?;
record_batches.extend(batches);
shared_fields.extend(file_schema.fields().iter().map(|f| f.name().to_string()));
let mut tasks = Vec::new();
let semaphore = std::sync::Arc::new(Semaphore::new(cfg.limit.query_thread_num));
for file in files.iter() {
let file_name = file.key.to_string();
let permit = semaphore.clone().acquire_owned().await.unwrap();
let task: tokio::task::JoinHandle<Option<String>> = tokio::task::spawn(async move {
let ret = if !file_data::disk::exist(&file_name).await {
file_data::disk::download("", &file_name).await.err()
} else {
None
};
// In case where the parquet file is not found or has no data, we assume that it
// must have been deleted by some external entity, and hence we
// should remove the entry from file_list table.
let file_name = if let Some(e) = ret {
if e.to_string().to_lowercase().contains("not found")
|| e.to_string().to_lowercase().contains("data size is zero")
{
// delete file from file list
log::warn!("found invalid file: {}", file_name);
if let Err(e) = file_list::delete_parquet_file(&file_name, true).await {
log::error!("[COMPACT] delete from file_list err: {}", e);
}
Some(file_name)
} else {
log::warn!("[COMPACT] download file to cache err: {}", e);
None
}
} else {
None
};
drop(permit);
file_name
});
tasks.push(task);
}
// create new schema with the shared fields
let schema = Arc::new(schema.retain(shared_fields));
let mut delete_files = Vec::new();
for task in tasks {
match task.await {
Ok(file) => {
if let Some(file) = file {
delete_files.push(file);
}
}
Err(e) => {
log::error!("[COMPACT] load file task err: {}", e);
}
}
}
// merge record batches, the record batch have same schema
let (schema, new_record_batches) =
merge_record_batches("MERGE", thread_id, schema, record_batches)?;
log::info!(
"[MERGE:JOB:{thread_id}] merge_parquet_files took {} ms",
start.elapsed().as_millis()
);
Ok((schema, vec![new_record_batches]))
Ok(delete_files)
}
// generate parquet file compact schema
fn generate_schema_diff(
schema: &Schema,
schema_latest_map: &HashMap<&String, &Arc<Field>>,
) -> Result<HashMap<String, DataType>, anyhow::Error> {
// calculate the diff between latest schema and group schema
let mut diff_fields = HashMap::new();
for field in schema.fields().iter() {
if let Some(latest_field) = schema_latest_map.get(field.name()) {
if field.data_type() != latest_field.data_type() {
diff_fields.insert(field.name().clone(), latest_field.data_type().clone());
}
}
}
Ok(diff_fields)
}

View File

@ -379,7 +379,7 @@ impl Report {
let mut email = Message::builder()
.from(cfg.smtp.smtp_from_email.parse()?)
.subject(format!("{}", &self.title));
.subject(self.title.to_string());
for recipient in recipients {
email = email.to(recipient.parse()?);

View File

@ -97,7 +97,7 @@ pub async fn cache(prefix: &str, force: bool) -> Result<(), anyhow::Error> {
stats.caching_time = start.elapsed().as_millis() as usize;
log::info!(
"Load file_list [{prefix}] load {}:{} done, download: {}ms, caching: {}ms",
"Load file_list [{prefix}] load {}:{} done, download: {} ms, caching: {} ms",
files_num,
stats.file_count,
stats.download_time,

View File

@ -134,6 +134,12 @@ pub async fn search(
return Err(Error::Message("no querier node online".to_string()));
}
log::info!(
"[trace_id {trace_id}] flight->search: get nodes num: {}, querier num: {}",
nodes.len(),
querier_num,
);
// waiting in work group queue
metrics::QUERY_PENDING_NUMS
.with_label_values(&[&req.org_id])

View File

@ -26,14 +26,11 @@ use config::{
PARQUET_BATCH_SIZE,
};
use datafusion::{
arrow::{
datatypes::{DataType, Schema},
record_batch::RecordBatch,
},
arrow::datatypes::{DataType, Schema},
catalog::TableProvider,
common::Column,
datasource::{
file_format::{json::JsonFormat, parquet::ParquetFormat},
file_format::parquet::ParquetFormat,
listing::{ListingOptions, ListingTableConfig, ListingTableUrl},
object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry},
},
@ -47,8 +44,10 @@ use datafusion::{
},
logical_expr::AggregateUDF,
optimizer::OptimizerRule,
physical_plan::execute_stream,
prelude::{Expr, SessionContext},
};
use futures::TryStreamExt;
use hashbrown::HashMap;
#[cfg(feature = "enterprise")]
use o2_enterprise::enterprise::{
@ -59,112 +58,26 @@ use super::{
file_type::{FileType, GetExt},
optimizer::join_reorder::JoinReorderRule,
storage::file_list,
table_provider::NewListingTable,
table_provider::{uniontable::NewUnionTable, NewListingTable},
udf::transform_udf::get_all_transform,
};
const DATAFUSION_MIN_MEM: usize = 1024 * 1024 * 256; // 256MB
const DATAFUSION_MIN_PARTITION: usize = 2; // CPU cores
pub async fn convert_parquet_file(
trace_id: &str,
buf: &mut Vec<u8>,
schema: Arc<Schema>,
bloom_filter_fields: &[String],
rules: HashMap<String, DataType>,
file_type: FileType,
) -> Result<()> {
let start = std::time::Instant::now();
let cfg = get_config();
let query_sql = format!(
"SELECT * FROM tbl ORDER BY {} DESC",
cfg.common.column_timestamp
); // select_wildcard -> without_optimizer
// query data
let ctx = prepare_datafusion_context(None, vec![], false, 0).await?;
// Configure listing options
let listing_options = match file_type {
FileType::PARQUET => {
let file_format = ParquetFormat::default();
ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext())
.with_target_partitions(ctx.state().config().target_partitions())
}
FileType::JSON => {
let file_format = JsonFormat::default();
ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::JSON.get_ext())
.with_target_partitions(ctx.state().config().target_partitions())
}
_ => {
return Err(DataFusionError::Execution(format!(
"Unsupported file type scheme {file_type:?}",
)));
}
};
let prefix = match ListingTableUrl::parse(format!("tmpfs:///{trace_id}/")) {
Ok(url) => url,
Err(e) => {
return Err(datafusion::error::DataFusionError::Execution(format!(
"ListingTableUrl error: {e}"
)));
}
};
let config = ListingTableConfig::new(prefix)
.with_listing_options(listing_options)
.with_schema(schema.clone());
let table = NewListingTable::try_new(config, rules)?;
ctx.register_table("tbl", Arc::new(table))?;
// get all sorted data
let df = match ctx.sql(&query_sql).await {
Ok(df) => df,
Err(e) => {
log::error!(
"convert sql execute failed, sql: {}, err: {:?}",
query_sql,
e
);
return Err(e);
}
};
let schema: Schema = df.schema().into();
let schema = Arc::new(schema);
let batches = df.collect().await?;
let file_meta = FileMeta::default();
let mut writer = new_parquet_writer(buf, &schema, bloom_filter_fields, &file_meta);
for batch in batches {
writer.write(&batch).await?;
}
writer.close().await?;
ctx.deregister_table("tbl")?;
drop(ctx);
log::info!(
"convert_parquet_file took {} ms",
start.elapsed().as_millis()
);
Ok(())
}
pub async fn merge_parquet_files(
trace_id: &str,
stream_type: StreamType,
stream_name: &str,
schema: Arc<Schema>,
) -> Result<(Arc<Schema>, Vec<RecordBatch>)> {
tables: Vec<Arc<dyn TableProvider>>,
bloom_filter_fields: &[String],
metadata: &FileMeta,
) -> Result<(Arc<Schema>, Vec<u8>)> {
let start = std::time::Instant::now();
let cfg = get_config();
// get all sorted data
let query_sql = if stream_type == StreamType::Index {
let sql = if stream_type == StreamType::Index {
format!(
"SELECT * FROM tbl WHERE file_name NOT IN (SELECT file_name FROM tbl WHERE deleted IS TRUE) ORDER BY {} DESC",
cfg.common.column_timestamp
@ -185,24 +98,40 @@ pub async fn merge_parquet_files(
};
// create datafusion context
let ctx = prepare_datafusion_context(None, vec![], false, 0).await?;
let sort_by_timestamp_desc = true;
let target_partitions = cfg.limit.cpu_num;
let ctx =
prepare_datafusion_context(None, vec![], sort_by_timestamp_desc, target_partitions).await?;
// register union table
let union_table = Arc::new(NewUnionTable::try_new(schema.clone(), tables)?);
ctx.register_table("tbl", union_table)?;
// Configure listing options
let file_format = ParquetFormat::default();
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(FileType::PARQUET.get_ext())
.with_target_partitions(ctx.state().config().target_partitions());
let prefix = ListingTableUrl::parse(format!("tmpfs:///{trace_id}/"))?;
let config = ListingTableConfig::new(prefix)
.with_listing_options(listing_options)
.with_schema(schema.clone());
let table = Arc::new(NewListingTable::try_new(config, HashMap::default())?);
ctx.register_table("tbl", table.clone())?;
let plan = ctx.state().create_logical_plan(&sql).await?;
let physical_plan = ctx.state().create_physical_plan(&plan).await?;
let schema = physical_plan.schema();
let df = ctx.sql(&query_sql).await?;
let schema: Schema = df.schema().into();
let schema = Arc::new(schema);
let batches = df.collect().await?;
// write result to parquet file
let mut buf = Vec::new();
let mut writer = new_parquet_writer(&mut buf, &schema, bloom_filter_fields, metadata);
let mut batch_stream = execute_stream(physical_plan, ctx.task_ctx())?;
loop {
match batch_stream.try_next().await {
Ok(Some(batch)) => {
if let Err(e) = writer.write(&batch).await {
log::error!("merge_parquet_files write Error: {}", e);
return Err(e.into());
}
}
Ok(None) => {
break;
}
Err(e) => {
log::error!("merge_parquet_files execute stream Error: {}", e);
return Err(e);
}
}
}
writer.close().await?;
ctx.deregister_table("tbl")?;
drop(ctx);
@ -212,7 +141,7 @@ pub async fn merge_parquet_files(
start.elapsed().as_millis()
);
Ok((schema, batches))
Ok((schema, buf))
}
pub fn create_session_config(

View File

@ -135,7 +135,7 @@ pub async fn search(
)
.await?;
log::info!(
"[trace_id {}] search->storage: stream {}/{}/{}, FST inverted index reduced file_list num to {} in {}ms",
"[trace_id {}] search->storage: stream {}/{}/{}, FST inverted index reduced file_list num to {} in {} ms",
query.trace_id,
query.org_id,
query.stream_type,
@ -516,13 +516,13 @@ async fn filter_file_list_by_inverted_index(
let full_text_term_clone = full_text_terms.clone();
let index_terms_clone = index_terms.clone();
let file_name = file.clone();
let trace_id_clone = query.trace_id.to_string();
let trace_id = query.trace_id.to_string();
let permit = semaphore.clone().acquire_owned().await.unwrap();
// Spawn a task for each file, wherein full text search and
// secondary index search queries are executed
let task = tokio::task::spawn(async move {
let res = inverted_index_search_in_file(
trace_id_clone.as_str(),
&trace_id,
&file_name,
full_text_term_clone,
index_terms_clone,
@ -535,10 +535,10 @@ async fn filter_file_list_by_inverted_index(
tasks.push(task)
}
for result in try_join_all(tasks)
let results = try_join_all(tasks)
.await
.map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(e.to_string())))?
{
.map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(e.to_string())))?;
for result in results {
// Each result corresponds to a file in the file list
match result {
Ok((file_name, bitvec)) => {
@ -551,8 +551,8 @@ async fn filter_file_list_by_inverted_index(
// we expect each file name has atleast 1 file
.unwrap();
file.segment_ids = Some(res.clone().into_vec());
log::info!(
"[trace_id {}] search->storage: Final bitmap for fts_terms {:?} and index_terms: {:?} length {}",
log::debug!(
"[trace_id {}] search->storage: Final bitmap for fts_terms {:?} and index_terms: {:?} total: {}",
query.trace_id,
*full_text_terms,
index_terms,
@ -560,7 +560,7 @@ async fn filter_file_list_by_inverted_index(
);
} else {
// if the bitmap is empty then we remove the file from the list
log::info!(
log::debug!(
"[trace_id {}] search->storage: no match found in index for file {}",
query.trace_id,
file_name
@ -628,26 +628,23 @@ async fn inverted_index_search_in_file(
Ok(bytes) => bytes,
};
let mut index_reader = create_index_reader_from_puffin_bytes(compressed_index_blob).await?;
let file_meta = index_reader.metadata().await?;
let mut res = BitVec::new();
if let Some(column_index_meta) = &file_meta.metas.get(INDEX_FIELD_NAME_FOR_ALL) {
// TODO: Add Eq and check performance
let mut index_reader = create_index_reader_from_puffin_bytes(compressed_index_blob).await?;
if let Some(mut field_reader) = index_reader.field(INDEX_FIELD_NAME_FOR_ALL).await? {
let column_index_meta = field_reader.metadata().await?;
let matched_bv = match cfg.common.full_text_search_type.as_str() {
"eq" => {
let mut searcher = ExactSearch::new(fts_terms.as_ref(), column_index_meta);
searcher.search(&mut index_reader).await
let mut searcher = ExactSearch::new(fts_terms.as_ref(), &column_index_meta);
searcher.search(&mut field_reader).await
}
"contains" => {
let mut searcher = SubstringSearch::new(fts_terms.as_ref(), column_index_meta);
searcher.search(&mut index_reader).await
let mut searcher = SubstringSearch::new(fts_terms.as_ref(), &column_index_meta);
searcher.search(&mut field_reader).await
}
// Default to prefix search
_ => {
let mut searcher = PrefixSearch::new(fts_terms.as_ref(), column_index_meta);
searcher.search(&mut index_reader).await
let mut searcher = PrefixSearch::new(fts_terms.as_ref(), &column_index_meta);
searcher.search(&mut field_reader).await
}
};
@ -670,9 +667,10 @@ async fn inverted_index_search_in_file(
if !index_terms.is_empty() {
for (col, index_terms) in index_terms.iter() {
if let Some(column_index_meta) = file_meta.metas.get(col) {
let mut secondary_index_match = ExactSearch::new(index_terms, column_index_meta);
match secondary_index_match.search(&mut index_reader).await {
if let Some(mut field_reader) = index_reader.field(col).await? {
let column_index_meta = field_reader.metadata().await?;
let mut secondary_index_match = ExactSearch::new(index_terms, &column_index_meta);
match secondary_index_match.search(&mut field_reader).await {
Ok(bitmap) => {
if res.len() < bitmap.len() {
res.resize(bitmap.len(), false);

View File

@ -220,7 +220,7 @@ impl std::fmt::Display for Sql {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"sql: {}, time_range: {:?}, stream: {}/{:?}/{:?}, match_items: {:?}, equal_items: {:?}, prefix_items: {:?}, aliases: {:?}, limit: {}, offset: {}, group_by: {:?}, order_by: {:?}, histogram_interval: {:?}, sorted_by_time: {}",
"sql: {}, time_range: {:?}, stream: {}/{}/{:?}, match_items: {:?}, equal_items: {:?}, prefix_items: {:?}, aliases: {:?}, limit: {}, offset: {}, group_by: {:?}, order_by: {:?}, histogram_interval: {:?}, sorted_by_time: {}",
self.sql,
self.time_range,
self.org_id,