diff --git a/src/config/src/config.rs b/src/config/src/config.rs
index c68b95a7c..e731ea28f 100644
--- a/src/config/src/config.rs
+++ b/src/config/src/config.rs
@@ -1044,18 +1044,6 @@ pub struct Limit {
help = "Maximum size of a single enrichment table in mb"
)]
pub max_enrichment_table_size: usize,
- #[env_config(
- name = "ZO_USE_UPPER_BOUND_FOR_MAX_TS",
- default = false,
- help = "use upper bound for max tx"
- )]
- pub use_upper_bound_for_max_ts: bool,
- #[env_config(
- name = "ZO_BUFFER_FOR_MAX_TS",
- default = 60,
- help = "buffer for upper bound in mins"
- )]
- pub upper_bound_for_max_ts: i64,
#[env_config(name = "ZO_SHORT_URL_RETENTION_DAYS", default = 30)] // days
pub short_url_retention_days: i64,
}
@@ -1066,18 +1054,22 @@ pub struct Compact {
pub enabled: bool,
#[env_config(name = "ZO_COMPACT_INTERVAL", default = 60)] // seconds
pub interval: u64,
+ #[env_config(name = "ZO_COMPACT_OLD_DATA_INTERVAL", default = 3600)] // seconds
+ pub old_data_interval: u64,
#[env_config(name = "ZO_COMPACT_STRATEGY", default = "file_time")] // file_size, file_time
pub strategy: String,
- #[env_config(name = "ZO_COMPACT_LOOKBACK_HOURS", default = 0)] // hours
- pub lookback_hours: i64,
- #[env_config(name = "ZO_COMPACT_STEP_SECS", default = 3600)] // seconds
- pub step_secs: i64,
#[env_config(name = "ZO_COMPACT_SYNC_TO_DB_INTERVAL", default = 600)] // seconds
pub sync_to_db_interval: u64,
#[env_config(name = "ZO_COMPACT_MAX_FILE_SIZE", default = 512)] // MB
pub max_file_size: usize,
#[env_config(name = "ZO_COMPACT_DATA_RETENTION_DAYS", default = 3650)] // days
pub data_retention_days: i64,
+ #[env_config(name = "ZO_COMPACT_OLD_DATA_MAX_DAYS", default = 7)] // days
+ pub old_data_max_days: i64,
+ #[env_config(name = "ZO_COMPACT_OLD_DATA_MIN_RECORDS", default = 100)] // records
+ pub old_data_min_records: i64,
+ #[env_config(name = "ZO_COMPACT_OLD_DATA_MIN_FILES", default = 10)] // files
+ pub old_data_min_files: i64,
#[env_config(name = "ZO_COMPACT_DELETE_FILES_DELAY_HOURS", default = 2)] // hours
pub delete_files_delay_hours: i64,
#[env_config(name = "ZO_COMPACT_BLOCKED_ORGS", default = "")] // use comma to split
@@ -1098,7 +1090,7 @@ pub struct Compact {
pub job_run_timeout: i64,
#[env_config(
name = "ZO_COMPACT_JOB_CLEAN_WAIT_TIME",
- default = 86400, // 1 day
+ default = 7200, // 2 hours
help = "Clean the jobs which are finished more than this time"
)]
pub job_clean_wait_time: i64,
@@ -1458,6 +1450,11 @@ pub fn init() -> Config {
panic!("disk cache config error: {e}");
}
+ // check compact config
+ if let Err(e) = check_compact_config(&mut cfg) {
+ panic!("compact config error: {e}");
+ }
+
// check etcd config
if let Err(e) = check_etcd_config(&mut cfg) {
panic!("etcd config error: {e}");
@@ -1561,34 +1558,6 @@ fn check_common_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
));
}
- // check compact_max_file_size to MB
- cfg.compact.max_file_size *= 1024 * 1024;
- if cfg.compact.interval == 0 {
- cfg.compact.interval = 60;
- }
- if cfg.compact.pending_jobs_metric_interval == 0 {
- cfg.compact.pending_jobs_metric_interval = 300;
- }
- // check compact_step_secs, min value is 600s
- if cfg.compact.step_secs == 0 {
- cfg.compact.step_secs = 3600;
- } else if cfg.compact.step_secs <= 600 {
- cfg.compact.step_secs = 600;
- }
- if cfg.compact.data_retention_days > 0 && cfg.compact.data_retention_days < 3 {
- return Err(anyhow::anyhow!(
- "Data retention is not allowed to be less than 3 days."
- ));
- }
- if cfg.compact.delete_files_delay_hours < 1 {
- return Err(anyhow::anyhow!(
- "Delete files delay is not allowed to be less than 1 hour."
- ));
- }
- if cfg.compact.batch_size < 1 {
- cfg.compact.batch_size = 100;
- }
-
// If the default scrape interval is less than 5s, raise an error
if cfg.common.default_scrape_interval < 5 {
return Err(anyhow::anyhow!(
@@ -1870,6 +1839,48 @@ fn check_disk_cache_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
Ok(())
}
+fn check_compact_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
+ if cfg.compact.data_retention_days > 0 && cfg.compact.data_retention_days < 3 {
+ return Err(anyhow::anyhow!(
+ "Data retention is not allowed to be less than 3 days."
+ ));
+ }
+ if cfg.compact.interval < 1 {
+ cfg.compact.interval = 60;
+ }
+
+ // check compact_max_file_size to MB
+ if cfg.compact.max_file_size < 1 {
+ cfg.compact.max_file_size = 512;
+ }
+ cfg.compact.max_file_size *= 1024 * 1024;
+ if cfg.compact.delete_files_delay_hours < 1 {
+ cfg.compact.delete_files_delay_hours = 2;
+ }
+
+ if cfg.compact.old_data_interval < 1 {
+ cfg.compact.old_data_interval = 3600;
+ }
+ if cfg.compact.old_data_max_days < 1 {
+ cfg.compact.old_data_max_days = 7;
+ }
+ if cfg.compact.old_data_min_records < 1 {
+ cfg.compact.old_data_min_records = 100;
+ }
+ if cfg.compact.old_data_min_files < 1 {
+ cfg.compact.old_data_min_files = 10;
+ }
+
+ if cfg.compact.batch_size < 1 {
+ cfg.compact.batch_size = 100;
+ }
+ if cfg.compact.pending_jobs_metric_interval == 0 {
+ cfg.compact.pending_jobs_metric_interval = 300;
+ }
+
+ Ok(())
+}
+
fn check_sns_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
// Validate endpoint URL if provided
if !cfg.sns.endpoint.is_empty()
diff --git a/src/config/src/meta/cluster.rs b/src/config/src/meta/cluster.rs
index 360d8167f..ef5053ec3 100644
--- a/src/config/src/meta/cluster.rs
+++ b/src/config/src/meta/cluster.rs
@@ -216,3 +216,9 @@ pub fn get_internal_grpc_token() -> String {
cfg.grpc.internal_grpc_token.clone()
}
}
+
+// CompactionJobType is used to distinguish between current and historical compaction jobs.
+pub enum CompactionJobType {
+ Current,
+ Historical,
+}
diff --git a/src/config/src/utils/time.rs b/src/config/src/utils/time.rs
index 55af3d33a..926f8fd62 100644
--- a/src/config/src/utils/time.rs
+++ b/src/config/src/utils/time.rs
@@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
-use chrono::{DateTime, Datelike, NaiveDateTime, TimeZone, Utc};
+use chrono::{DateTime, Datelike, Duration, NaiveDateTime, TimeZone, Utc};
use once_cell::sync::Lazy;
use crate::utils::json;
@@ -46,6 +46,19 @@ pub fn now_micros() -> i64 {
Utc::now().timestamp_micros()
}
+#[inline(always)]
+pub fn hour_micros(n: i64) -> i64 {
+ Duration::try_hours(n).unwrap().num_microseconds().unwrap()
+}
+
+#[inline(always)]
+pub fn second_micros(n: i64) -> i64 {
+ Duration::try_seconds(n)
+ .unwrap()
+ .num_microseconds()
+ .unwrap()
+}
+
#[inline(always)]
pub fn parse_i64_to_timestamp_micros(v: i64) -> i64 {
if v == 0 {
diff --git a/src/handler/http/auth/validator.rs b/src/handler/http/auth/validator.rs
index d48033ff1..b26b1e4b5 100644
--- a/src/handler/http/auth/validator.rs
+++ b/src/handler/http/auth/validator.rs
@@ -546,8 +546,10 @@ async fn oo_validator_internal(
path_prefix: &str,
) -> Result {
if auth_info.auth.starts_with("Basic") {
- let decoded = base64::decode(auth_info.auth.strip_prefix("Basic").unwrap().trim())
- .expect("Failed to decode base64 string");
+ let decoded = match base64::decode(auth_info.auth.strip_prefix("Basic").unwrap().trim()) {
+ Ok(val) => val,
+ Err(_) => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
+ };
let (username, password) = match get_user_details(decoded) {
Some(value) => value,
@@ -562,14 +564,16 @@ async fn oo_validator_internal(
if chrono::Utc::now().timestamp() - auth_tokens.request_time > auth_tokens.expires_in {
Err((ErrorUnauthorized("Unauthorized Access"), req))
} else {
- let decoded = base64::decode(
+ let decoded = match base64::decode(
auth_tokens
.auth_ext
.strip_prefix("auth_ext")
.unwrap()
.trim(),
- )
- .expect("Failed to decode base64 string");
+ ) {
+ Ok(val) => val,
+ Err(_) => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
+ };
let (username, password) = match get_user_details(decoded) {
Some(value) => value,
None => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
@@ -584,8 +588,10 @@ async fn oo_validator_internal(
#[cfg(feature = "enterprise")]
pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option {
if auth_str.starts_with("Basic") {
- let decoded = base64::decode(auth_str.strip_prefix("Basic").unwrap().trim())
- .expect("Failed to decode base64 string");
+ let decoded = match base64::decode(auth_str.strip_prefix("Basic").unwrap().trim()) {
+ Ok(val) => val,
+ Err(_) => return None,
+ };
match get_user_details(decoded) {
Some(value) => Some(value.0),
@@ -599,14 +605,16 @@ pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option {
if chrono::Utc::now().timestamp() - auth_tokens.request_time > auth_tokens.expires_in {
None
} else {
- let decoded = base64::decode(
+ let decoded = match base64::decode(
auth_tokens
.auth_ext
.strip_prefix("auth_ext")
.unwrap()
.trim(),
- )
- .expect("Failed to decode base64 string");
+ ) {
+ Ok(val) => val,
+ Err(_) => return None,
+ };
match get_user_details(decoded) {
Some(value) => Some(value.0),
None => None,
@@ -618,9 +626,10 @@ pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option {
}
fn get_user_details(decoded: String) -> Option<(String, String)> {
- let credentials = String::from_utf8(decoded.into())
- .map_err(|_| ())
- .expect("Failed to decode base64 string");
+ let credentials = match String::from_utf8(decoded.into()).map_err(|_| ()) {
+ Ok(val) => val,
+ Err(_) => return None,
+ };
let parts: Vec<&str> = credentials.splitn(2, ':').collect();
if parts.len() != 2 {
return None;
diff --git a/src/handler/http/router/mod.rs b/src/handler/http/router/mod.rs
index 9096888e3..d4ddab8af 100644
--- a/src/handler/http/router/mod.rs
+++ b/src/handler/http/router/mod.rs
@@ -547,6 +547,6 @@ mod tests {
.uri("/proxy/org1/https://cloud.openobserve.ai/assets/flUhRq6tzZclQEJ-Vdg-IuiaDsNa.fd84f88b.woff")
.to_request();
let resp = call_service(&mut app, req).await;
- assert_eq!(resp.status().as_u16(), 200);
+ assert_eq!(resp.status().as_u16(), 404);
}
}
diff --git a/src/infra/src/errors/grpc.rs b/src/infra/src/errors/grpc.rs
index 7b0dd62da..c0022fbaa 100644
--- a/src/infra/src/errors/grpc.rs
+++ b/src/infra/src/errors/grpc.rs
@@ -56,6 +56,7 @@ impl From for Error {
};
}
if err.contains("parquet not found") {
+ log::error!("[Datafusion] Parquet file not found: {}", err);
return Error::ErrorCode(ErrorCodes::SearchParquetFileNotFound);
}
if err.contains("Invalid function ") {
diff --git a/src/infra/src/file_list/mod.rs b/src/infra/src/file_list/mod.rs
index a2c2054a3..991ad289d 100644
--- a/src/infra/src/file_list/mod.rs
+++ b/src/infra/src/file_list/mod.rs
@@ -85,6 +85,13 @@ pub trait FileList: Sync + Send + 'static {
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result>;
+ async fn query_old_data_hours(
+ &self,
+ org_id: &str,
+ stream_type: StreamType,
+ stream_name: &str,
+ time_range: Option<(i64, i64)>,
+ ) -> Result>;
async fn query_deleted(
&self,
org_id: &str,
@@ -225,11 +232,7 @@ pub async fn list() -> Result> {
}
#[inline]
-#[tracing::instrument(
- name = "infra:file_list:query_db",
- skip_all,
- fields(org_id = org_id, stream_name = stream_name)
-)]
+#[tracing::instrument(name = "infra:file_list:db:query")]
pub async fn query(
org_id: &str,
stream_type: StreamType,
@@ -238,11 +241,7 @@ pub async fn query(
time_range: Option<(i64, i64)>,
flattened: Option,
) -> Result> {
- if let Some((start, end)) = time_range {
- if start > end || start == 0 || end == 0 {
- return Err(Error::Message("[file_list] invalid time range".to_string()));
- }
- }
+ validate_time_range(time_range)?;
CLIENT
.query(
org_id,
@@ -262,27 +261,33 @@ pub async fn query_by_ids(ids: &[i64]) -> Result> {
}
#[inline]
-#[tracing::instrument(
- name = "infra:file_list:db:query_ids",
- skip_all,
- fields(org_id = org_id, stream_name = stream_name)
-)]
+#[tracing::instrument(name = "infra:file_list:db:query_ids")]
pub async fn query_ids(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result> {
- if let Some((start, end)) = time_range {
- if start > end || start == 0 || end == 0 {
- return Err(Error::Message("[file_list] invalid time range".to_string()));
- }
- }
+ validate_time_range(time_range)?;
CLIENT
.query_ids(org_id, stream_type, stream_name, time_range)
.await
}
+#[inline]
+#[tracing::instrument(name = "infra:file_list:db:query_old_data_hours")]
+pub async fn query_old_data_hours(
+ org_id: &str,
+ stream_type: StreamType,
+ stream_name: &str,
+ time_range: Option<(i64, i64)>,
+) -> Result> {
+ validate_time_range(time_range)?;
+ CLIENT
+ .query_old_data_hours(org_id, stream_type, stream_name, time_range)
+ .await
+}
+
#[inline]
pub async fn query_deleted(org_id: &str, time_max: i64, limit: i64) -> Result> {
CLIENT.query_deleted(org_id, time_max, limit).await
@@ -440,6 +445,24 @@ pub async fn local_cache_gc() -> Result<()> {
Ok(())
}
+fn validate_time_range(time_range: Option<(i64, i64)>) -> Result<()> {
+ if let Some((start, end)) = time_range {
+ if start > end || start == 0 || end == 0 {
+ return Err(Error::Message("[file_list] invalid time range".to_string()));
+ }
+ }
+ Ok(())
+}
+
+fn calculate_max_ts_upper_bound(time_end: i64, stream_type: StreamType) -> i64 {
+ let ts = super::schema::unwrap_partition_time_level(None, stream_type).duration();
+ if ts > 0 {
+ time_end + ts
+ } else {
+ time_end + PartitionTimeLevel::Hourly.duration()
+ }
+}
+
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct FileRecord {
#[sqlx(default)]
diff --git a/src/infra/src/file_list/mysql.rs b/src/infra/src/file_list/mysql.rs
index 710a228aa..f6317c89f 100644
--- a/src/infra/src/file_list/mysql.rs
+++ b/src/infra/src/file_list/mysql.rs
@@ -363,11 +363,8 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
- let cfg = get_config();
- if cfg.limit.use_upper_bound_for_max_ts {
- let max_ts_upper_bound =
- time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
- sqlx::query_as::<_, super::FileRecord>(
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
+ sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
@@ -380,20 +377,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
- } else {
- sqlx::query_as::<_, super::FileRecord>(
- r#"
-SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
- FROM file_list
- WHERE stream = ? AND max_ts >= ? AND min_ts <= ?;
- "#,
- )
- .bind(stream_key)
- .bind(time_start)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- }
};
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
@@ -495,12 +478,10 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT.clone();
- let cfg = get_config();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
- if cfg.limit.use_upper_bound_for_max_ts {
- let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = ? AND max_ts >= ? AND max_ts <= ? AND min_ts <= ?;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
@@ -509,15 +490,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
- } else {
- let query = "SELECT id, records, original_size FROM file_list WHERE stream = ? AND max_ts >= ? AND min_ts <= ?;";
- sqlx::query_as::<_, super::FileId>(query)
- .bind(stream_key)
- .bind(time_start)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- }
}));
}
@@ -540,6 +512,56 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
+ async fn query_old_data_hours(
+ &self,
+ org_id: &str,
+ stream_type: StreamType,
+ stream_name: &str,
+ time_range: Option<(i64, i64)>,
+ ) -> Result> {
+ if let Some((start, end)) = time_range {
+ if start == 0 && end == 0 {
+ return Ok(Vec::new());
+ }
+ }
+
+ let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
+
+ let pool = CLIENT.clone();
+ DB_QUERY_NUMS
+ .with_label_values(&["select", "file_list"])
+ .inc();
+ let start = std::time::Instant::now();
+ let (time_start, time_end) = time_range.unwrap_or((0, 0));
+ let cfg = get_config();
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
+ let sql = r#"
+SELECT date
+ FROM file_list
+ WHERE stream = ? AND max_ts >= ? AND max_ts <= ? AND min_ts <= ? AND records < ?
+ GROUP BY date HAVING count(*) >= ?;
+ "#;
+
+ let ret = sqlx::query(sql)
+ .bind(stream_key)
+ .bind(time_start)
+ .bind(max_ts_upper_bound)
+ .bind(time_end)
+ .bind(cfg.compact.old_data_min_records)
+ .bind(cfg.compact.old_data_min_files)
+ .fetch_all(&pool)
+ .await?;
+
+ let time = start.elapsed().as_secs_f64();
+ DB_QUERY_TIME
+ .with_label_values(&["query_old_data_hours", "file_list"])
+ .observe(time);
+ Ok(ret
+ .into_iter()
+ .map(|r| r.try_get::("date").unwrap_or_default())
+ .collect())
+ }
+
async fn query_deleted(
&self,
org_id: &str,
diff --git a/src/infra/src/file_list/postgres.rs b/src/infra/src/file_list/postgres.rs
index b2a277178..cb98ace1c 100644
--- a/src/infra/src/file_list/postgres.rs
+++ b/src/infra/src/file_list/postgres.rs
@@ -376,37 +376,20 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.fetch_all(&pool).await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
- let cfg = get_config();
- if cfg.limit.use_upper_bound_for_max_ts {
- let max_ts_upper_bound =
- time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
- let sql = r#"
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
+ let sql = r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;
"#;
- sqlx::query_as::<_, super::FileRecord>(sql)
- .bind(stream_key)
- .bind(time_start)
- .bind(max_ts_upper_bound)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- } else {
- let sql = r#"
-SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
- FROM file_list
- WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;
- "#;
-
- sqlx::query_as::<_, super::FileRecord>(sql)
- .bind(stream_key)
- .bind(time_start)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- }
+ sqlx::query_as::<_, super::FileRecord>(sql)
+ .bind(stream_key)
+ .bind(time_start)
+ .bind(max_ts_upper_bound)
+ .bind(time_end)
+ .fetch_all(&pool)
+ .await
};
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
@@ -508,29 +491,18 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT.clone();
- let cfg = get_config();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
- if cfg.limit.use_upper_bound_for_max_ts {
- let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
- let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
- sqlx::query_as::<_, super::FileId>(query)
- .bind(stream_key)
- .bind(time_start)
- .bind(max_ts_upper_bound)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- } else {
- let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;";
- sqlx::query_as::<_, super::FileId>(query)
- .bind(stream_key)
- .bind(time_start)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- }
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
+ let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
+ sqlx::query_as::<_, super::FileId>(query)
+ .bind(stream_key)
+ .bind(time_start)
+ .bind(max_ts_upper_bound)
+ .bind(time_end)
+ .fetch_all(&pool)
+ .await
}));
}
@@ -553,6 +525,56 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
+ async fn query_old_data_hours(
+ &self,
+ org_id: &str,
+ stream_type: StreamType,
+ stream_name: &str,
+ time_range: Option<(i64, i64)>,
+ ) -> Result> {
+ if let Some((start, end)) = time_range {
+ if start == 0 && end == 0 {
+ return Ok(Vec::new());
+ }
+ }
+
+ let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
+
+ let pool = CLIENT.clone();
+ DB_QUERY_NUMS
+ .with_label_values(&["select", "file_list"])
+ .inc();
+ let start = std::time::Instant::now();
+ let (time_start, time_end) = time_range.unwrap_or((0, 0));
+ let cfg = get_config();
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
+ let sql = r#"
+SELECT date
+ FROM file_list
+ WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4 AND records < $5
+ GROUP BY date HAVING count(*) >= $6;
+ "#;
+
+ let ret = sqlx::query(sql)
+ .bind(stream_key)
+ .bind(time_start)
+ .bind(max_ts_upper_bound)
+ .bind(time_end)
+ .bind(cfg.compact.old_data_min_records)
+ .bind(cfg.compact.old_data_min_files)
+ .fetch_all(&pool)
+ .await?;
+
+ let time = start.elapsed().as_secs_f64();
+ DB_QUERY_TIME
+ .with_label_values(&["query_old_data_hours", "file_list"])
+ .observe(time);
+ Ok(ret
+ .into_iter()
+ .map(|r| r.try_get::("date").unwrap_or_default())
+ .collect())
+ }
+
async fn query_deleted(
&self,
org_id: &str,
diff --git a/src/infra/src/file_list/sqlite.rs b/src/infra/src/file_list/sqlite.rs
index 3a031ac22..483a599a8 100644
--- a/src/infra/src/file_list/sqlite.rs
+++ b/src/infra/src/file_list/sqlite.rs
@@ -326,11 +326,8 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
- let cfg = get_config();
- if cfg.limit.use_upper_bound_for_max_ts {
- let max_ts_upper_bound =
- time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
- sqlx::query_as::<_, super::FileRecord>(
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
+ sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
@@ -343,20 +340,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
- } else {
- sqlx::query_as::<_, super::FileRecord>(
- r#"
-SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
- FROM file_list
- WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;
- "#,
- )
- .bind(stream_key)
- .bind(time_start)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- }
};
Ok(ret?
.iter()
@@ -446,9 +429,7 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT_RO.clone();
- let cfg = get_config();
- if cfg.limit.use_upper_bound_for_max_ts {
- let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
@@ -457,15 +438,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
- } else {
- let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;";
- sqlx::query_as::<_, super::FileId>(query)
- .bind(stream_key)
- .bind(time_start)
- .bind(time_end)
- .fetch_all(&pool)
- .await
- }
}));
}
@@ -484,6 +456,47 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
+ async fn query_old_data_hours(
+ &self,
+ org_id: &str,
+ stream_type: StreamType,
+ stream_name: &str,
+ time_range: Option<(i64, i64)>,
+ ) -> Result> {
+ if let Some((start, end)) = time_range {
+ if start == 0 && end == 0 {
+ return Ok(Vec::new());
+ }
+ }
+
+ let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
+
+ let pool = CLIENT_RO.clone();
+ let (time_start, time_end) = time_range.unwrap_or((0, 0));
+ let cfg = get_config();
+ let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
+ let sql = r#"
+SELECT date
+ FROM file_list
+ WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4 AND records < $5
+ GROUP BY date HAVING count(*) >= $6;
+ "#;
+
+ let ret = sqlx::query(sql)
+ .bind(stream_key)
+ .bind(time_start)
+ .bind(max_ts_upper_bound)
+ .bind(time_end)
+ .bind(cfg.compact.old_data_min_records)
+ .bind(cfg.compact.old_data_min_files)
+ .fetch_all(&pool)
+ .await?;
+ Ok(ret
+ .into_iter()
+ .map(|r| r.try_get::("date").unwrap_or_default())
+ .collect())
+ }
+
async fn query_deleted(
&self,
org_id: &str,
diff --git a/src/job/compactor.rs b/src/job/compactor.rs
index 3506f449e..1b4cfbb20 100644
--- a/src/job/compactor.rs
+++ b/src/job/compactor.rs
@@ -15,7 +15,12 @@
use std::sync::Arc;
-use config::{cluster::LOCAL_NODE, get_config, meta::stream::FileKey, metrics};
+use config::{
+ cluster::LOCAL_NODE,
+ get_config,
+ meta::{cluster::CompactionJobType, stream::FileKey},
+ metrics,
+};
use tokio::{
sync::{mpsc, Mutex},
time,
@@ -94,6 +99,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
}
tokio::task::spawn(async move { run_generate_job().await });
+ tokio::task::spawn(async move { run_generate_old_data_job().await });
tokio::task::spawn(async move { run_merge(tx).await });
tokio::task::spawn(async move { run_retention().await });
tokio::task::spawn(async move { run_delay_deletion().await });
@@ -138,12 +144,27 @@ async fn run_generate_job() -> Result<(), anyhow::Error> {
loop {
time::sleep(time::Duration::from_secs(get_config().compact.interval)).await;
log::debug!("[COMPACTOR] Running generate merge job");
- if let Err(e) = compact::run_generate_job().await {
+ if let Err(e) = compact::run_generate_job(CompactionJobType::Current).await {
log::error!("[COMPACTOR] run generate merge job error: {e}");
}
}
}
+/// Generate merging jobs for old data
+async fn run_generate_old_data_job() -> Result<(), anyhow::Error> {
+ loop {
+ // run every 1 hour at least
+ time::sleep(time::Duration::from_secs(
+ get_config().compact.old_data_interval,
+ ))
+ .await;
+ log::debug!("[COMPACTOR] Running generate merge job for old data");
+ if let Err(e) = compact::run_generate_job(CompactionJobType::Historical).await {
+ log::error!("[COMPACTOR] run generate merge job for old data error: {e}");
+ }
+ }
+}
+
/// Merge small files
async fn run_merge(tx: mpsc::Sender<(MergeSender, MergeBatch)>) -> Result<(), anyhow::Error> {
loop {
diff --git a/src/job/files/parquet.rs b/src/job/files/parquet.rs
index e98d96ffc..1a766fe4c 100644
--- a/src/job/files/parquet.rs
+++ b/src/job/files/parquet.rs
@@ -298,6 +298,8 @@ async fn move_files(
let org_id = columns[1].to_string();
let stream_type = StreamType::from(columns[2]);
let stream_name = columns[3].to_string();
+ // let _thread_id = columns[4].to_string();
+ let prefix_date = format!("{}-{}-{}", columns[5], columns[6], columns[7]);
// log::debug!("[INGESTER:JOB:{thread_id}] check deletion for partition: {}", prefix);
@@ -372,6 +374,42 @@ async fn move_files(
return Ok(());
}
+ // check data retention
+ let stream_settings = infra::schema::get_settings(&org_id, &stream_name, stream_type)
+ .await
+ .unwrap_or_default();
+ let mut stream_data_retention_days = cfg.compact.data_retention_days;
+ if stream_settings.data_retention > 0 {
+ stream_data_retention_days = stream_settings.data_retention;
+ }
+ if stream_data_retention_days > 0 {
+ let date =
+ config::utils::time::now() - Duration::try_days(stream_data_retention_days).unwrap();
+ let stream_data_retention_end = date.format("%Y-%m-%d").to_string();
+ if prefix_date < stream_data_retention_end {
+ for file in files {
+ log::warn!(
+ "[INGESTER:JOB:{thread_id}] the file [{}/{}/{}] was exceed the data retention, just delete file: {}",
+ &org_id,
+ stream_type,
+ &stream_name,
+ file.key,
+ );
+ if let Err(e) = tokio::fs::remove_file(wal_dir.join(&file.key)).await {
+ log::error!(
+ "[INGESTER:JOB:{thread_id}] Failed to remove parquet file from disk: {}, {}",
+ file.key,
+ e
+ );
+ }
+ // delete metadata from cache
+ WAL_PARQUET_METADATA.write().await.remove(&file.key);
+ PROCESSING_FILES.write().await.remove(&file.key);
+ }
+ return Ok(());
+ }
+ }
+
// log::debug!("[INGESTER:JOB:{thread_id}] start processing for partition: {}", prefix);
let wal_dir = wal_dir.clone();
diff --git a/src/service/alerts/scheduler.rs b/src/service/alerts/scheduler.rs
index 1e1dc6bdf..dc5020ff2 100644
--- a/src/service/alerts/scheduler.rs
+++ b/src/service/alerts/scheduler.rs
@@ -22,7 +22,11 @@ use config::{
stream::StreamType,
usage::{TriggerData, TriggerDataStatus, TriggerDataType},
},
- utils::{json, rand::get_rand_num_within},
+ utils::{
+ json,
+ rand::get_rand_num_within,
+ time::{hour_micros, second_micros},
+ },
};
use cron::Schedule;
use futures::future::try_join_all;
@@ -86,11 +90,8 @@ fn get_max_considerable_delay(frequency: i64) -> i64 {
// If the delay is more than this, the alert will be skipped.
// The maximum delay is the lowest of 1 hour or 20% of the frequency.
// E.g. if the frequency is 5 mins, the maximum delay is 1 min.
- let frequency = Duration::try_seconds(frequency)
- .unwrap()
- .num_microseconds()
- .unwrap();
- let max_delay = Duration::try_hours(1).unwrap().num_microseconds().unwrap();
+ let frequency = second_micros(frequency);
+ let max_delay = hour_micros(1);
// limit.alert_considerable_delay is in percentage, convert into float
let considerable_delay = get_config().limit.alert_considerable_delay as f64 * 0.01;
let max_considerable_delay = (frequency as f64 * considerable_delay) as i64;
diff --git a/src/service/compact/file_list.rs b/src/service/compact/file_list.rs
index ba6c85e9f..f0eb6af85 100644
--- a/src/service/compact/file_list.rs
+++ b/src/service/compact/file_list.rs
@@ -20,7 +20,12 @@ use std::{
use bytes::Buf;
use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc};
-use config::{cluster::LOCAL_NODE, ider, meta::stream::FileKey, utils::json};
+use config::{
+ cluster::LOCAL_NODE,
+ ider,
+ meta::stream::FileKey,
+ utils::{json, time::hour_micros},
+};
use hashbrown::HashMap;
use infra::{dist_lock, schema::STREAM_SCHEMAS_LATEST, storage};
use tokio::sync::{RwLock, Semaphore};
@@ -106,7 +111,7 @@ pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
// offset
let mut is_waiting_streams = false;
for (key, val) in offsets {
- if (val - Duration::try_hours(1).unwrap().num_microseconds().unwrap()) < offset {
+ if (val - hour_micros(1)) < offset {
log::info!("[COMPACT] file_list is waiting for stream: {key}, offset: {val}");
is_waiting_streams = true;
break;
@@ -162,7 +167,7 @@ pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
merge_file_list(offset).await?;
// write new sync offset
- offset = offset_time_hour + Duration::try_hours(1).unwrap().num_microseconds().unwrap();
+ offset = offset_time_hour + hour_micros(1);
db::compact::file_list::set_offset(offset).await
}
diff --git a/src/service/compact/file_list_deleted.rs b/src/service/compact/file_list_deleted.rs
index ebb0e3acc..48e324fee 100644
--- a/src/service/compact/file_list_deleted.rs
+++ b/src/service/compact/file_list_deleted.rs
@@ -16,7 +16,8 @@
use std::io::{BufRead, BufReader};
use bytes::Buf;
-use chrono::{DateTime, Duration, TimeZone, Utc};
+use chrono::{DateTime, TimeZone, Utc};
+use config::utils::time::hour_micros;
use futures::future::try_join_all;
use hashbrown::HashMap;
use infra::{file_list as infra_file_list, storage};
@@ -201,7 +202,7 @@ async fn query_deleted_from_s3(
let entry = files.entry(file).or_insert_with(Vec::new);
entry.extend(records);
}
- cur_time += Duration::try_hours(1).unwrap().num_microseconds().unwrap();
+ cur_time += hour_micros(1);
}
Ok(files)
}
diff --git a/src/service/compact/merge.rs b/src/service/compact/merge.rs
index 2f7c306d6..cf30cbc8a 100644
--- a/src/service/compact/merge.rs
+++ b/src/service/compact/merge.rs
@@ -34,6 +34,7 @@ use config::{
},
record_batch_ext::{concat_batches, merge_record_batches},
schema_ext::SchemaExt,
+ time::hour_micros,
},
FILE_EXT_PARQUET,
};
@@ -143,26 +144,18 @@ pub async fn generate_job_by_stream(
)
.unwrap()
.timestamp_micros();
- // 1. if step_secs less than 1 hour, must wait for at least max_file_retention_time
- // 2. if step_secs greater than 1 hour, must wait for at least 3 * max_file_retention_time
+ // must wait for at least 3 * max_file_retention_time
// -- first period: the last hour local file upload to storage, write file list
// -- second period, the last hour file list upload to storage
// -- third period, we can do the merge, so, at least 3 times of
// max_file_retention_time
- if (cfg.compact.step_secs < 3600
- && time_now.timestamp_micros() - offset
+ if offset >= time_now_hour
+ || time_now.timestamp_micros() - offset
<= Duration::try_seconds(cfg.limit.max_file_retention_time as i64)
.unwrap()
.num_microseconds()
- .unwrap())
- || (cfg.compact.step_secs >= 3600
- && (offset >= time_now_hour
- || time_now.timestamp_micros() - offset
- <= Duration::try_seconds(cfg.limit.max_file_retention_time as i64)
- .unwrap()
- .num_microseconds()
- .unwrap()
- * 3))
+ .unwrap()
+ * 3
{
return Ok(()); // the time is future, just wait
}
@@ -184,11 +177,9 @@ pub async fn generate_job_by_stream(
}
// write new offset
- let offset = offset
- + Duration::try_seconds(cfg.compact.step_secs)
- .unwrap()
- .num_microseconds()
- .unwrap();
+ let offset = offset + hour_micros(1);
+ // format to hour with zero minutes, seconds
+ let offset = offset - offset % hour_micros(1);
db::compact::files::set_offset(
org_id,
stream_type,
@@ -201,6 +192,113 @@ pub async fn generate_job_by_stream(
Ok(())
}
+/// Generate merging job by stream
+/// 1. get old data by hour
+/// 2. check if other node is processing
+/// 3. create job or return
+pub async fn generate_old_data_job_by_stream(
+ org_id: &str,
+ stream_type: StreamType,
+ stream_name: &str,
+) -> Result<(), anyhow::Error> {
+ // get last compacted offset
+ let (offset, node) = db::compact::files::get_offset(org_id, stream_type, stream_name).await;
+ if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
+ return Ok(()); // other node is processing
+ }
+
+ if node.is_empty() || LOCAL_NODE.uuid.ne(&node) {
+ let lock_key = format!("/compact/merge/{}/{}/{}", org_id, stream_type, stream_name);
+ let locker = dist_lock::lock(&lock_key, 0, None).await?;
+ // check the working node again, maybe other node locked it first
+ let (offset, node) = db::compact::files::get_offset(org_id, stream_type, stream_name).await;
+ if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some()
+ {
+ dist_lock::unlock(&locker).await?;
+ return Ok(()); // other node is processing
+ }
+ // set to current node
+ let ret = db::compact::files::set_offset(
+ org_id,
+ stream_type,
+ stream_name,
+ offset,
+ Some(&LOCAL_NODE.uuid.clone()),
+ )
+ .await;
+ dist_lock::unlock(&locker).await?;
+ drop(locker);
+ ret?;
+ }
+
+ if offset == 0 {
+ return Ok(()); // no data
+ }
+
+ let cfg = get_config();
+ let stream_settings = infra::schema::get_settings(org_id, stream_name, stream_type)
+ .await
+ .unwrap_or_default();
+ let mut stream_data_retention_days = cfg.compact.data_retention_days;
+ if stream_settings.data_retention > 0 {
+ stream_data_retention_days = stream_settings.data_retention;
+ }
+ if stream_data_retention_days > cfg.compact.old_data_max_days {
+ stream_data_retention_days = cfg.compact.old_data_max_days;
+ }
+ if stream_data_retention_days == 0 {
+ return Ok(()); // no need to check old data
+ }
+
+ // get old data by hour, `offset - 2 hours` as old data
+ let end_time = offset - hour_micros(2);
+ let start_time = end_time
+ - Duration::try_days(stream_data_retention_days as i64)
+ .unwrap()
+ .num_microseconds()
+ .unwrap();
+ let hours = infra_file_list::query_old_data_hours(
+ org_id,
+ stream_type,
+ stream_name,
+ Some((start_time, end_time)),
+ )
+ .await?;
+
+ // generate merging job
+ for hour in hours {
+ let column = hour.split('/').collect::>();
+ if column.len() != 4 {
+ return Err(anyhow::anyhow!(
+ "Unexpected hour format in {}, Expected format YYYY/MM/DD/HH",
+ hour
+ ));
+ }
+ let offset = DateTime::parse_from_rfc3339(&format!(
+ "{}-{}-{}T{}:00:00Z",
+ column[0], column[1], column[2], column[3]
+ ))?
+ .with_timezone(&Utc);
+ let offset = offset.timestamp_micros();
+ log::debug!(
+ "[COMPACTOR] generate_old_data_job_by_stream [{}/{}/{}] hours: {}, offset: {}",
+ org_id,
+ stream_type,
+ stream_name,
+ hour,
+ offset
+ );
+ if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await {
+ return Err(anyhow::anyhow!(
+ "[COMAPCT] add file_list_jobs for old data failed: {}",
+ e
+ ));
+ }
+ }
+
+ Ok(())
+}
+
/// compactor run steps on a stream:
/// 3. get a cluster lock for compactor stream
/// 4. read last compacted offset: year/month/day/hour
@@ -277,17 +375,11 @@ pub async fn merge_by_stream(
// get current hour(day) all files
let (partition_offset_start, partition_offset_end) =
if partition_time_level == PartitionTimeLevel::Daily {
- (
- offset_time_day,
- offset_time_day + Duration::try_hours(24).unwrap().num_microseconds().unwrap() - 1,
- )
+ (offset_time_day, offset_time_day + hour_micros(24) - 1)
} else {
- (
- offset_time_hour,
- offset_time_hour + Duration::try_hours(1).unwrap().num_microseconds().unwrap() - 1,
- )
+ (offset_time_hour, offset_time_hour + hour_micros(1) - 1)
};
- let mut files = file_list::query(
+ let files = file_list::query(
org_id,
stream_name,
stream_type,
@@ -298,31 +390,6 @@ pub async fn merge_by_stream(
.await
.map_err(|e| anyhow::anyhow!("query file list failed: {}", e))?;
- // check lookback files
- if cfg.compact.lookback_hours > 0 {
- let lookback_offset = Duration::try_hours(cfg.compact.lookback_hours)
- .unwrap()
- .num_microseconds()
- .unwrap();
- let lookback_offset_start = partition_offset_start - lookback_offset;
- let mut lookback_offset_end = partition_offset_end - lookback_offset;
- if lookback_offset_end > partition_offset_start {
- // the lookback period is overlap with current period
- lookback_offset_end = partition_offset_start;
- }
- let lookback_files = file_list::query(
- org_id,
- stream_name,
- stream_type,
- partition_time_level,
- lookback_offset_start,
- lookback_offset_end,
- )
- .await
- .map_err(|e| anyhow::anyhow!("query lookback file list failed: {}", e))?;
- files.extend(lookback_files);
- }
-
log::debug!(
"[COMPACTOR] merge_by_stream [{}/{}/{}] time range: [{},{}], files: {}",
org_id,
@@ -399,13 +466,6 @@ pub async fn merge_by_stream(
}
new_file_size += file.meta.original_size;
new_file_list.push(file.clone());
- // metrics
- metrics::COMPACT_MERGED_FILES
- .with_label_values(&[&org_id, stream_type.to_string().as_str()])
- .inc();
- metrics::COMPACT_MERGED_BYTES
- .with_label_values(&[&org_id, stream_type.to_string().as_str()])
- .inc_by(file.meta.original_size as u64);
}
if new_file_list.len() > 1 {
batch_groups.push(MergeBatch {
@@ -450,22 +510,14 @@ pub async fn merge_by_stream(
let new_file_name = std::mem::take(&mut new_file.key);
let new_file_meta = std::mem::take(&mut new_file.meta);
let new_file_list = batch_groups.get(batch_id).unwrap().files.as_slice();
-
if new_file_name.is_empty() {
- if new_file_list.is_empty() {
- // no file need to merge
- break;
- } else {
- // delete files from file_list and continue
- files_with_size.retain(|f| !&new_file_list.contains(f));
- continue;
- }
+ continue;
}
// delete small files keys & write big files keys, use transaction
let mut events = Vec::with_capacity(new_file_list.len() + 1);
events.push(FileKey {
- key: new_file_name.clone(),
+ key: new_file_name,
meta: new_file_meta,
deleted: false,
segment_ids: None,
@@ -536,16 +588,12 @@ pub async fn merge_by_stream(
.inc_by(time);
metrics::COMPACT_DELAY_HOURS
.with_label_values(&[org_id, stream_name, stream_type.to_string().as_str()])
- .set(
- (time_now_hour - offset_time_hour)
- / Duration::try_hours(1).unwrap().num_microseconds().unwrap(),
- );
+ .set((time_now_hour - offset_time_hour) / hour_micros(1));
Ok(())
}
-/// merge some small files into one big file, upload to storage, returns the big
-/// file key and merged files
+/// merge small files into big file, upload to storage, returns the big file key and merged files
pub async fn merge_files(
thread_id: usize,
org_id: &str,
diff --git a/src/service/compact/mod.rs b/src/service/compact/mod.rs
index 9fee1461b..0d7b2acb4 100644
--- a/src/service/compact/mod.rs
+++ b/src/service/compact/mod.rs
@@ -18,7 +18,7 @@ use config::{
cluster::LOCAL_NODE,
get_config,
meta::{
- cluster::Role,
+ cluster::{CompactionJobType, Role},
stream::{PartitionTimeLevel, StreamType, ALL_STREAM_TYPES},
},
};
@@ -63,10 +63,12 @@ pub async fn run_retention() -> Result<(), anyhow::Error> {
continue; // not this node
}
- let schema = infra::schema::get(&org_id, &stream_name, stream_type).await?;
- let stream = super::stream::stream_res(&stream_name, stream_type, schema, None);
- let stream_data_retention_end = if stream.settings.data_retention > 0 {
- let date = now - Duration::try_days(stream.settings.data_retention).unwrap();
+ let stream_settings =
+ infra::schema::get_settings(&org_id, &stream_name, stream_type)
+ .await
+ .unwrap_or_default();
+ let stream_data_retention_end = if stream_settings.data_retention > 0 {
+ let date = now - Duration::try_days(stream_settings.data_retention).unwrap();
date.format("%Y-%m-%d").to_string()
} else {
data_lifecycle_end.clone()
@@ -137,7 +139,7 @@ pub async fn run_retention() -> Result<(), anyhow::Error> {
}
/// Generate job for compactor
-pub async fn run_generate_job() -> Result<(), anyhow::Error> {
+pub async fn run_generate_job(job_type: CompactionJobType) -> Result<(), anyhow::Error> {
let orgs = db::schema::list_organizations_from_cache().await;
for org_id in orgs {
// check backlist
@@ -191,16 +193,37 @@ pub async fn run_generate_job() -> Result<(), anyhow::Error> {
continue;
}
- if let Err(e) =
- merge::generate_job_by_stream(&org_id, stream_type, &stream_name).await
- {
- log::error!(
- "[COMPACTOR] generate_job_by_stream [{}/{}/{}] error: {}",
- org_id,
- stream_type,
- stream_name,
- e
- );
+ match job_type {
+ CompactionJobType::Current => {
+ if let Err(e) =
+ merge::generate_job_by_stream(&org_id, stream_type, &stream_name).await
+ {
+ log::error!(
+ "[COMPACTOR] generate_job_by_stream [{}/{}/{}] error: {}",
+ org_id,
+ stream_type,
+ stream_name,
+ e
+ );
+ }
+ }
+ CompactionJobType::Historical => {
+ if let Err(e) = merge::generate_old_data_job_by_stream(
+ &org_id,
+ stream_type,
+ &stream_name,
+ )
+ .await
+ {
+ log::error!(
+ "[COMPACTOR] generate_old_data_job_by_stream [{}/{}/{}] error: {}",
+ org_id,
+ stream_type,
+ stream_name,
+ e
+ );
+ }
+ }
}
}
}
@@ -234,7 +257,7 @@ pub async fn run_merge(
.unwrap_or_default();
let partition_time_level =
unwrap_partition_time_level(stream_setting.partition_time_level, stream_type);
- if partition_time_level == PartitionTimeLevel::Daily || cfg.compact.step_secs < 3600 {
+ if partition_time_level == PartitionTimeLevel::Daily {
// check if this stream need process by this node
let Some(node_name) =
get_node_from_consistent_hash(&stream_name, &Role::Compactor, None).await
diff --git a/src/service/compact/retention.rs b/src/service/compact/retention.rs
index 1f64e279b..ae96b6665 100644
--- a/src/service/compact/retention.rs
+++ b/src/service/compact/retention.rs
@@ -51,6 +51,15 @@ pub async fn delete_by_stream(
return Ok(()); // created_at is after lifecycle_end, just skip
}
+ log::debug!(
+ "[COMPACT] delete_by_stream {}/{}/{}/{},{}",
+ org_id,
+ stream_type,
+ stream_name,
+ lifecycle_start,
+ lifecycle_end
+ );
+
// Hack for 1970-01-01
if lifecycle_start.le("1970-01-01") {
let lifecycle_end = created_at + Duration::try_days(1).unwrap();
@@ -83,7 +92,7 @@ pub async fn delete_all(
let locker = dist_lock::lock(&lock_key, 0, None).await?;
let node = db::compact::retention::get_stream(org_id, stream_type, stream_name, None).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
- log::error!("[COMPACT] stream {org_id}/{stream_type}/{stream_name} is deleting by {node}");
+ log::warn!("[COMPACT] stream {org_id}/{stream_type}/{stream_name} is deleting by {node}");
dist_lock::unlock(&locker).await?;
return Ok(()); // not this node, just skip
}
@@ -179,7 +188,7 @@ pub async fn delete_by_date(
db::compact::retention::get_stream(org_id, stream_type, stream_name, Some(date_range))
.await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
- log::error!(
+ log::warn!(
"[COMPACT] stream {org_id}/{stream_type}/{stream_name}/{:?} is deleting by {node}",
date_range
);
diff --git a/src/service/db/compact/retention.rs b/src/service/db/compact/retention.rs
index c6c554fda..653c23ef4 100644
--- a/src/service/db/compact/retention.rs
+++ b/src/service/db/compact/retention.rs
@@ -15,12 +15,16 @@
use std::sync::Arc;
-use config::{meta::stream::StreamType, RwHashSet};
+use config::{
+ meta::stream::StreamType,
+ utils::time::{hour_micros, now_micros},
+ RwHashMap,
+};
use once_cell::sync::Lazy;
use crate::service::db;
-static CACHE: Lazy> = Lazy::new(Default::default);
+static CACHE: Lazy> = Lazy::new(Default::default);
#[inline]
fn mk_key(
@@ -47,12 +51,14 @@ pub async fn delete_stream(
let key = mk_key(org_id, stream_type, stream_name, date_range);
// write in cache
- if CACHE.contains(&key) {
- return Ok(()); // already in cache, just skip
+ if let Some(v) = CACHE.get(&key) {
+ if v.value() + hour_micros(1) > now_micros() {
+ return Ok(()); // already in cache, don't create same task in one hour
+ }
}
let db_key = format!("/compact/delete/{key}");
- CACHE.insert(key);
+ CACHE.insert(key, now_micros());
Ok(db::put(&db_key, "OK".into(), db::NEED_WATCH, None).await?)
}
@@ -92,7 +98,7 @@ pub fn is_deleting_stream(
stream_name: &str,
date_range: Option<(&str, &str)>,
) -> bool {
- CACHE.contains(&mk_key(org_id, stream_type, stream_name, date_range))
+ CACHE.contains_key(&mk_key(org_id, stream_type, stream_name, date_range))
}
pub async fn delete_stream_done(
@@ -102,9 +108,7 @@ pub async fn delete_stream_done(
date_range: Option<(&str, &str)>,
) -> Result<(), anyhow::Error> {
let key = mk_key(org_id, stream_type, stream_name, date_range);
- db::delete_if_exists(&format!("/compact/delete/{key}"), false, db::NEED_WATCH)
- .await
- .map_err(|e| anyhow::anyhow!(e))?;
+ db::delete_if_exists(&format!("/compact/delete/{key}"), false, db::NEED_WATCH).await?;
// remove in cache
CACHE.remove(&key);
@@ -128,19 +132,19 @@ pub async fn watch() -> Result<(), anyhow::Error> {
let cluster_coordinator = db::get_coordinator().await;
let mut events = cluster_coordinator.watch(key).await?;
let events = Arc::get_mut(&mut events).unwrap();
- log::info!("Start watching stream deleting");
+ log::info!("Start watching compact deleting");
loop {
let ev = match events.recv().await {
Some(ev) => ev,
None => {
- log::error!("watch_stream_deleting: event channel closed");
+ log::error!("watch_compact_deleting: event channel closed");
break;
}
};
match ev {
db::Event::Put(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
- CACHE.insert(item_key.to_string());
+ CACHE.insert(item_key.to_string(), now_micros());
}
db::Event::Delete(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
@@ -157,7 +161,7 @@ pub async fn cache() -> Result<(), anyhow::Error> {
let ret = db::list(key).await?;
for (item_key, _) in ret {
let item_key = item_key.strip_prefix(key).unwrap();
- CACHE.insert(item_key.to_string());
+ CACHE.insert(item_key.to_string(), now_micros());
}
Ok(())
}
diff --git a/src/service/db/file_list/remote.rs b/src/service/db/file_list/remote.rs
index b1f016405..2458bfe4f 100644
--- a/src/service/db/file_list/remote.rs
+++ b/src/service/db/file_list/remote.rs
@@ -20,8 +20,11 @@ use std::{
};
use bytes::Buf;
-use chrono::{DateTime, Duration, TimeZone, Utc};
-use config::{meta::stream::FileKey, utils::json};
+use chrono::{DateTime, TimeZone, Utc};
+use config::{
+ meta::stream::FileKey,
+ utils::{json, time::hour_micros},
+};
use futures::future::try_join_all;
use infra::{cache::stats, file_list as infra_file_list, storage};
use once_cell::sync::Lazy;
@@ -218,7 +221,7 @@ pub async fn cache_time_range(time_min: i64, time_max: i64) -> Result<(), anyhow
let offset_time: DateTime = Utc.timestamp_nanos(cur_time * 1000);
let file_list_prefix = offset_time.format("%Y/%m/%d/%H/").to_string();
cache(&file_list_prefix, false).await?;
- cur_time += Duration::try_hours(1).unwrap().num_microseconds().unwrap();
+ cur_time += hour_micros(1);
}
Ok(())
}
diff --git a/src/service/file_list.rs b/src/service/file_list.rs
index 890afefb6..e294d7f93 100644
--- a/src/service/file_list.rs
+++ b/src/service/file_list.rs
@@ -75,10 +75,16 @@ pub async fn query_by_ids(trace_id: &str, ids: &[i64]) -> Result> {
// 1. first query from local cache
let (mut files, ids) = if !cfg.common.local_mode && cfg.common.meta_store_external {
let ids_set: HashSet<_> = ids.iter().cloned().collect();
- let cached_files = file_list::LOCAL_CACHE
- .query_by_ids(ids)
- .await
- .unwrap_or_default();
+ let cached_files = match file_list::LOCAL_CACHE.query_by_ids(ids).await {
+ Ok(files) => files,
+ Err(e) => {
+ log::error!(
+ "[trace_id {trace_id}] file_list query cache failed: {:?}",
+ e
+ );
+ Vec::new()
+ }
+ };
let cached_ids = cached_files
.iter()
.map(|(id, ..)| *id)