fix: compact files is too big (#2456)
This commit is contained in:
parent
da08a7f5e8
commit
a32d5912a8
|
@ -1,7 +1,7 @@
|
|||
version = "Two"
|
||||
unstable_features = true
|
||||
|
||||
comment_width = 100
|
||||
comment_width = 120
|
||||
wrap_comments = true
|
||||
format_code_in_doc_comments = true
|
||||
format_macro_bodies = true
|
||||
|
|
|
@ -125,7 +125,6 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> {
|
|||
let wal_dir = wal_dir.clone();
|
||||
let permit = semaphore.clone().acquire_owned().await.unwrap();
|
||||
let task: JoinHandle<Result<(), anyhow::Error>> = tokio::task::spawn(async move {
|
||||
let mut force_upload = false;
|
||||
// sort by created time
|
||||
let mut files_with_size = files_with_size.to_owned();
|
||||
files_with_size.sort_by(|a, b| a.meta.min_ts.cmp(&b.meta.min_ts));
|
||||
|
@ -135,6 +134,7 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> {
|
|||
.map(|f| f.meta.original_size)
|
||||
.sum::<i64>();
|
||||
if total_original_size < CONFIG.limit.max_file_size_on_disk as i64 {
|
||||
let mut has_expired_files = false;
|
||||
// not enough files to upload, check if some files are too old
|
||||
let min_ts = Utc::now().timestamp_micros()
|
||||
- Duration::seconds(CONFIG.limit.max_file_retention_time as i64)
|
||||
|
@ -151,11 +151,11 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> {
|
|||
.unwrap()
|
||||
.as_micros() as i64;
|
||||
if file_created <= min_ts {
|
||||
force_upload = true;
|
||||
has_expired_files = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if !force_upload {
|
||||
if !has_expired_files {
|
||||
drop(permit);
|
||||
return Ok(());
|
||||
}
|
||||
|
@ -170,9 +170,7 @@ pub async fn move_files_to_storage() -> Result<(), anyhow::Error> {
|
|||
tokio::task::yield_now().await;
|
||||
// merge file and get the big file key
|
||||
let (new_file_name, new_file_meta, new_file_list) =
|
||||
match merge_files(&latest_schema, &wal_dir, &files_with_size, force_upload)
|
||||
.await
|
||||
{
|
||||
match merge_files(&latest_schema, &wal_dir, &files_with_size).await {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
log::error!("[INGESTER:JOB] merge files failed: {}", e);
|
||||
|
@ -266,7 +264,6 @@ async fn merge_files(
|
|||
latest_schema: &Schema,
|
||||
wal_dir: &Path,
|
||||
files_with_size: &[FileKey],
|
||||
force_upload: bool,
|
||||
) -> Result<(String, FileMeta, Vec<FileKey>), anyhow::Error> {
|
||||
if files_with_size.is_empty() {
|
||||
return Ok((String::from(""), FileMeta::default(), Vec::new()));
|
||||
|
@ -276,18 +273,12 @@ async fn merge_files(
|
|||
let mut new_file_list = Vec::new();
|
||||
let mut deleted_files = Vec::new();
|
||||
for file in files_with_size.iter() {
|
||||
if new_file_size > CONFIG.limit.max_file_size_on_disk as i64
|
||||
&& new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64
|
||||
{
|
||||
if new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64 {
|
||||
break;
|
||||
}
|
||||
new_file_size += file.meta.original_size;
|
||||
new_file_list.push(file.clone());
|
||||
}
|
||||
// these files are too small, just skip upload and wait for next round
|
||||
if !force_upload && new_file_size < CONFIG.limit.max_file_size_on_disk as i64 {
|
||||
return Ok((String::from(""), FileMeta::default(), Vec::new()));
|
||||
}
|
||||
let mut retain_file_list = new_file_list.clone();
|
||||
|
||||
// write parquet files into tmpfs
|
||||
|
|
|
@ -325,14 +325,11 @@ async fn merge_files(
|
|||
let mut new_file_list = Vec::new();
|
||||
let mut deleted_files = Vec::new();
|
||||
for file in files_with_size.iter() {
|
||||
if new_file_size > CONFIG.limit.max_file_size_on_disk as i64
|
||||
&& new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64
|
||||
{
|
||||
if new_file_size + file.meta.original_size > CONFIG.compact.max_file_size as i64 {
|
||||
break;
|
||||
}
|
||||
new_file_size += file.meta.original_size;
|
||||
new_file_list.push(file.clone());
|
||||
log::info!("[COMPACT] merge small file: {}", &file.key);
|
||||
// metrics
|
||||
metrics::COMPACT_MERGED_FILES
|
||||
.with_label_values(&[org_id, stream_type.to_string().as_str()])
|
||||
|
@ -350,6 +347,7 @@ async fn merge_files(
|
|||
// write parquet files into tmpfs
|
||||
let tmp_dir = cache::tmpfs::Directory::default();
|
||||
for file in &new_file_list {
|
||||
log::info!("[COMPACT] merge small file: {}", &file.key);
|
||||
let data = match storage::get(&file.key).await {
|
||||
Ok(body) => body,
|
||||
Err(err) => {
|
||||
|
|
Loading…
Reference in New Issue