From a32d5912a8a89296794261a6f371e546d7cd17e7 Mon Sep 17 00:00:00 2001 From: Hengfei Yang Date: Fri, 12 Jan 2024 10:26:29 +0800 Subject: [PATCH] fix: compact files is too big (#2456) --- rustfmt.toml | 2 +- src/job/files/parquet.rs | 19 +++++-------------- src/service/compact/merge.rs | 6 ++---- 3 files changed, 8 insertions(+), 19 deletions(-) diff --git a/rustfmt.toml b/rustfmt.toml index 39f95309b..3726713b9 100644 --- a/rustfmt.toml +++ b/rustfmt.toml @@ -1,7 +1,7 @@ version = "Two" unstable_features = true -comment_width = 100 +comment_width = 120 wrap_comments = true format_code_in_doc_comments = true format_macro_bodies = true diff --git a/src/job/files/parquet.rs b/src/job/files/parquet.rs index fe032e846..242e4a563 100644 --- a/src/job/files/parquet.rs +++ b/src/job/files/parquet.rs @@ -125,7 +125,6 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> { let wal_dir = wal_dir.clone(); let permit = semaphore.clone().acquire_owned().await.unwrap(); let task: JoinHandle> = tokio::task::spawn(async move { - let mut force_upload = false; // sort by created time let mut files_with_size = files_with_size.to_owned(); files_with_size.sort_by(|a, b| a.meta.min_ts.cmp(&b.meta.min_ts)); @@ -135,6 +134,7 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> { .map(|f| f.meta.original_size) .sum::(); if total_original_size < CONFIG.limit.max_file_size_on_disk as i64 { + let mut has_expired_files = false; // not enough files to upload, check if some files are too old let min_ts = Utc::now().timestamp_micros() - Duration::seconds(CONFIG.limit.max_file_retention_time as i64) @@ -151,11 +151,11 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> { .unwrap() .as_micros() as i64; if file_created <= min_ts { - force_upload = true; + has_expired_files = true; break; } } - if !force_upload { + if !has_expired_files { drop(permit); return Ok(()); } @@ -170,9 +170,7 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> { tokio::task::yield_now().await; // merge file and get the big file key let (new_file_name, new_file_meta, new_file_list) = - match merge_files(&latest_schema, &wal_dir, &files_with_size, force_upload) - .await - { + match merge_files(&latest_schema, &wal_dir, &files_with_size).await { Ok(v) => v, Err(e) => { log::error!("[INGESTER:JOB] merge files failed: {}", e); @@ -266,7 +264,6 @@ async fn merge_files( latest_schema: &Schema, wal_dir: &Path, files_with_size: &[FileKey], - force_upload: bool, ) -> Result<(String, FileMeta, Vec), anyhow::Error> { if files_with_size.is_empty() { return Ok((String::from(""), FileMeta::default(), Vec::new())); @@ -276,18 +273,12 @@ async fn merge_files( let mut new_file_list = Vec::new(); let mut deleted_files = Vec::new(); for file in files_with_size.iter() { - if new_file_size > CONFIG.limit.max_file_size_on_disk as i64 - && new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64 - { + if new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64 { break; } new_file_size += file.meta.original_size; new_file_list.push(file.clone()); } - // these files are too small, just skip upload and wait for next round - if !force_upload && new_file_size < CONFIG.limit.max_file_size_on_disk as i64 { - return Ok((String::from(""), FileMeta::default(), Vec::new())); - } let mut retain_file_list = new_file_list.clone(); // write parquet files into tmpfs diff --git a/src/service/compact/merge.rs b/src/service/compact/merge.rs index 7dfbb1868..e41a8956c 100644 --- a/src/service/compact/merge.rs +++ b/src/service/compact/merge.rs @@ -325,14 +325,11 @@ async fn merge_files( let mut new_file_list = Vec::new(); let mut deleted_files = Vec::new(); for file in files_with_size.iter() { - if new_file_size > CONFIG.limit.max_file_size_on_disk as i64 - && new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64 - { + if new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64 { break; } new_file_size += file.meta.original_size; new_file_list.push(file.clone()); - log::info!("[COMPACT] merge small file: {}", &file.key); // metrics metrics::COMPACT_MERGED_FILES .with_label_values(&[org_id, stream_type.to_string().as_str()]) @@ -350,6 +347,7 @@ async fn merge_files( // write parquet files into tmpfs let tmp_dir = cache::tmpfs::Directory::default(); for file in &new_file_list { + log::info!("[COMPACT] merge small file: {}", &file.key); let data = match storage::get(&file.key).await { Ok(body) => body, Err(err) => {