Compare commits

...

14 Commits

Author SHA1 Message Date
Taiming Liu e54ff6171a update: double usage reporting channel buffer size to accomondate large number of messages 2024-10-30 19:14:52 -07:00
Taiming Liu 03fd7215ff update: reduce default ZO_USAGE_PUBLISH_INTERVAL to 60s to avoid long delays 2024-10-30 19:03:51 -07:00
Taiming Liu f307c9476e fix: remove RwLock from in-memory USAGE_QUEUER 2024-10-30 19:03:51 -07:00
Taiming Liu f81a7657e3 feat: channel-based usage queue for async usage reporting 2024-10-30 19:03:51 -07:00
Taiming Liu 3966498e55
fix: parquet move_files split prefix to more columns for prefix_date (#4958) 2024-10-31 08:43:04 +08:00
Hengfei Yang 3955ed02f2
feat: add jobs for compact old data (#4952) 2024-10-31 00:33:09 +08:00
dependabot[bot] fe32354aa4
chore(deps): bump elliptic from 6.5.7 to 6.6.0 in /web (#4953) 2024-10-30 23:09:01 +08:00
Omkar Kesarkhane c4d164c230
feat: added Logstash data source in logs (#4957) 2024-10-30 22:13:36 +08:00
Huaijin 757c9dcb7b
fix: add search_type to search_multi (#4955) 2024-10-30 21:21:43 +08:00
Neha P 5306ad899f
test: clone alert (#4931)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced alert creation and cloning functionality with improved
validation checks.
- Added support for comprehensive search capabilities in the alert list.
- Introduced functionality to verify user interactions with search
history.

- **Bug Fixes**
	- Improved error handling and user feedback during alert cloning.
	- Enhanced validation for required fields in alert creation.

- **Tests**
- New test cases added for alert management and search history
functionalities.
- Updated existing tests to ensure better coverage and error handling,
including the addition of new scenarios for invalid inputs.

- **Chores**
- Cleaned up the test suite by removing unused code and improving
structure.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->

---------

Co-authored-by: Sai Nikhil <nikhil@openobserve.ai>
2024-10-30 17:53:27 +05:30
Subhra264 51e32f4b35
chore: don't ignore cargo test failure in unit tests ci (#4951)
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **Bug Fixes**
- Improved error handling during coverage testing to ensure failures are
no longer ignored.

- **New Features**
- Streamlined command execution flow with a default command set to
`cmd_check` when no arguments are provided.

- **Documentation**
	- Enhanced user feedback for unrecognized commands.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2024-10-30 16:11:27 +05:30
Abhay Padamani 273330405c
feat: color palette for dashboard chart (#4774)
- #4837 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

## Release Notes

- **New Features**
- Introduced a new color configuration option for dashboard panels,
allowing users to customize colors via a new dropdown component.
- Added a utility for managing color palettes and calculating color
values based on data metrics.

- **Enhancements**
- Improved SQL and PromQL data conversion processes to dynamically apply
color schemes and enhance tooltip formatting.
- Enhanced the configuration options for dashboard panels with
additional color settings.

- **Dependencies**
	- Added new dependencies for improved data visualization capabilities.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2024-10-30 15:25:00 +05:30
Huaijin 58ccd13abe
fix: unit test for join order (#4948) 2024-10-30 16:14:36 +08:00
Abhay Padamani 8cf9680886
fix: dashboard compare against issue (#4943)
- #4944 

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

- **New Features**
- Enhanced loading process for panel data with improved error handling
and caching mechanisms.
- Updated time calculation functions to allow for more accurate
timestamp computations based on specified offsets.
- Streamlined naming process for series in chart rendering based on time
range gaps.

- **Bug Fixes**
- Improved handling of time range gaps in data conversion functions,
ensuring consistent and accurate data representation.

- **Documentation**
- Updated function signatures to reflect changes in parameters and
enhance clarity for developers.
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2024-10-30 11:09:17 +05:30
46 changed files with 2197 additions and 1056 deletions

View File

@ -29,7 +29,6 @@ _cov_test() {
cargo llvm-cov test \
--verbose \
--ignore-filename-regex job \
--ignore-run-fail \
"$@"
}

View File

@ -235,6 +235,19 @@ pub struct PanelConfig {
table_dynamic_columns: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
mappings: Option<Vec<Mapping>>,
#[serde(skip_serializing_if = "Option::is_none")]
color: Option<ColorCfg>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "camelCase")]
pub struct ColorCfg {
#[serde(skip_serializing_if = "Option::is_none")]
mode: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
fixed_color: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
series_by: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
@ -308,7 +321,7 @@ pub struct Field {
#[serde(rename_all = "camelCase")]
pub struct Config {
#[serde(rename = "type")]
typee: String,
typee: String,
#[serde(skip_serializing_if = "Option::is_none")]
value: Option<Value>,
}
@ -473,7 +486,7 @@ pub struct LegendWidth {
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct LabelOption {
#[serde(skip_serializing_if = "Option::is_none")]
pub position: Option<LabelPosition>,
pub position: Option<LabelPosition>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rotate: Option<f64>,
}
@ -497,5 +510,5 @@ pub enum LabelPosition {
Top,
Inside,
InsideTop,
InsideBottom
}
InsideBottom,
}

View File

@ -662,7 +662,7 @@ pub struct Common {
pub usage_batch_size: usize,
#[env_config(
name = "ZO_USAGE_PUBLISH_INTERVAL",
default = 600,
default = 60,
help = "duration in seconds after last reporting usage will be published"
)]
// in seconds
@ -1044,18 +1044,6 @@ pub struct Limit {
help = "Maximum size of a single enrichment table in mb"
)]
pub max_enrichment_table_size: usize,
#[env_config(
name = "ZO_USE_UPPER_BOUND_FOR_MAX_TS",
default = false,
help = "use upper bound for max tx"
)]
pub use_upper_bound_for_max_ts: bool,
#[env_config(
name = "ZO_BUFFER_FOR_MAX_TS",
default = 60,
help = "buffer for upper bound in mins"
)]
pub upper_bound_for_max_ts: i64,
#[env_config(name = "ZO_SHORT_URL_RETENTION_DAYS", default = 30)] // days
pub short_url_retention_days: i64,
}
@ -1066,18 +1054,22 @@ pub struct Compact {
pub enabled: bool,
#[env_config(name = "ZO_COMPACT_INTERVAL", default = 60)] // seconds
pub interval: u64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_INTERVAL", default = 3600)] // seconds
pub old_data_interval: u64,
#[env_config(name = "ZO_COMPACT_STRATEGY", default = "file_time")] // file_size, file_time
pub strategy: String,
#[env_config(name = "ZO_COMPACT_LOOKBACK_HOURS", default = 0)] // hours
pub lookback_hours: i64,
#[env_config(name = "ZO_COMPACT_STEP_SECS", default = 3600)] // seconds
pub step_secs: i64,
#[env_config(name = "ZO_COMPACT_SYNC_TO_DB_INTERVAL", default = 600)] // seconds
pub sync_to_db_interval: u64,
#[env_config(name = "ZO_COMPACT_MAX_FILE_SIZE", default = 512)] // MB
pub max_file_size: usize,
#[env_config(name = "ZO_COMPACT_DATA_RETENTION_DAYS", default = 3650)] // days
pub data_retention_days: i64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_MAX_DAYS", default = 7)] // days
pub old_data_max_days: i64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_MIN_RECORDS", default = 100)] // records
pub old_data_min_records: i64,
#[env_config(name = "ZO_COMPACT_OLD_DATA_MIN_FILES", default = 10)] // files
pub old_data_min_files: i64,
#[env_config(name = "ZO_COMPACT_DELETE_FILES_DELAY_HOURS", default = 2)] // hours
pub delete_files_delay_hours: i64,
#[env_config(name = "ZO_COMPACT_BLOCKED_ORGS", default = "")] // use comma to split
@ -1098,7 +1090,7 @@ pub struct Compact {
pub job_run_timeout: i64,
#[env_config(
name = "ZO_COMPACT_JOB_CLEAN_WAIT_TIME",
default = 86400, // 1 day
default = 7200, // 2 hours
help = "Clean the jobs which are finished more than this time"
)]
pub job_clean_wait_time: i64,
@ -1458,6 +1450,11 @@ pub fn init() -> Config {
panic!("disk cache config error: {e}");
}
// check compact config
if let Err(e) = check_compact_config(&mut cfg) {
panic!("compact config error: {e}");
}
// check etcd config
if let Err(e) = check_etcd_config(&mut cfg) {
panic!("etcd config error: {e}");
@ -1561,34 +1558,6 @@ fn check_common_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
));
}
// check compact_max_file_size to MB
cfg.compact.max_file_size *= 1024 * 1024;
if cfg.compact.interval == 0 {
cfg.compact.interval = 60;
}
if cfg.compact.pending_jobs_metric_interval == 0 {
cfg.compact.pending_jobs_metric_interval = 300;
}
// check compact_step_secs, min value is 600s
if cfg.compact.step_secs == 0 {
cfg.compact.step_secs = 3600;
} else if cfg.compact.step_secs <= 600 {
cfg.compact.step_secs = 600;
}
if cfg.compact.data_retention_days > 0 && cfg.compact.data_retention_days < 3 {
return Err(anyhow::anyhow!(
"Data retention is not allowed to be less than 3 days."
));
}
if cfg.compact.delete_files_delay_hours < 1 {
return Err(anyhow::anyhow!(
"Delete files delay is not allowed to be less than 1 hour."
));
}
if cfg.compact.batch_size < 1 {
cfg.compact.batch_size = 100;
}
// If the default scrape interval is less than 5s, raise an error
if cfg.common.default_scrape_interval < 5 {
return Err(anyhow::anyhow!(
@ -1870,6 +1839,48 @@ fn check_disk_cache_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
Ok(())
}
fn check_compact_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
if cfg.compact.data_retention_days > 0 && cfg.compact.data_retention_days < 3 {
return Err(anyhow::anyhow!(
"Data retention is not allowed to be less than 3 days."
));
}
if cfg.compact.interval < 1 {
cfg.compact.interval = 60;
}
// check compact_max_file_size to MB
if cfg.compact.max_file_size < 1 {
cfg.compact.max_file_size = 512;
}
cfg.compact.max_file_size *= 1024 * 1024;
if cfg.compact.delete_files_delay_hours < 1 {
cfg.compact.delete_files_delay_hours = 2;
}
if cfg.compact.old_data_interval < 1 {
cfg.compact.old_data_interval = 3600;
}
if cfg.compact.old_data_max_days < 1 {
cfg.compact.old_data_max_days = 7;
}
if cfg.compact.old_data_min_records < 1 {
cfg.compact.old_data_min_records = 100;
}
if cfg.compact.old_data_min_files < 1 {
cfg.compact.old_data_min_files = 10;
}
if cfg.compact.batch_size < 1 {
cfg.compact.batch_size = 100;
}
if cfg.compact.pending_jobs_metric_interval == 0 {
cfg.compact.pending_jobs_metric_interval = 300;
}
Ok(())
}
fn check_sns_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
// Validate endpoint URL if provided
if !cfg.sns.endpoint.is_empty()

View File

@ -216,3 +216,9 @@ pub fn get_internal_grpc_token() -> String {
cfg.grpc.internal_grpc_token.clone()
}
}
// CompactionJobType is used to distinguish between current and historical compaction jobs.
pub enum CompactionJobType {
Current,
Historical,
}

View File

@ -1076,7 +1076,10 @@ mod search_history_utils {
let query = SearchHistoryQueryBuilder::new()
.with_org_id(&Some("org123".to_string()))
.build(SEARCH_STREAM_NAME);
assert_eq!(query, "SELECT * FROM usage WHERE event='Search' AND org_id = 'org123'");
assert_eq!(
query,
"SELECT * FROM usage WHERE event='Search' AND org_id = 'org123'"
);
}
#[test]

View File

@ -13,7 +13,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use chrono::{DateTime, Datelike, NaiveDateTime, TimeZone, Utc};
use chrono::{DateTime, Datelike, Duration, NaiveDateTime, TimeZone, Utc};
use once_cell::sync::Lazy;
use crate::utils::json;
@ -46,6 +46,19 @@ pub fn now_micros() -> i64 {
Utc::now().timestamp_micros()
}
#[inline(always)]
pub fn hour_micros(n: i64) -> i64 {
Duration::try_hours(n).unwrap().num_microseconds().unwrap()
}
#[inline(always)]
pub fn second_micros(n: i64) -> i64 {
Duration::try_seconds(n)
.unwrap()
.num_microseconds()
.unwrap()
}
#[inline(always)]
pub fn parse_i64_to_timestamp_micros(v: i64) -> i64 {
if v == 0 {

View File

@ -546,8 +546,10 @@ async fn oo_validator_internal(
path_prefix: &str,
) -> Result<ServiceRequest, (Error, ServiceRequest)> {
if auth_info.auth.starts_with("Basic") {
let decoded = base64::decode(auth_info.auth.strip_prefix("Basic").unwrap().trim())
.expect("Failed to decode base64 string");
let decoded = match base64::decode(auth_info.auth.strip_prefix("Basic").unwrap().trim()) {
Ok(val) => val,
Err(_) => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
};
let (username, password) = match get_user_details(decoded) {
Some(value) => value,
@ -562,14 +564,16 @@ async fn oo_validator_internal(
if chrono::Utc::now().timestamp() - auth_tokens.request_time > auth_tokens.expires_in {
Err((ErrorUnauthorized("Unauthorized Access"), req))
} else {
let decoded = base64::decode(
let decoded = match base64::decode(
auth_tokens
.auth_ext
.strip_prefix("auth_ext")
.unwrap()
.trim(),
)
.expect("Failed to decode base64 string");
) {
Ok(val) => val,
Err(_) => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
};
let (username, password) = match get_user_details(decoded) {
Some(value) => value,
None => return Err((ErrorUnauthorized("Unauthorized Access"), req)),
@ -584,8 +588,10 @@ async fn oo_validator_internal(
#[cfg(feature = "enterprise")]
pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option<String> {
if auth_str.starts_with("Basic") {
let decoded = base64::decode(auth_str.strip_prefix("Basic").unwrap().trim())
.expect("Failed to decode base64 string");
let decoded = match base64::decode(auth_str.strip_prefix("Basic").unwrap().trim()) {
Ok(val) => val,
Err(_) => return None,
};
match get_user_details(decoded) {
Some(value) => Some(value.0),
@ -599,14 +605,16 @@ pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option<String> {
if chrono::Utc::now().timestamp() - auth_tokens.request_time > auth_tokens.expires_in {
None
} else {
let decoded = base64::decode(
let decoded = match base64::decode(
auth_tokens
.auth_ext
.strip_prefix("auth_ext")
.unwrap()
.trim(),
)
.expect("Failed to decode base64 string");
) {
Ok(val) => val,
Err(_) => return None,
};
match get_user_details(decoded) {
Some(value) => Some(value.0),
None => None,
@ -618,9 +626,10 @@ pub async fn get_user_email_from_auth_str(auth_str: &str) -> Option<String> {
}
fn get_user_details(decoded: String) -> Option<(String, String)> {
let credentials = String::from_utf8(decoded.into())
.map_err(|_| ())
.expect("Failed to decode base64 string");
let credentials = match String::from_utf8(decoded.into()).map_err(|_| ()) {
Ok(val) => val,
Err(_) => return None,
};
let parts: Vec<&str> = credentials.splitn(2, ':').collect();
if parts.len() != 2 {
return None;

View File

@ -275,6 +275,9 @@ pub async fn search_multi(
}
}
// add search type to request
req.search_type = search_type;
metrics::QUERY_PENDING_NUMS
.with_label_values(&[&org_id])
.inc();

View File

@ -547,6 +547,6 @@ mod tests {
.uri("/proxy/org1/https://cloud.openobserve.ai/assets/flUhRq6tzZclQEJ-Vdg-IuiaDsNa.fd84f88b.woff")
.to_request();
let resp = call_service(&mut app, req).await;
assert_eq!(resp.status().as_u16(), 200);
assert_eq!(resp.status().as_u16(), 404);
}
}

View File

@ -56,6 +56,7 @@ impl From<DataFusionError> for Error {
};
}
if err.contains("parquet not found") {
log::error!("[Datafusion] Parquet file not found: {}", err);
return Error::ErrorCode(ErrorCodes::SearchParquetFileNotFound);
}
if err.contains("Invalid function ") {

View File

@ -85,6 +85,13 @@ pub trait FileList: Sync + Send + 'static {
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<FileId>>;
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>>;
async fn query_deleted(
&self,
org_id: &str,
@ -225,11 +232,7 @@ pub async fn list() -> Result<Vec<(String, FileMeta)>> {
}
#[inline]
#[tracing::instrument(
name = "infra:file_list:query_db",
skip_all,
fields(org_id = org_id, stream_name = stream_name)
)]
#[tracing::instrument(name = "infra:file_list:db:query")]
pub async fn query(
org_id: &str,
stream_type: StreamType,
@ -238,11 +241,7 @@ pub async fn query(
time_range: Option<(i64, i64)>,
flattened: Option<bool>,
) -> Result<Vec<(String, FileMeta)>> {
if let Some((start, end)) = time_range {
if start > end || start == 0 || end == 0 {
return Err(Error::Message("[file_list] invalid time range".to_string()));
}
}
validate_time_range(time_range)?;
CLIENT
.query(
org_id,
@ -262,27 +261,33 @@ pub async fn query_by_ids(ids: &[i64]) -> Result<Vec<(i64, String, FileMeta)>> {
}
#[inline]
#[tracing::instrument(
name = "infra:file_list:db:query_ids",
skip_all,
fields(org_id = org_id, stream_name = stream_name)
)]
#[tracing::instrument(name = "infra:file_list:db:query_ids")]
pub async fn query_ids(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<FileId>> {
if let Some((start, end)) = time_range {
if start > end || start == 0 || end == 0 {
return Err(Error::Message("[file_list] invalid time range".to_string()));
}
}
validate_time_range(time_range)?;
CLIENT
.query_ids(org_id, stream_type, stream_name, time_range)
.await
}
#[inline]
#[tracing::instrument(name = "infra:file_list:db:query_old_data_hours")]
pub async fn query_old_data_hours(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
validate_time_range(time_range)?;
CLIENT
.query_old_data_hours(org_id, stream_type, stream_name, time_range)
.await
}
#[inline]
pub async fn query_deleted(org_id: &str, time_max: i64, limit: i64) -> Result<Vec<(String, bool)>> {
CLIENT.query_deleted(org_id, time_max, limit).await
@ -440,6 +445,24 @@ pub async fn local_cache_gc() -> Result<()> {
Ok(())
}
fn validate_time_range(time_range: Option<(i64, i64)>) -> Result<()> {
if let Some((start, end)) = time_range {
if start > end || start == 0 || end == 0 {
return Err(Error::Message("[file_list] invalid time range".to_string()));
}
}
Ok(())
}
fn calculate_max_ts_upper_bound(time_end: i64, stream_type: StreamType) -> i64 {
let ts = super::schema::unwrap_partition_time_level(None, stream_type).duration();
if ts > 0 {
time_end + ts
} else {
time_end + PartitionTimeLevel::Hourly.duration()
}
}
#[derive(Debug, Clone, PartialEq, sqlx::FromRow)]
pub struct FileRecord {
#[sqlx(default)]

View File

@ -363,11 +363,8 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound =
time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
sqlx::query_as::<_, super::FileRecord>(
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
@ -380,20 +377,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = ? AND max_ts >= ? AND min_ts <= ?;
"#,
)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
};
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
@ -495,12 +478,10 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT.clone();
let cfg = get_config();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = ? AND max_ts >= ? AND max_ts <= ? AND min_ts <= ?;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
@ -509,15 +490,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let query = "SELECT id, records, original_size FROM file_list WHERE stream = ? AND max_ts >= ? AND min_ts <= ?;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
}));
}
@ -540,6 +512,56 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
if let Some((start, end)) = time_range {
if start == 0 && end == 0 {
return Ok(Vec::new());
}
}
let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
let pool = CLIENT.clone();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
let start = std::time::Instant::now();
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT date
FROM file_list
WHERE stream = ? AND max_ts >= ? AND max_ts <= ? AND min_ts <= ? AND records < ?
GROUP BY date HAVING count(*) >= ?;
"#;
let ret = sqlx::query(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.bind(cfg.compact.old_data_min_records)
.bind(cfg.compact.old_data_min_files)
.fetch_all(&pool)
.await?;
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
.with_label_values(&["query_old_data_hours", "file_list"])
.observe(time);
Ok(ret
.into_iter()
.map(|r| r.try_get::<String, &str>("date").unwrap_or_default())
.collect())
}
async fn query_deleted(
&self,
org_id: &str,

View File

@ -376,37 +376,20 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.fetch_all(&pool).await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound =
time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let sql = r#"
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;
"#;
sqlx::query_as::<_, super::FileRecord>(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let sql = r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;
"#;
sqlx::query_as::<_, super::FileRecord>(sql)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
sqlx::query_as::<_, super::FileRecord>(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
};
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
@ -508,29 +491,18 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT.clone();
let cfg = get_config();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.fetch_all(&pool)
.await
}));
}
@ -553,6 +525,56 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
if let Some((start, end)) = time_range {
if start == 0 && end == 0 {
return Ok(Vec::new());
}
}
let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
let pool = CLIENT.clone();
DB_QUERY_NUMS
.with_label_values(&["select", "file_list"])
.inc();
let start = std::time::Instant::now();
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT date
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4 AND records < $5
GROUP BY date HAVING count(*) >= $6;
"#;
let ret = sqlx::query(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.bind(cfg.compact.old_data_min_records)
.bind(cfg.compact.old_data_min_files)
.fetch_all(&pool)
.await?;
let time = start.elapsed().as_secs_f64();
DB_QUERY_TIME
.with_label_values(&["query_old_data_hours", "file_list"])
.observe(time);
Ok(ret
.into_iter()
.map(|r| r.try_get::<String, &str>("date").unwrap_or_default())
.collect())
}
async fn query_deleted(
&self,
org_id: &str,

View File

@ -326,11 +326,8 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.await
} else {
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound =
time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
sqlx::query_as::<_, super::FileRecord>(
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
@ -343,20 +340,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
sqlx::query_as::<_, super::FileRecord>(
r#"
SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, compressed_size, flattened
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;
"#,
)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
};
Ok(ret?
.iter()
@ -446,9 +429,7 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
let stream_key = stream_key.clone();
tasks.push(tokio::task::spawn(async move {
let pool = CLIENT_RO.clone();
let cfg = get_config();
if cfg.limit.use_upper_bound_for_max_ts {
let max_ts_upper_bound = time_end + cfg.limit.upper_bound_for_max_ts * 60 * 1_000_000;
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
@ -457,15 +438,6 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
.bind(time_end)
.fetch_all(&pool)
.await
} else {
let query = "SELECT id, records, original_size FROM file_list WHERE stream = $1 AND max_ts >= $2 AND min_ts <= $3;";
sqlx::query_as::<_, super::FileId>(query)
.bind(stream_key)
.bind(time_start)
.bind(time_end)
.fetch_all(&pool)
.await
}
}));
}
@ -484,6 +456,47 @@ SELECT stream, date, file, deleted, min_ts, max_ts, records, original_size, comp
Ok(rets)
}
async fn query_old_data_hours(
&self,
org_id: &str,
stream_type: StreamType,
stream_name: &str,
time_range: Option<(i64, i64)>,
) -> Result<Vec<String>> {
if let Some((start, end)) = time_range {
if start == 0 && end == 0 {
return Ok(Vec::new());
}
}
let stream_key = format!("{org_id}/{stream_type}/{stream_name}");
let pool = CLIENT_RO.clone();
let (time_start, time_end) = time_range.unwrap_or((0, 0));
let cfg = get_config();
let max_ts_upper_bound = super::calculate_max_ts_upper_bound(time_end, stream_type);
let sql = r#"
SELECT date
FROM file_list
WHERE stream = $1 AND max_ts >= $2 AND max_ts <= $3 AND min_ts <= $4 AND records < $5
GROUP BY date HAVING count(*) >= $6;
"#;
let ret = sqlx::query(sql)
.bind(stream_key)
.bind(time_start)
.bind(max_ts_upper_bound)
.bind(time_end)
.bind(cfg.compact.old_data_min_records)
.bind(cfg.compact.old_data_min_files)
.fetch_all(&pool)
.await?;
Ok(ret
.into_iter()
.map(|r| r.try_get::<String, &str>("date").unwrap_or_default())
.collect())
}
async fn query_deleted(
&self,
org_id: &str,

View File

@ -15,7 +15,12 @@
use std::sync::Arc;
use config::{cluster::LOCAL_NODE, get_config, meta::stream::FileKey, metrics};
use config::{
cluster::LOCAL_NODE,
get_config,
meta::{cluster::CompactionJobType, stream::FileKey},
metrics,
};
use tokio::{
sync::{mpsc, Mutex},
time,
@ -94,6 +99,7 @@ pub async fn run() -> Result<(), anyhow::Error> {
}
tokio::task::spawn(async move { run_generate_job().await });
tokio::task::spawn(async move { run_generate_old_data_job().await });
tokio::task::spawn(async move { run_merge(tx).await });
tokio::task::spawn(async move { run_retention().await });
tokio::task::spawn(async move { run_delay_deletion().await });
@ -138,12 +144,27 @@ async fn run_generate_job() -> Result<(), anyhow::Error> {
loop {
time::sleep(time::Duration::from_secs(get_config().compact.interval)).await;
log::debug!("[COMPACTOR] Running generate merge job");
if let Err(e) = compact::run_generate_job().await {
if let Err(e) = compact::run_generate_job(CompactionJobType::Current).await {
log::error!("[COMPACTOR] run generate merge job error: {e}");
}
}
}
/// Generate merging jobs for old data
async fn run_generate_old_data_job() -> Result<(), anyhow::Error> {
loop {
// run every 1 hour at least
time::sleep(time::Duration::from_secs(
get_config().compact.old_data_interval,
))
.await;
log::debug!("[COMPACTOR] Running generate merge job for old data");
if let Err(e) = compact::run_generate_job(CompactionJobType::Historical).await {
log::error!("[COMPACTOR] run generate merge job for old data error: {e}");
}
}
}
/// Merge small files
async fn run_merge(tx: mpsc::Sender<(MergeSender, MergeBatch)>) -> Result<(), anyhow::Error> {
loop {

View File

@ -291,13 +291,15 @@ async fn move_files(
return Ok(());
}
let columns = prefix.splitn(5, '/').collect::<Vec<&str>>();
let columns = prefix.splitn(9, '/').collect::<Vec<&str>>();
// eg: files/default/logs/olympics/0/2023/08/21/08/8b8a5451bbe1c44b/
// eg: files/default/traces/default/0/2023/09/04/05/default/service_name=ingester/
// let _ = columns[0].to_string(); // files/
let org_id = columns[1].to_string();
let stream_type = StreamType::from(columns[2]);
let stream_name = columns[3].to_string();
// let _thread_id = columns[4].to_string();
let prefix_date = format!("{}-{}-{}", columns[5], columns[6], columns[7]);
// log::debug!("[INGESTER:JOB:{thread_id}] check deletion for partition: {}", prefix);
@ -372,6 +374,42 @@ async fn move_files(
return Ok(());
}
// check data retention
let stream_settings = infra::schema::get_settings(&org_id, &stream_name, stream_type)
.await
.unwrap_or_default();
let mut stream_data_retention_days = cfg.compact.data_retention_days;
if stream_settings.data_retention > 0 {
stream_data_retention_days = stream_settings.data_retention;
}
if stream_data_retention_days > 0 {
let date =
config::utils::time::now() - Duration::try_days(stream_data_retention_days).unwrap();
let stream_data_retention_end = date.format("%Y-%m-%d").to_string();
if prefix_date < stream_data_retention_end {
for file in files {
log::warn!(
"[INGESTER:JOB:{thread_id}] the file [{}/{}/{}] was exceed the data retention, just delete file: {}",
&org_id,
stream_type,
&stream_name,
file.key,
);
if let Err(e) = tokio::fs::remove_file(wal_dir.join(&file.key)).await {
log::error!(
"[INGESTER:JOB:{thread_id}] Failed to remove parquet file from disk: {}, {}",
file.key,
e
);
}
// delete metadata from cache
WAL_PARQUET_METADATA.write().await.remove(&file.key);
PROCESSING_FILES.write().await.remove(&file.key);
}
return Ok(());
}
}
// log::debug!("[INGESTER:JOB:{thread_id}] start processing for partition: {}", prefix);
let wal_dir = wal_dir.clone();

View File

@ -16,7 +16,7 @@
use config::{cluster::LOCAL_NODE, get_config};
use tokio::time;
use crate::service::{compact::stats::update_stats_from_file_list, db, usage};
use crate::service::{compact::stats::update_stats_from_file_list, db};
pub async fn run() -> Result<(), anyhow::Error> {
// tokio::task::spawn(async move { usage_report_stats().await });
@ -25,25 +25,6 @@ pub async fn run() -> Result<(), anyhow::Error> {
Ok(())
}
async fn _usage_report_stats() -> Result<(), anyhow::Error> {
let cfg = get_config();
if !LOCAL_NODE.is_compactor() || !cfg.common.usage_enabled {
return Ok(());
}
// should run it every 10 minutes
let mut interval = time::interval(time::Duration::from_secs(
cfg.limit.calculate_stats_interval,
));
interval.tick().await; // trigger the first run
loop {
interval.tick().await;
if let Err(e) = usage::stats::publish_stats().await {
log::error!("[STATS] run publish stats error: {}", e);
}
}
}
// get stats from file_list to update stream_stats
async fn file_list_update_stats() -> Result<(), anyhow::Error> {
let cfg = get_config();

View File

@ -22,7 +22,11 @@ use config::{
stream::StreamType,
usage::{TriggerData, TriggerDataStatus, TriggerDataType},
},
utils::{json, rand::get_rand_num_within},
utils::{
json,
rand::get_rand_num_within,
time::{hour_micros, second_micros},
},
};
use cron::Schedule;
use futures::future::try_join_all;
@ -86,11 +90,8 @@ fn get_max_considerable_delay(frequency: i64) -> i64 {
// If the delay is more than this, the alert will be skipped.
// The maximum delay is the lowest of 1 hour or 20% of the frequency.
// E.g. if the frequency is 5 mins, the maximum delay is 1 min.
let frequency = Duration::try_seconds(frequency)
.unwrap()
.num_microseconds()
.unwrap();
let max_delay = Duration::try_hours(1).unwrap().num_microseconds().unwrap();
let frequency = second_micros(frequency);
let max_delay = hour_micros(1);
// limit.alert_considerable_delay is in percentage, convert into float
let considerable_delay = get_config().limit.alert_considerable_delay as f64 * 0.01;
let max_considerable_delay = (frequency as f64 * considerable_delay) as i64;

View File

@ -20,7 +20,12 @@ use std::{
use bytes::Buf;
use chrono::{DateTime, Datelike, Duration, TimeZone, Timelike, Utc};
use config::{cluster::LOCAL_NODE, ider, meta::stream::FileKey, utils::json};
use config::{
cluster::LOCAL_NODE,
ider,
meta::stream::FileKey,
utils::{json, time::hour_micros},
};
use hashbrown::HashMap;
use infra::{dist_lock, schema::STREAM_SCHEMAS_LATEST, storage};
use tokio::sync::{RwLock, Semaphore};
@ -106,7 +111,7 @@ pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
// offset
let mut is_waiting_streams = false;
for (key, val) in offsets {
if (val - Duration::try_hours(1).unwrap().num_microseconds().unwrap()) < offset {
if (val - hour_micros(1)) < offset {
log::info!("[COMPACT] file_list is waiting for stream: {key}, offset: {val}");
is_waiting_streams = true;
break;
@ -162,7 +167,7 @@ pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
merge_file_list(offset).await?;
// write new sync offset
offset = offset_time_hour + Duration::try_hours(1).unwrap().num_microseconds().unwrap();
offset = offset_time_hour + hour_micros(1);
db::compact::file_list::set_offset(offset).await
}

View File

@ -16,7 +16,8 @@
use std::io::{BufRead, BufReader};
use bytes::Buf;
use chrono::{DateTime, Duration, TimeZone, Utc};
use chrono::{DateTime, TimeZone, Utc};
use config::utils::time::hour_micros;
use futures::future::try_join_all;
use hashbrown::HashMap;
use infra::{file_list as infra_file_list, storage};
@ -201,7 +202,7 @@ async fn query_deleted_from_s3(
let entry = files.entry(file).or_insert_with(Vec::new);
entry.extend(records);
}
cur_time += Duration::try_hours(1).unwrap().num_microseconds().unwrap();
cur_time += hour_micros(1);
}
Ok(files)
}

View File

@ -34,6 +34,7 @@ use config::{
},
record_batch_ext::{concat_batches, merge_record_batches},
schema_ext::SchemaExt,
time::hour_micros,
},
FILE_EXT_PARQUET,
};
@ -143,26 +144,18 @@ pub async fn generate_job_by_stream(
)
.unwrap()
.timestamp_micros();
// 1. if step_secs less than 1 hour, must wait for at least max_file_retention_time
// 2. if step_secs greater than 1 hour, must wait for at least 3 * max_file_retention_time
// must wait for at least 3 * max_file_retention_time
// -- first period: the last hour local file upload to storage, write file list
// -- second period, the last hour file list upload to storage
// -- third period, we can do the merge, so, at least 3 times of
// max_file_retention_time
if (cfg.compact.step_secs < 3600
&& time_now.timestamp_micros() - offset
if offset >= time_now_hour
|| time_now.timestamp_micros() - offset
<= Duration::try_seconds(cfg.limit.max_file_retention_time as i64)
.unwrap()
.num_microseconds()
.unwrap())
|| (cfg.compact.step_secs >= 3600
&& (offset >= time_now_hour
|| time_now.timestamp_micros() - offset
<= Duration::try_seconds(cfg.limit.max_file_retention_time as i64)
.unwrap()
.num_microseconds()
.unwrap()
* 3))
.unwrap()
* 3
{
return Ok(()); // the time is future, just wait
}
@ -184,11 +177,9 @@ pub async fn generate_job_by_stream(
}
// write new offset
let offset = offset
+ Duration::try_seconds(cfg.compact.step_secs)
.unwrap()
.num_microseconds()
.unwrap();
let offset = offset + hour_micros(1);
// format to hour with zero minutes, seconds
let offset = offset - offset % hour_micros(1);
db::compact::files::set_offset(
org_id,
stream_type,
@ -201,6 +192,113 @@ pub async fn generate_job_by_stream(
Ok(())
}
/// Generate merging job by stream
/// 1. get old data by hour
/// 2. check if other node is processing
/// 3. create job or return
pub async fn generate_old_data_job_by_stream(
org_id: &str,
stream_type: StreamType,
stream_name: &str,
) -> Result<(), anyhow::Error> {
// get last compacted offset
let (offset, node) = db::compact::files::get_offset(org_id, stream_type, stream_name).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
return Ok(()); // other node is processing
}
if node.is_empty() || LOCAL_NODE.uuid.ne(&node) {
let lock_key = format!("/compact/merge/{}/{}/{}", org_id, stream_type, stream_name);
let locker = dist_lock::lock(&lock_key, 0, None).await?;
// check the working node again, maybe other node locked it first
let (offset, node) = db::compact::files::get_offset(org_id, stream_type, stream_name).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some()
{
dist_lock::unlock(&locker).await?;
return Ok(()); // other node is processing
}
// set to current node
let ret = db::compact::files::set_offset(
org_id,
stream_type,
stream_name,
offset,
Some(&LOCAL_NODE.uuid.clone()),
)
.await;
dist_lock::unlock(&locker).await?;
drop(locker);
ret?;
}
if offset == 0 {
return Ok(()); // no data
}
let cfg = get_config();
let stream_settings = infra::schema::get_settings(org_id, stream_name, stream_type)
.await
.unwrap_or_default();
let mut stream_data_retention_days = cfg.compact.data_retention_days;
if stream_settings.data_retention > 0 {
stream_data_retention_days = stream_settings.data_retention;
}
if stream_data_retention_days > cfg.compact.old_data_max_days {
stream_data_retention_days = cfg.compact.old_data_max_days;
}
if stream_data_retention_days == 0 {
return Ok(()); // no need to check old data
}
// get old data by hour, `offset - 2 hours` as old data
let end_time = offset - hour_micros(2);
let start_time = end_time
- Duration::try_days(stream_data_retention_days as i64)
.unwrap()
.num_microseconds()
.unwrap();
let hours = infra_file_list::query_old_data_hours(
org_id,
stream_type,
stream_name,
Some((start_time, end_time)),
)
.await?;
// generate merging job
for hour in hours {
let column = hour.split('/').collect::<Vec<_>>();
if column.len() != 4 {
return Err(anyhow::anyhow!(
"Unexpected hour format in {}, Expected format YYYY/MM/DD/HH",
hour
));
}
let offset = DateTime::parse_from_rfc3339(&format!(
"{}-{}-{}T{}:00:00Z",
column[0], column[1], column[2], column[3]
))?
.with_timezone(&Utc);
let offset = offset.timestamp_micros();
log::debug!(
"[COMPACTOR] generate_old_data_job_by_stream [{}/{}/{}] hours: {}, offset: {}",
org_id,
stream_type,
stream_name,
hour,
offset
);
if let Err(e) = infra_file_list::add_job(org_id, stream_type, stream_name, offset).await {
return Err(anyhow::anyhow!(
"[COMAPCT] add file_list_jobs for old data failed: {}",
e
));
}
}
Ok(())
}
/// compactor run steps on a stream:
/// 3. get a cluster lock for compactor stream
/// 4. read last compacted offset: year/month/day/hour
@ -277,17 +375,11 @@ pub async fn merge_by_stream(
// get current hour(day) all files
let (partition_offset_start, partition_offset_end) =
if partition_time_level == PartitionTimeLevel::Daily {
(
offset_time_day,
offset_time_day + Duration::try_hours(24).unwrap().num_microseconds().unwrap() - 1,
)
(offset_time_day, offset_time_day + hour_micros(24) - 1)
} else {
(
offset_time_hour,
offset_time_hour + Duration::try_hours(1).unwrap().num_microseconds().unwrap() - 1,
)
(offset_time_hour, offset_time_hour + hour_micros(1) - 1)
};
let mut files = file_list::query(
let files = file_list::query(
org_id,
stream_name,
stream_type,
@ -298,31 +390,6 @@ pub async fn merge_by_stream(
.await
.map_err(|e| anyhow::anyhow!("query file list failed: {}", e))?;
// check lookback files
if cfg.compact.lookback_hours > 0 {
let lookback_offset = Duration::try_hours(cfg.compact.lookback_hours)
.unwrap()
.num_microseconds()
.unwrap();
let lookback_offset_start = partition_offset_start - lookback_offset;
let mut lookback_offset_end = partition_offset_end - lookback_offset;
if lookback_offset_end > partition_offset_start {
// the lookback period is overlap with current period
lookback_offset_end = partition_offset_start;
}
let lookback_files = file_list::query(
org_id,
stream_name,
stream_type,
partition_time_level,
lookback_offset_start,
lookback_offset_end,
)
.await
.map_err(|e| anyhow::anyhow!("query lookback file list failed: {}", e))?;
files.extend(lookback_files);
}
log::debug!(
"[COMPACTOR] merge_by_stream [{}/{}/{}] time range: [{},{}], files: {}",
org_id,
@ -399,13 +466,6 @@ pub async fn merge_by_stream(
}
new_file_size += file.meta.original_size;
new_file_list.push(file.clone());
// metrics
metrics::COMPACT_MERGED_FILES
.with_label_values(&[&org_id, stream_type.to_string().as_str()])
.inc();
metrics::COMPACT_MERGED_BYTES
.with_label_values(&[&org_id, stream_type.to_string().as_str()])
.inc_by(file.meta.original_size as u64);
}
if new_file_list.len() > 1 {
batch_groups.push(MergeBatch {
@ -450,22 +510,14 @@ pub async fn merge_by_stream(
let new_file_name = std::mem::take(&mut new_file.key);
let new_file_meta = std::mem::take(&mut new_file.meta);
let new_file_list = batch_groups.get(batch_id).unwrap().files.as_slice();
if new_file_name.is_empty() {
if new_file_list.is_empty() {
// no file need to merge
break;
} else {
// delete files from file_list and continue
files_with_size.retain(|f| !&new_file_list.contains(f));
continue;
}
continue;
}
// delete small files keys & write big files keys, use transaction
let mut events = Vec::with_capacity(new_file_list.len() + 1);
events.push(FileKey {
key: new_file_name.clone(),
key: new_file_name,
meta: new_file_meta,
deleted: false,
segment_ids: None,
@ -536,16 +588,12 @@ pub async fn merge_by_stream(
.inc_by(time);
metrics::COMPACT_DELAY_HOURS
.with_label_values(&[org_id, stream_name, stream_type.to_string().as_str()])
.set(
(time_now_hour - offset_time_hour)
/ Duration::try_hours(1).unwrap().num_microseconds().unwrap(),
);
.set((time_now_hour - offset_time_hour) / hour_micros(1));
Ok(())
}
/// merge some small files into one big file, upload to storage, returns the big
/// file key and merged files
/// merge small files into big file, upload to storage, returns the big file key and merged files
pub async fn merge_files(
thread_id: usize,
org_id: &str,

View File

@ -18,7 +18,7 @@ use config::{
cluster::LOCAL_NODE,
get_config,
meta::{
cluster::Role,
cluster::{CompactionJobType, Role},
stream::{PartitionTimeLevel, StreamType, ALL_STREAM_TYPES},
},
};
@ -63,10 +63,12 @@ pub async fn run_retention() -> Result<(), anyhow::Error> {
continue; // not this node
}
let schema = infra::schema::get(&org_id, &stream_name, stream_type).await?;
let stream = super::stream::stream_res(&stream_name, stream_type, schema, None);
let stream_data_retention_end = if stream.settings.data_retention > 0 {
let date = now - Duration::try_days(stream.settings.data_retention).unwrap();
let stream_settings =
infra::schema::get_settings(&org_id, &stream_name, stream_type)
.await
.unwrap_or_default();
let stream_data_retention_end = if stream_settings.data_retention > 0 {
let date = now - Duration::try_days(stream_settings.data_retention).unwrap();
date.format("%Y-%m-%d").to_string()
} else {
data_lifecycle_end.clone()
@ -137,7 +139,7 @@ pub async fn run_retention() -> Result<(), anyhow::Error> {
}
/// Generate job for compactor
pub async fn run_generate_job() -> Result<(), anyhow::Error> {
pub async fn run_generate_job(job_type: CompactionJobType) -> Result<(), anyhow::Error> {
let orgs = db::schema::list_organizations_from_cache().await;
for org_id in orgs {
// check backlist
@ -191,16 +193,37 @@ pub async fn run_generate_job() -> Result<(), anyhow::Error> {
continue;
}
if let Err(e) =
merge::generate_job_by_stream(&org_id, stream_type, &stream_name).await
{
log::error!(
"[COMPACTOR] generate_job_by_stream [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
match job_type {
CompactionJobType::Current => {
if let Err(e) =
merge::generate_job_by_stream(&org_id, stream_type, &stream_name).await
{
log::error!(
"[COMPACTOR] generate_job_by_stream [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
}
}
CompactionJobType::Historical => {
if let Err(e) = merge::generate_old_data_job_by_stream(
&org_id,
stream_type,
&stream_name,
)
.await
{
log::error!(
"[COMPACTOR] generate_old_data_job_by_stream [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
}
}
}
}
}
@ -234,7 +257,7 @@ pub async fn run_merge(
.unwrap_or_default();
let partition_time_level =
unwrap_partition_time_level(stream_setting.partition_time_level, stream_type);
if partition_time_level == PartitionTimeLevel::Daily || cfg.compact.step_secs < 3600 {
if partition_time_level == PartitionTimeLevel::Daily {
// check if this stream need process by this node
let Some(node_name) =
get_node_from_consistent_hash(&stream_name, &Role::Compactor, None).await

View File

@ -51,6 +51,15 @@ pub async fn delete_by_stream(
return Ok(()); // created_at is after lifecycle_end, just skip
}
log::debug!(
"[COMPACT] delete_by_stream {}/{}/{}/{},{}",
org_id,
stream_type,
stream_name,
lifecycle_start,
lifecycle_end
);
// Hack for 1970-01-01
if lifecycle_start.le("1970-01-01") {
let lifecycle_end = created_at + Duration::try_days(1).unwrap();
@ -83,7 +92,7 @@ pub async fn delete_all(
let locker = dist_lock::lock(&lock_key, 0, None).await?;
let node = db::compact::retention::get_stream(org_id, stream_type, stream_name, None).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
log::error!("[COMPACT] stream {org_id}/{stream_type}/{stream_name} is deleting by {node}");
log::warn!("[COMPACT] stream {org_id}/{stream_type}/{stream_name} is deleting by {node}");
dist_lock::unlock(&locker).await?;
return Ok(()); // not this node, just skip
}
@ -179,7 +188,7 @@ pub async fn delete_by_date(
db::compact::retention::get_stream(org_id, stream_type, stream_name, Some(date_range))
.await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some() {
log::error!(
log::warn!(
"[COMPACT] stream {org_id}/{stream_type}/{stream_name}/{:?} is deleting by {node}",
date_range
);

View File

@ -15,12 +15,16 @@
use std::sync::Arc;
use config::{meta::stream::StreamType, RwHashSet};
use config::{
meta::stream::StreamType,
utils::time::{hour_micros, now_micros},
RwHashMap,
};
use once_cell::sync::Lazy;
use crate::service::db;
static CACHE: Lazy<RwHashSet<String>> = Lazy::new(Default::default);
static CACHE: Lazy<RwHashMap<String, i64>> = Lazy::new(Default::default);
#[inline]
fn mk_key(
@ -47,12 +51,14 @@ pub async fn delete_stream(
let key = mk_key(org_id, stream_type, stream_name, date_range);
// write in cache
if CACHE.contains(&key) {
return Ok(()); // already in cache, just skip
if let Some(v) = CACHE.get(&key) {
if v.value() + hour_micros(1) > now_micros() {
return Ok(()); // already in cache, don't create same task in one hour
}
}
let db_key = format!("/compact/delete/{key}");
CACHE.insert(key);
CACHE.insert(key, now_micros());
Ok(db::put(&db_key, "OK".into(), db::NEED_WATCH, None).await?)
}
@ -92,7 +98,7 @@ pub fn is_deleting_stream(
stream_name: &str,
date_range: Option<(&str, &str)>,
) -> bool {
CACHE.contains(&mk_key(org_id, stream_type, stream_name, date_range))
CACHE.contains_key(&mk_key(org_id, stream_type, stream_name, date_range))
}
pub async fn delete_stream_done(
@ -102,9 +108,7 @@ pub async fn delete_stream_done(
date_range: Option<(&str, &str)>,
) -> Result<(), anyhow::Error> {
let key = mk_key(org_id, stream_type, stream_name, date_range);
db::delete_if_exists(&format!("/compact/delete/{key}"), false, db::NEED_WATCH)
.await
.map_err(|e| anyhow::anyhow!(e))?;
db::delete_if_exists(&format!("/compact/delete/{key}"), false, db::NEED_WATCH).await?;
// remove in cache
CACHE.remove(&key);
@ -128,19 +132,19 @@ pub async fn watch() -> Result<(), anyhow::Error> {
let cluster_coordinator = db::get_coordinator().await;
let mut events = cluster_coordinator.watch(key).await?;
let events = Arc::get_mut(&mut events).unwrap();
log::info!("Start watching stream deleting");
log::info!("Start watching compact deleting");
loop {
let ev = match events.recv().await {
Some(ev) => ev,
None => {
log::error!("watch_stream_deleting: event channel closed");
log::error!("watch_compact_deleting: event channel closed");
break;
}
};
match ev {
db::Event::Put(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
CACHE.insert(item_key.to_string());
CACHE.insert(item_key.to_string(), now_micros());
}
db::Event::Delete(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
@ -157,7 +161,7 @@ pub async fn cache() -> Result<(), anyhow::Error> {
let ret = db::list(key).await?;
for (item_key, _) in ret {
let item_key = item_key.strip_prefix(key).unwrap();
CACHE.insert(item_key.to_string());
CACHE.insert(item_key.to_string(), now_micros());
}
Ok(())
}

View File

@ -20,8 +20,11 @@ use std::{
};
use bytes::Buf;
use chrono::{DateTime, Duration, TimeZone, Utc};
use config::{meta::stream::FileKey, utils::json};
use chrono::{DateTime, TimeZone, Utc};
use config::{
meta::stream::FileKey,
utils::{json, time::hour_micros},
};
use futures::future::try_join_all;
use infra::{cache::stats, file_list as infra_file_list, storage};
use once_cell::sync::Lazy;
@ -218,7 +221,7 @@ pub async fn cache_time_range(time_min: i64, time_max: i64) -> Result<(), anyhow
let offset_time: DateTime<Utc> = Utc.timestamp_nanos(cur_time * 1000);
let file_list_prefix = offset_time.format("%Y/%m/%d/%H/").to_string();
cache(&file_list_prefix, false).await?;
cur_time += Duration::try_hours(1).unwrap().num_microseconds().unwrap();
cur_time += hour_micros(1);
}
Ok(())
}

View File

@ -75,10 +75,16 @@ pub async fn query_by_ids(trace_id: &str, ids: &[i64]) -> Result<Vec<FileKey>> {
// 1. first query from local cache
let (mut files, ids) = if !cfg.common.local_mode && cfg.common.meta_store_external {
let ids_set: HashSet<_> = ids.iter().cloned().collect();
let cached_files = file_list::LOCAL_CACHE
.query_by_ids(ids)
.await
.unwrap_or_default();
let cached_files = match file_list::LOCAL_CACHE.query_by_ids(ids).await {
Ok(files) => files,
Err(e) => {
log::error!(
"[trace_id {trace_id}] file_list query cache failed: {:?}",
e
);
Vec::new()
}
};
let cached_ids = cached_files
.iter()
.map(|(id, ..)| *id)

View File

@ -140,7 +140,7 @@ mod tests {
.unwrap();
let state = SessionStateBuilder::new()
.with_config(SessionConfig::new())
.with_config(SessionConfig::new().with_target_partitions(12))
.with_runtime_env(Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()))
.with_default_features()
.with_physical_optimizer_rule(Arc::new(JoinReorderRule::new()))

View File

@ -35,15 +35,51 @@ use o2_enterprise::enterprise::common::auditor;
use once_cell::sync::Lazy;
use proto::cluster_rpc;
use reqwest::Client;
use tokio::{sync::RwLock, time};
use tokio::{
sync::{mpsc, oneshot},
time,
};
pub mod ingestion_service;
pub mod stats;
pub static USAGE_DATA: Lazy<Arc<RwLock<Vec<UsageData>>>> =
Lazy::new(|| Arc::new(RwLock::new(vec![])));
pub static TRIGGERS_USAGE_DATA: Lazy<Arc<RwLock<Vec<TriggerData>>>> =
Lazy::new(|| Arc::new(RwLock::new(vec![])));
static USAGE_QUEUER: Lazy<Arc<UsageQueuer>> = Lazy::new(|| Arc::new(initialize_usage_queuer()));
fn initialize_usage_queuer() -> UsageQueuer {
let cfg = get_config();
let timeout = time::Duration::from_secs(cfg.common.usage_publish_interval.try_into().unwrap());
let batch_size = cfg.common.usage_batch_size;
let (msg_sender, msg_receiver) = mpsc::channel::<UsageMessage>(batch_size * 2);
tokio::task::spawn(async move { ingest_usage_job(msg_receiver, batch_size, timeout).await });
UsageQueuer::new(msg_sender)
}
pub async fn run() {
let cfg = get_config();
if !cfg.common.usage_enabled {
return;
}
// Force initialization and wait for the background task to be ready
let (ping_sender, ping_receiver) = oneshot::channel();
if let Err(e) = USAGE_QUEUER
.msg_sender
.send(UsageMessage::Ping(ping_sender))
.await
{
log::error!("Failed to initialize usage queuer: {e}");
return;
}
if let Err(e) = ping_receiver.await {
log::error!("Usage queuer initialization failed: {e}");
return;
}
log::debug!("Usage queuer initialized successfully");
}
pub async fn report_request_usage_stats(
stats: RequestStats,
@ -150,19 +186,23 @@ pub async fn report_request_usage_stats(
}
}
pub async fn publish_usage(mut usage: Vec<UsageData>) {
let mut usages = USAGE_DATA.write().await;
usages.append(&mut usage);
if usages.len() < get_config().common.usage_batch_size {
async fn publish_usage(usage: Vec<UsageData>) {
let cfg = get_config();
if !cfg.common.usage_enabled {
return;
}
let curr_usages = std::mem::take(&mut *usages);
// release the write lock
drop(usages);
ingest_usages(curr_usages).await
match USAGE_QUEUER
.enqueue(usage.into_iter().map(UsageBuffer::Usage).collect())
.await
{
Err(e) => {
log::error!("Failed to send usage data to background ingesting job: {e}")
}
Ok(()) => {
log::debug!("Successfully queued usage data to be ingested")
}
}
}
pub async fn publish_triggers_usage(trigger: TriggerData) {
@ -171,18 +211,17 @@ pub async fn publish_triggers_usage(trigger: TriggerData) {
return;
}
let mut usages = TRIGGERS_USAGE_DATA.write().await;
usages.push(trigger);
if usages.len() < cfg.common.usage_batch_size {
return;
match USAGE_QUEUER
.enqueue(vec![UsageBuffer::Trigger(trigger)])
.await
{
Err(e) => {
log::error!("Failed to send trigger usage data to background ingesting job: {e}")
}
Ok(()) => {
log::debug!("Successfully queued trigger usage data to be ingested")
}
}
let curr_usages = std::mem::take(&mut *usages);
// release the write lock
drop(usages);
ingest_trigger_usages(curr_usages).await
}
pub async fn flush() {
@ -190,44 +229,13 @@ pub async fn flush() {
#[cfg(feature = "enterprise")]
flush_audit().await;
// flush usage report
flush_usage().await;
// flush triggers usage report
flush_triggers_usage().await;
}
async fn flush_usage() {
if !get_config().common.usage_enabled {
return;
// shutdown usage_queuer
let (res_sender, res_receiver) = oneshot::channel();
if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
log::error!("Error shutting down USAGE_QUEUER: {e}");
}
let mut usages = USAGE_DATA.write().await;
if usages.len() == 0 {
return;
}
let curr_usages = std::mem::take(&mut *usages);
// release the write lock
drop(usages);
ingest_usages(curr_usages).await
}
async fn flush_triggers_usage() {
if !get_config().common.usage_enabled {
return;
}
let mut usages = TRIGGERS_USAGE_DATA.write().await;
if usages.len() == 0 {
return;
}
let curr_usages = std::mem::take(&mut *usages);
// release the write lock
drop(usages);
ingest_trigger_usages(curr_usages).await
// wait for flush ingestion job
res_receiver.await.ok();
}
async fn ingest_usages(curr_usages: Vec<UsageData>) {
@ -311,10 +319,15 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
);
if &cfg.common.usage_reporting_mode != "both" {
// on error in ingesting usage data, push back the data
let mut usages = USAGE_DATA.write().await;
let mut curr_usages = curr_usages.clone();
usages.append(&mut curr_usages);
drop(usages);
let curr_usages = curr_usages.clone();
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.await
{
log::error!(
"Error in pushing back un-ingested Usage data to UsageQueuer: {e}"
);
}
}
}
}
@ -322,10 +335,15 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
log::error!("Error in ingesting usage data to external URL {:?}", e);
if &cfg.common.usage_reporting_mode != "both" {
// on error in ingesting usage data, push back the data
let mut usages = USAGE_DATA.write().await;
let mut curr_usages = curr_usages.clone();
usages.append(&mut curr_usages);
drop(usages);
let curr_usages = curr_usages.clone();
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.await
{
log::error!(
"Error in pushing back un-ingested Usage data to UsageQueuer: {e}"
);
}
}
}
}
@ -344,10 +362,12 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
if let Err(e) = ingestion_service::ingest(&cfg.common.usage_org, req).await {
log::error!("Error in ingesting usage data {:?}", e);
// on error in ingesting usage data, push back the data
let mut usages = USAGE_DATA.write().await;
let mut curr_usages = curr_usages.clone();
usages.append(&mut curr_usages);
drop(usages);
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.await
{
log::error!("Error in pushing back un-ingested Usage data to UsageQueuer: {e}");
}
}
}
}
@ -370,57 +390,151 @@ async fn ingest_trigger_usages(curr_usages: Vec<TriggerData>) {
};
if let Err(e) = ingestion_service::ingest(&get_config().common.usage_org, req).await {
log::error!("Error in ingesting triggers usage data {:?}", e);
// on error in ingesting usage data, push back the data
let mut usages = TRIGGERS_USAGE_DATA.write().await;
let mut curr_usages = curr_usages.clone();
usages.append(&mut curr_usages);
drop(usages);
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Trigger).collect())
.await
{
log::error!("Error in pushing back un-ingested Usage data to UsageQueuer: {e}");
}
}
}
async fn publish_existing_usage() {
let mut usages = USAGE_DATA.write().await;
log::debug!("publishing usage reports,len: {}", usages.len());
if usages.is_empty() {
return;
}
let curr_usages = std::mem::take(&mut *usages);
// release the write lock
drop(usages);
ingest_usages(curr_usages).await
#[derive(Debug)]
struct UsageQueuer {
msg_sender: mpsc::Sender<UsageMessage>,
}
async fn publish_existing_triggers_usage() {
let mut usages = TRIGGERS_USAGE_DATA.write().await;
log::debug!("publishing triggers usage reports,len: {}", usages.len());
if usages.is_empty() {
return;
impl UsageQueuer {
fn new(msg_sender: mpsc::Sender<UsageMessage>) -> Self {
Self { msg_sender }
}
let curr_usages = std::mem::take(&mut *usages);
// release the write lock
drop(usages);
async fn enqueue(
&self,
usage_buf: Vec<UsageBuffer>,
) -> Result<(), mpsc::error::SendError<UsageMessage>> {
self.msg_sender.send(UsageMessage::Data(usage_buf)).await
}
ingest_trigger_usages(curr_usages).await
async fn shutdown(
&self,
res_sender: oneshot::Sender<()>,
) -> Result<(), mpsc::error::SendError<UsageMessage>> {
self.msg_sender
.send(UsageMessage::Shutdown(res_sender))
.await
}
}
pub async fn run() {
let cfg = get_config();
if !cfg.common.usage_enabled {
return;
#[derive(Debug)]
enum UsageMessage {
Data(Vec<UsageBuffer>),
Shutdown(oneshot::Sender<()>),
Ping(oneshot::Sender<()>),
}
#[derive(Debug)]
enum UsageBuffer {
Usage(UsageData),
Trigger(TriggerData),
}
#[derive(Debug)]
struct UsageReportRunner {
pending: Vec<UsageBuffer>,
batch_size: usize,
timeout: time::Duration,
last_processed: time::Instant,
}
impl UsageReportRunner {
fn new(batch_size: usize, timeout: time::Duration) -> Self {
Self {
pending: Vec::new(),
batch_size,
timeout,
last_processed: time::Instant::now(),
}
}
let mut usage_interval = time::interval(time::Duration::from_secs(
cfg.common.usage_publish_interval.try_into().unwrap(),
));
usage_interval.tick().await; // trigger the first run
fn push(&mut self, data: Vec<UsageBuffer>) {
self.pending.extend(data);
}
fn should_process(&self) -> bool {
self.pending.len() >= self.batch_size
|| (!self.pending.is_empty() && self.last_processed.elapsed() >= self.timeout)
}
fn take_batch(&mut self) -> Vec<UsageBuffer> {
self.last_processed = time::Instant::now();
std::mem::take(&mut self.pending)
}
}
/// Background job to collect and ingest UsageData and TriggerData.
/// Ingestion happens when either the batch_size or the timeout is exceeded, whichever satisfies the
/// first.
async fn ingest_usage_job(
mut msg_receiver: mpsc::Receiver<UsageMessage>,
batch_size: usize,
timeout: time::Duration,
) {
let mut usage_report_runner = UsageReportRunner::new(batch_size, timeout);
let mut interval = time::interval(timeout);
loop {
log::debug!("Usage ingestion loop running");
usage_interval.tick().await;
publish_existing_usage().await;
publish_existing_triggers_usage().await;
tokio::select! {
msg = msg_receiver.recv() => {
match msg {
Some(UsageMessage::Data(usage_buf)) => {
usage_report_runner.push(usage_buf);
if usage_report_runner.should_process() {
let buffered = usage_report_runner.take_batch();
ingest_buffered_usage(buffered).await;
}
}
Some(UsageMessage::Shutdown(res_sender)) => {
log::debug!("Received shutdown signal");
// process any remaining data before shutting down
if !usage_report_runner.pending.is_empty() {
let buffered = usage_report_runner.take_batch();
ingest_buffered_usage(buffered).await;
}
res_sender.send(()).ok();
break;
}
Some(UsageMessage::Ping(ping_sender)) => {
log::debug!("Received initialization ping");
ping_sender.send(()).ok();
}
None => break, // channel closed
}
}
_ = interval.tick() => {
if usage_report_runner.should_process() {
let buffered = usage_report_runner.take_batch();
ingest_buffered_usage(buffered).await;
}
}
}
}
}
async fn ingest_buffered_usage(usage_buffer: Vec<UsageBuffer>) {
log::debug!("Ingest {} buffered usage data", usage_buffer.len());
let (mut usage_data, mut trigger_data) = (Vec::new(), Vec::new());
for item in usage_buffer {
match item {
UsageBuffer::Usage(usage) => usage_data.push(usage),
UsageBuffer::Trigger(trigger) => trigger_data.push(trigger),
}
}
if !usage_data.is_empty() {
ingest_usages(usage_data).await;
}
if !trigger_data.is_empty() {
ingest_trigger_usages(trigger_data).await;
}
}

View File

@ -1,282 +0,0 @@
// Copyright 2024 OpenObserve Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::HashMap, sync::Arc};
use config::{
cluster::LOCAL_NODE,
get_config,
meta::{
stream::StreamType,
usage::{Stats, UsageEvent, STATS_STREAM, USAGE_STREAM},
},
utils::json,
};
use infra::dist_lock;
use once_cell::sync::Lazy;
use proto::cluster_rpc;
use reqwest::Client;
use super::ingestion_service;
use crate::{
common::infra::cluster::get_node_by_uuid,
service::{db, search as SearchService},
};
pub static CLIENT: Lazy<Arc<Client>> = Lazy::new(|| Arc::new(Client::new()));
pub async fn publish_stats() -> Result<(), anyhow::Error> {
let cfg = get_config();
let mut orgs = db::schema::list_organizations_from_cache().await;
orgs.retain(|org: &String| org != &cfg.common.usage_org);
for org_id in orgs {
// get the working node for the organization
let (_offset, node) = get_last_stats_offset(&org_id).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some()
{
log::debug!("[STATS] for organization {org_id} are being calculated by {node}");
continue;
}
// get lock
let locker =
dist_lock::lock(&format!("/stats/publish_stats/org/{org_id}"), 0, None).await?;
let (last_query_ts, node) = get_last_stats_offset(&org_id).await;
if !node.is_empty() && LOCAL_NODE.uuid.ne(&node) && get_node_by_uuid(&node).await.is_some()
{
log::debug!("[STATS] for organization {org_id} are being calculated by {node}");
dist_lock::unlock(&locker).await?;
continue;
}
// set current node to lock the organization
let ret = if !node.is_empty() || LOCAL_NODE.uuid.ne(&node) {
set_last_stats_offset(&org_id, last_query_ts, Some(&LOCAL_NODE.uuid.clone())).await
} else {
Ok(())
};
// release lock
dist_lock::unlock(&locker).await?;
drop(locker);
ret?;
let current_ts = chrono::Utc::now().timestamp_micros();
let sql = format!(
"SELECT sum(num_records) as records, sum(size) as original_size, org_id, stream_type, stream_name FROM \"{USAGE_STREAM}\" where _timestamp between {last_query_ts} and {current_ts} and event = \'{}\' and org_id = \'{}\' group by org_id, stream_type, stream_name",
UsageEvent::Ingestion,
org_id
);
let query = config::meta::search::Query {
sql,
size: 100000000,
..Default::default()
};
let req = config::meta::search::Request {
query,
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
search_type: None,
index_type: "".to_string(),
};
// do search
match SearchService::search("", &cfg.common.usage_org, StreamType::Logs, None, &req).await {
Ok(res) => {
if !res.hits.is_empty() {
match report_stats(res.hits, &org_id, last_query_ts, current_ts).await {
Ok(_) => {
log::info!("report stats success");
set_last_stats_offset(
&org_id,
current_ts,
Some(&LOCAL_NODE.uuid.clone()),
)
.await?;
}
Err(err) => {
log::error!("report stats error: {:?} for {}", err, &org_id);
}
}
} else {
log::info!(
"no stats between time: {} and {} for {}",
last_query_ts,
current_ts,
&org_id
);
}
}
Err(err) => {
log::error!("calculate stats error: {:?} for {}", err, &org_id);
}
}
}
Ok(())
}
async fn get_last_stats(
org_id: &str,
stats_ts: i64,
) -> std::result::Result<Vec<json::Value>, anyhow::Error> {
let sql = format!(
"SELECT records, original_size, org_id, stream_type, stream_name, min_ts, max_ts FROM \"{STATS_STREAM}\" where _timestamp = {stats_ts} and org_id = \'{org_id}\'"
);
let query = config::meta::search::Query {
sql,
size: 100000000,
..Default::default()
};
let req = config::meta::search::Request {
query,
encoding: config::meta::search::RequestEncoding::Empty,
regions: vec![],
clusters: vec![],
timeout: 0,
search_type: None,
index_type: "".to_string(),
};
match SearchService::search(
"",
&get_config().common.usage_org,
StreamType::Logs,
None,
&req,
)
.await
{
Ok(res) => Ok(res.hits),
Err(err) => match &err {
infra::errors::Error::ErrorCode(infra::errors::ErrorCodes::SearchStreamNotFound(_)) => {
Ok(vec![])
}
_ => Err(err.into()),
},
}
}
async fn report_stats(
report_data: Vec<json::Value>,
org_id: &str,
last_query_ts: i64,
curr_ts: i64,
) -> Result<(), anyhow::Error> {
// get existing stats
let existing_stats = get_last_stats(org_id, last_query_ts).await?;
let mut report_data_map = to_map(report_data);
let curr_data: Vec<&Stats> = if !existing_stats.is_empty() {
let mut existing_stats_map = to_map(existing_stats);
for (key, value) in report_data_map.iter_mut() {
if let Some(existing_value) = existing_stats_map.remove(key) {
value.records += existing_value.records;
value.original_size += existing_value.original_size;
value._timestamp = curr_ts;
if value.min_ts == 0 && existing_value.min_ts != 0 {
value.min_ts = existing_value.min_ts;
} else {
value.min_ts = curr_ts;
}
value.max_ts = curr_ts;
} else {
value._timestamp = curr_ts;
}
}
// Add entries from existing_stats_map that aren't in report_data_map
for (key, mut value) in existing_stats_map {
value._timestamp = curr_ts;
report_data_map.entry(key).or_insert(value);
}
report_data_map.values().collect()
} else {
for (_, value) in report_data_map.iter_mut() {
value._timestamp = curr_ts;
value.min_ts = curr_ts;
value.max_ts = curr_ts;
}
report_data_map.values().collect()
};
let report_data: Vec<json::Value> = curr_data
.iter()
.map(|s| serde_json::to_value(s).unwrap())
.collect();
let req = cluster_rpc::UsageRequest {
stream_name: STATS_STREAM.to_owned(),
data: Some(cluster_rpc::UsageData::from(report_data)),
};
match ingestion_service::ingest(&get_config().common.usage_org, req).await {
Ok(_) => Ok(()),
Err(err) => Err(err),
}
}
async fn get_last_stats_offset(org_id: &str) -> (i64, String) {
let key = format!("/stats/last_updated/org/{org_id}");
let value = match db::get(&key).await {
Ok(ret) => String::from_utf8_lossy(&ret).to_string(),
Err(_) => String::from("0"),
};
if value.contains(';') {
let mut parts = value.split(';');
let offset: i64 = parts.next().unwrap().parse().unwrap();
let node = parts.next().unwrap().to_string();
(offset, node)
} else {
(value.parse().unwrap(), String::from(""))
}
}
pub async fn set_last_stats_offset(
org_id: &str,
offset: i64,
node: Option<&str>,
) -> Result<(), anyhow::Error> {
let val = if let Some(node) = node {
format!("{};{}", offset, node)
} else {
offset.to_string()
};
let key = format!("/stats/last_updated/org/{org_id}");
db::put(&key, val.into(), db::NO_NEED_WATCH, None).await?;
Ok(())
}
pub async fn _set_cache_expiry(offset: i64) -> Result<(), anyhow::Error> {
let key = "/stats/cache_expiry".to_string();
db::put(&key, offset.to_string().into(), db::NO_NEED_WATCH, None).await?;
Ok(())
}
fn to_map(data: Vec<json::Value>) -> HashMap<String, Stats> {
let mut map = HashMap::new();
for item in data {
let stats: Stats = json::from_value(item).unwrap();
let key = format!(
"{}/{}/{}",
stats.org_id, stats.stream_type, stats.stream_name
);
map.insert(key, stats);
}
map
}

View File

@ -169,27 +169,7 @@ test.describe("Alerts testcases", () => {
await expect(page.locator(".q-notification__message").getByText(/Please fill required fields/).first()).toBeVisible();
});
// test("should saved template successfully", async ({ page }) => {
// await page.waitForTimeout(2000);
// await page.locator('[data-test="alert-templates-tab"]').click({ force: true });
// await page.waitForTimeout(1000);
// await page.locator(
// '[data-test="alert-template-list-add-alert-btn"]').click({
// force: true });
// await page.waitForTimeout(100);
// await page.locator(
// '[data-test="add-template-name-input"]').fill("automationalert");
// const jsonString = '{"text": "{alert_name} is active"}';
// await page.click(".view-line")
// await page.keyboard.type(jsonString);
// await page.locator(
// '[data-test="add-template-submit-btn"]').click({ force: true });await expect(page.locator(
// ".q-notification__message").getByText(/Template Saved Successfully/).first()).toBeVisible();
// await page.waitForTimeout(1000);
// // await page.locator('[data-test="alert-template-list-automationalert-delete-template"]').click({ force: true });
// // await page.waitForTimeout(100);
// // await page.locator('[data-test="confirm-button"]').click();
// });
test("should display error when valid JSON not entered under template body", async ({ page }) => {
@ -239,6 +219,4 @@ test.describe("Alerts testcases", () => {
".q-notification__message").getByText(/Please fill required fields/).first()).toBeVisible();
})
});

View File

@ -423,7 +423,7 @@ test.describe("Sanity testcases", () => {
await page.waitForTimeout(100);
await page
.locator('[data-test="add-template-name-input"]')
.fill("sanitytemplates");
.fill("sanitytemp1");
const jsonString = '{"text": "{alert_name} is active"}';
await page.click(".view-line");
await page.keyboard.type(jsonString);
@ -434,6 +434,7 @@ test.describe("Sanity testcases", () => {
await expect(
page
.locator(".q-notification__message")
.getByText(/Template Saved Successfully/)
.first()
).toBeVisible();
@ -448,9 +449,9 @@ test.describe("Sanity testcases", () => {
await page.waitForTimeout(2000);
await page
.locator('[data-test="add-destination-name-input"]')
.fill("sanitydestinations");
.fill("sanitydest1");
await page.locator('[data-test="add-destination-template-select"]').click();
await page.getByText("sanitytemplate").click();
await page.getByText("sanitytemp1").click();
await page.locator('[data-test="add-destination-url-input"]').click();
await page
.locator('[data-test="add-destination-url-input"]')
@ -469,7 +470,7 @@ test.describe("Sanity testcases", () => {
await page
.locator('[data-test="add-alert-name-input"]')
.getByLabel("Name *")
.fill("sanityalerts");
.fill("sanityalert1");
await page
.locator('[data-test="add-alert-stream-type-select"]')
.getByText("arrow_drop_down")
@ -491,17 +492,43 @@ test.describe("Sanity testcases", () => {
.click();
await page
.locator(
'[data-test="add-alert-destination-sanitydestinations-select-item"]'
'[data-test="add-alert-destination-sanitydest1-select-item"]'
)
.click();
await page.locator('[data-test="add-alert-submit-btn"]').click();
// Clone the alert
await page.locator('[data-test="alert-clone"]').click(); // Ensure this selector is correct
await page.getByLabel('Alert Name').click();
await page.getByLabel('Alert Name').fill('test-clone');
await page.locator('[data-test="to-be-clone-alert-name"]').click()
await page.locator('[data-test="to-be-clone-alert-name"]').fill('test-clone');
await page.locator('[data-test="to-be-clone-stream-type"]').click();
await page.getByRole('option', { name: 'logs' }).locator('div').nth(2).click();
await page.locator('[data-test="to-be-clone-stream-name"]').click();
await page.locator('[data-test="to-be-clone-stream-name"]').fill('e2e_automate');
await page.waitForTimeout(2000);
await page.getByRole('option', { name: 'e2e_automate' }).click({force:true});
await page.waitForTimeout(2000);
await page.locator('[data-test="clone-alert-submit-btn"]').click();
await page.getByText('Alert Cloned Successfully').click();
// Delete the cloned alert
await page.getByRole('cell', { name: 'test-clone' }).click();
await page.locator('[data-test="alert-list-test-clone-delete-alert"]').click(); // Adjust the selector if necessary
await page.locator('[data-test="confirm-button"]').click();
// Delete the original alert
await page.locator('[data-test="alert-list-search-input"]').click();
await page.locator('[data-test="alert-list-search-input"]').fill("sani");
await page.waitForTimeout(2000);
await page
.locator('[data-test="alert-list-sanityalerts-delete-alert"]')
.locator('[data-test="alert-list-sanityalert1-delete-alert"]')
.click();
await page.locator('[data-test="confirm-button"]').click();
await page.waitForTimeout(2000);
await page.locator('[data-test="alert-destinations-tab"]').click();
await page.locator('[data-test="destination-list-search-input"]').click();
await page
@ -510,7 +537,7 @@ test.describe("Sanity testcases", () => {
await page.waitForTimeout(2000);
await page
.locator(
'[data-test="alert-destination-list-sanitydestinations-delete-destination"]'
'[data-test="alert-destination-list-sanitydest1-delete-destination"]'
)
.click();
await page.locator('[data-test="confirm-button"]').click();
@ -522,7 +549,7 @@ test.describe("Sanity testcases", () => {
await page.waitForTimeout(2000);
await page
.locator(
'[data-test="alert-template-list-sanitytemplates-delete-template"]'
'[data-test="alert-template-list-sanitytemp1-delete-template"]'
)
.click();
await page.locator('[data-test="confirm-button"]').click();
@ -965,6 +992,8 @@ test.describe("Sanity testcases", () => {
// Use a more specific locator for 'e2e_automate' by targeting its unique container or parent element
await page.locator('[data-test="logs-search-index-list"]').getByText('e2e_automate').click();
});
});
});

106
web/package-lock.json generated
View File

@ -19,6 +19,7 @@
"@tanstack/vue-table": "^8.19.3",
"@tanstack/vue-virtual": "^3.8.4",
"cron-parser": "^4.9.0",
"d3-scale": "^4.0.2",
"date-fns": "^3.6.0",
"date-fns-tz": "^3.1.3",
"dompurify": "^3.1.6",
@ -53,6 +54,7 @@
"@quasar/vite-plugin": "^1.7.0",
"@rushstack/eslint-patch": "^1.10.3",
"@types/axios": "^0.14.0",
"@types/d3-scale": "^4.0.8",
"@types/dompurify": "^3.0.5",
"@types/js-cookie": "^3.0.6",
"@types/jsdom": "^21.1.7",
@ -2511,6 +2513,21 @@
"integrity": "sha512-nl09VhutdjINdWyXxHWN/w9zlNCfr60JUqJbd24YXUuCwgeL0TpFSdElCwb6cxfB6ybE19Gjj4g0jsgkXxKv1Q==",
"license": "MIT"
},
"node_modules/@types/d3-scale": {
"version": "4.0.8",
"resolved": "https://registry.npmjs.org/@types/d3-scale/-/d3-scale-4.0.8.tgz",
"integrity": "sha512-gkK1VVTr5iNiYJ7vWDI+yUFFlszhNMtVeneJ6lUTKPjprsvLLI9/tgEGiXJOnlINJA8FyA88gfnQsHbybVZrYQ==",
"dev": true,
"dependencies": {
"@types/d3-time": "*"
}
},
"node_modules/@types/d3-time": {
"version": "3.0.3",
"resolved": "https://registry.npmjs.org/@types/d3-time/-/d3-time-3.0.3.tgz",
"integrity": "sha512-2p6olUZ4w3s+07q3Tm2dbiMZy5pCDfYwtLXXHUnVzXgQlZ/OyPtUz6OL382BkOuGlLXqfT+wqv8Fw2v8/0geBw==",
"dev": true
},
"node_modules/@types/dompurify": {
"version": "3.0.5",
"resolved": "https://registry.npmjs.org/@types/dompurify/-/dompurify-3.0.5.tgz",
@ -5843,6 +5860,81 @@
"url": "https://github.com/chalk/supports-color?sponsor=1"
}
},
"node_modules/d3-array": {
"version": "3.2.4",
"resolved": "https://registry.npmjs.org/d3-array/-/d3-array-3.2.4.tgz",
"integrity": "sha512-tdQAmyA18i4J7wprpYq8ClcxZy3SC31QMeByyCFyRt7BVHdREQZ5lpzoe5mFEYZUWe+oq8HBvk9JjpibyEV4Jg==",
"dependencies": {
"internmap": "1 - 2"
},
"engines": {
"node": ">=12"
}
},
"node_modules/d3-color": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/d3-color/-/d3-color-3.1.0.tgz",
"integrity": "sha512-zg/chbXyeBtMQ1LbD/WSoW2DpC3I0mpmPdW+ynRTj/x2DAWYrIY7qeZIHidozwV24m4iavr15lNwIwLxRmOxhA==",
"engines": {
"node": ">=12"
}
},
"node_modules/d3-format": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/d3-format/-/d3-format-3.1.0.tgz",
"integrity": "sha512-YyUI6AEuY/Wpt8KWLgZHsIU86atmikuoOmCfommt0LYHiQSPjvX2AcFc38PX0CBpr2RCyZhjex+NS/LPOv6YqA==",
"engines": {
"node": ">=12"
}
},
"node_modules/d3-interpolate": {
"version": "3.0.1",
"resolved": "https://registry.npmjs.org/d3-interpolate/-/d3-interpolate-3.0.1.tgz",
"integrity": "sha512-3bYs1rOD33uo8aqJfKP3JWPAibgw8Zm2+L9vBKEHJ2Rg+viTR7o5Mmv5mZcieN+FRYaAOWX5SJATX6k1PWz72g==",
"dependencies": {
"d3-color": "1 - 3"
},
"engines": {
"node": ">=12"
}
},
"node_modules/d3-scale": {
"version": "4.0.2",
"resolved": "https://registry.npmjs.org/d3-scale/-/d3-scale-4.0.2.tgz",
"integrity": "sha512-GZW464g1SH7ag3Y7hXjf8RoUuAFIqklOAq3MRl4OaWabTFJY9PN/E1YklhXLh+OQ3fM9yS2nOkCoS+WLZ6kvxQ==",
"dependencies": {
"d3-array": "2.10.0 - 3",
"d3-format": "1 - 3",
"d3-interpolate": "1.2.0 - 3",
"d3-time": "2.1.1 - 3",
"d3-time-format": "2 - 4"
},
"engines": {
"node": ">=12"
}
},
"node_modules/d3-time": {
"version": "3.1.0",
"resolved": "https://registry.npmjs.org/d3-time/-/d3-time-3.1.0.tgz",
"integrity": "sha512-VqKjzBLejbSMT4IgbmVgDjpkYrNWUYJnbCGo874u7MMKIWsILRX+OpX/gTk8MqjpT1A/c6HY2dCA77ZN0lkQ2Q==",
"dependencies": {
"d3-array": "2 - 3"
},
"engines": {
"node": ">=12"
}
},
"node_modules/d3-time-format": {
"version": "4.1.0",
"resolved": "https://registry.npmjs.org/d3-time-format/-/d3-time-format-4.1.0.tgz",
"integrity": "sha512-dJxPBlzC7NugB2PDLwo9Q8JiTR3M3e4/XANkreKSUxF8vvXKqm1Yfq4Q5dl8budlunRVlUUaDUgFt7eA8D6NLg==",
"dependencies": {
"d3-time": "1 - 3"
},
"engines": {
"node": ">=12"
}
},
"node_modules/dashdash": {
"version": "1.14.1",
"resolved": "https://registry.npmjs.org/dashdash/-/dashdash-1.14.1.tgz",
@ -6468,9 +6560,9 @@
}
},
"node_modules/elliptic": {
"version": "6.5.7",
"resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.5.7.tgz",
"integrity": "sha512-ESVCtTwiA+XhY3wyh24QqRGBoP3rEdDUl3EDUUo9tft074fi19IrdpH7hLCMMP3CIj7jb3W96rn8lt/BqIlt5Q==",
"version": "6.6.0",
"resolved": "https://registry.npmjs.org/elliptic/-/elliptic-6.6.0.tgz",
"integrity": "sha512-dpwoQcLc/2WLQvJvLRHKZ+f9FgOdjnq11rurqwekGQygGPsYSK29OMMD2WalatiqQ+XGFDglTNixpPfI+lpaAA==",
"dependencies": {
"bn.js": "^4.11.9",
"brorand": "^1.1.0",
@ -9032,6 +9124,14 @@
"node": ">= 0.4"
}
},
"node_modules/internmap": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/internmap/-/internmap-2.0.3.tgz",
"integrity": "sha512-5Hh7Y1wQbvY5ooGgPbDaL5iYLAPzMTUrjMulskHLH6wnv/A+1q5rgEaiuqEjB+oxGXIVZs1FF+R/KPN3ZSQYYg==",
"engines": {
"node": ">=12"
}
},
"node_modules/ipaddr.js": {
"version": "2.2.0",
"resolved": "https://registry.npmjs.org/ipaddr.js/-/ipaddr.js-2.2.0.tgz",

View File

@ -33,6 +33,7 @@
"@tanstack/vue-table": "^8.19.3",
"@tanstack/vue-virtual": "^3.8.4",
"cron-parser": "^4.9.0",
"d3-scale": "^4.0.2",
"date-fns": "^3.6.0",
"date-fns-tz": "^3.1.3",
"dompurify": "^3.1.6",
@ -67,6 +68,7 @@
"@quasar/vite-plugin": "^1.7.0",
"@rushstack/eslint-patch": "^1.10.3",
"@types/axios": "^0.14.0",
"@types/d3-scale": "^4.0.8",
"@types/dompurify": "^3.0.5",
"@types/js-cookie": "^3.0.6",
"@types/jsdom": "^21.1.7",

View File

@ -0,0 +1 @@
<svg id="Layer_1" xmlns="http://www.w3.org/2000/svg" viewBox="0 0 80 80" width="2500" height="2500"><style>.st0{fill:#f3bd19}.st1{fill:#231f20}.st2{fill:#3ebeb0}.st3{fill:#37a595}.st4{fill:none}</style><path class="st0" d="M41.1 41.9H15.6V12.5h7.7c9.9 0 17.8 8 17.8 17.8v11.6z"/><path class="st1" d="M41.1 67.5c-14.1 0-25.6-11.4-25.6-25.6h25.6v25.6z"/><path class="st2" d="M41.1 41.9h23.3v25.6H41.1z"/><path class="st3" d="M41.1 41.9h5.4v25.6h-5.4z"/><path class="st4" d="M0 0h80v80H0z"/></svg>

After

Width:  |  Height:  |  Size: 494 B

View File

@ -258,14 +258,16 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
</div>
<q-card-section>
<q-form @submit="submitForm">
<q-input v-model="toBeCloneAlertName" label="Alert Name" />
<q-input data-test="to-be-clone-alert-name" v-model="toBeCloneAlertName" label="Alert Name" />
<q-select
data-test="to-be-clone-stream-type"
v-model="toBeClonestreamType"
label="Stream Type"
:options="streamTypes"
@update:model-value="updateStreams()"
/>
<q-select
data-test="to-be-clone-stream-name"
v-model="toBeClonestreamName"
:loading="isFetchingStreams"
:disable="!toBeClonestreamType"
@ -280,7 +282,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
/>
<div class="flex justify-center q-mt-lg">
<q-btn
data-test="add-alert-cancel-btn"
data-test="clone-alert-cancel-btn"
v-close-popup="true"
class="q-mb-md text-bold"
:label="t('alerts.cancel')"
@ -289,7 +291,7 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
no-caps
/>
<q-btn
data-test="add-alert-submit-btn"
data-test="clone-alert-submit-btn"
:label="t('alerts.save')"
class="q-mb-md text-bold no-border q-ml-md"
color="secondary"

View File

@ -0,0 +1,280 @@
<!-- Copyright 2023 Zinc Labs Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
-->
<template>
<div>
<div style="display: flex; align-items: center">
<!-- dropdown to select color palette type/mode -->
<q-select
v-model="dashboardPanelData.data.config.color.mode"
:options="colorOptions"
outlined
dense
label="Color palette"
class="showLabelOnTop"
stack-label
:display-value="selectedOptionLabel"
@update:model-value="onColorModeChange"
style="width: 100%"
:popup-content-style="{ height: '300px', width: '200px' }"
>
<template v-slot:option="props">
<!-- label -->
<!-- sublabel -->
<!-- color palette as gradient -->
<q-item v-if="!props.opt.isGroup" v-bind="props.itemProps">
<q-item-section style="padding: 2px">
<q-item-label>{{ props.opt.label }}</q-item-label>
<q-item-label caption>
<div v-if="props.opt.subLabel">
<div style="font-weight: 200">
{{ props.opt.subLabel }}
</div>
</div>
<div
v-if="Array.isArray(props.opt.colorPalette)"
class="color-container"
>
<div
:style="{
background: `linear-gradient(to right, ${props.opt.colorPalette.join(
', ',
)})`,
width: '100%',
height: '8px',
borderRadius: '3px',
}"
></div>
</div>
</q-item-label>
</q-item-section>
</q-item>
<!-- Render non-selectable group headers -->
<q-item v-else>
<q-item-section>
<q-item-label v-html="props.opt.label" />
</q-item-section>
</q-item>
</template>
</q-select>
<!-- color picker for fixed and shades typed color mode -->
<div
class="color-input-wrapper"
v-if="
['fixed', 'shades'].includes(
dashboardPanelData.data.config.color.mode,
)
"
style="margin-top: 30px; margin-left: 5px"
>
<input
type="color"
v-model="dashboardPanelData.data.config.color.fixedColor[0]"
/>
</div>
</div>
<!-- color by button group -->
<div
class="q-pt-md"
v-if="dashboardPanelData.data.config.color.mode.startsWith('continuous')"
>
Color series by:
<div>
<q-btn-toggle
v-model="dashboardPanelData.data.config.color.seriesBy"
push
toggle-color="primary"
size="md"
:options="[
{ label: 'Last', value: 'last' },
{ label: 'Min', value: 'min' },
{ label: 'Max', value: 'max' },
]"
/>
</div>
</div>
</div>
</template>
<script lang="ts">
import useDashboardPanelData from "@/composables/useDashboardPanel";
import { classicColorPalette } from "@/utils/dashboard/colorPalette";
import { computed } from "vue";
import { onBeforeMount } from "vue";
import { defineComponent } from "vue";
export default defineComponent({
name: "ColorPaletteDropdown",
setup() {
const { dashboardPanelData, promqlMode } = useDashboardPanelData();
onBeforeMount(() => {
// on before mount need to check whether color object is there or not else use palette-classic-by-series as a default
if (!dashboardPanelData?.data?.config?.color) {
dashboardPanelData.data.config.color = {
mode: "palette-classic-by-series",
fixedColor: [],
seriesBy: "last",
};
}
});
const colorOptions = [
{
label: "<b>By Series</b>",
isGroup: true,
},
{
label: "Default Palette (By Series)",
subLabel: "Series with the same name will use the same color",
colorPalette: classicColorPalette,
value: "palette-classic-by-series",
},
{
label: "Palette-Classic",
subLabel:
"A random color will be used for each series, regardless of its name",
colorPalette: [
"#5470c6",
"#91cc75",
"#fac858",
"#ee6666",
"#73c0de",
"#3ba272",
"#fc8452",
"#9a60b4",
"#ea7ccc",
"#59c4e6",
"#edafda",
"#93b7e3",
"#a5e7f0",
],
value: "palette-classic",
},
{
label: "Single Color",
subLabel: "Set a specific color to all series",
value: "fixed",
},
{
label: "Shades Of Specific Color",
subLabel: "Different shades of specific color",
value: "shades",
},
{
label: "<b>By Value</b>",
isGroup: true,
},
{
label: "Green-Yellow-Red (By Value)",
colorPalette: ["green", "yellow", "red"],
value: "continuous-green-yellow-red",
},
{
label: "Red-Yellow-Green (By Value)",
colorPalette: ["red", "yellow", "green"],
value: "continuous-red-yellow-green",
},
{
label: "Temperature (By Value)",
colorPalette: ["#F6EADB", "#FBDBA2", "#FFC86D", "#FC8585"],
value: "continuous-temperature",
},
{
label: "Positive (By Value)",
colorPalette: ["#A7D1A7", "#7AB97A", "#4EA24E", "#228B22"],
value: "continuous-positive",
},
{
label: "Negative (By Value)",
colorPalette: ["#FFD4D4", "#FFADAD", "#F77272", "#F03030", "#B12E21"],
value: "continuous-negative",
},
{
label: "Light To Dark Blue (By Value)",
colorPalette: ["#B8CCE0", "#96AFCD", "#7392BA", "#5175A7", "#2E5894"],
value: "continuous-light-to-dark-blue",
},
];
const selectedOptionLabel = computed(() => {
const selectedOption = colorOptions.find(
(option) =>
option.value ===
(dashboardPanelData?.data?.config?.color?.mode ??
"palette-classic-by-series"),
);
return selectedOption
? selectedOption.label
: "Palette-Classic (By Series)";
});
const onColorModeChange = (value: any) => {
// if value.value is fixed or shades, assign ["#53ca53"] to fixedcolor as a default
if (["fixed", "shades"].includes(value.value)) {
dashboardPanelData.data.config.color.fixedColor = ["#53ca53"];
dashboardPanelData.data.config.color.seriesBy = "last";
} else if (
["palette-classic-by-series", "palette-classic"].includes(value.value)
) {
// do not store fixedcolor in config for palette-classic-by-series and palette-classic
dashboardPanelData.data.config.color.fixedColor = [];
} else {
// else assign sublabel to fixedcolor
dashboardPanelData.data.config.color.fixedColor = value.colorPalette;
}
dashboardPanelData.data.config.color.mode = value.value;
};
return {
dashboardPanelData,
promqlMode,
colorOptions,
onColorModeChange,
selectedOptionLabel,
};
},
});
</script>
<style lang="scss" scoped>
:deep(.selectedLabel span) {
text-transform: none !important;
}
.space {
margin-top: 10px;
margin-bottom: 10px;
}
.color-input-wrapper {
height: 25px;
width: 25px;
overflow: hidden;
border-radius: 50%;
display: inline-flex;
align-items: center;
position: relative;
}
.color-input-wrapper input[type="color"] {
position: absolute;
height: 4em;
width: 4em;
top: 50%;
left: 50%;
transform: translate(-50%, -50%);
overflow: hidden;
border: none;
margin: 0;
padding: 0;
}
.color-container {
display: flex;
height: 8px;
}
</style>

View File

@ -1196,6 +1196,10 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
/>
</div>
<div class="space"></div>
<ColorPaletteDropDown v-if="showColorPalette" />
<div class="space"></div>
<div class="space"></div>
<OverrideConfig
v-if="dashboardPanelData.data.type == 'table'"
@ -1214,6 +1218,7 @@ import ValueMapping from "./ValueMapping.vue";
import MarkLineConfig from "./MarkLineConfig.vue";
import CommonAutoComplete from "@/components/dashboards/addPanel/CommonAutoComplete.vue";
import CustomDateTimePicker from "@/components/CustomDateTimePicker.vue";
import ColorPaletteDropDown from "./ColorPaletteDropDown.vue";
import OverrideConfig from "./OverrideConfig.vue";
import LinearIcon from "@/components/icons/dashboards/LinearIcon.vue";
import NoSymbol from "@/components/icons/dashboards/NoSymbol.vue";
@ -1230,6 +1235,7 @@ export default defineComponent({
CommonAutoComplete,
MarkLineConfig,
CustomDateTimePicker,
ColorPaletteDropDown,
OverrideConfig,
LinearIcon,
NoSymbol,
@ -1623,6 +1629,22 @@ export default defineComponent({
].config.time_shift.splice(index, 1);
};
const showColorPalette = computed(() => {
return [
"area",
"area-stacked",
"bar",
"h-bar",
"line",
"scatter",
"stacked",
"h-stacked",
"pie",
"donut",
"gauge",
].includes(dashboardPanelData.data.type);
});
return {
t,
dashboardPanelData,
@ -1643,6 +1665,7 @@ export default defineComponent({
selectPromQlNameOption,
addTimeShift,
removeTimeShift,
showColorPalette,
};
},
});

View File

@ -125,6 +125,18 @@ along with this program. If not, see <http://www.gnu.org/licenses/>.
label="OTEL Collector"
content-class="tab_content"
/>
<q-route-tab
name="logstash"
:to="{
name: 'logstash',
query: {
org_identifier: store.state.selectedOrganization.identifier,
},
}"
:icon="'img:' + getImageURL('images/ingestion/logstash.svg')"
label="Logstash"
content-class="tab_content"
/>
<q-route-tab
v-if="showSyslog"
name="syslogNg"
@ -194,7 +206,7 @@ export default defineComponent({
const confirmUpdate = ref<boolean>(false);
const ingestiontabs = ref("");
const currentOrgIdentifier: any = ref(
store.state.selectedOrganization.identifier
store.state.selectedOrganization.identifier,
);
const ingestRoutes = [
"curl",

View File

@ -0,0 +1,77 @@
<!-- Copyright 2023 OpenObserve Inc.
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU Affero General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU Affero General Public License for more details.
You should have received a copy of the GNU Affero General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
-->
<template>
<div class="q-ma-md">
<CopyContent class="q-mt-sm" :content="content" />
</div>
</template>
<script lang="ts">
import { defineComponent, ref } from "vue";
import config from "../../../aws-exports";
import { useStore } from "vuex";
import { getImageURL } from "../../../utils/zincutils";
import CopyContent from "@/components/CopyContent.vue";
export default defineComponent({
name: "logstash-datasource",
props: {
currOrgIdentifier: {
type: String,
},
currUserEmail: {
type: String,
},
},
components: { CopyContent },
setup() {
const store = useStore();
const endpoint: any = ref({
url: "",
host: "",
port: "",
protocol: "",
tls: "",
});
const url = new URL(store.state.API_ENDPOINT);
endpoint.value = {
url: store.state.API_ENDPOINT,
host: url.hostname,
port: url.port || (url.protocol === "https:" ? "443" : "80"),
protocol: url.protocol.replace(":", ""),
tls: url.protocol === "https:" ? "On" : "Off",
};
const content = `output {
http {
url => "${endpoint.value.url}"
http_method => "post"
format => "json_batch"
headers => {
"Authorization" => "Basic [BASIC_PASSCODE]"
"Content-Type" => "application/json"
}
}
}`;
return {
store,
config,
endpoint,
content,
getImageURL,
};
},
});
</script>

View File

@ -49,6 +49,13 @@ import { convertOffsetToSeconds } from "@/utils/dashboard/convertDataIntoUnitVal
*/
const PANEL_DATA_LOADER_DEBOUNCE_TIME = 50;
const adjustTimestampByTimeRangeGap = (
timestamp: number,
timeRangeGapSeconds: number,
) => {
return timestamp - timeRangeGapSeconds * 1000;
};
export const usePanelDataLoader = (
panelSchema: any,
selectedTimeObj: any,
@ -446,11 +453,14 @@ export const usePanelDataLoader = (
if (it.config?.time_shift && it.config?.time_shift?.length > 0) {
// convert time shift to milliseconds
const timeShiftInMilliSecondsArray = it.config?.time_shift?.map(
(it: any) => convertOffsetToSeconds(it.offSet)
(it: any) => convertOffsetToSeconds(it.offSet, endISOTimestamp),
);
// append 0 seconds to the timeShiftInMilliSecondsArray at 0th index
timeShiftInMilliSecondsArray.unshift(0);
timeShiftInMilliSecondsArray.unshift({
seconds: 0,
periodAsStr: "",
});
const timeShiftQueries: any[] = [];
@ -460,9 +470,15 @@ export const usePanelDataLoader = (
const { query: query1, metadata: metadata1 } =
replaceQueryValue(
it.query,
startISOTimestamp - timeRangeGap * 1000,
endISOTimestamp - timeRangeGap * 1000,
panelSchema.value.queryType
adjustTimestampByTimeRangeGap(
startISOTimestamp,
timeRangeGap.seconds,
),
adjustTimestampByTimeRangeGap(
endISOTimestamp,
timeRangeGap.seconds,
),
panelSchema.value.queryType,
);
const { query: query2, metadata: metadata2 } =
@ -474,8 +490,14 @@ export const usePanelDataLoader = (
const metadata: any = {
originalQuery: it.query,
query: query,
startTime: startISOTimestamp - timeRangeGap * 1000,
endTime: endISOTimestamp - timeRangeGap * 1000,
startTime: adjustTimestampByTimeRangeGap(
startISOTimestamp,
timeRangeGap.seconds,
),
endTime: adjustTimestampByTimeRangeGap(
endISOTimestamp,
timeRangeGap.seconds,
),
queryType: panelSchema.value.queryType,
variables: [...(metadata1 || []), ...(metadata2 || [])],
timeRangeGap: timeRangeGap,
@ -486,8 +508,14 @@ export const usePanelDataLoader = (
metadata,
searchRequestObj: {
sql: query,
start_time: startISOTimestamp - timeRangeGap * 1000,
end_time: endISOTimestamp - timeRangeGap * 1000,
start_time: adjustTimestampByTimeRangeGap(
startISOTimestamp,
timeRangeGap.seconds,
),
end_time: adjustTimestampByTimeRangeGap(
endISOTimestamp,
timeRangeGap.seconds,
),
query_fn: null,
},
});
@ -629,7 +657,10 @@ export const usePanelDataLoader = (
endTime: endISOTimestamp,
queryType: panelSchema.value.queryType,
variables: [...(metadata1 || []), ...(metadata2 || [])],
timeRangeGap: 0,
timeRangeGap: {
seconds: 0,
periodAsStr: "",
},
};
const { traceparent, traceId } = generateTraceContext();
addTraceId(traceId);

View File

@ -35,6 +35,7 @@ import IngestMetrics from "@/components/ingestion/metrics/Index.vue";
import IngestTraces from "@/components/ingestion/traces/Index.vue";
import Recommended from "@/components/ingestion/Recommended.vue";
import Custom from "@/components/ingestion/Custom.vue";
import LogstashDatasource from "@/components/ingestion/logs/LogstashDatasource.vue";
import RUMWeb from "@/components/ingestion/recommended/FrontendRumConfig.vue";
import KubernetesConfig from "@/components/ingestion/recommended/KubernetesConfig.vue";
@ -116,6 +117,14 @@ const useIngestionRoutes = () => {
routeGuard(to, from, next);
},
},
{
path: "logstash",
name: "logstash",
component: LogstashDatasource,
beforeEnter(to: any, from: any, next: any) {
routeGuard(to, from, next);
},
},
],
},
{

View File

@ -87,6 +87,11 @@ const getDefaultDashboardPanelData: any = () => ({
wrap_table_cells: false,
table_transpose: false,
table_dynamic_columns: false,
color: {
mode: "palette-classic-by-series",
fixedColor: ["#53ca53"],
seriesBy: "last",
},
},
htmlContent: "",
markdownContent: "",

View File

@ -0,0 +1,282 @@
// Copyright 2023 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
import { scaleLinear } from "d3-scale";
export enum ColorModeWithoutMinMax {
PALETTE_CLASSIC_BY_SERIES = "palette-classic-by-series",
PALETTE_CLASSIC = "palette-classic",
FIXED = "fixed",
}
export const classicColorPalette = [
"#a2b6ff",
"#889ef9",
"#6e87df",
"#5470c6",
"#385aad",
"#c3ffa5",
"#aae58d",
"#91cc75",
"#79b35e",
"#619b48",
"#ffffa1",
"#fffb89",
"#ffe170",
"#fac858",
"#dfaf40",
"#ffb1ac",
"#ff9794",
"#ff7f7d",
"#ee6666",
"#d24d50",
"#c1ffff",
"#a6f3ff",
"#8dd9f8",
"#73c0de",
"#59a8c5",
"#89eeb9",
"#6fd4a1",
"#56bb89",
"#3ba272",
"#1c8a5c",
"#ffce98",
"#ffb580",
"#ff9c69",
"#fc8452",
"#e06c3c",
"#e6a7ff",
"#cc8fe6",
"#b377cd",
"#9a60b4",
"#824a9c",
"#ffc7ff",
"#ffaeff",
"#ff95e5",
"#ea7ccc",
"#d064b3",
];
const isValidHexColor = (color: string): boolean => {
const result = /^#?([a-f\d]{2})([a-f\d]{2})([a-f\d]{2})$/i.exec(color);
return result !== null;
};
export const shadeColor = (
color: string,
value: number,
min: number,
max: number,
): string | null => {
if (!isValidHexColor(color)) {
return null;
}
if (
typeof value !== "number" ||
typeof min !== "number" ||
typeof max !== "number"
) {
return null;
}
let percent = 0;
if (max === min) {
percent = 0;
} else {
percent = (value - min) / (max - min);
}
let num = parseInt(color.replace("#", ""), 16),
amt = Math.round(1.55 * percent * 100),
R = ((num >> 16) & 0xff) + amt,
G = ((num >> 8) & 0xff) + amt,
B = (num & 0xff) + amt;
let newColor = (
0x1000000 +
(R < 255 ? (R < 0 ? 0 : R) : 255) * 0x10000 +
(G < 255 ? (G < 0 ? 0 : G) : 255) * 0x100 +
(B < 255 ? (B < 0 ? 0 : B) : 255)
)
.toString(16)
.slice(1);
return "#" + newColor;
};
interface MetricResult {
values: [number, number][];
}
interface Metric {
result?: MetricResult[];
}
export const getMetricMinMaxValue = (
searchQueryData: Metric[],
): [number, number] => {
let min = Infinity;
let max = -Infinity;
const allValues = searchQueryData
.flatMap((metric) => metric.result ?? [])
.flatMap((result) => result.values ?? [])
.map(([, value]) => value)
.filter((value) => !Number.isNaN(value));
if (allValues.length > 0) {
min = Math.min(...allValues);
max = Math.max(...allValues);
}
return [min, max];
};
interface SQLData {
[key: string]: number | null | undefined;
}
export const getSQLMinMaxValue = (
yaxiskeys: string[],
searchQueryData: SQLData[][],
): [number, number] => {
// need min and max value for color
let min = Infinity;
let max = -Infinity;
searchQueryData[0]?.forEach((data: SQLData) => {
yaxiskeys?.forEach((key: string) => {
if (
data[key] !== undefined &&
!Number.isNaN(data[key]) &&
data[key] !== null
) {
min = Math.min(min, data[key]);
max = Math.max(max, data[key]);
}
});
});
return [min, max];
};
// need one function which will take some series name and will return hash which will be between 1 to 1000
const getSeriesHash = (seriesName: string) => {
// Initialize a hash variable
let hash = 0;
const classicColorPaletteLength = classicColorPalette.length;
// If the seriesName is empty, return 1 as a default hash value
if (seriesName.length === 0) return 1;
// Loop through each character in the series name
for (let i = 0; i < seriesName.length; i++) {
const char = seriesName.charCodeAt(i); // Get the Unicode value of the character
hash = (hash << 5) - hash + char; // Bitwise hash computation
hash |= 0; // Convert to 32-bit integer
}
// Ensure the hash is positive and between 0 to 99
return Math.abs(hash) % classicColorPaletteLength;
};
type SeriesBy = "last" | "min" | "max";
const getSeriesValueBasedOnSeriesBy = (
values: number[],
seriesBy: SeriesBy,
): number => {
switch (seriesBy) {
case "last":
return values[values.length - 1];
case "min":
return Math.min(...values);
case "max":
return Math.max(...values);
default:
return values[values.length - 1];
}
};
interface ColorConfig {
mode:
| "fixed"
| "shades"
| "palette-classic-by-series"
| "palette-classic"
| "continuous-green-yellow-red"
| "continuous-red-yellow-green"
| "continuous-temperature"
| "continuous-positive"
| "continuous-negative"
| "continuous-light-to-dark-blue";
fixedColor?: string[];
seriesBy?: SeriesBy;
}
function getDomainPartitions(
chartMin: number,
chartMax: number,
numPartitions: number,
) {
// If there's only one partition, return just chartMin and chartMax
if (numPartitions < 2) {
return [chartMin, chartMax];
}
const partitions = [];
const step = (chartMax - chartMin) / (numPartitions - 1);
for (let i = 0; i < numPartitions; i++) {
partitions.push(chartMin + i * step);
}
return partitions;
}
export const getSeriesColor = (
colorCfg: ColorConfig | null,
seriesName: string,
value: number[],
chartMin: number,
chartMax: number,
): string | null => {
if (!colorCfg) {
return classicColorPalette[getSeriesHash(seriesName)];
} else if (colorCfg.mode === "fixed") {
return colorCfg?.fixedColor?.[0] ?? "#53ca53";
} else if (colorCfg.mode === "shades") {
return shadeColor(
colorCfg?.fixedColor?.[0] ?? "#53ca53",
getSeriesValueBasedOnSeriesBy(value, "last"),
chartMin,
chartMax,
);
} else if (colorCfg.mode === "palette-classic-by-series") {
return classicColorPalette[getSeriesHash(seriesName)];
} else if (colorCfg.mode === "palette-classic") {
return null;
} else {
const d3ColorObj = scaleLinear(
getDomainPartitions(
chartMin,
chartMax,
colorCfg?.fixedColor?.length ?? classicColorPalette.length,
),
colorCfg?.fixedColor?.length ? colorCfg.fixedColor : classicColorPalette,
);
return d3ColorObj(
getSeriesValueBasedOnSeriesBy(value, colorCfg?.seriesBy ?? "last"),
);
}
};

View File

@ -1,3 +1,5 @@
import { date } from "quasar";
const units: any = {
bytes: [
{ unit: "B", divisor: 1 },
@ -281,43 +283,75 @@ export const isTimeStamp = (sample: any) => {
);
};
export function convertOffsetToSeconds(offset: string) {
const value = parseInt(offset.slice(0, -1)); // Extract the numeric part
const unit = offset.slice(-1); // Extract the last character (unit)
export function convertOffsetToSeconds(
offset: string,
endISOTimestamp: number,
) {
try {
const periodValue = parseInt(offset.slice(0, -1)); // Extract the numeric part
const period = offset.slice(-1); // Extract the last character (unit)
switch (unit) {
case "m": // Minutes
return value * 60 * 1000;
case "h": // Hours
return value * 60 * 60 * 1000;
case "d": // Days
return value * 24 * 60 * 60 * 1000;
case "w": // Weeks
return value * 7 * 24 * 60 * 60 * 1000;
case "M": // Months (approximate, using 30.44 days per month)
return value * 30.44 * 24 * 60 * 60 * 1000;
default:
return 0; // Return 0 if the unit is not recognized
}
}
export function convertSecondsToOffset(seconds: number): string {
const units = [
{ unit: "Months", factor: 30.44 * 24 * 60 * 60 * 1000 }, // Months (approximate)
{ unit: "Weeks", factor: 7 * 24 * 60 * 60 * 1000 }, // Weeks
{ unit: "Days", factor: 24 * 60 * 60 * 1000 }, // Days
{ unit: "Hours", factor: 60 * 60 * 1000 }, // Hours
{ unit: "Minutes", factor: 60 * 1000 }, // Minutes
];
for (const { unit, factor } of units) {
const value = Math.floor(seconds / factor);
if (value > 0) {
return `${value} ${unit}`;
if (isNaN(periodValue)) {
return {
seconds: 0,
periodAsStr: "",
};
}
}
return "0 Minutes"; // Return "0m" if seconds is 0 or less
const subtractObject: any = {};
let periodAsStr = periodValue.toString();
switch (period) {
case "s": // Seconds
subtractObject.seconds = periodValue;
periodAsStr += " Seconds ago";
break;
case "m": // Minutes
subtractObject.minutes = periodValue;
periodAsStr += " Minutes ago";
break;
case "h": // Hours
subtractObject.hours = periodValue;
periodAsStr += " Hours ago";
break;
case "d": // Days
subtractObject.days = periodValue;
periodAsStr += " Days ago";
break;
case "w": // Weeks
subtractObject.days = periodValue * 7;
periodAsStr += " Weeks ago";
break;
case "M": // Months (approximate, using 30 days per month)
subtractObject.months = periodValue;
periodAsStr += " Months ago";
break;
default:
return {
seconds: 0,
periodAsStr: "",
};
}
// subtract period from endISOTimestamp
const startTimeStamp = date.subtractFromDate(
endISOTimestamp,
subtractObject,
);
// return difference of seconds between endISOTimestamp and startTimeStamp
return {
seconds: endISOTimestamp - startTimeStamp.getTime(),
periodAsStr: periodAsStr,
};
} catch (error) {
console.log(error);
return {
seconds: 0,
periodAsStr: "",
};
}
}
// will find first valid mapped value based on given fieldToCheck

View File

@ -20,6 +20,11 @@ import {
} from "./convertDataIntoUnitValue";
import { toZonedTime } from "date-fns-tz";
import { calculateGridPositions } from "./calculateGridForSubPlot";
import {
ColorModeWithoutMinMax,
getMetricMinMaxValue,
getSeriesColor,
} from "./colorPalette";
let moment: any;
let momentInitialized = false;
@ -64,7 +69,7 @@ export const convertPromQLData = async (
searchQueryData: any,
store: any,
chartPanelRef: any,
hoveredSeriesState: any
hoveredSeriesState: any,
) => {
// console.time("convertPromQLData");
@ -85,7 +90,7 @@ export const convertPromQLData = async (
let isTimeSeriesFlag = true;
const legendPosition = getLegendPosition(
panelSchema?.config?.legends_position
panelSchema?.config?.legends_position,
);
// get the x axis key which will be timestamp
@ -94,8 +99,8 @@ export const convertPromQLData = async (
// add all series timestamp
searchQueryData.forEach((queryData: any) =>
queryData.result.forEach((result: any) =>
result.values.forEach((value: any) => xAxisData.add(value[0]))
)
result.values.forEach((value: any) => xAxisData.add(value[0])),
),
);
// sort the timestamp and make an array
@ -172,8 +177,8 @@ export const convertPromQLData = async (
? 30
: 50
: panelSchema.config?.axis_width == null
? 5
: "25",
? 5
: 25,
},
tooltip: {
show: true,
@ -209,7 +214,7 @@ export const convertPromQLData = async (
// get the current series index from name
const currentSeriesIndex = name.findIndex(
(it: any) =>
it.seriesName == hoveredSeriesState?.value?.hoveredSeriesName
it.seriesName == hoveredSeriesState?.value?.hoveredSeriesName,
);
// if hovered series index is not -1 then take it to very first position
@ -236,9 +241,9 @@ export const convertPromQLData = async (
it.data[1],
panelSchema.config?.unit,
panelSchema.config?.unit_custom,
panelSchema.config?.decimals
)
)} </strong>`
panelSchema.config?.decimals,
),
)} </strong>`,
);
// else normal text
else
@ -248,9 +253,9 @@ export const convertPromQLData = async (
it.data[1],
panelSchema.config?.unit,
panelSchema.config?.unit_custom,
panelSchema.config?.decimals
) ?? ""
)}`
panelSchema.config?.decimals,
) ?? "",
)}`,
);
}
});
@ -271,8 +276,8 @@ export const convertPromQLData = async (
name.value,
panelSchema.config?.unit,
panelSchema.config?.unit_custom,
panelSchema.config?.decimals
)
panelSchema.config?.decimals,
),
);
const date = new Date(name.value);
return `${formatDate(date)}`;
@ -305,8 +310,8 @@ export const convertPromQLData = async (
name,
panelSchema.config?.unit,
panelSchema.config?.unit_custom,
panelSchema.config?.decimals
)
panelSchema.config?.decimals,
),
);
},
},
@ -360,7 +365,7 @@ export const convertPromQLData = async (
gridDataForGauge = calculateGridPositions(
chartPanelRef.value.offsetWidth,
chartPanelRef.value.offsetHeight,
totalLength
totalLength,
);
//assign grid array to gauge chart options
@ -368,9 +373,21 @@ export const convertPromQLData = async (
}
const seriesPropsBasedOnChartType = getPropsByChartTypeForSeries(
panelSchema.type
panelSchema.type,
);
// if color type is shades, continuous then required to calculate min and max for chart.
let chartMin: any = Infinity;
let chartMax: any = -Infinity;
if (
!Object.values(ColorModeWithoutMinMax).includes(
panelSchema.config?.color?.mode,
)
) {
[chartMin, chartMax] = getMetricMinMaxValue(searchQueryData);
}
options.series = searchQueryData.map((it: any, index: number) => {
switch (panelSchema.type) {
case "bar":
@ -392,11 +409,29 @@ export const convertPromQLData = async (
seriesDataObj[value[0]] = value[1];
});
const seriesName = getPromqlLegendName(
metric.metric,
panelSchema.queries[index].config.promql_legend,
);
return {
name: getPromqlLegendName(
metric.metric,
panelSchema.queries[index].config.promql_legend
),
name: seriesName,
itemStyle: {
color: (() => {
try {
return getSeriesColor(
panelSchema?.config?.color,
seriesName,
metric.values.map((value: any) => value[1]),
chartMin,
chartMax,
);
} catch (error) {
console.warn("Failed to get series color:", error);
return undefined; // fallback to default color
}
})(),
},
// if utc then simply return the values by removing z from string
// else convert time from utc to zoned
// used slice to remove Z from isostring to pass as a utc
@ -424,7 +459,7 @@ export const convertPromQLData = async (
return {
name: JSON.stringify(metric.metric),
x: values.map((value: any) =>
moment(value[0] * 1000).toISOString(true)
moment(value[0] * 1000).toISOString(true),
),
y: values.map((value: any) => value[1]),
};
@ -440,6 +475,12 @@ export const convertPromQLData = async (
const series = it?.result?.map((metric: any) => {
const values = metric.values.sort((a: any, b: any) => a[0] - b[0]);
gaugeIndex++;
const seriesName = getPromqlLegendName(
metric.metric,
panelSchema.queries[index].config.promql_legend,
);
return {
...getPropsByChartTypeForSeries(panelSchema.type),
min: panelSchema?.queries[index]?.config?.min || 0,
@ -451,7 +492,7 @@ export const convertPromQLData = async (
radius: `${
Math.min(
gridDataForGauge.gridWidth,
gridDataForGauge.gridHeight
gridDataForGauge.gridHeight,
) /
2 -
5
@ -461,7 +502,7 @@ export const convertPromQLData = async (
width: `${
Math.min(
gridDataForGauge.gridWidth,
gridDataForGauge.gridHeight
gridDataForGauge.gridHeight,
) / 6
}`,
},
@ -470,7 +511,7 @@ export const convertPromQLData = async (
width: `${
Math.min(
gridDataForGauge.gridWidth,
gridDataForGauge.gridHeight
gridDataForGauge.gridHeight,
) / 6
}`,
},
@ -498,10 +539,7 @@ export const convertPromQLData = async (
],
data: [
{
name: getPromqlLegendName(
metric.metric,
panelSchema.queries[index].config.promql_legend
),
name: seriesName,
// taking first value for gauge
value: values[0][1],
detail: {
@ -510,11 +548,26 @@ export const convertPromQLData = async (
value,
panelSchema.config?.unit,
panelSchema.config?.unit_custom,
panelSchema.config?.decimals
panelSchema.config?.decimals,
);
return unitValue.value + unitValue.unit;
},
},
itemStyle: {
color: (() => {
const defaultColor = null;
if (!values?.[0]?.[1]) return defaultColor;
return (
getSeriesColor(
panelSchema?.config?.color,
seriesName,
values[0][1],
chartMin,
chartMax,
) ?? defaultColor
);
})(),
},
},
],
detail: {
@ -539,8 +592,8 @@ export const convertPromQLData = async (
value,
panelSchema.config?.unit,
panelSchema.config?.unit_custom,
panelSchema.config?.decimals
)
panelSchema.config?.decimals,
),
);
},
enterable: true,
@ -569,13 +622,13 @@ export const convertPromQLData = async (
case "matrix": {
const series = it?.result?.map((metric: any) => {
const values = metric.values.sort(
(a: any, b: any) => a[0] - b[0]
(a: any, b: any) => a[0] - b[0],
);
const unitValue = getUnitValue(
values[values.length - 1][1],
panelSchema.config?.unit,
panelSchema.config?.unit_custom,
panelSchema.config?.decimals
panelSchema.config?.decimals,
);
return {
...getPropsByChartTypeForSeries(panelSchema.type),
@ -665,7 +718,7 @@ export const convertPromQLData = async (
legendWidth =
Math.min(
chartPanelRef.value?.offsetWidth / 3,
calculateWidthText(maxValue) + 60
calculateWidthText(maxValue) + 60,
) ?? 20;
}

File diff suppressed because it is too large Load Diff