From df0a4a79aeeeb4cdd2cccfee8c1fcaa248b3a28c Mon Sep 17 00:00:00 2001 From: Hengfei Yang Date: Thu, 7 Nov 2024 17:30:10 +0800 Subject: [PATCH] feat: refactor merge file on compactor (#4971) --- deploy/build/buildspec-tag-arm64-pi.yml | 1 + src/cli/basic/cli.rs | 6 +- src/common/utils/stream.rs | 102 +-- src/config/src/config.rs | 2 +- src/config/src/meta/inverted_index/mod.rs | 37 - src/config/src/meta/inverted_index/reader.rs | 133 +-- src/config/src/meta/inverted_index/search.rs | 33 +- src/config/src/meta/inverted_index/writer.rs | 18 +- src/config/src/meta/puffin/mod.rs | 57 +- src/config/src/meta/puffin/reader.rs | 72 +- src/config/src/meta/puffin/writer.rs | 63 +- src/config/src/meta/search.rs | 1 + src/config/src/utils/inverted_index.rs | 17 +- src/config/src/utils/parquet.rs | 37 +- src/handler/http/request/search/mod.rs | 28 +- src/infra/src/cache/file_data/mod.rs | 23 +- src/infra/src/file_list/mysql.rs | 2 +- src/infra/src/file_list/postgres.rs | 2 +- src/infra/src/file_list/sqlite.rs | 2 +- src/infra/src/storage/mod.rs | 7 + src/job/file_list.rs | 2 +- src/job/files/idx.rs | 42 +- src/job/files/parquet.rs | 905 +++++++++---------- src/report_server/src/report.rs | 2 +- src/service/alerts/alert.rs | 4 +- src/service/compact/file_list.rs | 2 +- src/service/compact/merge.rs | 541 +++++------ src/service/dashboards/reports.rs | 2 +- src/service/db/file_list/remote.rs | 2 +- src/service/search/cluster/flight.rs | 6 + src/service/search/datafusion/exec.rs | 157 +--- src/service/search/grpc/storage.rs | 46 +- src/service/search/sql.rs | 2 +- 33 files changed, 1073 insertions(+), 1283 deletions(-) diff --git a/deploy/build/buildspec-tag-arm64-pi.yml b/deploy/build/buildspec-tag-arm64-pi.yml index 44a0140a2..f15d5568f 100644 --- a/deploy/build/buildspec-tag-arm64-pi.yml +++ b/deploy/build/buildspec-tag-arm64-pi.yml @@ -17,6 +17,7 @@ phases: - GIT_TAG="$(git describe --tags --abbrev=0)" # disable gxhash - sed -i 's/default = \[\"gxhash\"\]/default = []/g' src/config/Cargo.toml + - sed -i 's/+aes,//g' .cargo/config.toml # std version - docker build -t openobserve:latest-arm64 -f deploy/build/Dockerfile.tag.aarch64 . diff --git a/src/cli/basic/cli.rs b/src/cli/basic/cli.rs index dedc481db..674e89efd 100644 --- a/src/cli/basic/cli.rs +++ b/src/cli/basic/cli.rs @@ -132,7 +132,7 @@ pub async fn cli() -> Result { match command.get_one::("path") { Some(path) => { set_permission(path, 0o777)?; - println!("init dir {} succeeded", path); + println!("init dir {} successfully", path); } None => { return Err(anyhow::anyhow!("please set data path")); @@ -263,7 +263,7 @@ pub async fn cli() -> Result { let file = command.get_one::("file").unwrap(); match file_list::delete_parquet_file(file, true).await { Ok(_) => { - println!("delete parquet file {} succeeded", file); + println!("delete parquet file {} successfully", file); } Err(e) => { println!("delete parquet file {} failed, error: {}", file, e); @@ -291,6 +291,6 @@ pub async fn cli() -> Result { log::error!("waiting for db close failed, error: {}", e); } - println!("command {name} execute succeeded"); + println!("command {name} execute successfully"); Ok(true) } diff --git a/src/common/utils/stream.rs b/src/common/utils/stream.rs index 3eecf6eb6..c809d01c8 100644 --- a/src/common/utils/stream.rs +++ b/src/common/utils/stream.rs @@ -13,23 +13,15 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::{ - io::{Error, ErrorKind}, - sync::Arc, -}; +use std::io::{Error, ErrorKind}; use actix_web::HttpResponse; +use arrow::array::{Int64Array, RecordBatch}; use config::{ get_config, meta::stream::{FileMeta, StreamType}, - utils::{arrow::record_batches_to_json_rows, json}, FILE_EXT_JSON, }; -use datafusion::{ - arrow::{datatypes::Schema, record_batch::RecordBatch}, - datasource::MemTable, - prelude::SessionContext, -}; #[inline(always)] pub fn stream_type_query_param_error() -> Result { @@ -72,61 +64,59 @@ pub fn get_file_name_v1(org_id: &str, stream_name: &str, suffix: u32) -> String } pub async fn populate_file_meta( - schema: Arc, - batch: Vec>, + batches: &[&RecordBatch], file_meta: &mut FileMeta, min_field: Option<&str>, max_field: Option<&str>, ) -> Result<(), anyhow::Error> { - if schema.fields().is_empty() || batch.is_empty() { + if batches.is_empty() { return Ok(()); } let cfg = get_config(); let min_field = min_field.unwrap_or_else(|| cfg.common.column_timestamp.as_str()); let max_field = max_field.unwrap_or_else(|| cfg.common.column_timestamp.as_str()); - let ctx = SessionContext::new(); - let provider = MemTable::try_new(schema, batch)?; - ctx.register_table("temp", Arc::new(provider))?; - let sql = format!( - "SELECT min({}) as min, max({}) as max, count(*) as num_records FROM temp;", - min_field, max_field - ); - let df = ctx.sql(sql.as_str()).await?; - let batches = df.collect().await?; - let batches_ref: Vec<&RecordBatch> = batches.iter().collect(); - let json_rows = record_batches_to_json_rows(&batches_ref)?; - let mut result: Vec = json_rows.into_iter().map(json::Value::Object).collect(); - if result.is_empty() { - return Ok(()); + let total = batches.iter().map(|batch| batch.num_rows()).sum::(); + let mut min_val = i64::MAX; + let mut max_val = 0; + for batch in batches.iter() { + let num_row = batch.num_rows(); + let Some(min_field) = batch.column_by_name(min_field) else { + return Err(anyhow::anyhow!("No min_field found: {}", min_field)); + }; + let Some(max_field) = batch.column_by_name(max_field) else { + return Err(anyhow::anyhow!("No max_field found: {}", max_field)); + }; + let min_col = min_field.as_any().downcast_ref::().unwrap(); + let max_col = max_field.as_any().downcast_ref::().unwrap(); + for i in 0..num_row { + let val = min_col.value(i); + if val < min_val { + min_val = val; + } + let val = max_col.value(i); + if val > max_val { + max_val = val; + } + } } - let record = result.pop().expect("No record found"); - if record.is_null() { - return Ok(()); + if min_val == i64::MAX { + min_val = 0; } - file_meta.min_ts = record - .get("min") - .expect("No field found: min") - .as_i64() - .expect("No value found: min"); - file_meta.max_ts = record - .get("max") - .expect("No field found: max") - .as_i64() - .expect("No value found: max"); - file_meta.records = record - .get("num_records") - .expect("No field found: num_records") - .as_i64() - .expect("No value found: num_records"); + + file_meta.min_ts = min_val; + file_meta.max_ts = max_val; + file_meta.records = total as i64; Ok(()) } #[cfg(test)] mod tests { + use std::sync::Arc; + use datafusion::arrow::{ - array::{Int64Array, StringArray}, - datatypes::{DataType, Field}, + array::StringArray, + datatypes::{DataType, Field, Schema}, }; use super::*; @@ -173,7 +163,7 @@ mod tests { // define data. let batch = RecordBatch::try_new( - schema.clone(), + schema, vec![ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), Arc::new(Int64Array::from(vec![1, 2, 1, 2])), @@ -190,7 +180,7 @@ mod tests { compressed_size: 700, flattened: false, }; - populate_file_meta(schema, vec![vec![batch]], &mut file_meta, None, None) + populate_file_meta(&[&batch], &mut file_meta, None, None) .await .unwrap(); assert_eq!(file_meta.records, 4); @@ -209,7 +199,7 @@ mod tests { // define data. let batch = RecordBatch::try_new( - schema.clone(), + schema, vec![ Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), Arc::new(Int64Array::from(vec![1, 2, 1, 2])), @@ -226,15 +216,9 @@ mod tests { compressed_size: 700, flattened: false, }; - populate_file_meta( - schema, - vec![vec![batch]], - &mut file_meta, - Some("time"), - Some("time"), - ) - .await - .unwrap(); + populate_file_meta(&[&batch], &mut file_meta, Some("time"), Some("time")) + .await + .unwrap(); assert_eq!(file_meta.records, 4); assert_eq!(file_meta.min_ts, val - 100); } diff --git a/src/config/src/config.rs b/src/config/src/config.rs index 7f3944fca..50ddd3e70 100644 --- a/src/config/src/config.rs +++ b/src/config/src/config.rs @@ -1587,7 +1587,7 @@ fn check_common_config(cfg: &mut Config) -> Result<(), anyhow::Error> { } if !["both", "parquet"].contains(&cfg.common.inverted_index_store_format.as_str()) { return Err(anyhow::anyhow!( - "ZO_INVERTED_INDEX_SEARCH_FORMAT must be one of both, parquet." + "ZO_INVERTED_INDEX_STORE_FORMAT must be one of both, parquet." )); } if cfg.common.inverted_index_store_format != "both" { diff --git a/src/config/src/meta/inverted_index/mod.rs b/src/config/src/meta/inverted_index/mod.rs index 6dbac7d86..450b1bf1e 100644 --- a/src/config/src/meta/inverted_index/mod.rs +++ b/src/config/src/meta/inverted_index/mod.rs @@ -17,8 +17,6 @@ pub mod reader; pub mod search; pub mod writer; -use std::{collections::HashMap, io::Write}; - use anyhow::Result; use serde::{Deserialize, Serialize}; @@ -27,43 +25,8 @@ const INDEX_FILE_METAS_SIZE_SIZE: u64 = 4; type Bytes = Vec; type BytesRef<'a> = &'a [u8]; -/// Tracks and consumes [`ColumnIndexMeta`] after each selected column within a parquet file -/// is indexed via [`ColumnIndexer`]. -/// The aggregated metas is then compressed and written to buffer for writing to file system. -#[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] -pub struct IndexFileMetas { - #[serde(default)] - pub metas: HashMap, -} - -impl IndexFileMetas { - pub fn new() -> Self { - Self::default() - } - - /// Writes aggregated [`ColumnIndexMeta`] into writer and compresses all written bytes. - /// Returns the length of bytes written to writer. - pub fn finish(&self, writer: &mut Vec) -> Result { - if self.metas.is_empty() { - return Ok(0u64); - } - let original_size = writer.len() as u64; - let meta_bytes = serde_json::to_vec(&self)?; - writer.write_all(&meta_bytes)?; - let metas_size = meta_bytes.len() as u32; - writer.write_all(&metas_size.to_le_bytes())?; - writer.flush()?; - let new_size = writer.len() as u64; - - Ok(new_size - original_size) - } -} - #[derive(Debug, Clone, PartialEq, Default, Serialize, Deserialize)] pub struct ColumnIndexMeta { - // base byte offset for this column index date within the index file (multiple column indices) - #[serde(default)] - pub base_offset: u64, // total byte size of this column index date #[serde(default)] pub index_size: u64, diff --git a/src/config/src/meta/inverted_index/reader.rs b/src/config/src/meta/inverted_index/reader.rs index 5f4595706..ef71afdb4 100644 --- a/src/config/src/meta/inverted_index/reader.rs +++ b/src/config/src/meta/inverted_index/reader.rs @@ -16,25 +16,49 @@ /// interface use std::{io::SeekFrom, sync::Arc}; -use anyhow::{anyhow, ensure, Result}; -use futures::{AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; +use anyhow::{anyhow, Result}; +use futures::{io::Cursor, AsyncRead, AsyncReadExt, AsyncSeek, AsyncSeekExt}; -use super::{ColumnIndexMeta, IndexFileMetas, INDEX_FILE_METAS_SIZE_SIZE}; -use crate::{meta::bitvec::BitVec, utils::inverted_index::unpack_u32_pair}; +use super::{ColumnIndexMeta, INDEX_FILE_METAS_SIZE_SIZE}; +use crate::{ + meta::{bitvec::BitVec, puffin::reader::PuffinBytesReader}, + utils::inverted_index::unpack_u32_pair, +}; /// Index reader helps read Index Blob which consists of multiple ColumnIndex. pub struct IndexReader { - source: R, + source: PuffinBytesReader, } impl IndexReader { pub fn new(source: R) -> Self { + Self { + source: PuffinBytesReader::new(source), + } + } + + pub async fn field(&mut self, field: &str) -> Result> { + let Some(field_metdata) = self.source.get_field(field).await? else { + return Ok(None); + }; + let blob_bytes = self.source.read_blob_bytes(&field_metdata).await?; + let data = Cursor::new(blob_bytes); + Ok(Some(FieldReader::new(data))) + } +} + +pub struct FieldReader { + source: Cursor>, +} + +impl FieldReader { + pub fn new(source: Cursor>) -> Self { Self { source } } /// Reads and parse the bytes read from source to construct [`IndexFileMetas`]. /// IndexFileMetas is used to find and read ColumnIndex for a particular column. - pub async fn metadata(&mut self) -> Result> { + pub async fn metadata(&mut self) -> Result> { let end_offset = self.source.seek(SeekFrom::End(0)).await?; // read index_size @@ -44,17 +68,16 @@ impl IndexReader { self.source.read_exact(index_file_metas_size_buf).await?; let index_file_metas_size = u32::from_le_bytes(*index_file_metas_size_buf) as u64; - // read index_file_metas + // read column index meta let index_file_metas_offset = SeekFrom::Start(end_offset - INDEX_FILE_METAS_SIZE_SIZE - index_file_metas_size); self.source.seek(index_file_metas_offset).await?; let index_file_metas_buf = &mut vec![0u8; index_file_metas_size as usize]; self.source.read_exact(index_file_metas_buf).await?; - let index_file_metas: IndexFileMetas = serde_json::from_slice(index_file_metas_buf)?; - Self::validate_meta(&index_file_metas, index_file_metas_size, end_offset)?; + let column_meta: ColumnIndexMeta = serde_json::from_slice(index_file_metas_buf)?; - Ok(Arc::new(index_file_metas)) + Ok(Arc::new(column_meta)) } pub async fn fst(&mut self, offset: u64, size: u32) -> Result>> { @@ -67,14 +90,9 @@ impl IndexReader { }) } - pub async fn get_bitmap( - &mut self, - column_index_meta: &ColumnIndexMeta, - fst_val: u64, - ) -> Result { + pub async fn get_bitmap(&mut self, fst_val: u64) -> Result { let (relative_offset, size) = unpack_u32_pair(fst_val); - self.bitmap(column_index_meta.base_offset + relative_offset as u64, size) - .await + self.bitmap(relative_offset as u64, size).await } async fn bitmap(&mut self, offset: u64, size: u32) -> Result { @@ -89,34 +107,6 @@ impl IndexReader { } } -impl IndexReader { - fn validate_meta( - index_file_metas: &IndexFileMetas, - index_file_metas_size: u64, - end_offset: u64, - ) -> Result<()> { - for col_meta in index_file_metas.metas.values() { - let ColumnIndexMeta { - base_offset, - index_size, - .. - } = col_meta; - - let limit = end_offset - INDEX_FILE_METAS_SIZE_SIZE - index_file_metas_size; - ensure!( - *base_offset + *index_size <= limit, - anyhow!( - "ColumnIndexMeta unexpected offset: {} and size: {}. IndexFileMetas size {}", - base_offset, - index_size, - index_file_metas_size - ) - ); - } - Ok(()) - } -} - /// An automaton that matches if the input contains to a specific string. /// /// ```rust @@ -192,56 +182,3 @@ impl<'a> fst::automaton::Automaton for Contains<'a> { None } } - -#[cfg(test)] -mod test { - use std::collections::HashMap; - - use super::*; - #[test] - fn test_index_reader_validate_empty_meta() { - let index_file_metas = IndexFileMetas { - metas: HashMap::new(), - }; - let index_file_metas_size = 0; - let end_offset = 0; - let result = IndexReader::>::validate_meta( - &index_file_metas, - index_file_metas_size, - end_offset, - ); - assert!(result.is_ok()); - } - - #[test] - fn test_index_reader_validate_meta() { - let mut metas = HashMap::new(); - metas.insert( - "col1".to_string(), - ColumnIndexMeta { - base_offset: 0, - index_size: 10, - relative_fst_offset: 10, - ..Default::default() - }, - ); - metas.insert( - "col2".to_string(), - ColumnIndexMeta { - base_offset: 10, - index_size: 10, - relative_fst_offset: 10, - ..Default::default() - }, - ); - let index_file_metas = IndexFileMetas { metas }; - let index_file_metas_size = 0; - let end_offset = 30; - let result = IndexReader::>::validate_meta( - &index_file_metas, - index_file_metas_size, - end_offset, - ); - assert!(result.is_ok()); - } -} diff --git a/src/config/src/meta/inverted_index/search.rs b/src/config/src/meta/inverted_index/search.rs index 6b819eefe..2c174d88e 100644 --- a/src/config/src/meta/inverted_index/search.rs +++ b/src/config/src/meta/inverted_index/search.rs @@ -4,10 +4,9 @@ use fst::{ automaton::{StartsWith, Str}, Automaton, IntoStreamer, Streamer, }; -use futures::{AsyncRead, AsyncSeek}; use super::{ - reader::{Contains, IndexReader}, + reader::{Contains, FieldReader}, ColumnIndexMeta, }; @@ -24,17 +23,14 @@ impl<'b> SubstringSearch<'b> { } } - pub async fn search(&mut self, index_reader: &mut IndexReader) -> Result> - where - R: AsyncRead + AsyncSeek + Unpin + Send, - { + pub async fn search(&mut self, field_reader: &mut FieldReader) -> Result> { self.filter(); let matchers = self .terms .iter() .map(|term| Contains::new(term)) .collect::>(); - inverted_index_search(index_reader, &matchers, self.meta).await + inverted_index_search(field_reader, &matchers, self.meta).await } fn filter(&mut self) { @@ -55,17 +51,14 @@ impl<'b> PrefixSearch<'b> { } } - pub async fn search(&mut self, index_reader: &mut IndexReader) -> Result> - where - R: AsyncRead + AsyncSeek + Unpin + Send, - { + pub async fn search(&mut self, field_reader: &mut FieldReader) -> Result> { self.filter(); let matchers = self .terms .iter() .map(|term| Str::new(term).starts_with()) .collect::>>(); - inverted_index_search(index_reader, &matchers, self.meta).await + inverted_index_search(field_reader, &matchers, self.meta).await } fn filter(&mut self) { @@ -93,17 +86,14 @@ impl<'b> ExactSearch<'b> { } } - pub async fn search(&mut self, index_reader: &mut IndexReader) -> Result> - where - R: AsyncRead + AsyncSeek + Unpin + Send, - { + pub async fn search(&mut self, field_reader: &mut FieldReader) -> Result> { self.filter(); let matchers = self .terms .iter() .map(|term| Str::new(term)) .collect::>(); - inverted_index_search(index_reader, &matchers, self.meta).await + inverted_index_search(field_reader, &matchers, self.meta).await } fn filter(&mut self) { @@ -118,16 +108,15 @@ impl<'b> ExactSearch<'b> { } } -pub async fn inverted_index_search( - index_reader: &mut IndexReader, +pub async fn inverted_index_search( + index_reader: &mut FieldReader, matchers: &[A], column_index_meta: &ColumnIndexMeta, ) -> Result> where - R: AsyncRead + AsyncSeek + Unpin + Send, A: Automaton, { - let fst_offset = column_index_meta.base_offset + column_index_meta.relative_fst_offset as u64; + let fst_offset = column_index_meta.relative_fst_offset as u64; let fst_size = column_index_meta.fst_size; let fst_map = index_reader.fst(fst_offset, fst_size).await?; let mut res = BitVec::::new(); @@ -137,7 +126,7 @@ where let mut stream = fst_map.search(matcher).into_stream(); // We do not care about the key at this point, only the offset while let Some((_, value)) = stream.next() { - let bitmap = index_reader.get_bitmap(column_index_meta, value).await?; + let bitmap = index_reader.get_bitmap(value).await?; // Resize if the res map is smaller than the bitmap if res.len() < bitmap.len() { diff --git a/src/config/src/meta/inverted_index/writer.rs b/src/config/src/meta/inverted_index/writer.rs index 0449d445d..c9ae09b83 100644 --- a/src/config/src/meta/inverted_index/writer.rs +++ b/src/config/src/meta/inverted_index/writer.rs @@ -55,7 +55,7 @@ impl ColumnIndexer { pub fn push(&mut self, value: BytesRef<'_>, segment_id: usize, term_len: usize) { let bitmap = self.sorter.entry(value.into()).or_default(); if segment_id >= bitmap.len() { - bitmap.resize(segment_id + 1, false); + bitmap.resize(segment_id + 64, false); } bitmap.set(segment_id, true); @@ -75,7 +75,8 @@ impl ColumnIndexer { let min_val = self.sorter.keys().next().cloned(); let max_val = self.sorter.keys().next_back().cloned(); // 1. write bitmaps to writer - for (value, bitmap) in std::mem::take(&mut self.sorter) { + let sorter = std::mem::take(&mut self.sorter); + for (value, bitmap) in sorter { self.append_value(value, bitmap, writer)?; } @@ -84,12 +85,19 @@ impl ColumnIndexer { writer.write_all(&fst_bytes)?; // update meta - self.meta.relative_fst_offset = self.meta.index_size as _; - self.meta.fst_size = fst_bytes.len() as _; - self.meta.index_size += self.meta.fst_size as u64; + self.meta.index_size = 0; + self.meta.fst_size = fst_bytes.len() as u32; + self.meta.relative_fst_offset = writer.len() as u32 - self.meta.fst_size; self.meta.min_val = min_val.unwrap_or_default(); self.meta.max_val = max_val.unwrap_or_default(); + // write meta into writer buffer + let meta_bytes = serde_json::to_vec(&self.meta)?; + writer.write_all(&meta_bytes)?; + let metas_size = meta_bytes.len() as u32; + writer.write_all(&metas_size.to_le_bytes())?; + writer.flush()?; + Ok(self.meta) } diff --git a/src/config/src/meta/puffin/mod.rs b/src/config/src/meta/puffin/mod.rs index 12b7d93d7..4bffe8d67 100644 --- a/src/config/src/meta/puffin/mod.rs +++ b/src/config/src/meta/puffin/mod.rs @@ -28,14 +28,12 @@ pub const MIN_FILE_SIZE: u64 = MAGIC_SIZE + MIN_FOOTER_SIZE; pub const FLAGS_SIZE: u64 = 4; pub const FOOTER_PAYLOAD_SIZE_SIZE: u64 = 4; pub const MIN_FOOTER_SIZE: u64 = MAGIC_SIZE + FLAGS_SIZE + FOOTER_PAYLOAD_SIZE_SIZE + MAGIC_SIZE; // without any blobs -// Version 1 of the inverted index blob -pub const BLOB_TYPE: &str = "o2_inverted_index_v1"; bitflags! { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct PuffinFooterFlags: u32 { const DEFAULT = 0b00000000; - const COMPRESSED_ZSTD = 0b00000001; + const COMPRESSED = 0b00000001; } } @@ -43,7 +41,7 @@ bitflags! { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] pub struct PuffinMeta { /// Metadata for each blob in the file - pub blob_metadata: Vec, + pub blobs: Vec, /// Storage for arbitrary meta-information, like writer identification/version #[serde(default)] @@ -57,31 +55,31 @@ pub struct PuffinMeta { pub struct BlobMetadata { /// Blob type #[serde(rename = "type")] - pub blob_type: String, + pub blob_type: BlobTypes, - /// Required by specs. Not used for InvertedIndex within OpenObserve + /// Required by specs #[serde(default)] - pub fields: Vec, + pub fields: Vec, - /// Required by specs. Not used for InvertedIndex within OpenObserve + /// Required by specs #[serde(default)] - pub snapshot_id: i64, + pub snapshot_id: u64, - /// Required by specs. Not used for InvertedIndex within OpenObserve + /// Required by specs #[serde(default)] - pub sequence_number: i64, + pub sequence_number: u64, /// The offset in the file where the blob contents start - pub offset: i64, + pub offset: u64, /// The length of the blob stored in the file (after compression, if compressed) - pub length: i64, + pub length: u64, /// Default to ZSTD compression for OpenObserve inverted index #[serde(default, skip_serializing_if = "Option::is_none")] pub compression_codec: Option, - /// Additional meta information of the file. Not used for InvertedIndex within OpenObserve + /// Additional meta information of the file. #[serde(default, skip_serializing_if = "HashMap::is_empty")] pub properties: HashMap, } @@ -93,30 +91,41 @@ pub enum CompressionCodec { Zstd, } +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] +pub enum BlobTypes { + #[serde(rename = "apache-datasketches-theta-v1")] + ApacheDatasketchesThetaV1, + #[serde(rename = "deletion-vector-v1")] + DeletionVectorV1, + #[default] + #[serde(rename = "o2-fst-v1")] + O2FstV1, +} + #[derive(Default)] pub struct BlobMetadataBuilder { - blob_type: Option, - fields: Vec, - snapshot_id: i64, - sequence_number: i64, - offset: Option, - length: Option, + blob_type: Option, + fields: Vec, + snapshot_id: u64, + sequence_number: u64, + offset: Option, + length: Option, compression_codec: Option, properties: HashMap, } impl BlobMetadataBuilder { - pub fn blob_type(mut self, blob_type: String) -> Self { + pub fn blob_type(mut self, blob_type: BlobTypes) -> Self { self.blob_type = Some(blob_type); self } - pub fn offset(mut self, offset: i64) -> Self { + pub fn offset(mut self, offset: u64) -> Self { self.offset = Some(offset); self } - pub fn length(mut self, length: i64) -> Self { + pub fn length(mut self, length: u64) -> Self { self.length = Some(length); self } @@ -133,7 +142,7 @@ impl BlobMetadataBuilder { snapshot_id: self.snapshot_id, sequence_number: self.sequence_number, offset: self.offset.ok_or("offset is required")?, - length: self.length.ok_or("length is required")?, + length: self.length.unwrap_or_default(), compression_codec: self.compression_codec, properties: self.properties, }) diff --git a/src/config/src/meta/puffin/reader.rs b/src/config/src/meta/puffin/reader.rs index 61ddc355d..68970f88c 100644 --- a/src/config/src/meta/puffin/reader.rs +++ b/src/config/src/meta/puffin/reader.rs @@ -26,7 +26,6 @@ use crate::meta::puffin::{CompressionCodec, MIN_FILE_SIZE}; pub struct PuffinBytesReader { source: R, - metadata: Option, } @@ -46,23 +45,49 @@ impl PuffinBytesReader { .seek(SeekFrom::Start(blob_metadata.offset as _)) .await?; - // decompress bytes since OpenObserve InvertedIndex compresses index data by default - ensure!( - blob_metadata.compression_codec == Some(CompressionCodec::Zstd), - anyhow!("Unexpected CompressionCodex found in BlobMetadata") - ); - let mut compressed = vec![0u8; blob_metadata.length as usize]; - self.source.read_exact(&mut compressed).await?; + let mut raw_data = vec![0u8; blob_metadata.length as usize]; + self.source.read_exact(&mut raw_data).await?; - let mut decompressed = Vec::new(); - let mut decoder = zstd::Decoder::new(&compressed[..])?; - decoder.read_to_end(&mut decompressed)?; - Ok(decompressed) + let data = match blob_metadata.compression_codec { + None => raw_data, + Some(CompressionCodec::Zstd) => { + let mut decompressed = Vec::new(); + let mut decoder = zstd::Decoder::new(&raw_data[..])?; + decoder.read_to_end(&mut decompressed)?; + decompressed + } + Some(CompressionCodec::Lz4) => { + todo!("Lz4 decompression is not implemented yet") + } + }; + + Ok(data) } - pub async fn get_metadata(&mut self) -> Result { - if let Some(meta) = &self.metadata { - return Ok(meta.clone()); + pub async fn get_field(&mut self, field: &str) -> Result> { + self.parse_footer().await?; + match self.metadata.as_ref() { + None => Err(anyhow!("Metadata not found")), + Some(v) => { + let Some(idx) = v.properties.get(field) else { + return Ok(None); + }; + let idx = idx + .parse::() + .map_err(|_| anyhow!("Field not found"))?; + Ok(v.blobs.get(idx).cloned()) + } + } + } + + pub async fn get_metadata(&mut self) -> Result> { + self.parse_footer().await?; + Ok(self.metadata.clone()) + } + + pub async fn parse_footer(&mut self) -> Result<()> { + if self.metadata.is_some() { + return Ok(()); } // check MAGIC @@ -83,9 +108,8 @@ impl PuffinBytesReader { let puffin_meta = PuffinFooterBytesReader::new(&mut self.source, end_offset) .parse() .await?; - self.metadata = Some(puffin_meta.clone()); - - Ok(puffin_meta) + self.metadata = Some(puffin_meta); + Ok(()) } } @@ -181,7 +205,7 @@ impl PuffinFooterBytesReader { } fn parse_payload(&self, bytes: &[u8]) -> Result { - if self.flags.contains(PuffinFooterFlags::COMPRESSED_ZSTD) { + if self.flags.contains(PuffinFooterFlags::COMPRESSED) { let decoder = zstd::Decoder::new(bytes)?; serde_json::from_reader(decoder) .map_err(|e| anyhow!("Error decompress footer payload {}", e.to_string())) @@ -195,18 +219,18 @@ impl PuffinFooterBytesReader { let puffin_metadata = self.metadata.as_ref().expect("metadata is not set"); let mut offset = MAGIC_SIZE; - for blob in &puffin_metadata.blob_metadata { + for blob in &puffin_metadata.blobs { ensure!( - blob.offset as u64 == offset, + blob.offset == offset, anyhow!("Blob payload offset mismatch") ); - offset += blob.length as u64; + offset += blob.length; } let payload_ends_at = puffin_metadata - .blob_metadata + .blobs .last() - .map_or(MAGIC_SIZE, |blob| (blob.offset + blob.length) as u64); + .map_or(MAGIC_SIZE, |blob| blob.offset + blob.length); ensure!( payload_ends_at == self.head_magic_offset(), diff --git a/src/config/src/meta/puffin/writer.rs b/src/config/src/meta/puffin/writer.rs index 08b87b153..3ccb4d032 100644 --- a/src/config/src/meta/puffin/writer.rs +++ b/src/config/src/meta/puffin/writer.rs @@ -22,7 +22,7 @@ use std::{ use anyhow::{Context, Result}; use super::{ - BlobMetadata, BlobMetadataBuilder, CompressionCodec, PuffinFooterFlags, PuffinMeta, BLOB_TYPE, + BlobMetadata, BlobMetadataBuilder, BlobTypes, CompressionCodec, PuffinFooterFlags, PuffinMeta, MAGIC, MAGIC_SIZE, MIN_FOOTER_SIZE, }; @@ -50,44 +50,53 @@ impl PuffinBytesWriter { } } - fn add_blob_metadata( + fn build_blob_metadata( &self, - blob_type: String, + blob_type: BlobTypes, compression_codec: Option, - size: u64, ) -> BlobMetadata { BlobMetadataBuilder::default() .blob_type(blob_type) .compression_codec(compression_codec) .offset(self.written_bytes as _) - .length(size as _) .build() .expect("Missing required fields") } } impl PuffinBytesWriter { - pub fn add_blob(&mut self, raw_data: Vec) -> Result<()> { + pub fn add_blob(&mut self, field: String, raw_data: Vec) -> Result<()> { self.add_header_if_needed() .context("Error writing puffin header")?; - // compress blob raw data - let mut encoder = zstd::Encoder::new(vec![], 3)?; - encoder - .write_all(&raw_data) - .context("Error encoding blob raw data")?; - let compressed_bytes = encoder.finish()?; - let compressed_size = compressed_bytes.len() as u64; - self.writer.write_all(&compressed_bytes)?; + // build blob metadata + let mut metadata = self.build_blob_metadata(BlobTypes::O2FstV1, None); + + // compress blob raw data + match metadata.compression_codec { + None => { + metadata.length = raw_data.len() as u64; + self.writer.write_all(&raw_data)?; + } + Some(CompressionCodec::Zstd) => { + let mut encoder = zstd::Encoder::new(vec![], 3)?; + encoder + .write_all(&raw_data) + .context("Error encoding blob raw data")?; + let compressed_bytes = encoder.finish()?; + self.writer.write_all(&compressed_bytes)?; + metadata.length = compressed_bytes.len() as u64; + } + Some(CompressionCodec::Lz4) => { + todo!("Lz4 compression is not implemented yet") + } + }; + + self.written_bytes += metadata.length; + self.properties + .insert(field, self.blobs_metadata.len().to_string()); + self.blobs_metadata.push(metadata); - // add metadata for this blob - let blob_metadata = self.add_blob_metadata( - BLOB_TYPE.to_string(), - Some(CompressionCodec::Zstd), - compressed_size, - ); - self.blobs_metadata.push(blob_metadata); - self.written_bytes += compressed_size; Ok(()) } @@ -113,7 +122,6 @@ impl PuffinBytesWriter { mem::take(&mut self.properties), ) .into_bytes()?; - self.writer.write_all(&footer_bytes)?; self.written_bytes += footer_bytes.len() as u64; Ok(()) @@ -150,7 +158,7 @@ impl PuffinFooterWriter { buf.extend_from_slice(&(payload_size as i32).to_le_bytes()); // flags - buf.extend_from_slice(&PuffinFooterFlags::COMPRESSED_ZSTD.bits().to_le_bytes()); + buf.extend_from_slice(&PuffinFooterFlags::DEFAULT.bits().to_le_bytes()); // FootMagic buf.extend_from_slice(&MAGIC); @@ -160,12 +168,9 @@ impl PuffinFooterWriter { fn get_payload(&mut self) -> Result> { let file_metdadata = PuffinMeta { - blob_metadata: mem::take(&mut self.blob_metadata), + blobs: mem::take(&mut self.blob_metadata), properties: mem::take(&mut self.file_properties), }; - - let mut encoder = zstd::Encoder::new(vec![], 3)?; - serde_json::to_writer(&mut encoder, &file_metdadata)?; - Ok(encoder.finish()?) + serde_json::to_vec(&file_metdadata).context("Error serializing puffin metadata") } } diff --git a/src/config/src/meta/search.rs b/src/config/src/meta/search.rs index ad8e7cc6e..30d6be23a 100644 --- a/src/config/src/meta/search.rs +++ b/src/config/src/meta/search.rs @@ -708,6 +708,7 @@ impl From<&cluster_rpc::ScanStats> for ScanStats { } #[derive(Hash, Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] pub enum SearchEventType { UI, Dashboards, diff --git a/src/config/src/utils/inverted_index.rs b/src/config/src/utils/inverted_index.rs index 6e55ca8b0..e17ce2d15 100644 --- a/src/config/src/utils/inverted_index.rs +++ b/src/config/src/utils/inverted_index.rs @@ -15,14 +15,12 @@ use std::borrow::Cow; -use anyhow::{anyhow, ensure, Result}; +use anyhow::Result; use futures::io::Cursor; use itertools::Itertools; use crate::{ - meta::{ - inverted_index::reader::IndexReader, puffin::reader::PuffinBytesReader, stream::StreamType, - }, + meta::{inverted_index::reader::IndexReader, stream::StreamType}, FILE_EXT_PARQUET, FILE_EXT_PUFFIN, INDEX_MIN_CHAR_LEN, }; @@ -70,16 +68,7 @@ pub fn unpack_u32_pair(packed: u64) -> (u32, u32) { pub async fn create_index_reader_from_puffin_bytes( buf: Vec, ) -> Result>>> { - let mut puffin_reader = PuffinBytesReader::new(Cursor::new(buf)); - let puffin_meta = puffin_reader.get_metadata().await?; - ensure!( - puffin_meta.blob_metadata.len() == 1, - anyhow!("InvertedIndex should only have one blob each puffin file") - ); - let blob_bytes = puffin_reader - .read_blob_bytes(puffin_meta.blob_metadata.first().unwrap()) - .await?; - Ok(IndexReader::new(Cursor::new(blob_bytes))) + Ok(IndexReader::new(Cursor::new(buf))) } /// FST inverted index solution has a 1:1 mapping between parquet and idx files. diff --git a/src/config/src/utils/parquet.rs b/src/config/src/utils/parquet.rs index a8b0ab937..e1f4ba30a 100644 --- a/src/config/src/utils/parquet.rs +++ b/src/config/src/utils/parquet.rs @@ -24,7 +24,10 @@ use arrow::record_batch::RecordBatch; use arrow_schema::Schema; use futures::TryStreamExt; use parquet::{ - arrow::{arrow_reader::ArrowReaderMetadata, AsyncArrowWriter, ParquetRecordBatchStreamBuilder}, + arrow::{ + arrow_reader::ArrowReaderMetadata, async_reader::ParquetRecordBatchStream, + AsyncArrowWriter, ParquetRecordBatchStreamBuilder, + }, basic::{Compression, Encoding}, file::{metadata::KeyValue, properties::WriterProperties}, }; @@ -115,25 +118,39 @@ pub fn parse_file_key_columns(key: &str) -> Result<(String, String, String), any Ok((stream_key, date_key, file_name)) } -pub async fn read_recordbatch_from_bytes( +pub async fn get_recordbatch_reader_from_bytes( data: &bytes::Bytes, -) -> Result<(Arc, Vec), anyhow::Error> { +) -> Result<(Arc, ParquetRecordBatchStream>), anyhow::Error> { let schema_reader = Cursor::new(data.clone()); let arrow_reader = ParquetRecordBatchStreamBuilder::new(schema_reader).await?; let schema = arrow_reader.schema().clone(); - let record_reader = arrow_reader.build()?; - let batches = record_reader.try_collect().await?; + let reader = arrow_reader.build()?; + Ok((schema, reader)) +} + +pub async fn get_recordbatch_reader_from_file( + path: &PathBuf, +) -> Result<(Arc, ParquetRecordBatchStream), anyhow::Error> { + let file = tokio::fs::File::open(path).await?; + let arrow_reader = ParquetRecordBatchStreamBuilder::new(file).await?; + let schema = arrow_reader.schema().clone(); + let reader = arrow_reader.build()?; + Ok((schema, reader)) +} + +pub async fn read_recordbatch_from_bytes( + data: &bytes::Bytes, +) -> Result<(Arc, Vec), anyhow::Error> { + let (schema, reader) = get_recordbatch_reader_from_bytes(data).await?; + let batches = reader.try_collect().await?; Ok((schema, batches)) } pub async fn read_recordbatch_from_file( path: &PathBuf, ) -> Result<(Arc, Vec), anyhow::Error> { - let file = tokio::fs::File::open(path).await?; - let arrow_reader = ParquetRecordBatchStreamBuilder::new(file).await?; - let schema = arrow_reader.schema().clone(); - let record_reader = arrow_reader.build()?; - let batches = record_reader.try_collect().await?; + let (schema, reader) = get_recordbatch_reader_from_file(path).await?; + let batches = reader.try_collect().await?; Ok((schema, batches)) } diff --git a/src/handler/http/request/search/mod.rs b/src/handler/http/request/search/mod.rs index c29277668..887eb7af8 100644 --- a/src/handler/http/request/search/mod.rs +++ b/src/handler/http/request/search/mod.rs @@ -147,20 +147,26 @@ pub async fn search( } // set search event type - req.search_type = match get_search_type_from_request(&query) { - Ok(v) => v, - Err(e) => return Ok(MetaHttpResponse::bad_request(e)), + if req.search_type.is_none() { + req.search_type = match get_search_type_from_request(&query) { + Ok(v) => v, + Err(e) => return Ok(MetaHttpResponse::bad_request(e)), + }; }; - req.search_event_context = req - .search_type - .as_ref() - .and_then(|event_type| get_search_event_context_from_request(event_type, &query)); + if req.search_event_context.is_none() { + req.search_event_context = req + .search_type + .as_ref() + .and_then(|event_type| get_search_event_context_from_request(event_type, &query)); + } // set index_type - req.index_type = match get_index_type_from_request(&query) { - Ok(typ) => typ, - Err(e) => return Ok(MetaHttpResponse::bad_request(e)), - }; + if req.index_type.is_empty() { + req.index_type = match get_index_type_from_request(&query) { + Ok(typ) => typ, + Err(e) => return Ok(MetaHttpResponse::bad_request(e)), + }; + } // get stream name let stream_names = match resolve_stream_names(&req.query.sql) { diff --git a/src/infra/src/cache/file_data/mod.rs b/src/infra/src/cache/file_data/mod.rs index 0325f8678..0f994729a 100644 --- a/src/infra/src/cache/file_data/mod.rs +++ b/src/infra/src/cache/file_data/mod.rs @@ -16,7 +16,7 @@ pub mod disk; pub mod memory; -use std::collections::VecDeque; +use std::{collections::VecDeque, ops::Range}; use hashbrown::HashSet; use hashlink::lru_cache::LruCache; @@ -133,6 +133,27 @@ pub async fn download(trace_id: &str, file: &str) -> Result<(), anyhow::Error> { } } +pub async fn get(file: &str, range: Option>) -> Result { + let cfg = config::get_config(); + // get from memory cache + if cfg.memory_cache.enabled { + if let Some(v) = memory::get(file, range.clone()).await { + return Ok(v); + } + } + // get from disk cache + if cfg.disk_cache.enabled { + if let Some(v) = disk::get(file, range.clone()).await { + return Ok(v); + } + } + // get from storage + match range { + Some(r) => crate::storage::get_range(file, r).await, + None => crate::storage::get(file).await, + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/infra/src/file_list/mysql.rs b/src/infra/src/file_list/mysql.rs index f6317c89f..4f0fd451f 100644 --- a/src/infra/src/file_list/mysql.rs +++ b/src/infra/src/file_list/mysql.rs @@ -1551,7 +1551,7 @@ pub async fn create_table_index() -> Result<()> { &["stream", "date", "file"], )) .await?; - log::warn!("[MYSQL] create table index(file_list_stream_file_idx) succeed"); + log::warn!("[MYSQL] create table index(file_list_stream_file_idx) successfully"); } Ok(()) diff --git a/src/infra/src/file_list/postgres.rs b/src/infra/src/file_list/postgres.rs index cb98ace1c..0733545d2 100644 --- a/src/infra/src/file_list/postgres.rs +++ b/src/infra/src/file_list/postgres.rs @@ -1517,7 +1517,7 @@ pub async fn create_table_index() -> Result<()> { &["stream", "date", "file"], )) .await?; - log::warn!("[POSTGRES] create table index(file_list_stream_file_idx) succeed"); + log::warn!("[POSTGRES] create table index(file_list_stream_file_idx) successfully"); } Ok(()) diff --git a/src/infra/src/file_list/sqlite.rs b/src/infra/src/file_list/sqlite.rs index 483a599a8..c5acba886 100644 --- a/src/infra/src/file_list/sqlite.rs +++ b/src/infra/src/file_list/sqlite.rs @@ -1356,7 +1356,7 @@ pub async fn create_table_index() -> Result<()> { &["stream", "date", "file"], )) .await?; - log::warn!("[SQLITE] create table index(file_list_stream_file_idx) succeed"); + log::warn!("[SQLITE] create table index(file_list_stream_file_idx) successfully"); } // delete trigger for old version diff --git a/src/infra/src/storage/mod.rs b/src/infra/src/storage/mod.rs index a78650806..98b7d9c02 100644 --- a/src/infra/src/storage/mod.rs +++ b/src/infra/src/storage/mod.rs @@ -13,6 +13,8 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +use std::ops::Range; + use config::{get_config, is_local_disk_storage, metrics}; use datafusion::parquet::data_type::AsBytes; use futures::{StreamExt, TryStreamExt}; @@ -78,6 +80,11 @@ pub async fn get(file: &str) -> Result { Ok(data) } +pub async fn get_range(file: &str, range: Range) -> Result { + let data = DEFAULT.get_range(&file.into(), range).await?; + Ok(data) +} + pub async fn put(file: &str, data: bytes::Bytes) -> Result<(), anyhow::Error> { if bytes_size_in_mb(&data) >= MULTI_PART_UPLOAD_DATA_SIZE { put_multipart(file, data).await?; diff --git a/src/job/file_list.rs b/src/job/file_list.rs index c6535ed62..6935e6ede 100644 --- a/src/job/file_list.rs +++ b/src/job/file_list.rs @@ -131,7 +131,7 @@ async fn upload_file(path_str: &str, file_key: &str) -> Result<(), anyhow::Error let result = storage::put(&new_file_key, bytes::Bytes::from(compressed_bytes)).await; match result { Ok(_output) => { - log::info!("[JOB] File_list upload succeeded: {}", new_file_key); + log::info!("[JOB] File_list upload successfully: {}", new_file_key); Ok(()) } Err(err) => { diff --git a/src/job/files/idx.rs b/src/job/files/idx.rs index 76edb9db9..65777dcba 100644 --- a/src/job/files/idx.rs +++ b/src/job/files/idx.rs @@ -24,7 +24,9 @@ use config::{ ider, meta::stream::{FileMeta, StreamPartition, StreamPartitionType, StreamType}, utils::{ - parquet::new_parquet_writer, record_batch_ext::concat_batches, schema::format_partition_key, + parquet::new_parquet_writer, + record_batch_ext::{concat_batches, RecordBatchExt}, + schema::format_partition_key, }, FILE_EXT_PARQUET, }; @@ -57,12 +59,10 @@ fn generate_index_file_name_from_compacted_file( pub(crate) async fn write_parquet_index_to_disk( batches: Vec, - file_size: u64, org_id: &str, stream_type: StreamType, stream_name: &str, file_name: &str, - caller: &str, ) -> Result, anyhow::Error> { let schema = if let Some(first_batch) = batches.first() { first_batch.schema() @@ -71,7 +71,7 @@ pub(crate) async fn write_parquet_index_to_disk( }; log::debug!( - "write_parquet_index_to_disk: batches row counts: {:?}", + "[JOB:IDX] write_parquet_index_to_disk: batches row counts: {:?}", batches.iter().map(|b| b.num_rows()).sum::() ); @@ -97,22 +97,12 @@ pub(crate) async fn write_parquet_index_to_disk( let mut ret = Vec::new(); for (prefix, batch) in partitioned_batches.into_iter() { // write metadata + let batch_size = batch.size(); let mut file_meta = FileMeta { - min_ts: 0, - max_ts: 0, - records: 0, - original_size: file_size as i64, - compressed_size: 0, - flattened: false, + original_size: batch_size as i64, + ..Default::default() }; - populate_file_meta( - schema.clone(), - vec![vec![batch.clone()]], - &mut file_meta, - Some("min_ts"), - Some("max_ts"), - ) - .await?; + populate_file_meta(&[&batch], &mut file_meta, Some("min_ts"), Some("max_ts")).await?; // write parquet file let mut buf_parquet = Vec::new(); @@ -129,24 +119,18 @@ pub(crate) async fn write_parquet_index_to_disk( file_name, &prefix, ); - log::info!( - "[JOB] IDX: write_to_disk: {}/{}/{} {} {} {}", - org_id, - stream_name, - stream_type, - new_idx_file_name, - file_name, - caller, - ); let store_file_name = new_idx_file_name.clone(); match storage::put(&store_file_name, bytes::Bytes::from(buf_parquet)).await { Ok(_) => { - log::info!("[JOB] disk file upload succeeded: {}", &new_idx_file_name); + log::info!( + "[JOB:IDX] index file upload successfully: {}", + &new_idx_file_name + ); ret.push((new_idx_file_name, file_meta)); } Err(err) => { - log::error!("[JOB] disk file upload error: {:?}", err); + log::error!("[JOB] index file upload error: {:?}", err); return Err(anyhow::anyhow!(err)); } } diff --git a/src/job/files/parquet.rs b/src/job/files/parquet.rs index 9a5999f06..09e8f3254 100644 --- a/src/job/files/parquet.rs +++ b/src/job/files/parquet.rs @@ -36,27 +36,28 @@ use config::{ cluster, get_config, meta::{ bitvec::BitVec, - inverted_index::{writer::ColumnIndexer, IndexFileMetas, InvertedIndexFormat}, + inverted_index::{writer::ColumnIndexer, InvertedIndexFormat}, puffin::writer::PuffinBytesWriter, + search::StorageType, stream::{FileKey, FileMeta, PartitionTimeLevel, StreamSettings, StreamType}, }, metrics, utils::{ arrow::record_batches_to_json_rows, - asynchronism::file::{get_file_contents, get_file_meta}, + asynchronism::file::get_file_meta, file::scan_files_with_channel, inverted_index::{convert_parquet_idx_file_name, split_token}, json, parquet::{ - read_metadata_from_file, read_recordbatch_from_bytes, write_recordbatch_to_parquet, + get_recordbatch_reader_from_bytes, read_metadata_from_file, read_schema_from_file, }, schema_ext::SchemaExt, }, FxIndexMap, INDEX_FIELD_NAME_FOR_ALL, INDEX_SEGMENT_LENGTH, }; +use futures::TryStreamExt; use hashbrown::HashSet; use infra::{ - cache::tmpfs, schema::{ get_stream_setting_bloom_filter_fields, get_stream_setting_fts_fields, get_stream_setting_index_fields, unwrap_stream_settings, SchemaCache, @@ -65,6 +66,7 @@ use infra::{ }; use ingester::WAL_PARQUET_METADATA; use once_cell::sync::Lazy; +use parquet::arrow::async_reader::ParquetRecordBatchStream; use tokio::{ sync::{Mutex, RwLock}, time, @@ -77,10 +79,11 @@ use crate::{ }, job::files::idx::write_parquet_index_to_disk, service::{ - compact::merge::{generate_inverted_idx_recordbatch, merge_parquet_files}, db, schema::generate_schema_for_defined_schema_fields, - search::datafusion::exec::merge_parquet_files as merge_parquet_files_by_datafusion, + search::datafusion::exec::{ + self, merge_parquet_files as merge_parquet_files_by_datafusion, + }, }, }; @@ -591,11 +594,6 @@ async fn merge_files( let mut new_file_size: i64 = 0; let mut new_compressed_file_size = 0; let mut new_file_list = Vec::new(); - let mut deleted_files = Vec::new(); - let mut min_ts = i64::MAX; - let mut max_ts = i64::MIN; - let mut total_records = 0; - let stream_fields_num = latest_schema.fields().len(); let max_file_size = std::cmp::min( cfg.limit.max_file_size_on_disk as i64, @@ -614,45 +612,33 @@ async fn merge_files( new_compressed_file_size += file.meta.compressed_size; new_file_list.push(file.clone()); } - let mut retain_file_list = new_file_list.clone(); - - // write parquet files into tmpfs - let tmp_dir = tmpfs::Directory::default(); - for file in retain_file_list.iter_mut() { - log::info!("[INGESTER:JOB:{thread_id}] merge small file: {}", &file.key); - let data = match get_file_contents(&wal_dir.join(&file.key)).await { - Ok(body) => { - min_ts = std::cmp::min(min_ts, file.meta.min_ts); - max_ts = std::cmp::max(max_ts, file.meta.max_ts); - total_records += file.meta.records; - body - } - Err(err) => { - log::error!( - "[INGESTER:JOB:{thread_id}] merge small file: {}, err: {}", - &file.key, - err - ); - deleted_files.push(file.key.clone()); - continue; - } - }; - let file_size = data.len(); - file.meta.compressed_size = file_size as i64; - tmp_dir.set(&file.key, data.into())?; - } - if !deleted_files.is_empty() { - new_file_list.retain(|f| !deleted_files.contains(&f.key)); - } + // no files need to merge if new_file_list.is_empty() { - return Ok((String::from(""), FileMeta::default(), retain_file_list)); + return Ok((String::from(""), FileMeta::default(), Vec::new())); } - // eg: files/default/logs/olympics/0/2023/08/21/08/8b8a5451bbe1c44b/ - // 7099303408192061440f3XQ2p.parquet - // eg: files/default/traces/default/0/023/09/04/05/default/ - // service_name=ingester/7104328279989026816guOA4t.parquet - // let _ = columns[0].to_string(); // files/ + let retain_file_list = new_file_list.clone(); + + // get time range for these files + let min_ts = new_file_list.iter().map(|f| f.meta.min_ts).min().unwrap(); + let max_ts = new_file_list.iter().map(|f| f.meta.max_ts).max().unwrap(); + let total_records = new_file_list.iter().map(|f| f.meta.records).sum(); + let new_file_size = new_file_list.iter().map(|f| f.meta.original_size).sum(); + let mut new_file_meta = FileMeta { + min_ts, + max_ts, + records: total_records, + original_size: new_file_size, + compressed_size: 0, + flattened: false, + }; + if new_file_meta.records == 0 { + return Err(anyhow::anyhow!("merge_files error: records is 0")); + } + + // eg: files/default/logs/olympics/0/2023/08/21/08/8b8a5451bbe1c44b/7099303408192061440f3XQ2p. + // parquet eg: files/default/traces/default/2/2023/09/04/05/default/service_name=ingester/ + // 7104328279989026816guOA4t.parquet let _ = columns[0].to_string(); // files/ let file = new_file_list.first().unwrap(); let columns = file.key.splitn(5, '/').collect::>(); let org_id = columns[1].to_string(); @@ -660,19 +646,19 @@ async fn merge_files( let stream_name = columns[3].to_string(); let file_name = columns[4].to_string(); - // merge files - let stream_setting = infra::schema::get_settings(&org_id, &stream_name, stream_type).await; - let bloom_filter_fields = get_stream_setting_bloom_filter_fields(&stream_setting); - let full_text_search_fields = get_stream_setting_fts_fields(&stream_setting); - let index_fields = get_stream_setting_index_fields(&stream_setting); - let (defined_schema_fields, need_original) = match stream_setting { + // get latest version of schema + let stream_settings = infra::schema::get_settings(&org_id, &stream_name, stream_type).await; + let bloom_filter_fields = get_stream_setting_bloom_filter_fields(&stream_settings); + let full_text_search_fields = get_stream_setting_fts_fields(&stream_settings); + let index_fields = get_stream_setting_index_fields(&stream_settings); + let (defined_schema_fields, need_original) = match stream_settings { Some(s) => ( s.defined_schema_fields.unwrap_or_default(), s.store_original_data, ), None => (Vec::new(), false), }; - let schema = if !defined_schema_fields.is_empty() { + let _latest_schema = if !defined_schema_fields.is_empty() { let latest_schema = SchemaCache::new(latest_schema.as_ref().clone()); let latest_schema = generate_schema_for_defined_schema_fields( &latest_schema, @@ -684,43 +670,39 @@ async fn merge_files( latest_schema.clone() }; - let mut new_file_meta = FileMeta { - min_ts, - max_ts, - records: total_records, - original_size: new_file_size, - compressed_size: 0, - flattened: false, + // read schema from parquet file, there files have the same schema because they are under the + // same prefix + let schema = read_schema_from_file(&(&wal_dir.join(&file.key)).into()).await?; + let schema_key = schema + .as_ref() + .clone() + .with_metadata(Default::default()) + .hash_key(); + + // generate datafusion tables + let session = config::meta::search::Session { + id: format!("ingester-{schema_key}"), + storage_type: StorageType::Wal, + work_group: None, + target_partitions: 0, }; - if new_file_meta.records == 0 { - return Err(anyhow::anyhow!( - "merge_parquet_files error: records is 0 for org {} stream type {} stream {}", - org_id, - stream_type, - stream_name - )); - } + let rules = hashbrown::HashMap::new(); + let table = + exec::create_parquet_table(&session, schema.clone(), &new_file_list, rules, true, None) + .await?; + let tables = vec![table]; let start = std::time::Instant::now(); - let mut buf = Vec::new(); - let single_file = new_file_list.len() == 1; - let merge_result = if single_file { - move_single_file( - thread_id, - tmp_dir.name(), - file, - stream_type, - &stream_name, - &mut buf, - ) - .await - } else if stream_type == StreamType::Logs { - merge_parquet_files(thread_id, tmp_dir.name(), schema.clone()).await - } else { - merge_parquet_files_by_datafusion(tmp_dir.name(), stream_type, &stream_name, schema.clone()) - .await - }; - let (new_schema, new_batches) = match merge_result { + let merge_result = merge_parquet_files_by_datafusion( + stream_type, + &stream_name, + schema, + tables, + &bloom_filter_fields, + &new_file_meta, + ) + .await; + let (_new_schema, buf) = match merge_result { Ok(v) => v, Err(e) => { log::error!( @@ -737,15 +719,7 @@ async fn merge_files( return Err(e.into()); } }; - if !single_file { - buf = write_recordbatch_to_parquet( - new_schema.clone(), - &new_batches, - &bloom_filter_fields, - &new_file_meta, - ) - .await?; - } + new_file_meta.compressed_size = buf.len() as i64; if new_file_meta.compressed_size == 0 { return Err(anyhow::anyhow!( @@ -755,7 +729,7 @@ async fn merge_files( let new_file_key = super::generate_storage_file_name(&org_id, stream_type, &stream_name, &file_name); log::info!( - "[INGESTER:JOB:{thread_id}] merge file succeeded, {} files into a new file: {}, original_size: {}, compressed_size: {}, took: {} ms", + "[INGESTER:JOB:{thread_id}] merge file successfully, {} files into a new file: {}, original_size: {}, compressed_size: {}, took: {} ms", retain_file_list.len(), new_file_key, new_file_meta.original_size, @@ -765,69 +739,64 @@ async fn merge_files( // upload file let buf = Bytes::from(buf); - match storage::put(&new_file_key, buf).await { - Ok(_) => { - if cfg.common.inverted_index_enabled && stream_type.is_basic_type() { - // generate inverted index RecordBatch - if let Some(inverted_idx_batch) = generate_inverted_idx_recordbatch( - new_schema.clone(), - &new_batches, - stream_type, - &full_text_search_fields, - &index_fields, - )? { - let index_format = - InvertedIndexFormat::from(&cfg.common.inverted_index_store_format); - if matches!( - index_format, - InvertedIndexFormat::Parquet | InvertedIndexFormat::Both - ) { - generate_index_on_ingester( - inverted_idx_batch.clone(), - new_file_key.clone(), - &org_id, - stream_type, - &stream_name, - &full_text_search_fields, - &index_fields, - ) - .await - .map_err(|e| { - anyhow::anyhow!("generate_parquet_index_on_ingester error: {}", e) - })?; - } - if matches!( - index_format, - InvertedIndexFormat::FST | InvertedIndexFormat::Both - ) { - // generate fst inverted index and write to storage - generate_fst_inverted_index( - inverted_idx_batch, - &new_file_key, - &full_text_search_fields, - &index_fields, - None, - ) - .await?; - } - } - } - Ok((new_file_key, new_file_meta, retain_file_list)) - } - Err(e) => Err(e), + storage::put(&new_file_key, buf.clone()).await?; + + if !cfg.common.inverted_index_enabled || !stream_type.is_basic_type() { + return Ok((new_file_key, new_file_meta, retain_file_list)); } + + // generate parquet format inverted index + let index_format = InvertedIndexFormat::from(&cfg.common.inverted_index_store_format); + if matches!( + index_format, + InvertedIndexFormat::Parquet | InvertedIndexFormat::Both + ) { + let (schema, mut reader) = get_recordbatch_reader_from_bytes(&buf).await?; + generate_index_on_ingester( + &new_file_key, + &org_id, + stream_type, + &stream_name, + &full_text_search_fields, + &index_fields, + schema, + &mut reader, + ) + .await + .map_err(|e| anyhow::anyhow!("generate_parquet_index_on_ingester error: {}", e))?; + } + + // generate fst format inverted index + if matches!( + index_format, + InvertedIndexFormat::FST | InvertedIndexFormat::Both + ) { + let (schema, mut reader) = get_recordbatch_reader_from_bytes(&buf).await?; + generate_fst_inverted_index( + &new_file_key, + &full_text_search_fields, + &index_fields, + None, + schema, + &mut reader, + ) + .await?; + } + + Ok((new_file_key, new_file_meta, retain_file_list)) } /// Create an inverted index file for the given file #[allow(clippy::too_many_arguments)] pub(crate) async fn generate_index_on_ingester( - inverted_idx_batch: RecordBatch, - new_file_key: String, + new_file_key: &str, org_id: &str, stream_type: StreamType, stream_name: &str, full_text_search_fields: &[String], index_fields: &[String], + schema: Arc, + reader: &mut ParquetRecordBatchStream>, ) -> Result<(), anyhow::Error> { let index_stream_name = if get_config().common.inverted_index_old_format && stream_type == StreamType::Logs { @@ -836,20 +805,21 @@ pub(crate) async fn generate_index_on_ingester( format!("{}_{}", stream_name, stream_type) }; let record_batches = prepare_index_record_batches( - inverted_idx_batch, org_id, stream_type, stream_name, - &new_file_key, + new_file_key, full_text_search_fields, index_fields, + schema, + reader, ) - .map_err(|e| anyhow::anyhow!("prepare_index_record_batches error: {}", e))?; + .await?; if record_batches.is_empty() || record_batches.iter().all(|b| b.num_rows() == 0) { return Ok(()); } - let idx_schema: SchemaRef = record_batches.first().unwrap().schema(); + let idx_schema: SchemaRef = record_batches.first().unwrap().schema(); let mut schema_map: HashMap = HashMap::new(); let schema_chk = crate::service::schema::stream_schema_exists( org_id, @@ -991,14 +961,17 @@ pub(crate) async fn generate_index_on_ingester( #[allow(clippy::too_many_arguments)] pub(crate) async fn generate_index_on_compactor( file_list_to_invalidate: &[FileKey], - inverted_idx_batch: RecordBatch, - new_file_key: String, + new_file_key: &str, org_id: &str, stream_type: StreamType, stream_name: &str, full_text_search_fields: &[String], index_fields: &[String], + schema: Arc, + reader: &mut ParquetRecordBatchStream>, ) -> Result, anyhow::Error> { + let start = std::time::Instant::now(); + let index_stream_name = if get_config().common.inverted_index_old_format && stream_type == StreamType::Logs { stream_name.to_string() @@ -1006,19 +979,21 @@ pub(crate) async fn generate_index_on_compactor( format!("{}_{}", stream_name, stream_type) }; let mut record_batches = prepare_index_record_batches( - inverted_idx_batch, org_id, stream_type, stream_name, - &new_file_key, + new_file_key, full_text_search_fields, index_fields, - )?; + schema, + reader, + ) + .await?; if record_batches.is_empty() || record_batches.iter().all(|b| b.num_rows() == 0) { return Ok(vec![(String::new(), FileMeta::default())]); } - let schema = record_batches.first().unwrap().schema(); + let schema = record_batches.first().unwrap().schema(); let prefix_to_remove = format!("files/{}/{}/{}/", org_id, stream_type, stream_name); let len_of_columns_to_invalidate = file_list_to_invalidate.len(); @@ -1075,33 +1050,42 @@ pub(crate) async fn generate_index_on_compactor( .map_err(|e| anyhow::anyhow!("RecordBatch::try_new error: {}", e))?; record_batches.push(batch); - let original_file_size = 0; // The file never existed before this function was called let files = write_parquet_index_to_disk( record_batches, - original_file_size, org_id, StreamType::Index, &index_stream_name, - &new_file_key, - "index_creator", + new_file_key, ) .await?; - log::debug!("[COMPACTOR:JOB] Written index files successfully"); + log::info!( + "[COMPACT:JOB] generate index successfully, data file: {}, index files: {:?}, took: {} ms", + new_file_key, + files.iter().map(|(k, _)| k).collect::>(), + start.elapsed().as_millis(), + ); + Ok(files) } -fn prepare_index_record_batches( - inverted_idx_batch: RecordBatch, +#[allow(clippy::too_many_arguments)] +async fn prepare_index_record_batches( org_id: &str, stream_type: StreamType, stream_name: &str, new_file_key: &str, full_text_search_fields: &[String], index_fields: &[String], + schema: Arc, + reader: &mut ParquetRecordBatchStream>, ) -> Result, anyhow::Error> { let cfg = get_config(); - let schema = inverted_idx_batch.schema(); + let schema_fields = schema + .fields() + .iter() + .map(|f| (f.name(), f)) + .collect::>(); let new_schema = Arc::new(Schema::new(vec![ Field::new(cfg.common.column_timestamp.as_str(), DataType::Int64, false), @@ -1115,168 +1099,138 @@ fn prepare_index_record_batches( Field::new("segment_ids", DataType::Binary, true), // bitmap ])); + let mut total_num_rows = 0; + let mut uniq_terms: HashMap> = HashMap::new(); + loop { + let batch = reader.try_next().await?; + let Some(batch) = batch else { + break; + }; + let num_rows = batch.num_rows(); + if num_rows == 0 { + continue; + } + + // update total_num_rows + let prev_total_num_rows = total_num_rows; + total_num_rows += num_rows; + + // get _timestamp column + let Some(time_data) = batch + .column_by_name(&cfg.common.column_timestamp) + .unwrap() + .as_any() + .downcast_ref::() + else { + continue; + }; + + // process full text search fields + for column_name in full_text_search_fields.iter() { + if !schema_fields.contains_key(column_name) + || schema_fields.get(column_name).unwrap().data_type() != &DataType::Utf8 + { + continue; + } + + // get full text search column + let Some(column_data) = batch + .column_by_name(column_name) + .unwrap() + .as_any() + .downcast_ref::() + else { + continue; + }; + + // split the column into terms + let terms = (0..num_rows) + .flat_map(|i| { + split_token(column_data.value(i), &cfg.common.inverted_index_split_chars) + .into_iter() + .map(|s| (s, i)) + .collect::>() + }) + .collect::>(); + if terms.is_empty() { + continue; + } + + // unique terms and get the min & max _timestamp + let column_uniq_terms = uniq_terms + .entry(INDEX_FIELD_NAME_FOR_ALL.to_string()) + .or_insert(BTreeMap::new()); + for (term, idx) in terms { + let term_time = time_data.value(idx); + let (min_ts, max_ts, ids) = column_uniq_terms.entry(term.to_string()).or_insert(( + term_time, + term_time, + Vec::new(), + )); + if *min_ts > term_time { + *min_ts = term_time; + } + if *max_ts < term_time { + *max_ts = term_time; + } + ids.push(idx + prev_total_num_rows); + } + } + + // process index fields + for column_name in index_fields.iter() { + if !schema_fields.contains_key(column_name) + || schema_fields.get(column_name).unwrap().data_type() != &DataType::Utf8 + { + continue; + } + + // get index column + let Some(column_data) = batch + .column_by_name(column_name) + .unwrap() + .as_any() + .downcast_ref::() + else { + continue; + }; + + // collect terms + let terms = (0..num_rows) + .map(|i| (column_data.value(i), i)) + .collect::>(); + if terms.is_empty() { + continue; + } + + // unique terms and get the min & max _timestamp + let column_uniq_terms = uniq_terms + .entry(column_name.to_string()) + .or_insert(BTreeMap::new()); + for (term, idx) in terms { + let term_time = time_data.value(idx); + let (min_ts, max_ts, ids) = column_uniq_terms.entry(term.to_string()).or_insert(( + term_time, + term_time, + Vec::new(), + )); + if *min_ts > term_time { + *min_ts = term_time; + } + if *max_ts < term_time { + *max_ts = term_time; + } + ids.push(idx + prev_total_num_rows); + } + } + } + + // build record batch let prefix_to_remove = format!("files/{}/{}/{}/", org_id, stream_type, stream_name); let file_name_without_prefix = new_file_key.trim_start_matches(&prefix_to_remove); let mut indexed_record_batches_to_merge = Vec::new(); - - // get _timestamp column - let Some(time_data) = inverted_idx_batch - .column_by_name(&cfg.common.column_timestamp) - .unwrap() - .as_any() - .downcast_ref::() - else { - return Ok(vec![]); - }; - - let num_rows = inverted_idx_batch.num_rows(); - // process full text search fields - for column in schema.fields().iter() { - let column_name = column.name(); - if !full_text_search_fields.contains(column_name) || column.data_type() != &DataType::Utf8 { - continue; - } - - // get full text search column - let Some(column_data) = inverted_idx_batch - .column_by_name(column_name) - .unwrap() - .as_any() - .downcast_ref::() - else { - continue; - }; - - // split the column into terms - let terms = (0..num_rows) - .flat_map(|i| { - split_token(column_data.value(i), &cfg.common.inverted_index_split_chars) - .into_iter() - .map(|s| (s, i)) - .collect::>() - }) - .collect::>(); - if terms.is_empty() { - continue; - } - - // unique terms and get the min & max _timestamp - let mut uniq_terms = BTreeMap::new(); - for (term, idx) in terms { - let term_time = time_data.value(idx); - let (min_ts, max_ts, ids) = - uniq_terms - .entry(term.to_string()) - .or_insert((term_time, term_time, Vec::new())); - if *min_ts > term_time { - *min_ts = term_time; - } - if *max_ts < term_time { - *max_ts = term_time; - } - ids.push(idx); - } - - // build record batch - let records_len = uniq_terms.len(); - let mut field_timestamp = Int64Builder::with_capacity(records_len); - let mut field_min_ts = Int64Builder::with_capacity(records_len); - let mut field_max_ts = Int64Builder::with_capacity(records_len); - let mut field_field = - StringBuilder::with_capacity(records_len, INDEX_FIELD_NAME_FOR_ALL.len() * records_len); - let mut field_term = StringBuilder::with_capacity( - records_len, - uniq_terms.iter().map(|x| x.0.len()).sum::(), - ); - let mut field_file_name = - StringBuilder::with_capacity(records_len, file_name_without_prefix.len() * records_len); - let mut field_count = Int64Builder::with_capacity(records_len); - let mut field_deleted = BooleanBuilder::with_capacity(records_len); - let mut field_segment_ids = BinaryBuilder::with_capacity(records_len, records_len); - for (term, (min_ts, max_ts, ids)) in uniq_terms { - field_timestamp.append_value(min_ts); - field_min_ts.append_value(min_ts); - field_max_ts.append_value(max_ts); - field_field.append_value(INDEX_FIELD_NAME_FOR_ALL); - field_term.append_value(term); - field_file_name.append_value(file_name_without_prefix); - field_count.append_value(ids.len() as i64); - field_deleted.append_value(false); - // calculate segment ids - let segment_ids = ids - .iter() - .map(|i| i / INDEX_SEGMENT_LENGTH) - .collect::>(); - let segment_num = (num_rows + INDEX_SEGMENT_LENGTH - 1) / INDEX_SEGMENT_LENGTH; - let mut bv = BitVec::with_capacity(segment_num); - for i in 0..segment_num { - bv.push(segment_ids.contains(&i)); - } - field_segment_ids.append_value(bv.into_vec()); - } - - let record_batch = RecordBatch::try_new( - new_schema.clone(), - vec![ - Arc::new(field_timestamp.finish()), - Arc::new(field_min_ts.finish()), - Arc::new(field_max_ts.finish()), - Arc::new(field_field.finish()), - Arc::new(field_term.finish()), - Arc::new(field_file_name.finish()), - Arc::new(field_count.finish()), - Arc::new(field_deleted.finish()), - Arc::new(field_segment_ids.finish()), - ], - ) - .map_err(|e| anyhow::anyhow!("RecordBatch::try_new error: {}", e))?; - indexed_record_batches_to_merge.push(record_batch); - } - - // process index fields - for column in schema.fields().iter() { - let column_name = column.name(); - if !index_fields.contains(column_name) || column.data_type() != &DataType::Utf8 { - continue; - } - - // get index column - let Some(column_data) = inverted_idx_batch - .column_by_name(column_name) - .unwrap() - .as_any() - .downcast_ref::() - else { - continue; - }; - - // split the column into terms - let terms = (0..num_rows) - .map(|i| (column_data.value(i), i)) - .collect::>(); - if terms.is_empty() { - continue; - } - - // unique terms and get the min & max _timestamp - let mut uniq_terms = BTreeMap::new(); - for (term, idx) in terms { - let term_time = time_data.value(idx); - let (min_ts, max_ts, ids) = - uniq_terms - .entry(term.to_string()) - .or_insert((term_time, term_time, Vec::new())); - if *min_ts > term_time { - *min_ts = term_time; - } - if *max_ts < term_time { - *max_ts = term_time; - } - ids.push(idx); - } - - // build record batch - let records_len = uniq_terms.len(); + for (column_name, column_uniq_terms) in uniq_terms { + let records_len = column_uniq_terms.len(); let mut field_timestamp = Int64Builder::with_capacity(records_len); let mut field_min_ts = Int64Builder::with_capacity(records_len); let mut field_max_ts = Int64Builder::with_capacity(records_len); @@ -1284,18 +1238,18 @@ fn prepare_index_record_batches( StringBuilder::with_capacity(records_len, column_name.len() * records_len); let mut field_term = StringBuilder::with_capacity( records_len, - uniq_terms.iter().map(|x| x.0.len()).sum::(), + column_uniq_terms.iter().map(|x| x.0.len()).sum::(), ); let mut field_file_name = StringBuilder::with_capacity(records_len, file_name_without_prefix.len() * records_len); let mut field_count = Int64Builder::with_capacity(records_len); let mut field_deleted = BooleanBuilder::with_capacity(records_len); let mut field_segment_ids = BinaryBuilder::with_capacity(records_len, records_len); - for (term, (min_ts, max_ts, ids)) in uniq_terms { + for (term, (min_ts, max_ts, ids)) in column_uniq_terms { field_timestamp.append_value(min_ts); field_min_ts.append_value(min_ts); field_max_ts.append_value(max_ts); - field_field.append_value(column_name); + field_field.append_value(&column_name); field_term.append_value(term); field_file_name.append_value(file_name_without_prefix); field_count.append_value(ids.len() as i64); @@ -1305,7 +1259,7 @@ fn prepare_index_record_batches( .iter() .map(|i| i / INDEX_SEGMENT_LENGTH) .collect::>(); - let segment_num = (num_rows + INDEX_SEGMENT_LENGTH - 1) / INDEX_SEGMENT_LENGTH; + let segment_num = (total_num_rows + INDEX_SEGMENT_LENGTH - 1) / INDEX_SEGMENT_LENGTH; let mut bv = BitVec::with_capacity(segment_num); for i in 0..segment_num { bv.push(segment_ids.contains(&i)); @@ -1338,15 +1292,19 @@ fn prepare_index_record_batches( /// Called by both ingester and compactor. Compactor needs to provide `file_list_to_invalidate` /// to delete previously created small index files pub(crate) async fn generate_fst_inverted_index( - inverted_idx_batch: RecordBatch, parquet_file_name: &str, full_text_search_fields: &[String], index_fields: &[String], file_list_to_invalidate: Option<&[FileKey]>, /* for compactor to delete corresponding small * .idx files */ + + schema: Arc, + reader: &mut ParquetRecordBatchStream>, ) -> Result<(), anyhow::Error> { + let start = std::time::Instant::now(); + let Some((compressed_bytes, file_meta)) = - prepare_fst_index_bytes(inverted_idx_batch, full_text_search_fields, index_fields)? + prepare_fst_index_bytes(schema, reader, full_text_search_fields, index_fields).await? else { log::info!("generate_fst_index_on_compactor creates empty index. skip"); return Ok(()); @@ -1380,182 +1338,181 @@ pub(crate) async fn generate_fst_inverted_index( match storage::put(&idx_file_name, Bytes::from(compressed_bytes)).await { Ok(_) => { log::info!( - "{} Written fst index file successfully compressed size {}, original size {}", + "{} Written fst index file successfully: {}, compressed size {}, original size {}, took: {} ms", caller, + idx_file_name, file_meta.compressed_size, file_meta.original_size, + start.elapsed().as_millis() ); Ok(()) } Err(e) => { - log::error!("{} Written fst index file error: {}", caller, e.to_string()); + log::error!( + "{} Written fst index file: {}, error: {}", + caller, + idx_file_name, + e.to_string() + ); Err(e) } } } /// Create and compressed inverted index bytes using FST solution for the given RecordBatch -pub(crate) fn prepare_fst_index_bytes( - inverted_idx_batch: RecordBatch, +pub(crate) async fn prepare_fst_index_bytes( + schema: Arc, + reader: &mut ParquetRecordBatchStream>, full_text_search_fields: &[String], index_fields: &[String], ) -> Result, FileMeta)>, anyhow::Error> { - let schema = inverted_idx_batch.schema(); + let cfg = get_config(); + let schema_fields = schema + .fields() + .iter() + .map(|f| (f.name(), f)) + .collect::>(); - let mut writer = Vec::new(); - let mut index_file_metas = IndexFileMetas::new(); - - let num_rows = inverted_idx_batch.num_rows(); - // Process full text search fields - let mut ft_indexer = ColumnIndexer::new(); - for column in schema.fields() { - let column_name = column.name(); - if !full_text_search_fields.contains(column_name) || column.data_type() != &DataType::Utf8 { - continue; - } - - let Some(column_data) = inverted_idx_batch - .column_by_name(column_name) - .unwrap() - .as_any() - .downcast_ref::() - else { - continue; + let mut total_num_rows = 0; + let mut uniq_terms: HashMap> = HashMap::new(); + loop { + let batch = reader.try_next().await?; + let Some(inverted_idx_batch) = batch else { + break; }; - - // split the column into terms - let terms = (0..num_rows) - .flat_map(|i| { - split_token( - column_data.value(i), - &config::get_config().common.inverted_index_split_chars, - ) - .into_iter() - .map(|s| (s, i)) - .collect::>() - }) - .collect::>(); - - if terms.is_empty() { + let num_rows = inverted_idx_batch.num_rows(); + if num_rows == 0 { continue; } - for (term, row_id) in terms { - let segment_id = row_id / INDEX_SEGMENT_LENGTH; - ft_indexer.push(term.as_bytes(), segment_id, term.len()); - } - } - // finish ft_indexer - if !ft_indexer.is_empty() { - let ft_index_meta = ft_indexer - .write(&mut writer) - .context("Error constructing FST ColumnIndex for full text search fields")?; - // add ft_indexer_meta to IndexFileMetas - index_file_metas - .metas - .insert(INDEX_FIELD_NAME_FOR_ALL.to_string(), ft_index_meta); - } + // update total_num_rows + let prev_total_num_rows = total_num_rows; + total_num_rows += num_rows; - // Process secondary index fields - for column in schema.fields() { - let column_name = column.name(); - if !index_fields.contains(column_name) || column.data_type() != &DataType::Utf8 { - continue; + // process full text search fields + for column_name in full_text_search_fields.iter() { + if !schema_fields.contains_key(column_name) { + continue; + } + if schema_fields.get(column_name).unwrap().data_type() != &DataType::Utf8 { + continue; + } + + // get full text search column + let Some(column_data) = inverted_idx_batch + .column_by_name(column_name) + .unwrap() + .as_any() + .downcast_ref::() + else { + continue; + }; + + // split the column into terms + let terms = (0..num_rows) + .flat_map(|i| { + split_token(column_data.value(i), &cfg.common.inverted_index_split_chars) + .into_iter() + .map(|s| (s, i)) + .collect::>() + }) + .collect::>(); + if terms.is_empty() { + continue; + } + + // unique terms and get the min & max _timestamp + let column_uniq_terms = uniq_terms + .entry(INDEX_FIELD_NAME_FOR_ALL.to_string()) + .or_insert(BTreeMap::new()); + for (term, idx) in terms { + let ids = column_uniq_terms + .entry(term.to_string()) + .or_insert(Vec::new()); + ids.push(idx + prev_total_num_rows); + } } - let Some(column_data) = inverted_idx_batch - .column_by_name(column_name) - .unwrap() - .as_any() - .downcast_ref::() - else { - continue; - }; + // process index fields + for column_name in index_fields.iter() { + if !schema_fields.contains_key(column_name) { + continue; + } + if schema_fields.get(column_name).unwrap().data_type() != &DataType::Utf8 { + continue; + } - let terms = (0..num_rows) - .map(|i| (column_data.value(i), i)) - .collect::>(); - if terms.is_empty() { - continue; - } + // get index column + let Some(column_data) = inverted_idx_batch + .column_by_name(column_name) + .unwrap() + .as_any() + .downcast_ref::() + else { + continue; + }; - let mut col_indexer = ColumnIndexer::new(); - for (term, row_id) in terms { - let segment_id = row_id / INDEX_SEGMENT_LENGTH; - col_indexer.push(term.as_bytes(), segment_id, term.len()); - } + // collect terms + let terms = (0..num_rows) + .map(|i| (column_data.value(i), i)) + .collect::>(); + if terms.is_empty() { + continue; + } - // finish col_indexer - if !col_indexer.is_empty() { - let col_index_meta = col_indexer.write(&mut writer).context(format!( - "Error constructing FST ColumnIndex for field {}", - column_name - ))?; - // add ft_indexer_meta to IndexFileMetas - index_file_metas - .metas - .insert(column_name.to_string(), col_index_meta); + // unique terms + let column_uniq_terms = uniq_terms + .entry(column_name.to_string()) + .or_insert(BTreeMap::new()); + for (term, idx) in terms { + let ids = column_uniq_terms + .entry(term.to_string()) + .or_insert(Vec::new()); + ids.push(idx + prev_total_num_rows); + } } } - // TODO(taiming): left for future improvement - write index files into file_list to show total - // index size - let mut file_meta = FileMeta { - min_ts: 0, - max_ts: 0, - records: 0, - original_size: 0, - compressed_size: 0, - flattened: false, - }; - - let _ = index_file_metas.finish(&mut writer)?; - let original_size = writer.len(); + // Process fst writer + let mut indexers = HashMap::new(); + for (column_name, terms) in uniq_terms { + let indexer = indexers + .entry(column_name) + .or_insert_with(ColumnIndexer::new); + for (term, ids) in terms { + for row_id in ids { + let segment_id = row_id / INDEX_SEGMENT_LENGTH; + indexer.push(term.as_bytes(), segment_id, term.len()); + } + } + } + // create puffin file let mut puffin_buf: Vec = Vec::new(); let mut puffin_writer = PuffinBytesWriter::new(&mut puffin_buf); - puffin_writer.add_blob(writer)?; + + let mut original_size = 0; + for (column_name, indexer) in indexers { + if indexer.is_empty() { + continue; + } + let mut buf = Vec::new(); + let _index_meta = indexer.write(&mut buf).context(format!( + "Error constructing FST ColumnIndex for field {}", + column_name + ))?; + original_size += buf.len(); + puffin_writer.add_blob(column_name, buf)?; + } + puffin_writer.finish()?; - file_meta.original_size = original_size as _; - file_meta.compressed_size = puffin_buf.len() as _; + // index size + let file_meta = FileMeta { + original_size: original_size as i64, + compressed_size: puffin_buf.len() as i64, + ..Default::default() + }; Ok(Some((puffin_buf, file_meta))) } - -#[allow(clippy::too_many_arguments)] -async fn move_single_file( - thread_id: usize, - trace_id: &str, - file: &FileKey, - stream_type: StreamType, - stream_name: &str, - buf: &mut Vec, -) -> datafusion::error::Result<(Arc, Vec)> { - let data = tmpfs::get(format!("{trace_id}/{}", &file.key).as_str()).map_err(|e| { - log::error!( - "[INGESTER:JOB:{thread_id}] merge small file: {}, err: {}", - file.key, - e - ); - datafusion::error::DataFusionError::Execution(e.to_string()) - })?; - - // copy data to buf - buf.extend_from_slice(&data); - - read_recordbatch_from_bytes(&data).await.map_err(|e| { - log::error!( - "[INGESTER:JOB:{thread_id}] read_recordbatch_from_bytes error for stream -> '{}/{}/{}'", - trace_id, - stream_type, - stream_name - ); - log::error!( - "[INGESTER:JOB:{thread_id}] read recordbatch for file: {}, err: {}", - file.key, - e - ); - datafusion::error::DataFusionError::Execution(e.to_string()) - }) -} diff --git a/src/report_server/src/report.rs b/src/report_server/src/report.rs index 234c06250..767bb550e 100644 --- a/src/report_server/src/report.rs +++ b/src/report_server/src/report.rs @@ -388,7 +388,7 @@ pub async fn send_email( let mut email = Message::builder() .from(config.from_email.parse()?) - .subject(format!("{}", &email_details.title)); + .subject(email_details.title.to_string()); for recipient in recipients { email = email.to(recipient.parse()?); diff --git a/src/service/alerts/alert.rs b/src/service/alerts/alert.rs index e3e5783b3..2db66484c 100644 --- a/src/service/alerts/alert.rs +++ b/src/service/alerts/alert.rs @@ -355,7 +355,7 @@ pub trait AlertExt: Sync + Send + 'static { start_time: Option, ) -> Result<(Option>>, i64), anyhow::Error>; - /// Returns a tuple containing a boolean - if all the send notification jobs succeeded + /// Returns a tuple containing a boolean - if all the send notification jobs successfully /// and the error message if any async fn send_notification( &self, @@ -556,7 +556,7 @@ pub async fn send_email_notification( let mut email = Message::builder() .from(cfg.smtp.smtp_from_email.parse()?) - .subject(format!("{}", email_subject)); + .subject(email_subject.to_string()); for recipient in recipients { email = email.to(recipient.parse()?); diff --git a/src/service/compact/file_list.rs b/src/service/compact/file_list.rs index f0eb6af85..815b1005e 100644 --- a/src/service/compact/file_list.rs +++ b/src/service/compact/file_list.rs @@ -327,7 +327,7 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> { match storage::put(&file_name, compressed_bytes.into()).await { Ok(_) => { log::info!( - "[COMPACT] file_list merge succeed, {} files into a new file: {}", + "[COMPACT] file_list merge successfully, {} files into a new file: {}", file_list.len(), file_name ); diff --git a/src/service/compact/merge.rs b/src/service/compact/merge.rs index 0654b8764..bd64355d4 100644 --- a/src/service/compact/merge.rs +++ b/src/service/compact/merge.rs @@ -13,34 +13,37 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::{collections::HashMap, io::Write, sync::Arc}; +use std::{io::Write, sync::Arc}; use ::datafusion::{arrow::datatypes::Schema, error::DataFusionError}; use arrow::array::RecordBatch; +use arrow_schema::{DataType, Field}; use bytes::Bytes; use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc}; use config::{ cluster::LOCAL_NODE, - get_config, ider, + get_config, ider, is_local_disk_storage, meta::{ inverted_index::InvertedIndexFormat, + search::StorageType, stream::{FileKey, FileMeta, MergeStrategy, PartitionTimeLevel, StreamStats, StreamType}, }, metrics, utils::{ json, parquet::{ - parse_file_key_columns, read_recordbatch_from_bytes, write_recordbatch_to_parquet, + get_recordbatch_reader_from_bytes, parse_file_key_columns, read_schema_from_bytes, }, - record_batch_ext::{concat_batches, merge_record_batches}, + record_batch_ext::concat_batches, schema_ext::SchemaExt, time::hour_micros, }, FILE_EXT_PARQUET, }; -use hashbrown::HashSet; +use hashbrown::{HashMap, HashSet}; use infra::{ - cache, dist_lock, file_list as infra_file_list, + cache::file_data, + dist_lock, file_list as infra_file_list, schema::{ get_stream_setting_bloom_filter_fields, get_stream_setting_fts_fields, get_stream_setting_index_fields, unwrap_partition_time_level, unwrap_stream_settings, @@ -57,9 +60,7 @@ use crate::{ common::infra::cluster::get_node_by_uuid, job::files::parquet::{generate_fst_inverted_index, generate_index_on_compactor}, service::{ - db, file_list, - schema::generate_schema_for_defined_schema_fields, - search::datafusion::{self, file_type::FileType}, + db, file_list, schema::generate_schema_for_defined_schema_fields, search::datafusion::exec, stream, }, }; @@ -429,7 +430,8 @@ pub async fn merge_by_stream( let cfg = get_config(); // sort by file size let mut files_with_size = files_with_size.to_owned(); - match MergeStrategy::from(&cfg.compact.strategy) { + let job_strategy = MergeStrategy::from(&cfg.compact.strategy); + match job_strategy { MergeStrategy::FileSize => { files_with_size.sort_by(|a, b| a.meta.original_size.cmp(&b.meta.original_size)); } @@ -451,7 +453,12 @@ pub async fn merge_by_stream( for file in files_with_size.iter() { if new_file_size + file.meta.original_size > cfg.compact.max_file_size as i64 { if new_file_list.len() <= 1 { - break; // no files need to merge + if job_strategy == MergeStrategy::FileSize { + break; + } + new_file_size = 0; + new_file_list.clear(); + continue; // this batch don't need to merge, skip } batch_groups.push(MergeBatch { batch_id: batch_groups.len(), @@ -608,9 +615,7 @@ pub async fn merge_files( let mut new_file_size = 0; let mut new_compressed_file_size = 0; - let mut total_records = 0; let mut new_file_list = Vec::new(); - let mut deleted_files = Vec::new(); let cfg = get_config(); for file in files_with_size.iter() { if new_file_size + file.meta.original_size > cfg.compact.max_file_size as i64 @@ -621,7 +626,6 @@ pub async fn merge_files( } new_file_size += file.meta.original_size; new_compressed_file_size += file.meta.compressed_size; - total_records += file.meta.records; new_file_list.push(file.clone()); // metrics metrics::COMPACT_MERGED_FILES @@ -638,40 +642,8 @@ pub async fn merge_files( let retain_file_list = new_file_list.clone(); - // write parquet files into tmpfs - let tmp_dir = cache::tmpfs::Directory::default(); - let mut fi = 0; - for file in new_file_list.iter() { - fi += 1; - log::info!("[COMPACT:{thread_id}:{fi}] merge small file: {}", &file.key); - let data = match storage::get(&file.key).await { - Ok(body) => body, - Err(err) => { - log::error!( - "[COMPACT:{thread_id}] merge small file: {}, err: {}", - &file.key, - err - ); - if err.to_string().to_lowercase().contains("not found") { - // delete file from file list - if let Err(err) = file_list::delete_parquet_file(&file.key, true).await { - log::error!( - "[COMPACT:{thread_id}] delete file: {}, from file_list err: {}", - &file.key, - err - ); - } - } - // QUESTION(taiming): since parquet files deleted, should we delete previously - // created fst index files as well if there's any? - deleted_files.push(file.key.clone()); - total_records -= file.meta.records; - new_file_size -= file.meta.original_size; - continue; - } - }; - tmp_dir.set(&file.key, data)?; - } + // cache parquet files + let deleted_files = cache_remote_files(&new_file_list).await?; if !deleted_files.is_empty() { new_file_list.retain(|f| !deleted_files.contains(&f.key)); } @@ -682,7 +654,8 @@ pub async fn merge_files( // get time range for these files let min_ts = new_file_list.iter().map(|f| f.meta.min_ts).min().unwrap(); let max_ts = new_file_list.iter().map(|f| f.meta.max_ts).max().unwrap(); - + let total_records = new_file_list.iter().map(|f| f.meta.records).sum(); + let new_file_size = new_file_list.iter().map(|f| f.meta.original_size).sum(); let mut new_file_meta = FileMeta { min_ts, max_ts, @@ -692,13 +665,16 @@ pub async fn merge_files( flattened: false, }; if new_file_meta.records == 0 { - return Err(anyhow::anyhow!("merge_parquet_files error: records is 0")); + return Err(anyhow::anyhow!("merge_files error: records is 0")); } - // convert the file to the latest version of schema + // get latest version of schema let schema_latest = infra::schema::get(org_id, stream_name, stream_type).await?; - let stream_setting = infra::schema::get_settings(org_id, stream_name, stream_type).await; - let (defined_schema_fields, need_original) = match stream_setting { + let stream_settings = infra::schema::get_settings(org_id, stream_name, stream_type).await; + let bloom_filter_fields = get_stream_setting_bloom_filter_fields(&stream_settings); + let full_text_search_fields = get_stream_setting_fts_fields(&stream_settings); + let index_fields = get_stream_setting_index_fields(&stream_settings); + let (defined_schema_fields, need_original) = match stream_settings { Some(s) => ( s.defined_schema_fields.unwrap_or_default(), s.store_original_data, @@ -717,102 +693,75 @@ pub async fn merge_files( Arc::new(schema_latest) }; - let schema_versions = - infra::schema::get_versions(org_id, stream_name, stream_type, Some((min_ts, max_ts))) - .await?; - let schema_latest_id = schema_versions.len() - 1; - let schema_settings = unwrap_stream_settings(&schema_latest); - let bloom_filter_fields = get_stream_setting_bloom_filter_fields(&schema_settings); - let full_text_search_fields = get_stream_setting_fts_fields(&schema_settings); - let index_fields = get_stream_setting_index_fields(&schema_settings); - if cfg.common.widening_schema_evolution && schema_versions.len() > 1 { - for file in new_file_list.iter() { - // get the schema version of the file - let schema_ver_id = match db::schema::filter_schema_version_id( - &schema_versions, - file.meta.min_ts, - file.meta.max_ts, - ) { - Some(id) => id, - None => { - log::error!( - "[COMPACT:{thread_id}] merge small file: {}, schema version not found, min_ts: {}, max_ts: {}", - &file.key, - file.meta.min_ts, - file.meta.max_ts - ); - // HACK: use the latest version if not found in schema versions - schema_latest_id - } - }; - if schema_ver_id == schema_latest_id { - continue; - } - // calculate the diff between latest schema and current schema - let schema = schema_versions[schema_ver_id] - .clone() - .with_metadata(HashMap::new()); - let mut diff_fields = hashbrown::HashMap::new(); - let cur_fields = schema.fields(); - for field in cur_fields { - if let Ok(v) = schema_latest.field_with_name(field.name()) { - if v.data_type() != field.data_type() { - diff_fields.insert(v.name().clone(), v.data_type().clone()); - } - } - } - if diff_fields.is_empty() { - continue; - } - - // do the convert - let mut buf = Vec::new(); - let file_tmp_dir = cache::tmpfs::Directory::default(); - let file_data = tmp_dir.get(&file.key)?; - if file_data.is_empty() { - // delete file from file list - log::warn!("found invalid file: {}", file.key); - if let Err(err) = file_list::delete_parquet_file(&file.key, true).await { - log::error!( - "[COMPACT:{thread_id}] delete file: {}, from file_list err: {}", - &file.key, - err - ); - } - return Err(anyhow::anyhow!("merge_files error: file data is empty")); - } - file_tmp_dir.set(&file.key, file_data)?; - datafusion::exec::convert_parquet_file( - file_tmp_dir.name(), - &mut buf, - Arc::new(schema), - &bloom_filter_fields, - diff_fields, - FileType::PARQUET, - ) - .await - .map_err(|e| { - DataFusionError::Plan(format!("convert_parquet_file {}, err: {}", &file.key, e)) - })?; - - // replace the file in tmpfs - tmp_dir.set(&file.key, buf.into())?; + // read schema from parquet file and group files by schema + let mut schemas = HashMap::new(); + let mut file_groups = HashMap::new(); + let mut fi = 0; + for file in new_file_list.iter() { + fi += 1; + log::info!("[COMPACT:{thread_id}:{fi}] merge small file: {}", &file.key); + let buf = file_data::get(&file.key, None).await?; + let schema = read_schema_from_bytes(&buf).await?; + let schema = schema.as_ref().clone().with_metadata(Default::default()); + let schema_key = schema.hash_key(); + if !schemas.contains_key(&schema_key) { + schemas.insert(schema_key.clone(), schema); + file_groups.insert(schema_key.clone(), vec![]); } + let entry = file_groups.get_mut(&schema_key).unwrap(); + entry.push(file.clone()); + } + + // generate the final schema + let all_fields = schemas + .values() + .flat_map(|s| s.fields().iter().map(|f| f.name().to_string())) + .collect::>(); + let schema_latest = Arc::new(schema_latest.retain(all_fields)); + let mut schema_latest_fields = HashMap::with_capacity(schema_latest.fields().len()); + for field in schema_latest.fields() { + schema_latest_fields.insert(field.name(), field); + } + + // generate datafusion tables + let mut tables = Vec::new(); + let trace_id = ider::generate(); + for (schema_key, files) in file_groups { + if files.is_empty() { + continue; + } + let schema = schemas.get(&schema_key).unwrap().clone(); + let session = config::meta::search::Session { + id: format!("{trace_id}-{schema_key}"), + storage_type: StorageType::Memory, + work_group: None, + target_partitions: 0, + }; + + let diff_fields = generate_schema_diff(&schema, &schema_latest_fields)?; + let table = exec::create_parquet_table( + &session, + schema_latest.clone(), + &files, + diff_fields, + true, + None, + ) + .await?; + tables.push(table); } let start = std::time::Instant::now(); - let merge_result = if stream_type.is_basic_type() { - merge_parquet_files(thread_id, tmp_dir.name(), schema_latest.clone()).await - } else { - datafusion::exec::merge_parquet_files( - tmp_dir.name(), - stream_type, - stream_name, - schema_latest.clone(), - ) - .await - }; - let (new_schema, new_batches) = merge_result.map_err(|e| { + let merge_result = exec::merge_parquet_files( + stream_type, + stream_name, + schema_latest.clone(), + tables, + &bloom_filter_fields, + &new_file_meta, + ) + .await; + let (_new_schema, buf) = merge_result.map_err(|e| { let files = new_file_list.into_iter().map(|f| f.key).collect::>(); log::error!( "merge_parquet_files err: {}, files: {:?}, schema: {:?}", @@ -824,13 +773,6 @@ pub async fn merge_files( DataFusionError::Plan(format!("merge_parquet_files err: {:?}", e)) })?; - let buf = write_recordbatch_to_parquet( - new_schema.clone(), - &new_batches, - &bloom_filter_fields, - &new_file_meta, - ) - .await?; new_file_meta.compressed_size = buf.len() as i64; if new_file_meta.compressed_size == 0 { return Err(anyhow::anyhow!( @@ -841,7 +783,7 @@ pub async fn merge_files( let id = ider::generate(); let new_file_key = format!("{prefix}/{id}{}", FILE_EXT_PARQUET); log::info!( - "[COMPACT:{thread_id}] merge file succeeded, {} files into a new file: {}, original_size: {}, compressed_size: {}, took: {} ms", + "[COMPACT:{thread_id}] merge file successfully, {} files into a new file: {}, original_size: {}, compressed_size: {}, took: {} ms", retain_file_list.len(), new_file_key, new_file_meta.original_size, @@ -849,89 +791,83 @@ pub async fn merge_files( start.elapsed().as_millis(), ); + // upload file to storage let buf = Bytes::from(buf); - // upload file - match storage::put(&new_file_key, buf.clone()).await { - Ok(_) => { - if cfg.common.inverted_index_enabled && stream_type.is_basic_type() { - // generate inverted index RecordBatch - if let Some(inverted_idx_batch) = generate_inverted_idx_recordbatch( - schema_latest.clone(), - &new_batches, - stream_type, - &full_text_search_fields, - &index_fields, - )? { - let index_format = - InvertedIndexFormat::from(&cfg.common.inverted_index_store_format); - if matches!( - index_format, - InvertedIndexFormat::Parquet | InvertedIndexFormat::Both - ) { - let files = generate_index_on_compactor( - &retain_file_list, - inverted_idx_batch.clone(), - new_file_key.clone(), - org_id, - stream_type, - stream_name, - &full_text_search_fields, - &index_fields, - ) - .await - .map_err(|e| { - anyhow::anyhow!( - "generate_index_on_compactor error: {}, need delete files: {:?}", - e, - retain_file_list - ) - })?; - for (file_name, filemeta) in files { - log::info!( - "Created parquet index file during compaction {}", - file_name - ); - // Notify that we wrote the index file to the db. - if let Err(e) = write_file_list( - org_id, - &[FileKey { - key: file_name.clone(), - meta: filemeta, - deleted: false, - segment_ids: None, - }], - ) - .await - { - log::error!( - "generate_index_on_compactor write to file list: {}, error: {}, need delete files: {:?}", - file_name, - e.to_string(), - retain_file_list - ); - } - } - } - if matches!( - index_format, - InvertedIndexFormat::FST | InvertedIndexFormat::Both - ) { - // generate fst inverted index and write to storage - generate_fst_inverted_index( - inverted_idx_batch, - &new_file_key, - &full_text_search_fields, - &index_fields, - Some(&retain_file_list), - ) - .await?; - } - } - } - Ok((new_file_key, new_file_meta, retain_file_list)) - } - Err(e) => Err(e), + storage::put(&new_file_key, buf.clone()).await?; + + if !cfg.common.inverted_index_enabled || !stream_type.is_basic_type() { + return Ok((new_file_key, new_file_meta, retain_file_list)); } + + // generate parquet format inverted index + let index_format = InvertedIndexFormat::from(&cfg.common.inverted_index_store_format); + if matches!( + index_format, + InvertedIndexFormat::Parquet | InvertedIndexFormat::Both + ) { + let (schema, mut reader) = get_recordbatch_reader_from_bytes(&buf).await?; + let files = generate_index_on_compactor( + &retain_file_list, + &new_file_key, + org_id, + stream_type, + stream_name, + &full_text_search_fields, + &index_fields, + schema, + &mut reader, + ) + .await + .map_err(|e| { + anyhow::anyhow!( + "generate_index_on_compactor error: {}, need delete files: {:?}", + e, + retain_file_list + ) + })?; + for (file_name, filemeta) in files { + log::info!("Created parquet index file during compaction {}", file_name); + // Notify that we wrote the index file to the db. + if let Err(e) = write_file_list( + org_id, + &[FileKey { + key: file_name.clone(), + meta: filemeta, + deleted: false, + segment_ids: None, + }], + ) + .await + { + log::error!( + "generate_index_on_compactor write to file list: {}, error: {}, need delete files: {:?}", + file_name, + e.to_string(), + retain_file_list + ); + } + } + } + + // generate fst format inverted index + if matches!( + index_format, + InvertedIndexFormat::FST | InvertedIndexFormat::Both + ) { + let (schema, mut reader) = get_recordbatch_reader_from_bytes(&buf).await?; + // generate fst inverted index and write to storage + generate_fst_inverted_index( + &new_file_key, + &full_text_search_fields, + &index_fields, + Some(&retain_file_list), + schema, + &mut reader, + ) + .await?; + } + + Ok((new_file_key, new_file_meta, retain_file_list)) } async fn write_file_list(org_id: &str, events: &[FileKey]) -> Result<(), anyhow::Error> { @@ -1174,22 +1110,13 @@ pub fn generate_inverted_idx_recordbatch( if inverted_idx_batches.is_empty() { Ok(None) } else { - let mut new_batch = if inverted_idx_batches.len() == 1 { + let new_batch = if inverted_idx_batches.len() == 1 { inverted_idx_batches.remove(0) } else { let new_schema = inverted_idx_batches.first().unwrap().schema(); concat_batches(new_schema, inverted_idx_batches).map_err(anyhow::Error::from)? }; - let mut null_columns = 0; - for i in 0..new_batch.num_columns() { - let ni = i - null_columns; - if new_batch.column(ni).null_count() == new_batch.num_rows() { - new_batch.remove_column(ni); - null_columns += 1; - } - } - if matches!( new_batch.schema().fields().len(), 0 | 1 if new_batch.schema().field(0).name() == &cfg.common.column_timestamp @@ -1201,57 +1128,85 @@ pub fn generate_inverted_idx_recordbatch( } } -pub async fn merge_parquet_files( - thread_id: usize, - trace_id: &str, - schema: Arc, -) -> ::datafusion::error::Result<(Arc, Vec)> { - let start = std::time::Instant::now(); +async fn cache_remote_files(files: &[FileKey]) -> Result, anyhow::Error> { + let cfg = get_config(); + let scan_size = files.iter().map(|f| f.meta.compressed_size).sum::(); + if is_local_disk_storage() + || !cfg.disk_cache.enabled + || scan_size >= cfg.disk_cache.skip_size as i64 + { + return Ok(Vec::new()); + }; - // get record batches from tmpfs - let temp_files = infra::cache::tmpfs::list(trace_id, "parquet").map_err(|e| { - log::error!( - "[MERGE:JOB:{thread_id}] merge small files failed at getting temp files. Error {}", - e - ); - DataFusionError::Execution(e.to_string()) - })?; - - let mut record_batches = Vec::new(); - let mut shared_fields = HashSet::new(); - for file in temp_files { - let bytes = infra::cache::tmpfs::get(&file.location).map_err(|e| { - log::error!( - "[MERGE:JOB:{thread_id}] merge small files failed at reading temp files to bytes. Error {}", - e - ); - DataFusionError::Execution(e.to_string()) - })?; - - let (file_schema, batches) = read_recordbatch_from_bytes(&bytes).await.map_err(|e| { - log::error!("[MERGE:JOB:{thread_id}] read_recordbatch_from_bytes error"); - log::error!( - "[MERGE:JOB:{thread_id}] read_recordbatch_from_bytes error for file: {}, err: {}", - file.location, - e - ); - DataFusionError::Execution(e.to_string()) - })?; - record_batches.extend(batches); - shared_fields.extend(file_schema.fields().iter().map(|f| f.name().to_string())); + let mut tasks = Vec::new(); + let semaphore = std::sync::Arc::new(Semaphore::new(cfg.limit.query_thread_num)); + for file in files.iter() { + let file_name = file.key.to_string(); + let permit = semaphore.clone().acquire_owned().await.unwrap(); + let task: tokio::task::JoinHandle> = tokio::task::spawn(async move { + let ret = if !file_data::disk::exist(&file_name).await { + file_data::disk::download("", &file_name).await.err() + } else { + None + }; + // In case where the parquet file is not found or has no data, we assume that it + // must have been deleted by some external entity, and hence we + // should remove the entry from file_list table. + let file_name = if let Some(e) = ret { + if e.to_string().to_lowercase().contains("not found") + || e.to_string().to_lowercase().contains("data size is zero") + { + // delete file from file list + log::warn!("found invalid file: {}", file_name); + if let Err(e) = file_list::delete_parquet_file(&file_name, true).await { + log::error!("[COMPACT] delete from file_list err: {}", e); + } + Some(file_name) + } else { + log::warn!("[COMPACT] download file to cache err: {}", e); + None + } + } else { + None + }; + drop(permit); + file_name + }); + tasks.push(task); } - // create new schema with the shared fields - let schema = Arc::new(schema.retain(shared_fields)); + let mut delete_files = Vec::new(); + for task in tasks { + match task.await { + Ok(file) => { + if let Some(file) = file { + delete_files.push(file); + } + } + Err(e) => { + log::error!("[COMPACT] load file task err: {}", e); + } + } + } - // merge record batches, the record batch have same schema - let (schema, new_record_batches) = - merge_record_batches("MERGE", thread_id, schema, record_batches)?; - - log::info!( - "[MERGE:JOB:{thread_id}] merge_parquet_files took {} ms", - start.elapsed().as_millis() - ); - - Ok((schema, vec![new_record_batches])) + Ok(delete_files) +} + +// generate parquet file compact schema +fn generate_schema_diff( + schema: &Schema, + schema_latest_map: &HashMap<&String, &Arc>, +) -> Result, anyhow::Error> { + // calculate the diff between latest schema and group schema + let mut diff_fields = HashMap::new(); + + for field in schema.fields().iter() { + if let Some(latest_field) = schema_latest_map.get(field.name()) { + if field.data_type() != latest_field.data_type() { + diff_fields.insert(field.name().clone(), latest_field.data_type().clone()); + } + } + } + + Ok(diff_fields) } diff --git a/src/service/dashboards/reports.rs b/src/service/dashboards/reports.rs index ee3847a32..af7729c6d 100644 --- a/src/service/dashboards/reports.rs +++ b/src/service/dashboards/reports.rs @@ -379,7 +379,7 @@ impl Report { let mut email = Message::builder() .from(cfg.smtp.smtp_from_email.parse()?) - .subject(format!("{}", &self.title)); + .subject(self.title.to_string()); for recipient in recipients { email = email.to(recipient.parse()?); diff --git a/src/service/db/file_list/remote.rs b/src/service/db/file_list/remote.rs index 2458bfe4f..48cc14680 100644 --- a/src/service/db/file_list/remote.rs +++ b/src/service/db/file_list/remote.rs @@ -97,7 +97,7 @@ pub async fn cache(prefix: &str, force: bool) -> Result<(), anyhow::Error> { stats.caching_time = start.elapsed().as_millis() as usize; log::info!( - "Load file_list [{prefix}] load {}:{} done, download: {}ms, caching: {}ms", + "Load file_list [{prefix}] load {}:{} done, download: {} ms, caching: {} ms", files_num, stats.file_count, stats.download_time, diff --git a/src/service/search/cluster/flight.rs b/src/service/search/cluster/flight.rs index 9aa9bc206..43253c97e 100644 --- a/src/service/search/cluster/flight.rs +++ b/src/service/search/cluster/flight.rs @@ -134,6 +134,12 @@ pub async fn search( return Err(Error::Message("no querier node online".to_string())); } + log::info!( + "[trace_id {trace_id}] flight->search: get nodes num: {}, querier num: {}", + nodes.len(), + querier_num, + ); + // waiting in work group queue metrics::QUERY_PENDING_NUMS .with_label_values(&[&req.org_id]) diff --git a/src/service/search/datafusion/exec.rs b/src/service/search/datafusion/exec.rs index e2b31f4f9..d7dff3204 100644 --- a/src/service/search/datafusion/exec.rs +++ b/src/service/search/datafusion/exec.rs @@ -26,14 +26,11 @@ use config::{ PARQUET_BATCH_SIZE, }; use datafusion::{ - arrow::{ - datatypes::{DataType, Schema}, - record_batch::RecordBatch, - }, + arrow::datatypes::{DataType, Schema}, catalog::TableProvider, common::Column, datasource::{ - file_format::{json::JsonFormat, parquet::ParquetFormat}, + file_format::parquet::ParquetFormat, listing::{ListingOptions, ListingTableConfig, ListingTableUrl}, object_store::{DefaultObjectStoreRegistry, ObjectStoreRegistry}, }, @@ -47,8 +44,10 @@ use datafusion::{ }, logical_expr::AggregateUDF, optimizer::OptimizerRule, + physical_plan::execute_stream, prelude::{Expr, SessionContext}, }; +use futures::TryStreamExt; use hashbrown::HashMap; #[cfg(feature = "enterprise")] use o2_enterprise::enterprise::{ @@ -59,112 +58,26 @@ use super::{ file_type::{FileType, GetExt}, optimizer::join_reorder::JoinReorderRule, storage::file_list, - table_provider::NewListingTable, + table_provider::{uniontable::NewUnionTable, NewListingTable}, udf::transform_udf::get_all_transform, }; const DATAFUSION_MIN_MEM: usize = 1024 * 1024 * 256; // 256MB const DATAFUSION_MIN_PARTITION: usize = 2; // CPU cores -pub async fn convert_parquet_file( - trace_id: &str, - buf: &mut Vec, - schema: Arc, - bloom_filter_fields: &[String], - rules: HashMap, - file_type: FileType, -) -> Result<()> { - let start = std::time::Instant::now(); - let cfg = get_config(); - - let query_sql = format!( - "SELECT * FROM tbl ORDER BY {} DESC", - cfg.common.column_timestamp - ); // select_wildcard -> without_optimizer - - // query data - let ctx = prepare_datafusion_context(None, vec![], false, 0).await?; - - // Configure listing options - let listing_options = match file_type { - FileType::PARQUET => { - let file_format = ParquetFormat::default(); - ListingOptions::new(Arc::new(file_format)) - .with_file_extension(FileType::PARQUET.get_ext()) - .with_target_partitions(ctx.state().config().target_partitions()) - } - FileType::JSON => { - let file_format = JsonFormat::default(); - ListingOptions::new(Arc::new(file_format)) - .with_file_extension(FileType::JSON.get_ext()) - .with_target_partitions(ctx.state().config().target_partitions()) - } - _ => { - return Err(DataFusionError::Execution(format!( - "Unsupported file type scheme {file_type:?}", - ))); - } - }; - - let prefix = match ListingTableUrl::parse(format!("tmpfs:///{trace_id}/")) { - Ok(url) => url, - Err(e) => { - return Err(datafusion::error::DataFusionError::Execution(format!( - "ListingTableUrl error: {e}" - ))); - } - }; - - let config = ListingTableConfig::new(prefix) - .with_listing_options(listing_options) - .with_schema(schema.clone()); - - let table = NewListingTable::try_new(config, rules)?; - ctx.register_table("tbl", Arc::new(table))?; - - // get all sorted data - let df = match ctx.sql(&query_sql).await { - Ok(df) => df, - Err(e) => { - log::error!( - "convert sql execute failed, sql: {}, err: {:?}", - query_sql, - e - ); - return Err(e); - } - }; - let schema: Schema = df.schema().into(); - let schema = Arc::new(schema); - let batches = df.collect().await?; - let file_meta = FileMeta::default(); - let mut writer = new_parquet_writer(buf, &schema, bloom_filter_fields, &file_meta); - for batch in batches { - writer.write(&batch).await?; - } - writer.close().await?; - ctx.deregister_table("tbl")?; - drop(ctx); - - log::info!( - "convert_parquet_file took {} ms", - start.elapsed().as_millis() - ); - - Ok(()) -} - pub async fn merge_parquet_files( - trace_id: &str, stream_type: StreamType, stream_name: &str, schema: Arc, -) -> Result<(Arc, Vec)> { + tables: Vec>, + bloom_filter_fields: &[String], + metadata: &FileMeta, +) -> Result<(Arc, Vec)> { let start = std::time::Instant::now(); let cfg = get_config(); // get all sorted data - let query_sql = if stream_type == StreamType::Index { + let sql = if stream_type == StreamType::Index { format!( "SELECT * FROM tbl WHERE file_name NOT IN (SELECT file_name FROM tbl WHERE deleted IS TRUE) ORDER BY {} DESC", cfg.common.column_timestamp @@ -185,24 +98,40 @@ pub async fn merge_parquet_files( }; // create datafusion context - let ctx = prepare_datafusion_context(None, vec![], false, 0).await?; + let sort_by_timestamp_desc = true; + let target_partitions = cfg.limit.cpu_num; + let ctx = + prepare_datafusion_context(None, vec![], sort_by_timestamp_desc, target_partitions).await?; + // register union table + let union_table = Arc::new(NewUnionTable::try_new(schema.clone(), tables)?); + ctx.register_table("tbl", union_table)?; - // Configure listing options - let file_format = ParquetFormat::default(); - let listing_options = ListingOptions::new(Arc::new(file_format)) - .with_file_extension(FileType::PARQUET.get_ext()) - .with_target_partitions(ctx.state().config().target_partitions()); - let prefix = ListingTableUrl::parse(format!("tmpfs:///{trace_id}/"))?; - let config = ListingTableConfig::new(prefix) - .with_listing_options(listing_options) - .with_schema(schema.clone()); - let table = Arc::new(NewListingTable::try_new(config, HashMap::default())?); - ctx.register_table("tbl", table.clone())?; + let plan = ctx.state().create_logical_plan(&sql).await?; + let physical_plan = ctx.state().create_physical_plan(&plan).await?; + let schema = physical_plan.schema(); - let df = ctx.sql(&query_sql).await?; - let schema: Schema = df.schema().into(); - let schema = Arc::new(schema); - let batches = df.collect().await?; + // write result to parquet file + let mut buf = Vec::new(); + let mut writer = new_parquet_writer(&mut buf, &schema, bloom_filter_fields, metadata); + let mut batch_stream = execute_stream(physical_plan, ctx.task_ctx())?; + loop { + match batch_stream.try_next().await { + Ok(Some(batch)) => { + if let Err(e) = writer.write(&batch).await { + log::error!("merge_parquet_files write Error: {}", e); + return Err(e.into()); + } + } + Ok(None) => { + break; + } + Err(e) => { + log::error!("merge_parquet_files execute stream Error: {}", e); + return Err(e); + } + } + } + writer.close().await?; ctx.deregister_table("tbl")?; drop(ctx); @@ -212,7 +141,7 @@ pub async fn merge_parquet_files( start.elapsed().as_millis() ); - Ok((schema, batches)) + Ok((schema, buf)) } pub fn create_session_config( diff --git a/src/service/search/grpc/storage.rs b/src/service/search/grpc/storage.rs index 92711da35..bd5a5d861 100644 --- a/src/service/search/grpc/storage.rs +++ b/src/service/search/grpc/storage.rs @@ -135,7 +135,7 @@ pub async fn search( ) .await?; log::info!( - "[trace_id {}] search->storage: stream {}/{}/{}, FST inverted index reduced file_list num to {} in {}ms", + "[trace_id {}] search->storage: stream {}/{}/{}, FST inverted index reduced file_list num to {} in {} ms", query.trace_id, query.org_id, query.stream_type, @@ -516,13 +516,13 @@ async fn filter_file_list_by_inverted_index( let full_text_term_clone = full_text_terms.clone(); let index_terms_clone = index_terms.clone(); let file_name = file.clone(); - let trace_id_clone = query.trace_id.to_string(); + let trace_id = query.trace_id.to_string(); let permit = semaphore.clone().acquire_owned().await.unwrap(); // Spawn a task for each file, wherein full text search and // secondary index search queries are executed let task = tokio::task::spawn(async move { let res = inverted_index_search_in_file( - trace_id_clone.as_str(), + &trace_id, &file_name, full_text_term_clone, index_terms_clone, @@ -535,10 +535,10 @@ async fn filter_file_list_by_inverted_index( tasks.push(task) } - for result in try_join_all(tasks) + let results = try_join_all(tasks) .await - .map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(e.to_string())))? - { + .map_err(|e| Error::ErrorCode(ErrorCodes::ServerInternalError(e.to_string())))?; + for result in results { // Each result corresponds to a file in the file list match result { Ok((file_name, bitvec)) => { @@ -551,8 +551,8 @@ async fn filter_file_list_by_inverted_index( // we expect each file name has atleast 1 file .unwrap(); file.segment_ids = Some(res.clone().into_vec()); - log::info!( - "[trace_id {}] search->storage: Final bitmap for fts_terms {:?} and index_terms: {:?} length {}", + log::debug!( + "[trace_id {}] search->storage: Final bitmap for fts_terms {:?} and index_terms: {:?} total: {}", query.trace_id, *full_text_terms, index_terms, @@ -560,7 +560,7 @@ async fn filter_file_list_by_inverted_index( ); } else { // if the bitmap is empty then we remove the file from the list - log::info!( + log::debug!( "[trace_id {}] search->storage: no match found in index for file {}", query.trace_id, file_name @@ -628,26 +628,23 @@ async fn inverted_index_search_in_file( Ok(bytes) => bytes, }; - let mut index_reader = create_index_reader_from_puffin_bytes(compressed_index_blob).await?; - let file_meta = index_reader.metadata().await?; - let mut res = BitVec::new(); - - if let Some(column_index_meta) = &file_meta.metas.get(INDEX_FIELD_NAME_FOR_ALL) { - // TODO: Add Eq and check performance + let mut index_reader = create_index_reader_from_puffin_bytes(compressed_index_blob).await?; + if let Some(mut field_reader) = index_reader.field(INDEX_FIELD_NAME_FOR_ALL).await? { + let column_index_meta = field_reader.metadata().await?; let matched_bv = match cfg.common.full_text_search_type.as_str() { "eq" => { - let mut searcher = ExactSearch::new(fts_terms.as_ref(), column_index_meta); - searcher.search(&mut index_reader).await + let mut searcher = ExactSearch::new(fts_terms.as_ref(), &column_index_meta); + searcher.search(&mut field_reader).await } "contains" => { - let mut searcher = SubstringSearch::new(fts_terms.as_ref(), column_index_meta); - searcher.search(&mut index_reader).await + let mut searcher = SubstringSearch::new(fts_terms.as_ref(), &column_index_meta); + searcher.search(&mut field_reader).await } // Default to prefix search _ => { - let mut searcher = PrefixSearch::new(fts_terms.as_ref(), column_index_meta); - searcher.search(&mut index_reader).await + let mut searcher = PrefixSearch::new(fts_terms.as_ref(), &column_index_meta); + searcher.search(&mut field_reader).await } }; @@ -670,9 +667,10 @@ async fn inverted_index_search_in_file( if !index_terms.is_empty() { for (col, index_terms) in index_terms.iter() { - if let Some(column_index_meta) = file_meta.metas.get(col) { - let mut secondary_index_match = ExactSearch::new(index_terms, column_index_meta); - match secondary_index_match.search(&mut index_reader).await { + if let Some(mut field_reader) = index_reader.field(col).await? { + let column_index_meta = field_reader.metadata().await?; + let mut secondary_index_match = ExactSearch::new(index_terms, &column_index_meta); + match secondary_index_match.search(&mut field_reader).await { Ok(bitmap) => { if res.len() < bitmap.len() { res.resize(bitmap.len(), false); diff --git a/src/service/search/sql.rs b/src/service/search/sql.rs index 56531c0fe..05442c759 100644 --- a/src/service/search/sql.rs +++ b/src/service/search/sql.rs @@ -220,7 +220,7 @@ impl std::fmt::Display for Sql { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( f, - "sql: {}, time_range: {:?}, stream: {}/{:?}/{:?}, match_items: {:?}, equal_items: {:?}, prefix_items: {:?}, aliases: {:?}, limit: {}, offset: {}, group_by: {:?}, order_by: {:?}, histogram_interval: {:?}, sorted_by_time: {}", + "sql: {}, time_range: {:?}, stream: {}/{}/{:?}, match_items: {:?}, equal_items: {:?}, prefix_items: {:?}, aliases: {:?}, limit: {}, offset: {}, group_by: {:?}, order_by: {:?}, histogram_interval: {:?}, sorted_by_time: {}", self.sql, self.time_range, self.org_id,