feat: add jobs for compact old data (#4952)

This commit is contained in:
Hengfei Yang 2024-10-31 00:33:09 +08:00 committed by GitHub
parent fe32354aa4
commit 3955ed02f2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 594 additions and 315 deletions

View File

@ -1044,18 +1044,6 @@ pub struct Limit {
help = "Maximum size of a single enrichment table in mb"
)]
pub max_enrichment_table_size: usize,
#[env_config(
name = "ZO_USE_UPPER_BOUND_FOR_MAX_TS",
default = false,
help = "use upper bound for max tx"
)]
pub use_upper_bound_for_max_ts: bool,
#[env_config(
name = "ZO_BUFFER_FOR_MAX_TS",
default = 60,
help = "buffer for upper bound in mins"
)]
pub upper_bound_for_max_ts: i64,
#[env_config(name = "ZO_SHORT_URL_RETENTION_DAYS", default = 30)] // days
pub short_url_retention_days: i64,
}
@ -1066,18 +1054,22 @@ pub struct Compact {
pub enabled: bool,
#[env_config(name = "ZO_COMPACT_INTERVAL", default = 60)] // seconds
pub interval: u64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_INTERVAL", default = 3600)] // seconds
pub old_data_interval: u64,
#[env_config(name = "ZO_COMPACT_STRATEGY", default = "file_time")] // file_size, file_time
pub strategy: String,
#[env_config(name = "ZO_COMPACT_LOOKBACK_HOURS", default = 0)] // hours
pub lookback_hours: i64,
#[env_config(name = "ZO_COMPACT_STEP_SECS", default = 3600)] // seconds
pub step_secs: i64,
#[env_config(name = "ZO_COMPACT_SYNC_TO_DB_INTERVAL", default = 600)] // seconds
pub sync_to_db_interval: u64,
#[env_config(name = "ZO_COMPACT_MAX_FILE_SIZE", default = 512)] // MB
pub max_file_size: usize,
#[env_config(name = "ZO_COMPACT_DATA_RETENTION_DAYS", default = 3650)] // days
pub data_retention_days: i64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_MAX_DAYS", default = 7)] // days
pub old_data_max_days: i64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_MIN_RECORDS", default = 100)] // records
pub old_data_min_records: i64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_MIN_FILES", default = 10)] // files
pub old_data_min_files: i64,
#[env_config(name = "ZO_COMPACT_DELETE_FILES_DELAY_HOURS", default = 2)] // hours
pub delete_files_delay_hours: i64,
#[env_config(name = "ZO_COMPACT_BLOCKED_ORGS", default = "")] // use comma to split
@ -1098,7 +1090,7 @@ pub struct Compact {
pub job_run_timeout: i64,
#[env_config(
name = "ZO_COMPACT_JOB_CLEAN_WAIT_TIME",
default = 86400, // 1 day
default = 7200, // 2 hours
help = "Clean the jobs which are finished more than this time"
)]
pub job_clean_wait_time: i64,
@ -1458,6 +1450,11 @@ pub fn init() -> Config {
panic!("disk cache config error: {e}");
}
// check compact config
if let Err(e) = check_compact_config(&mut cfg) {
panic!("compact config error: {e}");
}
// check etcd config
if let Err(e) = check_etcd_config(&mut cfg) {
panic!("etcd config error: {e}");
@ -1561,34 +1558,6 @@ fn check_common_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
));
}
// check compact_max_file_size to MB
cfg.compact.max_file_size *= 1024 * 1024;
if cfg.compact.interval == 0 {
cfg.compact.interval = 60;
}
if cfg.compact.pending_jobs_metric_interval == 0 {
cfg.compact.pending_jobs_metric_interval = 300;
}
// check compact_step_secs, min value is 600s
if cfg.compact.step_secs == 0 {
cfg.compact.step_secs = 3600;
} else if cfg.compact.step_secs <= 600 {
cfg.compact.step_secs = 600;
}
if cfg.compact.data_retention_days > 0 && cfg.compact.data_retention_days < 3 {
return Err(anyhow::anyhow!(
"Data retention is not allowed to be less than 3 days."
));
}
if cfg.compact.delete_files_delay_hours < 1 {
return Err(anyhow::anyhow!(
"Delete files delay is not allowed to be less than 1 hour."
));
}
if cfg.compact.batch_size < 1 {
cfg.compact.batch_size = 100;
}
// If the default scrape interval is less than 5s, raise an error
if cfg.common.default_scrape_interval < 5 {
return Err(anyhow::anyhow!(
@ -1870,6 +1839,48 @@ fn check_disk_cache_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
Ok(())
}
fn check_compact_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
if cfg.compact.data_retention_days > 0 && cfg.compact.data_retention_days < 3 {
return Err(anyhow::anyhow!(
"Data retention is not allowed to be less than 3 days."
));
}
if cfg.compact.interval < 1 {
cfg.compact.interval = 60;
}
// check compact_max_file_size to MB
if cfg.compact.max_file_size < 1 {
cfg.compact.max_file_size = 512;
}
cfg.compact.max_file_size *= 1024 * 1024;
if cfg.compact.delete_files_delay_hours < 1 {
cfg.compact.delete_files_delay_hours = 2;
}
if cfg.compact.old_data_interval < 1 {
cfg.compact.old_data_interval = 3600;
}
if cfg.compact.old_data_max_days < 1 {
cfg.compact.old_data_max_days = 7;
}
if cfg.compact.old_data_min_records < 1 {
cfg.compact.old_data_min_records = 100;
}
if cfg.compact.old_data_min_files < 1 {
cfg.compact.old_data_min_files = 10;
}
if cfg.compact.batch_size < 1 {
cfg.compact.batch_size = 100;
}
if cfg.compact.pending_jobs_metric_interval == 0 {
cfg.compact.pending_jobs_metric_interval = 300;
}
Ok(())
}
fn check_sns_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
// Validate endpoint URL if provided
if !cfg.sns.endpoint.is_empty()

View File

@ -216,3 +216,9 @@ pub fn get_internal_grpc_token() -> String {
cfg.grpc.internal_grpc_token.clone()
}
}
// CompactionJobType is used to distinguish between current and historical compaction jobs.
pub enum CompactionJobType {
Current,
Historical,
}

View File

@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use chrono::{DateTime, Datelike, NaiveDateTime, TimeZone, Utc};
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, TimeZone, Utc};
use once_cell::sync::Lazy;
use crate::utils::json;
@ -46,6 +46,19 @@ pub fn now_micros() -> i64 {
Utc::now().timestamp_micros()
}
#[inline(always)]
pub fn hour_micros(n: i64) -> i64 {
Duration::try_hours(n).unwrap().num_microseconds().unwrap()
}
#[inline(always)]
pub fn second_micros(n: i64) -> i64 {
Duration::try_seconds(n)
.unwrap()
.num_microseconds()
.unwrap()
}
#[inline(always)]
pub fn parse_i64_to_timestamp_micros(v: i64) -> i64 {
if v == 0 {

View File

@ -546,8 +546,10 @@ async fn oo_validator_internal(
path_prefix: &str,
) -> Result<ServiceRequest, (Error, ServiceRequest)> {
if auth_info.auth.starts_with("Basic") {
let decoded = base64::decode(auth_info.auth.strip_prefix("Basic").unwrap().trim())
.expect("Failed to decode base64 string");
let decoded = match base64::decode(auth_info.auth.strip_prefix("Basic").unwrap().trim()) {
Ok(val) => val,
Err(_) => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
};
let (username, password) = match get_user_details(decoded) {
Some(value) => value,
@ -562,14 +564,16 @@ async fn oo_validator_internal(
if chrono::Utc::now().timestamp() - auth_tokens.request_time > auth_tokens.expires_in {
Err((ErrorUnauthorized("Unauthorized Access"), req))
} else {
let decoded = base64::decode(
let decoded = match base64::decode(
auth_tokens
.auth_ext
.strip_prefix("auth_ext")
.unwrap()
.trim(),
)
.expect("Failed to decode base64 string");
) {
Ok(val) => val,
Err(_) => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
};
let (username, password) = match get_user_details(decoded) {
Some(value) => value,
None => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
@ -584,8 +588,10 @@ async fn oo_validator_internal(
#[cfg(feature = "enterprise")]
pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option<String> {
if auth_str.starts_with("Basic") {
let decoded = base64::decode(auth_str.strip_prefix("Basic").unwrap().trim())
.expect("Failed to decode base64 string");
let decoded = match base64::decode(auth_str.strip_prefix("Basic").unwrap().trim()) {
Ok(val) => val,
Err(_) => return None,
};
match get_user_details(decoded) {
Some(value) => Some(value.0),
@ -599,14 +605,16 @@ pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option<String> {
if chrono::Utc::now().timestamp() - auth_tokens.request_time > auth_tokens.expires_in {
None
} else {
let decoded = base64::decode(
let decoded = match base64::decode(
auth_tokens
.auth_ext
.strip_prefix("auth_ext")
.unwrap()
.trim(),
)
.expect("Failed to decode base64 string");
) {
Ok(val) => val,
Err(_) => return None,
};
match get_user_details(decoded) {
Some(value) => Some(value.0),
None => None,
@ -618,9 +626,10 @@ pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option<String> {
}
fn get_user_details(decoded: String) -> Option<(String, String)> {
let credentials = String::from_utf8(decoded.into())
.map_err(|_| ())
.expect("Failed to decode base64 string");
let credentials = match String::from_utf8(decoded.into()).map_err(|_| ()) {
Ok(val) => val,
Err(_) => return None,
};
let parts: Vec<&str> = credentials.splitn(2, ':').collect();
if parts.len() != 2 {
return None;

View File

@ -547,6 +547,6 @@ mod tests {
.uri("/proxy/org1/https://cloud.openobserve.ai/assets/flUhRq6tzZclQEJ-Vdg-IuiaDsNa.fd84f88b.woff")
.to_request();
let resp = call_service(&mut app, req).await;
assert_eq!(resp.status().as_u16(), 200);
assert_eq!(resp.status().as_u16(), 404);
}
}

View File

@ -56,6 +56,7 @@ impl From<DataFusionError> for Error {
};
}
if err.contains("parquet not found") {
log::error!("[Datafusion] Parquet file not found: {}", err);
return Error::ErrorCode(ErrorCodes::SearchParquetFileNotFound);
}
if err.contains("Invalid function ") {

View File

@ -85,6 +85,13 @@ pub trait FileList: Sync + Send + 'static {
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<FileId>>;
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>>;
async fn query_deleted(
&self,
org_id: &str,
@ -225,11 +232,7 @@ pub async fn list() -> Result<Vec<(String, FileMeta)>> {
}
#[inline]
#[tracing::instrument(
name = "infra:file_list:query_db",
skip_all,
fields(org_id = org_id, stream_name = stream_name)
)]
#[tracing::instrument(name = "infra:file_list:db:query")]
pub async fn query(
org_id: &str,
stream_type: StreamType,
@ -238,11 +241,7 @@ pub async fn query(
time_range: Option<(i64, i64)>,
flattened: Option<bool>,
) -> Result<Vec<(String, FileMeta)>> {
if let Some((start, end)) = time_range {
if start > end || start == 0 || end == 0 {
return Err(Error::Message("[file_list] invalid time range".to_string()));
}
}
validate_time_range(time_range)?;
CLIENT
.query(
org_id,
@ -262,27 +261,33 @@ pub async fn query_by_ids(ids: &[i64]) -> Result<Vec<(i64, String, FileMeta)>> {
}
#[inline]
#[tracing::instrument(
name = "infra:file_list:db:query_ids",
skip_all,
fields(org_id = org_id, stream_name = stream_name)
)]
#[tracing::instrument(name = "infra:file_list:db:query_ids")]
pub async fn query_ids(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<FileId>> {
if let Some((start, end)) = time_range {
if start > end || start == 0 || end == 0 {
return Err(Error::Message("[file_list] invalid time range".to_string()));
}
}
validate_time_range(time_range)?;
CLIENT
.query_ids(org_id, stream_type, stream_name, time_range)
.await
}
#[inline]
#[tracing::instrument(name = "infra:file_list:db:query_old_data_hours")]
pub async fn query_old_data_hours(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
validate_time_range(time_range)?;
CLIENT
.query_old_data_hours(org_id, stream_type, stream_name, time_range)
.await
}
#[inline]
pub async fn query_deleted(org_id: &str, time_max: i64, limit: i64) -> Result<Vec<(String, bool)>> {
CLIENT.query_deleted(org_id, time_max, limit).await
@ -440,6 +445,24 @@ pub async fn local_cache_gc() -> Result<()> {
Ok(())
}
fn validate_time_range(time_range: Option<(i64, i64)>) -> Result<()> {
if let Some((start, end)) = time_range {
if start > end || start == 0 || end == 0 {
return Err(Error::Message("[file_list] invalid time range".to_string()));
}
}
Ok(())
}
fn calculate_max_ts_upper_bound(time_end: i64, stream_type: StreamType) -> i64 {
let ts = super::schema::unwrap_partition_time_level(None, stream_type).duration();
if ts > 0 {
time_end + ts
} else {
time_end + PartitionTimeLevel::Hourly.duration()
}
}
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct FileRecord {
#[sqlx(default)]

View File

@ -363,11 +363,8 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound =
time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
sqlx::query_as::<_, super::FileRecord>(
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
@ -380,20 +377,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = ? AND max_ts >= ? AND min_ts <= ?;
"#,
)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
};
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
@ -495,12 +478,10 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT.clone();
let cfg = get_config();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = ? AND max_ts >= ? AND max_ts <= ? AND min_ts <= ?;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
@ -509,15 +490,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let query = "SELECT id, records, original_size FROM file_list WHERE stream = ? AND max_ts >= ? AND min_ts <= ?;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
}));
}
@ -540,6 +512,56 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
if let Some((start, end)) = time_range {
if start == 0 && end == 0 {
return Ok(Vec::new());
}
}
let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
let pool = CLIENT.clone();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
let start = std::time::Instant::now();
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT date
FROM file_list
WHERE stream = ? AND max_ts >= ? AND max_ts <= ? AND min_ts <= ? AND records < ?
GROUP BY date HAVING count(*) >= ?;
"#;
let ret = sqlx::query(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.bind(cfg.compact.old_data_min_records)
.bind(cfg.compact.old_data_min_files)
.fetch_all(&pool)
.await?;
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
.with_label_values(&["query_old_data_hours", "file_list"])
.observe(time);
Ok(ret
.into_iter()
.map(|r| r.try_get::<String, &str>("date").unwrap_or_default())
.collect())
}
async fn query_deleted(
&self,
org_id: &str,

View File

@ -376,37 +376,20 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.fetch_all(&pool).await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound =
time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let sql = r#"
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;
"#;
sqlx::query_as::<_, super::FileRecord>(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let sql = r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;
"#;
sqlx::query_as::<_, super::FileRecord>(sql)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
sqlx::query_as::<_, super::FileRecord>(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
};
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
@ -508,29 +491,18 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT.clone();
let cfg = get_config();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
}));
}
@ -553,6 +525,56 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
if let Some((start, end)) = time_range {
if start == 0 && end == 0 {
return Ok(Vec::new());
}
}
let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
let pool = CLIENT.clone();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
let start = std::time::Instant::now();
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT date
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4 AND records < $5
GROUP BY date HAVING count(*) >= $6;
"#;
let ret = sqlx::query(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.bind(cfg.compact.old_data_min_records)
.bind(cfg.compact.old_data_min_files)
.fetch_all(&pool)
.await?;
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
.with_label_values(&["query_old_data_hours", "file_list"])
.observe(time);
Ok(ret
.into_iter()
.map(|r| r.try_get::<String, &str>("date").unwrap_or_default())
.collect())
}
async fn query_deleted(
&self,
org_id: &str,

View File

@ -326,11 +326,8 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound =
time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
sqlx::query_as::<_, super::FileRecord>(
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
@ -343,20 +340,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;
"#,
)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
};
Ok(ret?
.iter()
@ -446,9 +429,7 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT_RO.clone();
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
@ -457,15 +438,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
}));
}
@ -484,6 +456,47 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
if let Some((start, end)) = time_range {
if start == 0 && end == 0 {
return Ok(Vec::new());
}
}
let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
let pool = CLIENT_RO.clone();
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT date
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4 AND records < $5
GROUP BY date HAVING count(*) >= $6;
"#;
let ret = sqlx::query(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.bind(cfg.compact.old_data_min_records)
.bind(cfg.compact.old_data_min_files)
.fetch_all(&pool)
.await?;
Ok(ret
.into_iter()
.map(|r| r.try_get::<String, &str>("date").unwrap_or_default())
.collect())
}
async fn query_deleted(
&self,
org_id: &str,

View File

@ -15,7 +15,12 @@
use std::sync::Arc;
use config::{cluster::LOCAL_NODE, get_config, meta::stream::FileKey, metrics};
use config::{
cluster::LOCAL_NODE,
get_config,
meta::{cluster::CompactionJobType, stream::FileKey},
metrics,
};
use tokio::{
sync::{mpsc, Mutex},
time,
@ -94,6 +99,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
}
tokio::task::spawn(async move { run_generate_job().await });
tokio::task::spawn(async move { run_generate_old_data_job().await });
tokio::task::spawn(async move { run_merge(tx).await });
tokio::task::spawn(async move { run_retention().await });
tokio::task::spawn(async move { run_delay_deletion().await });
@ -138,12 +144,27 @@ async fn run_generate_job() -> Result<(), anyhow::Error> {
loop {
time::sleep(time::Duration::from_secs(get_config().compact.interval)).await;
log::debug!("[COMPACTOR] Running generate merge job");
if let Err(e) = compact::run_generate_job().await {
if let Err(e) = compact::run_generate_job(CompactionJobType::Current).await {
log::error!("[COMPACTOR] run generate merge job error: {e}");
}
}
}
/// Generate merging jobs for old data
async fn run_generate_old_data_job() -> Result<(), anyhow::Error> {
loop {
// run every 1 hour at least
time::sleep(time::Duration::from_secs(
get_config().compact.old_data_interval,
))
.await;
log::debug!("[COMPACTOR] Running generate merge job for old data");
if let Err(e) = compact::run_generate_job(CompactionJobType::Historical).await {
log::error!("[COMPACTOR] run generate merge job for old data error: {e}");
}
}
}
/// Merge small files
async fn run_merge(tx: mpsc::Sender<(MergeSender, MergeBatch)>) -> Result<(), anyhow::Error> {
loop {

View File

@ -298,6 +298,8 @@ async fn move_files(
let org_id = columns[1].to_string();
let stream_type = StreamType::from(columns[2]);
let stream_name = columns[3].to_string();
// let _thread_id = columns[4].to_string();
let prefix_date = format!("{}-{}-{}", columns[5], columns[6], columns[7]);
// log::debug!("[INGESTER:JOB:{thread_id}] check deletion for partition: {}", prefix);
@ -372,6 +374,42 @@ async fn move_files(
return Ok(());
}
// check data retention
let stream_settings = infra::schema::get_settings(&org_id, &stream_name, stream_type)
.await
.unwrap_or_default();
let mut stream_data_retention_days = cfg.compact.data_retention_days;
if stream_settings.data_retention > 0 {
stream_data_retention_days = stream_settings.data_retention;
}
if stream_data_retention_days > 0 {
let date =
config::utils::time::now() - Duration::try_days(stream_data_retention_days).unwrap();
let stream_data_retention_end = date.format("%Y-%m-%d").to_string();
if prefix_date < stream_data_retention_end {
for file in files {
log::warn!(
"[INGESTER:JOB:{thread_id}] the file [{}/{}/{}] was exceed the data retention, just delete file: {}",
&org_id,
stream_type,
&stream_name,
file.key,
);
if let Err(e) = tokio::fs::remove_file(wal_dir.join(&file.key)).await {
log::error!(
"[INGESTER:JOB:{thread_id}] Failed to remove parquet file from disk: {}, {}",
file.key,
e
);
}
// delete metadata from cache
WAL_PARQUET_METADATA.write().await.remove(&file.key);
PROCESSING_FILES.write().await.remove(&file.key);
}
return Ok(());
}
}
// log::debug!("[INGESTER:JOB:{thread_id}] start processing for partition: {}", prefix);
let wal_dir = wal_dir.clone();

View File

@ -22,7 +22,11 @@ use config::{
stream::StreamType,
usage::{TriggerData, TriggerDataStatus, TriggerDataType},
},
utils::{json, rand::get_rand_num_within},
utils::{
json,
rand::get_rand_num_within,
time::{hour_micros, second_micros},
},
};
use cron::Schedule;
use futures::future::try_join_all;
@ -86,11 +90,8 @@ fn get_max_considerable_delay(frequency: i64) -> i64 {
// If the delay is more than this, the alert will be skipped.
// The maximum delay is the lowest of 1 hour or 20% of the frequency.
// E.g. if the frequency is 5 mins, the maximum delay is 1 min.
let frequency = Duration::try_seconds(frequency)
.unwrap()
.num_microseconds()
.unwrap();
let max_delay = Duration::try_hours(1).unwrap().num_microseconds().unwrap();
let frequency = second_micros(frequency);
let max_delay = hour_micros(1);
// limit.alert_considerable_delay is in percentage, convert into float
let considerable_delay = get_config().limit.alert_considerable_delay as f64 * 0.01;
let max_considerable_delay = (frequency as f64 * considerable_delay) as i64;

View File

@ -20,7 +20,12 @@ use std::{
use bytes::Buf;
use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc};
use config::{cluster::LOCAL_NODE, ider, meta::stream::FileKey, utils::json};
use config::{
cluster::LOCAL_NODE,
ider,
meta::stream::FileKey,
utils::{json, time::hour_micros},
};
use hashbrown::HashMap;
use infra::{dist_lock, schema::STREAM_SCHEMAS_LATEST, storage};
use tokio::sync::{RwLock, Semaphore};
@ -106,7 +111,7 @@ pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
// offset
let mut is_waiting_streams = false;
for (key, val) in offsets {
if (val - Duration::try_hours(1).unwrap().num_microseconds().unwrap()) < offset {
if (val - hour_micros(1)) < offset {
log::info!("[COMPACT] file_list is waiting for stream: {key}, offset: {val}");
is_waiting_streams = true;
break;
@ -162,7 +167,7 @@ pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
merge_file_list(offset).await?;
// write new sync offset
offset = offset_time_hour + Duration::try_hours(1).unwrap().num_microseconds().unwrap();
offset = offset_time_hour + hour_micros(1);
db::compact::file_list::set_offset(offset).await
}

View File

@ -16,7 +16,8 @@
use std::io::{BufRead, BufReader};
use bytes::Buf;
use chrono::{DateTime, Duration, TimeZone, Utc};
use chrono::{DateTime, TimeZone, Utc};
use config::utils::time::hour_micros;
use futures::future::try_join_all;
use hashbrown::HashMap;
use infra::{file_list as infra_file_list, storage};
@ -201,7 +202,7 @@ async fn query_deleted_from_s3(
let entry = files.entry(file).or_insert_with(Vec::new);
entry.extend(records);
}
cur_time += Duration::try_hours(1).unwrap().num_microseconds().unwrap();
cur_time += hour_micros(1);
}
Ok(files)
}

View File

@ -34,6 +34,7 @@ use config::{
},
record_batch_ext::{concat_batches, merge_record_batches},
schema_ext::SchemaExt,
time::hour_micros,
},
FILE_EXT_PARQUET,
};
@ -143,26 +144,18 @@ pub async fn generate_job_by_stream(
)
.unwrap()
.timestamp_micros();
// 1. if step_secs less than 1 hour, must wait for at least max_file_retention_time
// 2. if step_secs greater than 1 hour, must wait for at least 3 * max_file_retention_time
// must wait for at least 3 * max_file_retention_time
// -- first period: the last hour local file upload to storage, write file list
// -- second period, the last hour file list upload to storage
// -- third period, we can do the merge, so, at least 3 times of
// max_file_retention_time
if (cfg.compact.step_secs < 3600
&& time_now.timestamp_micros() - offset
if offset >= time_now_hour
|| time_now.timestamp_micros() - offset
<= Duration::try_seconds(cfg.limit.max_file_retention_time as i64)
.unwrap()
.num_microseconds()
.unwrap())
|| (cfg.compact.step_secs >= 3600
&& (offset >= time_now_hour
|| time_now.timestamp_micros() - offset
<= Duration::try_seconds(cfg.limit.max_file_retention_time as i64)
.unwrap()
.num_microseconds()
.unwrap()
* 3))
.unwrap()
* 3
{
return Ok(()); // the time is future, just wait
}
@ -184,11 +177,9 @@ pub async fn generate_job_by_stream(
}
// write new offset
let offset = offset
+ Duration::try_seconds(cfg.compact.step_secs)
.unwrap()
.num_microseconds()
.unwrap();
let offset = offset + hour_micros(1);
// format to hour with zero minutes, seconds
let offset = offset - offset % hour_micros(1);
db::compact::files::set_offset(
org_id,
stream_type,
@ -201,6 +192,113 @@ pub async fn generate_job_by_stream(
Ok(())
}
/// Generate merging job by stream
/// 1. get old data by hour
/// 2. check if other node is processing
/// 3. create job or return
pub async fn generate_old_data_job_by_stream(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
) -> Result<(), anyhow::Error> {
// get last compacted offset
let (offset, node) = db::compact::files::get_offset(org_id, stream_type, stream_name).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
return Ok(()); // other node is processing
}
if node.is_empty() || LOCAL_NODE.uuid.ne(&node) {
let lock_key = format!("/compact/merge/{}/{}/{}", org_id, stream_type, stream_name);
let locker = dist_lock::lock(&lock_key, 0, None).await?;
// check the working node again, maybe other node locked it first
let (offset, node) = db::compact::files::get_offset(org_id, stream_type, stream_name).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some()
{
dist_lock::unlock(&locker).await?;
return Ok(()); // other node is processing
}
// set to current node
let ret = db::compact::files::set_offset(
org_id,
stream_type,
stream_name,
offset,
Some(&LOCAL_NODE.uuid.clone()),
)
.await;
dist_lock::unlock(&locker).await?;
drop(locker);
ret?;
}
if offset == 0 {
return Ok(()); // no data
}
let cfg = get_config();
let stream_settings = infra::schema::get_settings(org_id, stream_name, stream_type)
.await
.unwrap_or_default();
let mut stream_data_retention_days = cfg.compact.data_retention_days;
if stream_settings.data_retention > 0 {
stream_data_retention_days = stream_settings.data_retention;
}
if stream_data_retention_days > cfg.compact.old_data_max_days {
stream_data_retention_days = cfg.compact.old_data_max_days;
}
if stream_data_retention_days == 0 {
return Ok(()); // no need to check old data
}
// get old data by hour, `offset - 2 hours` as old data
let end_time = offset - hour_micros(2);
let start_time = end_time
- Duration::try_days(stream_data_retention_days as i64)
.unwrap()
.num_microseconds()
.unwrap();
let hours = infra_file_list::query_old_data_hours(
org_id,
stream_type,
stream_name,
Some((start_time, end_time)),
)
.await?;
// generate merging job
for hour in hours {
let column = hour.split('/').collect::<Vec<_>>();
if column.len() != 4 {
return Err(anyhow::anyhow!(
"Unexpected hour format in {}, Expected format YYYY/MM/DD/HH",
hour
));
}
let offset = DateTime::parse_from_rfc3339(&format!(
"{}-{}-{}T{}:00:00Z",
column[0], column[1], column[2], column[3]
))?
.with_timezone(&Utc);
let offset = offset.timestamp_micros();
log::debug!(
"[COMPACTOR] generate_old_data_job_by_stream [{}/{}/{}] hours: {}, offset: {}",
org_id,
stream_type,
stream_name,
hour,
offset
);
if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await {
return Err(anyhow::anyhow!(
"[COMAPCT] add file_list_jobs for old data failed: {}",
e
));
}
}
Ok(())
}
/// compactor run steps on a stream:
/// 3. get a cluster lock for compactor stream
/// 4. read last compacted offset: year/month/day/hour
@ -277,17 +375,11 @@ pub async fn merge_by_stream(
// get current hour(day) all files
let (partition_offset_start, partition_offset_end) =
if partition_time_level == PartitionTimeLevel::Daily {
(
offset_time_day,
offset_time_day + Duration::try_hours(24).unwrap().num_microseconds().unwrap() - 1,
)
(offset_time_day, offset_time_day + hour_micros(24) - 1)
} else {
(
offset_time_hour,
offset_time_hour + Duration::try_hours(1).unwrap().num_microseconds().unwrap() - 1,
)
(offset_time_hour, offset_time_hour + hour_micros(1) - 1)
};
let mut files = file_list::query(
let files = file_list::query(
org_id,
stream_name,
stream_type,
@ -298,31 +390,6 @@ pub async fn merge_by_stream(
.await
.map_err(|e| anyhow::anyhow!("query file list failed: {}", e))?;
// check lookback files
if cfg.compact.lookback_hours > 0 {
let lookback_offset = Duration::try_hours(cfg.compact.lookback_hours)
.unwrap()
.num_microseconds()
.unwrap();
let lookback_offset_start = partition_offset_start - lookback_offset;
let mut lookback_offset_end = partition_offset_end - lookback_offset;
if lookback_offset_end > partition_offset_start {
// the lookback period is overlap with current period
lookback_offset_end = partition_offset_start;
}
let lookback_files = file_list::query(
org_id,
stream_name,
stream_type,
partition_time_level,
lookback_offset_start,
lookback_offset_end,
)
.await
.map_err(|e| anyhow::anyhow!("query lookback file list failed: {}", e))?;
files.extend(lookback_files);
}
log::debug!(
"[COMPACTOR] merge_by_stream [{}/{}/{}] time range: [{},{}], files: {}",
org_id,
@ -399,13 +466,6 @@ pub async fn merge_by_stream(
}
new_file_size += file.meta.original_size;
new_file_list.push(file.clone());
// metrics
metrics::COMPACT_MERGED_FILES
.with_label_values(&[&org_id, stream_type.to_string().as_str()])
.inc();
metrics::COMPACT_MERGED_BYTES
.with_label_values(&[&org_id, stream_type.to_string().as_str()])
.inc_by(file.meta.original_size as u64);
}
if new_file_list.len() > 1 {
batch_groups.push(MergeBatch {
@ -450,22 +510,14 @@ pub async fn merge_by_stream(
let new_file_name = std::mem::take(&mut new_file.key);
let new_file_meta = std::mem::take(&mut new_file.meta);
let new_file_list = batch_groups.get(batch_id).unwrap().files.as_slice();
if new_file_name.is_empty() {
if new_file_list.is_empty() {
// no file need to merge
break;
} else {
// delete files from file_list and continue
files_with_size.retain(|f| !&new_file_list.contains(f));
continue;
}
continue;
}
// delete small files keys & write big files keys, use transaction
let mut events = Vec::with_capacity(new_file_list.len() + 1);
events.push(FileKey {
key: new_file_name.clone(),
key: new_file_name,
meta: new_file_meta,
deleted: false,
segment_ids: None,
@ -536,16 +588,12 @@ pub async fn merge_by_stream(
.inc_by(time);
metrics::COMPACT_DELAY_HOURS
.with_label_values(&[org_id, stream_name, stream_type.to_string().as_str()])
.set(
(time_now_hour - offset_time_hour)
/ Duration::try_hours(1).unwrap().num_microseconds().unwrap(),
);
.set((time_now_hour - offset_time_hour) / hour_micros(1));
Ok(())
}
/// merge some small files into one big file, upload to storage, returns the big
/// file key and merged files
/// merge small files into big file, upload to storage, returns the big file key and merged files
pub async fn merge_files(
thread_id: usize,
org_id: &str,

View File

@ -18,7 +18,7 @@ use config::{
cluster::LOCAL_NODE,
get_config,
meta::{
cluster::Role,
cluster::{CompactionJobType, Role},
stream::{PartitionTimeLevel, StreamType, ALL_STREAM_TYPES},
},
};
@ -63,10 +63,12 @@ pub async fn run_retention() -> Result<(), anyhow::Error> {
continue; // not this node
}
let schema = infra::schema::get(&org_id, &stream_name, stream_type).await?;
let stream = super::stream::stream_res(&stream_name, stream_type, schema, None);
let stream_data_retention_end = if stream.settings.data_retention > 0 {
let date = now - Duration::try_days(stream.settings.data_retention).unwrap();
let stream_settings =
infra::schema::get_settings(&org_id, &stream_name, stream_type)
.await
.unwrap_or_default();
let stream_data_retention_end = if stream_settings.data_retention > 0 {
let date = now - Duration::try_days(stream_settings.data_retention).unwrap();
date.format("%Y-%m-%d").to_string()
} else {
data_lifecycle_end.clone()
@ -137,7 +139,7 @@ pub async fn run_retention() -> Result<(), anyhow::Error> {
}
/// Generate job for compactor
pub async fn run_generate_job() -> Result<(), anyhow::Error> {
pub async fn run_generate_job(job_type: CompactionJobType) -> Result<(), anyhow::Error> {
let orgs = db::schema::list_organizations_from_cache().await;
for org_id in orgs {
// check backlist
@ -191,16 +193,37 @@ pub async fn run_generate_job() -> Result<(), anyhow::Error> {
continue;
}
if let Err(e) =
merge::generate_job_by_stream(&org_id, stream_type, &stream_name).await
{
log::error!(
"[COMPACTOR] generate_job_by_stream [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
match job_type {
CompactionJobType::Current => {
if let Err(e) =
merge::generate_job_by_stream(&org_id, stream_type, &stream_name).await
{
log::error!(
"[COMPACTOR] generate_job_by_stream [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
}
}
CompactionJobType::Historical => {
if let Err(e) = merge::generate_old_data_job_by_stream(
&org_id,
stream_type,
&stream_name,
)
.await
{
log::error!(
"[COMPACTOR] generate_old_data_job_by_stream [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
}
}
}
}
}
@ -234,7 +257,7 @@ pub async fn run_merge(
.unwrap_or_default();
let partition_time_level =
unwrap_partition_time_level(stream_setting.partition_time_level, stream_type);
if partition_time_level == PartitionTimeLevel::Daily || cfg.compact.step_secs < 3600 {
if partition_time_level == PartitionTimeLevel::Daily {
// check if this stream need process by this node
let Some(node_name) =
get_node_from_consistent_hash(&stream_name, &Role::Compactor, None).await

View File

@ -51,6 +51,15 @@ pub async fn delete_by_stream(
return Ok(()); // created_at is after lifecycle_end, just skip
}
log::debug!(
"[COMPACT] delete_by_stream {}/{}/{}/{},{}",
org_id,
stream_type,
stream_name,
lifecycle_start,
lifecycle_end
);
// Hack for 1970-01-01
if lifecycle_start.le("1970-01-01") {
let lifecycle_end = created_at + Duration::try_days(1).unwrap();
@ -83,7 +92,7 @@ pub async fn delete_all(
let locker = dist_lock::lock(&lock_key, 0, None).await?;
let node = db::compact::retention::get_stream(org_id, stream_type, stream_name, None).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
log::error!("[COMPACT] stream {org_id}/{stream_type}/{stream_name} is deleting by {node}");
log::warn!("[COMPACT] stream {org_id}/{stream_type}/{stream_name} is deleting by {node}");
dist_lock::unlock(&locker).await?;
return Ok(()); // not this node, just skip
}
@ -179,7 +188,7 @@ pub async fn delete_by_date(
db::compact::retention::get_stream(org_id, stream_type, stream_name, Some(date_range))
.await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
log::error!(
log::warn!(
"[COMPACT] stream {org_id}/{stream_type}/{stream_name}/{:?} is deleting by {node}",
date_range
);

View File

@ -15,12 +15,16 @@
use std::sync::Arc;
use config::{meta::stream::StreamType, RwHashSet};
use config::{
meta::stream::StreamType,
utils::time::{hour_micros, now_micros},
RwHashMap,
};
use once_cell::sync::Lazy;
use crate::service::db;
static CACHE: Lazy<RwHashSet<String>> = Lazy::new(Default::default);
static CACHE: Lazy<RwHashMap<String, i64>> = Lazy::new(Default::default);
#[inline]
fn mk_key(
@ -47,12 +51,14 @@ pub async fn delete_stream(
let key = mk_key(org_id, stream_type, stream_name, date_range);
// write in cache
if CACHE.contains(&key) {
return Ok(()); // already in cache, just skip
if let Some(v) = CACHE.get(&key) {
if v.value() + hour_micros(1) > now_micros() {
return Ok(()); // already in cache, don't create same task in one hour
}
}
let db_key = format!("/compact/delete/{key}");
CACHE.insert(key);
CACHE.insert(key, now_micros());
Ok(db::put(&db_key, "OK".into(), db::NEED_WATCH, None).await?)
}
@ -92,7 +98,7 @@ pub fn is_deleting_stream(
stream_name: &str,
date_range: Option<(&str, &str)>,
) -> bool {
CACHE.contains(&mk_key(org_id, stream_type, stream_name, date_range))
CACHE.contains_key(&mk_key(org_id, stream_type, stream_name, date_range))
}
pub async fn delete_stream_done(
@ -102,9 +108,7 @@ pub async fn delete_stream_done(
date_range: Option<(&str, &str)>,
) -> Result<(), anyhow::Error> {
let key = mk_key(org_id, stream_type, stream_name, date_range);
db::delete_if_exists(&format!("/compact/delete/{key}"), false, db::NEED_WATCH)
.await
.map_err(|e| anyhow::anyhow!(e))?;
db::delete_if_exists(&format!("/compact/delete/{key}"), false, db::NEED_WATCH).await?;
// remove in cache
CACHE.remove(&key);
@ -128,19 +132,19 @@ pub async fn watch() -> Result<(), anyhow::Error> {
let cluster_coordinator = db::get_coordinator().await;
let mut events = cluster_coordinator.watch(key).await?;
let events = Arc::get_mut(&mut events).unwrap();
log::info!("Start watching stream deleting");
log::info!("Start watching compact deleting");
loop {
let ev = match events.recv().await {
Some(ev) => ev,
None => {
log::error!("watch_stream_deleting: event channel closed");
log::error!("watch_compact_deleting: event channel closed");
break;
}
};
match ev {
db::Event::Put(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
CACHE.insert(item_key.to_string());
CACHE.insert(item_key.to_string(), now_micros());
}
db::Event::Delete(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
@ -157,7 +161,7 @@ pub async fn cache() -> Result<(), anyhow::Error> {
let ret = db::list(key).await?;
for (item_key, _) in ret {
let item_key = item_key.strip_prefix(key).unwrap();
CACHE.insert(item_key.to_string());
CACHE.insert(item_key.to_string(), now_micros());
}
Ok(())
}

View File

@ -20,8 +20,11 @@ use std::{
};
use bytes::Buf;
use chrono::{DateTime, Duration, TimeZone, Utc};
use config::{meta::stream::FileKey, utils::json};
use chrono::{DateTime, TimeZone, Utc};
use config::{
meta::stream::FileKey,
utils::{json, time::hour_micros},
};
use futures::future::try_join_all;
use infra::{cache::stats, file_list as infra_file_list, storage};
use once_cell::sync::Lazy;
@ -218,7 +221,7 @@ pub async fn cache_time_range(time_min: i64, time_max: i64) -> Result<(), anyhow
let offset_time: DateTime<Utc> = Utc.timestamp_nanos(cur_time * 1000);
let file_list_prefix = offset_time.format("%Y/%m/%d/%H/").to_string();
cache(&file_list_prefix, false).await?;
cur_time += Duration::try_hours(1).unwrap().num_microseconds().unwrap();
cur_time += hour_micros(1);
}
Ok(())
}

View File

@ -75,10 +75,16 @@ pub async fn query_by_ids(trace_id: &str, ids: &[i64]) -> Result<Vec<FileKey>> {
// 1. first query from local cache
let (mut files, ids) = if !cfg.common.local_mode && cfg.common.meta_store_external {
let ids_set: HashSet<_> = ids.iter().cloned().collect();
let cached_files = file_list::LOCAL_CACHE
.query_by_ids(ids)
.await
.unwrap_or_default();
let cached_files = match file_list::LOCAL_CACHE.query_by_ids(ids).await {
Ok(files) => files,
Err(e) => {
log::error!(
"[trace_id {trace_id}] file_list query cache failed: {:?}",
e
);
Vec::new()
}
};
let cached_ids = cached_files
.iter()
.map(|(id, ..)| *id)