diff --git a/.gitignore b/.gitignore index 38a16f994..34f16131e 100644 --- a/.gitignore +++ b/.gitignore @@ -4,6 +4,7 @@ report.json # Generated by Cargo # will have compiled files and executables +/examples/ /target/ /bin/ diff --git a/Cargo.lock b/Cargo.lock index 578cd80b1..989f220eb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,9 +373,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6" [[package]] name = "arrow" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b556d39f9d19e363833a0fe65d591cd0e2ecc0977589a78179b592bea8dc945" +checksum = "f3724c874f1517cf898cd1c3ad18ab5071edf893c48e73139ab1e16cf0f2affe" dependencies = [ "ahash 0.8.3", "arrow-arith", @@ -396,9 +396,9 @@ dependencies = [ [[package]] name = "arrow-arith" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85c61b9235694b48f60d89e0e8d6cb478f39c65dd14b0fe1c3f04379b7d50068" +checksum = "e958823b8383ca14d0a2e973de478dd7674cd9f72837f8c41c132a0fda6a4e5e" dependencies = [ "arrow-array", "arrow-buffer", @@ -411,9 +411,9 @@ dependencies = [ [[package]] name = "arrow-array" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a1e6e839764618a911cc460a58ebee5ad3d42bc12d9a5e96a29b7cc296303aa1" +checksum = "db670eab50e76654065b5aed930f4367101fcddcb2223802007d1e0b4d5a2579" dependencies = [ "ahash 0.8.3", "arrow-buffer", @@ -428,9 +428,9 @@ dependencies = [ [[package]] name = "arrow-buffer" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03a21d232b1bc1190a3fdd2f9c1e39b7cd41235e95a0d44dd4f522bc5f495748" +checksum = "9f0e01c931882448c0407bd32311a624b9f099739e94e786af68adc97016b5f2" dependencies = [ "half", "num", @@ -438,9 +438,9 @@ dependencies = [ [[package]] name = "arrow-cast" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "83dcdb1436cac574f1c1b30fda91c53c467534337bef4064bbd4ea2d6fbc6e04" +checksum = "4bf35d78836c93f80d9362f3ccb47ff5e2c5ecfc270ff42cdf1ef80334961d44" dependencies = [ "arrow-array", "arrow-buffer", @@ -454,9 +454,9 @@ dependencies = [ [[package]] name = "arrow-csv" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a01677ae9458f5af9e35e1aa6ba97502f539e621db0c6672566403f97edd0448" +checksum = "0a6aa7c2531d89d01fed8c469a9b1bf97132a0bdf70b4724fe4bbb4537a50880" dependencies = [ "arrow-array", "arrow-buffer", @@ -473,9 +473,9 @@ dependencies = [ [[package]] name = "arrow-data" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "14e3e69c9fd98357eeeab4aa0f626ecf7ecf663e68e8fc04eac87c424a414477" +checksum = "ea50db4d1e1e4c2da2bfdea7b6d2722eef64267d5ab680d815f7ae42428057f5" dependencies = [ "arrow-buffer", "arrow-schema", @@ -485,9 +485,9 @@ dependencies = [ [[package]] name = "arrow-ipc" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "64cac2706acbd796965b6eaf0da30204fe44aacf70273f8cb3c9b7d7f3d4c190" +checksum = "a4042fe6585155d1ec28a8e4937ec901a3ca7a19a22b9f6cd3f551b935cd84f5" dependencies = [ "arrow-array", "arrow-buffer", @@ -501,9 +501,9 @@ dependencies = [ [[package]] name = "arrow-json" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7790e8b7df2d8ef5ac802377ac256cf2fb80cbf7d44b82d6464e20ace6232a5a" +checksum = "7c907c4ab4f26970a3719dc06e78e8054a01d0c96da3664d23b941e201b33d2b" dependencies = [ "arrow-array", "arrow-buffer", @@ -513,15 +513,16 @@ dependencies = [ "chrono", "half", "indexmap", + "lexical-core", "num", "serde_json", ] [[package]] name = "arrow-ord" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7ee6e1b761dfffaaf7b5bbe68c113a576a3a802146c5c0b9fcec781e30d80a3" +checksum = "e131b447242a32129efc7932f58ed8931b42f35d8701c1a08f9f524da13b1d3c" dependencies = [ "arrow-array", "arrow-buffer", @@ -533,9 +534,9 @@ dependencies = [ [[package]] name = "arrow-row" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e65bfedf782fc92721e796fdd26ae7343c98ba9a9243d62def9e4e1c4c1cf0b" +checksum = "b591ef70d76f4ac28dd7666093295fece0e5f9298f49af51ea49c001e1635bb6" dependencies = [ "ahash 0.8.3", "arrow-array", @@ -548,18 +549,18 @@ dependencies = [ [[package]] name = "arrow-schema" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73ca49d010b27e2d73f70c1d1f90c1b378550ed0f4ad379c4dea0c997d97d723" +checksum = "eb327717d87eb94be5eff3b0cb8987f54059d343ee5235abf7f143c85f54cfc8" dependencies = [ "serde", ] [[package]] name = "arrow-select" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "976cbaeb1a85c09eea81f3f9c149c758630ff422ed0238624c5c3f4704b6a53c" +checksum = "79d3c389d1cea86793934f31594f914c8547d82e91e3411d4833ad0aac3266a7" dependencies = [ "arrow-array", "arrow-buffer", @@ -570,9 +571,9 @@ dependencies = [ [[package]] name = "arrow-string" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d4882762f8f48a9218946c016553d38b04b4fe8202038dad4141b3b887b7da8" +checksum = "30ee67790496dd310ddbf5096870324431e89aa76453e010020ac29b1184d356" dependencies = [ "arrow-array", "arrow-buffer", @@ -1664,9 +1665,9 @@ dependencies = [ [[package]] name = "datafusion" -version = "17.0.0" +version = "19.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d90cae91414aaeda37ae8022a23ef1124ca8efc08ac7d7770274249f7cf148" +checksum = "12d462c103bd1cfd24f8e8a199986d89582af6280528e085c393c4be2ff25da7" dependencies = [ "ahash 0.8.3", "arrow", @@ -1711,9 +1712,9 @@ dependencies = [ [[package]] name = "datafusion-common" -version = "17.0.0" +version = "19.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b21c4b8e8b7815e86d79d25da16854fee6d4d1b386572e802a248b7d43188e86" +checksum = "b5babdbcf102862b1f1828c1ab41094e39ba881d5ece4cee2d481d528148f592" dependencies = [ "arrow", "chrono", @@ -1725,9 +1726,9 @@ dependencies = [ [[package]] name = "datafusion-expr" -version = "17.0.0" +version = "19.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db8c07b051fbaf01657a3eb910a76b042ecfed0350a40412f70cf6b949bd5328" +checksum = "90f0c34e87fa541a59d378dc7ee7c9c3dd1fcfa793eab09561b8b4cb35e1827a" dependencies = [ "ahash 0.8.3", "arrow", @@ -1738,9 +1739,9 @@ dependencies = [ [[package]] name = "datafusion-optimizer" -version = "17.0.0" +version = "19.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2ce4d34a808cd2e4c4864cdc759dd1bd22dcac2b8af38aa570e30fd54577c4d" +checksum = "7d0c6d912b7b7e4637d85947222455cd948ea193ca454ebf649e7265fd10b048" dependencies = [ "arrow", "async-trait", @@ -1755,9 +1756,9 @@ dependencies = [ [[package]] name = "datafusion-physical-expr" -version = "17.0.0" +version = "19.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a38afa11a09505c24bd7e595039d7914ec39329ba490209413ef2d37895c8220" +checksum = "8000e8f8efafb810ff2943323bb48bd722ac5bb919fe302a66b832ed9c25245f" dependencies = [ "ahash 0.8.3", "arrow", @@ -1786,9 +1787,9 @@ dependencies = [ [[package]] name = "datafusion-row" -version = "17.0.0" +version = "19.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9172411b25ff4aa97f8e99884898595a581636d93cc96c12f96dbe3bf51cd7e5" +checksum = "4e900f05d7e5666e8ab714a96a28cb6f143e62aa1d501ba1199024f8635c726c" dependencies = [ "arrow", "datafusion-common", @@ -1798,9 +1799,9 @@ dependencies = [ [[package]] name = "datafusion-sql" -version = "17.0.0" +version = "19.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7fbe5e61563ced2f6992a60afea568ff3de69e32940bbf07db06fc5c9d8cd866" +checksum = "096f293799e8ae883e0f79f8ebaa51e4292e690ba45e0269b48ca9bd79f57094" dependencies = [ "arrow-schema", "datafusion-common", @@ -1964,12 +1965,12 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "flatbuffers" -version = "22.9.29" +version = "23.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ce016b9901aef3579617931fbb2df8fc9a9f7cb95a16eb8acc8148209bb9e70" +checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619" dependencies = [ "bitflags", - "thiserror", + "rustc_version", ] [[package]] @@ -3298,9 +3299,9 @@ dependencies = [ [[package]] name = "parquet" -version = "31.0.0" +version = "33.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6b4ee1ffc0778395c9783a5c74f2cad2fb1a128ade95a965212d31b7b13e3d45" +checksum = "b1b076829801167d889795cd1957989055543430fa1469cb1f6e32b789bfc764" dependencies = [ "ahash 0.8.3", "arrow-array", diff --git a/Cargo.toml b/Cargo.toml index 32bf6d4f8..51ab47693 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,8 +30,8 @@ actix-web-rust-embed-responder = {version = "2.2.0", default-features = false, f ahash = {version = "0.8.3", features = ["serde"]} anyhow = "1.0" argon2 = {version = "0.4.1", features = ["alloc", "password-hash"]} -arrow = {version = "31.0", features = ["simd", "ipc_compression"]} -arrow-schema = {version = "31.0", features = ["serde"]} +arrow = {version = "33.0", features = ["simd", "ipc_compression"]} +arrow-schema = {version = "33.0", features = ["serde"]} async-trait = "0.1.57" async_once = "0.2.6" awc = "3.1.0" @@ -41,8 +41,8 @@ bytes = "1.1" chrono = "0.4" clap = { version = "4.1.6", default-features = false, features = ["std", "help", "usage", "suggestions", "cargo"] } dashmap = {version = "5.4.0", features = ["serde"]} -datafusion = {version = "17.0", features = ["simd"]} -datafusion-common = "17.0" +datafusion = {version = "19.0", features = ["simd"]} +datafusion-common = "19.0" dotenv_config = "0.1.5" dotenvy = "0.15.6" env_logger = "0.9" @@ -65,7 +65,7 @@ opentelemetry = {version = "0.17", features = ["rt-tokio"]} opentelemetry-otlp = {version = "0.10", features = ["http-proto", "serialize", "reqwest-client"]} opentelemetry-proto = {version = "0.1", features = ["gen-tonic", "traces", "with-serde", "build-server"]} parking_lot = "0.12" -parquet = {version = "31.0", features = ["arrow", "async"]} +parquet = {version = "33.0", features = ["arrow", "async"]} prometheus = "0.13.3" prost = "0.11.2" rand = "0.8.5" diff --git a/README.md b/README.md index e6e9e1473..5402b1cf1 100644 --- a/README.md +++ b/README.md @@ -75,4 +75,3 @@ Check the [contributing guide](./CONTRIBUTING.md) ## Ingestion ![Home](./screenshots/zo_ingestion.png) - diff --git a/coverage.sh b/coverage.sh index 877353710..6419a04de 100755 --- a/coverage.sh +++ b/coverage.sh @@ -17,9 +17,9 @@ echo "line_cov $line_cov" # enable threshold #COVERAGE_THRESHOLD=80 -FUNC_COV_THRESHOLD=60 -LINE_COV_THRESHOLD=53 -REGION_COV_THRESHOLD=40 +FUNC_COV_THRESHOLD=55 +LINE_COV_THRESHOLD=50 +REGION_COV_THRESHOLD=35 # clean up # find ./target -name llvm-cov-target -type d|xargs rm -fR diff --git a/src/common/utils.rs b/src/common/utils.rs index fe760d80c..75b5ec18f 100644 --- a/src/common/utils.rs +++ b/src/common/utils.rs @@ -39,7 +39,7 @@ pub fn get_stream_file_num_v1(file_name: &str) -> u32 { } pub fn get_file_name_v1(org_id: &str, stream_name: &str, suffix: u32) -> String { - // creates file name like "./data/olympics/olympics#2022#09#13#13_1.json" + // creates file name like "./data/zincobserve/olympics/olympics#2022#09#13#13_1.json" format!( "{}{}/{}/{}/{}_{}{}", &CONFIG.common.data_wal_dir, @@ -101,7 +101,7 @@ mod test_utils { for i in 0..suffix_nums.len() { let suffix = increment_stream_file_num_v1(&format!( - "./data/WAL/nexus/logs/olympics/1663064862606912_{}.json", + "./data/zincobserve/WAL/nexus/logs/olympics/1663064862606912_{}.json", suffix_nums[i] )); assert_eq!(suffix, suffix_nums[i] + 1); @@ -113,13 +113,14 @@ mod test_utils { let file_key = get_file_name_v1(&"nexus".to_owned(), &"Olympics".to_owned(), 2); assert_eq!( file_key.as_str(), - "./data/wal/nexus/logs/Olympics/Olympics_2.json" + "./data/zincobserve/wal/nexus/logs/Olympics/Olympics_2.json" ); } #[test] fn test_get_stream_file_num_v1() { - let file_key = get_stream_file_num_v1("./data/WAL/logs/nexus/Olympics/Olympics_2.json"); + let file_key = + get_stream_file_num_v1("./data/zincobserve/WAL/logs/nexus/Olympics/Olympics_2.json"); assert_eq!(file_key, 2); } diff --git a/src/handler/http/request/stream/mod.rs b/src/handler/http/request/stream/mod.rs index 860992265..5b47a1ff1 100644 --- a/src/handler/http/request/stream/mod.rs +++ b/src/handler/http/request/stream/mod.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use actix_web::{get, http, post, web, HttpRequest, HttpResponse, Responder}; +use actix_web::{delete, get, http, post, web, HttpRequest, HttpResponse, Responder}; use std::collections::HashMap; use std::io::Error; use std::io::ErrorKind; @@ -55,21 +55,11 @@ async fn schema( ) } }; - match stream_type { - Some(stream_type_loc) => { - stream::get_stream(org_id.as_str(), stream_name.as_str(), stream_type_loc).await - } - /* Some(StreamType::Logs) => { - stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Logs).await - } - Some(StreamType::Metrics) => { - stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Metrics).await - } - Some(StreamType::Traces) => { - stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Traces).await - } */ - None => stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Logs).await, - } + let stream_type = match stream_type { + Some(v) => v, + None => StreamType::Logs, + }; + stream::get_stream(&org_id, &stream_name, stream_type).await } #[utoipa::path( @@ -109,26 +99,52 @@ async fn settings( } }; - match stream_type { - Some(steam_type_loc) => { - stream::save_stream_settings( - org_id.as_str(), - stream_name.as_str(), - steam_type_loc, - settings.into_inner(), + let stream_type = match stream_type { + Some(v) => v, + None => StreamType::Logs, + }; + stream::save_stream_settings(&org_id, &stream_name, stream_type, settings.into_inner()).await +} + +#[utoipa::path( + context_path = "/api", + tag = "Streams", + operation_id = "StreamDelete", + security( + ("Authorization"= []) + ), + params( + ("org_id" = String, Path, description = "Organization name"), + ("stream_name" = String, Path, description = "Stream name"), + ), + responses( + (status = 200, description="Success", content_type = "application/json", body = Stream), + (status = 400, description="Failure", content_type = "application/json", body = HttpResponse), + ) +)] +#[delete("/{org_id}/{stream_name}")] +async fn delete( + path: web::Path<(String, String)>, + req: HttpRequest, +) -> Result { + let (org_id, stream_name) = path.into_inner(); + let query = web::Query::>::from_query(req.query_string()).unwrap(); + let stream_type = match get_stream_type_from_request(&query) { + Ok(v) => v, + Err(e) => { + return Ok( + HttpResponse::BadRequest().json(meta::http::HttpResponse::error( + http::StatusCode::BAD_REQUEST.into(), + Some(e.to_string()), + )), ) - .await } - None => { - stream::save_stream_settings( - org_id.as_str(), - stream_name.as_str(), - StreamType::Logs, - settings.into_inner(), - ) - .await - } - } + }; + let stream_type = match stream_type { + Some(v) => v, + None => StreamType::Logs, + }; + stream::delete_stream(&org_id, &stream_name, stream_type).await } #[utoipa::path( @@ -175,21 +191,7 @@ async fn list(org_id: web::Path, req: HttpRequest) -> impl Responder { None => false, }; - match stream_type { - /* Some(StreamType::Logs) => { - stream::list_streams(org_id.as_str(), Some(StreamType::Logs), fetch_schema).await - } - Some(StreamType::Metrics) => { - stream::list_streams(org_id.as_str(), Some(StreamType::Metrics), fetch_schema).await - } - Some(StreamType::Traces) => { - stream::list_streams(org_id.as_str(), Some(StreamType::Traces), fetch_schema).await - } */ - Some(stream_type_loc) => { - stream::list_streams(org_id.as_str(), Some(stream_type_loc), fetch_schema).await - } - None => stream::list_streams(org_id.as_str(), None, fetch_schema).await, - } + stream::list_streams(org_id.as_str(), stream_type, fetch_schema).await } #[get("/{org_id}/")] diff --git a/src/handler/http/router/mod.rs b/src/handler/http/router/mod.rs index 27e4e8e07..625660816 100644 --- a/src/handler/http/router/mod.rs +++ b/src/handler/http/router/mod.rs @@ -95,6 +95,7 @@ pub fn get_service_routes(cfg: &mut web::ServiceConfig) { .service(search::around) .service(stream::schema) .service(stream::settings) + .service(stream::delete) .service(stream::list) .service(stream::org_index) .service(functions::save_function) diff --git a/src/handler/http/router/openapi.rs b/src/handler/http/router/openapi.rs index 3b0a9e68c..a37ca1dfc 100644 --- a/src/handler/http/router/openapi.rs +++ b/src/handler/http/router/openapi.rs @@ -26,6 +26,7 @@ use crate::meta; request::stream::list, request::stream::schema, request::stream::settings, + request::stream::delete, request::ingest::bulk, request::ingest::multi, request::ingest::json, diff --git a/src/infra/cache/file_list.rs b/src/infra/cache/file_list.rs index cb23b7402..021d501da 100644 --- a/src/infra/cache/file_list.rs +++ b/src/infra/cache/file_list.rs @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -use chrono::{Datelike, TimeZone, Timelike, Utc}; +use chrono::{Datelike, Duration, TimeZone, Timelike, Utc}; use dashmap::DashMap; use crate::meta::{common::FileMeta, StreamType}; @@ -377,73 +377,37 @@ pub async fn get_file_list( ) -> Result, anyhow::Error> { let mut files = Vec::new(); let mut keys = Vec::new(); - let mut key = "".to_string(); if time_min > 0 && time_max > 0 { let time_min = Utc.timestamp_nanos(time_min * 1000); let time_max = Utc.timestamp_nanos(time_max * 1000); - if time_min.year() == time_max.year() { - key.push_str(&time_min.format("%Y").to_string()); - key.push('/'); - if time_min.month() == time_max.month() { - key.push_str(&time_min.format("%m").to_string()); - key.push('/'); - if time_min.day() == time_max.day() { - key.push_str(&time_min.format("%d").to_string()); - key.push('/'); - if time_min.hour() == time_max.hour() { - key.push_str(&time_min.format("%H").to_string()); - key.push('/'); - keys.push(key); - } else { - for i in time_min.hour()..(time_max.hour() + 1) { - let mut k = key.clone(); - if i < 10 { - k.push_str(&format!("0{}", i)) - } else { - k.push_str(&format!("{}", i)) - } - k.push('/'); - keys.push(k); - } - } - } else { - for i in time_min.day()..(time_max.day() + 1) { - let mut k = key.clone(); - if i < 10 { - k.push_str(&format!("0{}", i)) - } else { - k.push_str(&format!("{}", i)) - } - k.push('/'); - keys.push(k); - } - } - } else { - for i in time_min.month()..(time_max.month() + 1) { - let mut k = key.clone(); - if i < 10 { - k.push_str(&format!("0{}", i)) - } else { - k.push_str(&format!("{}", i)) - } - k.push('/'); - keys.push(k); - } + if time_min + Duration::hours(48) >= time_max { + // less than 48 hours, generate keys by hours + let mut time_min = Utc + .with_ymd_and_hms( + time_min.year(), + time_min.month(), + time_min.day(), + time_min.hour(), + 0, + 0, + ) + .unwrap(); + while time_min <= time_max { + keys.push(time_min.format("%Y/%m/%d/%H/").to_string()); + time_min += Duration::hours(1); } } else { - for i in time_min.year()..(time_max.year() + 1) { - let mut k = key.clone(); - if i < 10 { - k.push_str(&format!("0{}", i)) - } else { - k.push_str(&format!("{}", i)) - } - k.push('/'); - keys.push(k); + // more than 48 hours, generate keys by days + let mut time_min = Utc + .with_ymd_and_hms(time_min.year(), time_min.month(), time_min.day(), 0, 0, 0) + .unwrap(); + while time_min <= time_max { + keys.push(time_min.format("%Y/%m/%d/").to_string()); + time_min += Duration::days(1); } } } else { - keys.push(key); + keys.push("".to_string()); } for key in keys { diff --git a/src/infra/cache/stats.rs b/src/infra/cache/stats.rs index c1e2b323d..5a5fddf91 100644 --- a/src/infra/cache/stats.rs +++ b/src/infra/cache/stats.rs @@ -16,6 +16,7 @@ use dashmap::DashMap; use crate::meta::common::FileMeta; use crate::meta::stream::StreamStats; +use crate::meta::StreamType; lazy_static! { static ref STATS: DashMap = DashMap::with_capacity(2); @@ -23,20 +24,38 @@ lazy_static! { const STREAM_STATS_MEM_SIZE: usize = std::mem::size_of::(); +#[inline] pub fn get_stats() -> DashMap { STATS.clone() } -pub fn get_stream_stats(org_id: &str, stream_name: &str, stream_type: &str) -> Option { +#[inline] +pub fn get_stream_stats(org_id: &str, stream_name: &str, stream_type: StreamType) -> StreamStats { let key = format!("{}/{}/{}", org_id, stream_type, stream_name); - STATS.get(&key).map(|v| *v.value()) + match STATS.get(&key).map(|v| *v.value()) { + Some(v) => v, + None => StreamStats::default(), + } } -pub fn set_stream_stats(org_id: &str, stream_name: &str, stream_type: &str, val: StreamStats) { +#[inline] +pub fn remove_stream_stats(org_id: &str, stream_name: &str, stream_type: StreamType) { + let key = format!("{}/{}/{}", org_id, stream_type, stream_name); + STATS.remove(&key); +} + +#[inline] +pub fn set_stream_stats( + org_id: &str, + stream_name: &str, + stream_type: StreamType, + val: StreamStats, +) { let key = format!("{}/{}/{}", org_id, stream_type, stream_name); STATS.insert(key, val); } +#[inline] pub fn incr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error> { // eg: files/default/olympics/2022/10/03/10/6982652937134804993_1.parquet let columns = key.split('/').collect::>(); @@ -69,6 +88,7 @@ pub fn incr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error> Ok(()) } +#[inline] pub fn decr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error> { // eg: files/default/olympics/2022/10/03/10/6982652937134804993_1.parquet let columns = key.split('/').collect::>(); @@ -83,6 +103,9 @@ pub fn decr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error> let stream_type = columns[2].to_string(); let stream_name = columns[3].to_string(); let key = format!("{}/{}/{}", org_id, stream_type, stream_name); + if !STATS.contains_key(&key) { + return Ok(()); + } let mut stats = STATS.entry(key).or_default(); stats.doc_num -= val.records; stats.file_num -= 1; @@ -92,10 +115,34 @@ pub fn decr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error> Ok(()) } +#[inline] +pub fn reset_stream_stats( + org_id: &str, + stream_name: &str, + stream_type: StreamType, + val: FileMeta, +) -> Result<(), anyhow::Error> { + let key = format!("{}/{}/{}", org_id, stream_type, stream_name); + if !STATS.contains_key(&key) { + return Ok(()); + } + let mut stats = STATS.entry(key).or_default(); + if val.min_ts != 0 { + stats.doc_time_min = val.min_ts; + } + if val.max_ts != 0 { + stats.doc_time_max = val.max_ts; + } + + Ok(()) +} + +#[inline] pub fn get_stream_stats_len() -> usize { STATS.len() } +#[inline] pub fn get_stream_stats_in_memory_size() -> usize { STATS .iter() @@ -121,9 +168,9 @@ mod tests { compressed_size: 3.00, }; - let _ = set_stream_stats("nexus", "default", "logs", val); - let stats = get_stream_stats("nexus", "default", "logs"); - assert_eq!(stats, Some(val)); + let _ = set_stream_stats("nexus", "default", StreamType::Logs, val); + let stats = get_stream_stats("nexus", "default", StreamType::Logs); + assert_eq!(stats, val); let file_meta = FileMeta { min_ts: 1667978841110, @@ -136,11 +183,11 @@ mod tests { let file_key = "files/nexus/logs/default/2022/10/03/10/6982652937134804993_1.parquet"; let _ = incr_stream_stats(file_key, file_meta); - let stats = get_stream_stats("nexus", "default", "logs"); - assert_eq!(stats.unwrap().doc_num, 5300); + let stats = get_stream_stats("nexus", "default", StreamType::Logs); + assert_eq!(stats.doc_num, 5300); let _ = decr_stream_stats(file_key, file_meta); - let stats = get_stream_stats("nexus", "default", "logs"); - assert_eq!(stats.unwrap().doc_num, 5000); + let stats = get_stream_stats("nexus", "default", StreamType::Logs); + assert_eq!(stats.doc_num, 5000); } } diff --git a/src/infra/config.rs b/src/infra/config.rs index 07c96379a..62cabdc5d 100644 --- a/src/infra/config.rs +++ b/src/infra/config.rs @@ -186,8 +186,10 @@ pub struct Limit { pub file_move_thread_num: usize, #[env_config(name = "ZO_QUERY_THREAD_NUM", default = 0)] pub query_thread_num: usize, - #[env_config(name = "ZO_TS_ALLOWED_UPTO", default = 5)] // in hours - in past - pub allowed_upto: i64, + #[env_config(name = "ZO_INGEST_ALLOWED_UPTO", default = 5)] // in hours - in past + pub ingest_allowed_upto: i64, + #[env_config(name = "ZO_DATA_LIFECYCLE", default = 0)] // in days + pub data_lifecycle: i64, #[env_config(name = "ZO_METRICS_LEADER_PUSH_INTERVAL", default = 15)] pub metrics_leader_push_interval: u64, #[env_config(name = "ZO_METRICS_LEADER_ELECTION_INTERVAL", default = 30)] @@ -270,6 +272,8 @@ pub struct Sled { #[derive(Clone, Debug, EnvConfig)] pub struct S3 { + #[env_config(name = "ZO_S3_PROVIDER", default = "")] + pub provider: String, #[env_config(name = "ZO_S3_SERVER_URL", default = "")] pub server_url: String, #[env_config(name = "ZO_S3_REGION_NAME", default = "")] @@ -299,6 +303,10 @@ pub fn init() -> Config { if cfg.limit.file_push_interval == 0 { cfg.limit.file_push_interval = 10; } + if cfg.limit.data_lifecycle > 0 && cfg.limit.data_lifecycle < 3 { + panic!("data lifecyle disallow set period less than 3 days."); + } + // HACK instance_name if cfg.common.instance_name.is_empty() { cfg.common.instance_name = hostname().unwrap(); @@ -331,6 +339,11 @@ pub fn init() -> Config { if let Err(e) = check_etcd_config(&mut cfg) { panic!("etcd config error: {}", e); } + + // check s3 config + if let Err(e) = check_s3_config(&mut cfg) { + panic!("s3 config error: {}", e); + } cfg } @@ -408,6 +421,13 @@ fn check_memory_cache_config(cfg: &mut Config) -> Result<(), anyhow::Error> { Ok(()) } +fn check_s3_config(cfg: &mut Config) -> Result<(), anyhow::Error> { + if cfg.s3.provider.is_empty() && cfg.s3.server_url.contains(".googleapis.com") { + cfg.s3.provider = "gcs".to_string(); + } + Ok(()) +} + #[inline] pub fn get_parquet_compression() -> parquet::basic::Compression { match CONFIG.common.parquet_compression.to_lowercase().as_str() { diff --git a/src/infra/storage/local.rs b/src/infra/storage/local.rs index bb3ecf474..fbbac7479 100644 --- a/src/infra/storage/local.rs +++ b/src/infra/storage/local.rs @@ -60,12 +60,17 @@ impl FileStorage for Local { } } - async fn del(&self, file: &str) -> Result<(), anyhow::Error> { - let file = format!("{}{}", CONFIG.common.data_stream_dir, file); - match fs::remove_file(file) { - Ok(_) => Ok(()), - Err(e) => Err(anyhow::anyhow!(e)), + async fn del(&self, files: &[&str]) -> Result<(), anyhow::Error> { + if files.is_empty() { + return Ok(()); } + for file in files { + let file = format!("{}{}", CONFIG.common.data_stream_dir, file); + if let Err(e) = fs::remove_file(file) { + return Err(anyhow::anyhow!(e)); + } + } + Ok(()) } } @@ -87,7 +92,7 @@ mod test_util { let resp = local.list("").await; assert!(resp.unwrap().contains(&file_name.to_string())); - let resp = local.del(file_name).await; + let resp = local.del(&[file_name]).await; assert!(resp.is_ok()); } } diff --git a/src/infra/storage/mod.rs b/src/infra/storage/mod.rs index 563e1742a..4f9a676ef 100644 --- a/src/infra/storage/mod.rs +++ b/src/infra/storage/mod.rs @@ -26,7 +26,7 @@ pub trait FileStorage: Sync + 'static { async fn list(&self, prefix: &str) -> Result, anyhow::Error>; async fn get(&self, file: &str) -> Result; async fn put(&self, file: &str, data: Bytes) -> Result<(), anyhow::Error>; - async fn del(&self, file: &str) -> Result<(), anyhow::Error>; + async fn del(&self, files: &[&str]) -> Result<(), anyhow::Error>; } lazy_static! { diff --git a/src/infra/storage/s3.rs b/src/infra/storage/s3.rs index cd1fae2be..0baaabe4a 100644 --- a/src/infra/storage/s3.rs +++ b/src/infra/storage/s3.rs @@ -16,6 +16,7 @@ use async_once::AsyncOnce; use async_trait::async_trait; use aws_config::default_provider::credentials::DefaultCredentialsChain; use aws_config::{timeout::TimeoutConfig, SdkConfig}; +use aws_sdk_s3::model::{Delete, ObjectIdentifier}; use aws_sdk_s3::{Client, Credentials, Region}; use std::{sync::Arc, time::Duration}; @@ -103,7 +104,65 @@ impl FileStorage for S3 { } } - async fn del(&self, file: &str) -> Result<(), anyhow::Error> { + async fn del(&self, files: &[&str]) -> Result<(), anyhow::Error> { + if files.is_empty() { + return Ok(()); + } + + // Hack for GCS + if CONFIG.s3.provider.eq("gcs") { + for file in files { + if let Err(e) = self.del_for_gcs(file).await { + return Err(anyhow::anyhow!(e)); + } + tokio::task::yield_now().await; // yield to other tasks + } + return Ok(()); + } + + let step = 100; + let mut start = 0; + let files_len = files.len(); + while start < files_len { + let s3config = S3CONFIG.get().await.clone().unwrap(); + let client = Client::new(&s3config); + let result = client + .delete_objects() + .bucket(&CONFIG.s3.bucket_name) + .delete( + Delete::builder() + .set_objects(Some( + files[start..(start + step).min(files_len)] + .iter() + .map(|file| { + ObjectIdentifier::builder() + .set_key(Some(file.to_string())) + .build() + }) + .collect::>(), + )) + .build(), + ) + .send() + .await; + match result { + Ok(_output) => { + log::info!("s3 File delete success: {:?}", files); + } + Err(err) => { + log::error!("s3 File delete error: {:?}", err); + return Err(anyhow::anyhow!(err)); + } + } + start += step; + tokio::task::yield_now().await; // yield to other tasks + } + Ok(()) + } +} + +impl S3 { + async fn del_for_gcs(&self, file: &str) -> Result<(), anyhow::Error> { let s3config = S3CONFIG.get().await.clone().unwrap(); let client = Client::new(&s3config); let result = client @@ -114,11 +173,11 @@ impl FileStorage for S3 { .await; match result { Ok(_output) => { - log::info!("s3 File delete success: {}", file); + log::info!("s3[GCS] File delete success: {}", file); Ok(()) } Err(err) => { - log::error!("s3 File delete error: {:?}", err); + log::error!("s3[GCS] File delete error: {:?}", err); Err(anyhow::anyhow!(err)) } } diff --git a/src/job/compact.rs b/src/job/compact.rs index abb3cdc98..c67c822e7 100644 --- a/src/job/compact.rs +++ b/src/job/compact.rs @@ -22,17 +22,36 @@ pub async fn run() -> Result<(), anyhow::Error> { if !is_compactor(&super::cluster::LOCAL_NODE_ROLE) { return Ok(()); } + if !CONFIG.compact.enabled { return Ok(()); } - // should run it every hour + + tokio::task::spawn(async move { run_delete().await }); + tokio::task::spawn(async move { run_merge().await }); + + Ok(()) +} + +async fn run_delete() -> Result<(), anyhow::Error> { let mut interval = time::interval(time::Duration::from_secs(CONFIG.compact.interval)); interval.tick().await; // trigger the first run loop { interval.tick().await; - let ret = service::compact::run().await; + let ret = service::compact::run_delete().await; if ret.is_err() { - log::error!("[COMPACTOR] run error: {}", ret.err().unwrap()); + log::error!("[COMPACTOR] run data delete error: {}", ret.err().unwrap()); + } + } +} +async fn run_merge() -> Result<(), anyhow::Error> { + let mut interval = time::interval(time::Duration::from_secs(CONFIG.compact.interval)); + interval.tick().await; // trigger the first run + loop { + interval.tick().await; + let ret = service::compact::run_merge().await; + if ret.is_err() { + log::error!("[COMPACTOR] run data merge error: {}", ret.err().unwrap()); } } } diff --git a/src/job/files/disk.rs b/src/job/files/disk.rs index 1495ea102..852438834 100644 --- a/src/job/files/disk.rs +++ b/src/job/files/disk.rs @@ -94,6 +94,25 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> { } log::info!("[JOB] convert disk file: {}", file); + // check if we are allowed to ingest or just delete the file + if db::compact::delete::is_deleting_stream(&org_id, &stream_name, stream_type, None) { + log::info!( + "[JOB] the stream [{}/{}/{}] is deleting, just delete file: {}", + &org_id, + stream_type, + &stream_name, + file + ); + if let Err(e) = fs::remove_file(&local_file) { + log::error!( + "[JOB] Failed to remove disk file from disk: {}, {}", + local_file, + e + ); + } + continue; + } + let mut partitions = file_name.split('_').collect::>(); partitions.retain(|&x| x.contains('=')); let mut partition_key = String::from(""); diff --git a/src/job/files/memory.rs b/src/job/files/memory.rs index 6bdaf9ac4..265b62978 100644 --- a/src/job/files/memory.rs +++ b/src/job/files/memory.rs @@ -73,6 +73,21 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> { log::info!("[JOB] convert memory file: {}", file); + // check if we are allowed to ingest or just delete the file + if db::compact::delete::is_deleting_stream(&org_id, &stream_name, stream_type, None) { + log::info!( + "[JOB] the stream [{}/{}/{}] is deleting, just delete file: {}", + &org_id, + stream_type, + &stream_name, + file + ); + if file_lock::FILES.write().unwrap().remove(&file).is_none() { + log::error!("[JOB] Failed to remove memory file: {}", file) + } + continue; + } + let mut partitions = file_name.split('_').collect::>(); partitions.retain(|&x| x.contains('=')); let mut partition_key = String::from(""); @@ -108,12 +123,11 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> { Ok(ret) => match ret { Ok((path, key, meta, _stream_type)) => { match db::file_list::local::set(&key, meta, false).await { - Ok(_) => match file_lock::FILES.write().unwrap().remove(&path) { - Some(_) => {} - None => { + Ok(_) => { + if file_lock::FILES.write().unwrap().remove(&path).is_none() { log::error!("[JOB] Failed to remove memory file: {}", path) } - }, + } Err(e) => log::error!( "[JOB] Failed write memory file meta:{}, error: {}", path, diff --git a/src/job/mod.rs b/src/job/mod.rs index 819535c20..2310fd795 100644 --- a/src/job/mod.rs +++ b/src/job/mod.rs @@ -69,6 +69,7 @@ pub async fn init() -> Result<(), anyhow::Error> { tokio::task::spawn(async move { db::functions::watch().await }); tokio::task::spawn(async move { db::user::watch().await }); tokio::task::spawn(async move { db::schema::watch().await }); + tokio::task::spawn(async move { db::compact::delete::watch().await }); tokio::task::spawn(async move { db::watch_prom_cluster_leader().await }); tokio::task::spawn(async move { db::alerts::watch().await }); tokio::task::spawn(async move { db::triggers::watch().await }); @@ -76,6 +77,7 @@ pub async fn init() -> Result<(), anyhow::Error> { db::functions::cache().await?; db::user::cache().await?; db::schema::cache().await?; + db::compact::delete::cache().await?; db::cache_prom_cluster_leader().await?; db::alerts::cache().await?; db::triggers::cache().await?; diff --git a/src/service/compact/delete.rs b/src/service/compact/delete.rs new file mode 100644 index 000000000..3379e81dd --- /dev/null +++ b/src/service/compact/delete.rs @@ -0,0 +1,267 @@ +use chrono::{Duration, TimeZone, Utc}; +use std::collections::{HashMap, HashSet}; +use std::io::Write; +use tokio::time; + +use crate::common::json; +use crate::common::utils::is_local_disk_storage; +use crate::infra::config::CONFIG; +use crate::infra::{cache, ider, storage}; +use crate::meta::common::{FileKey, FileMeta}; +use crate::meta::StreamType; +use crate::service::{db, file_list}; + +pub async fn delete_all( + org_id: &str, + stream_name: &str, + stream_type: StreamType, +) -> Result<(), anyhow::Error> { + // println!("delete_all: {}/{}/{}", org_id, stream_type, stream_name); + if is_local_disk_storage() { + let data_dir = format!( + "{}/files/{}/{}/{}", + CONFIG.common.data_stream_dir, org_id, stream_type, stream_name + ); + let path = std::path::Path::new(&data_dir); + if path.exists() { + std::fs::remove_dir_all(path)?; + } + } else { + // delete files from s3 + // first fetch file list from local cache + let files = file_list::get_file_list(org_id, stream_name, Some(stream_type), 0, 0).await?; + let storage = &storage::DEFAULT; + match storage + .del(&files.iter().map(|v| v.as_str()).collect::>()) + .await + { + Ok(_) => {} + Err(e) => { + log::error!("[COMPACT] delete file failed: {}", e); + } + } + + // at the end, fetch a file list from s3 to guatantte there is no file + let prefix = format!("files/{}/{}/{}/", org_id, stream_type, stream_name); + loop { + let files = storage.list(&prefix).await?; + if files.is_empty() { + break; + } + match storage + .del(&files.iter().map(|v| v.as_str()).collect::>()) + .await + { + Ok(_) => {} + Err(e) => { + log::error!("[COMPACT] delete file failed: {}", e); + } + } + tokio::task::yield_now().await; // yield to other tasks + } + } + + // delete from file list + delete_from_file_list(org_id, stream_name, stream_type, (0, 0)).await?; + + // mark delete done + db::compact::delete::delete_stream_done(org_id, stream_name, stream_type, None).await +} + +pub async fn delete_by_date( + org_id: &str, + stream_name: &str, + stream_type: StreamType, + date_range: (&str, &str), +) -> Result<(), anyhow::Error> { + // println!( + // "delete_by_date: {}/{}/{}, {:?}", + // org_id, stream_type, stream_name, date_range + // ); + let mut date_start = Utc.datetime_from_str( + format!("{}T00:00:00Z", date_range.0).as_str(), + "%Y-%m-%dT%H:%M:%SZ", + )?; + let date_end = Utc.datetime_from_str( + format!("{}T23:59:59Z", date_range.1).as_str(), + "%Y-%m-%dT%H:%M:%SZ", + )?; + let time_range = { (date_start.timestamp_micros(), date_end.timestamp_micros()) }; + + if is_local_disk_storage() { + while date_start <= date_end { + let data_dir = format!( + "{}/files/{}/{}/{}/{}", + CONFIG.common.data_stream_dir, + org_id, + stream_type, + stream_name, + date_start.format("%Y/%m/%d") + ); + let path = std::path::Path::new(&data_dir); + if path.exists() { + std::fs::remove_dir_all(path)?; + } + date_start += Duration::days(1); + } + } else { + // delete files from s3 + // first fetch file list from local cache + let files = file_list::get_file_list( + org_id, + stream_name, + Some(stream_type), + time_range.0, + time_range.1, + ) + .await?; + let storage = &storage::DEFAULT; + match storage + .del(&files.iter().map(|v| v.as_str()).collect::>()) + .await + { + Ok(_) => {} + Err(e) => { + log::error!("[COMPACT] delete file failed: {}", e); + } + } + + // at the end, fetch a file list from s3 to guatantte there is no file + while date_start <= date_end { + let prefix = format!( + "files/{}/{}/{}/{}/", + org_id, + stream_type, + stream_name, + date_start.format("%Y/%m/%d") + ); + loop { + let files = storage.list(&prefix).await?; + if files.is_empty() { + break; + } + match storage + .del(&files.iter().map(|v| v.as_str()).collect::>()) + .await + { + Ok(_) => {} + Err(e) => { + log::error!("[COMPACT] delete file failed: {}", e); + } + } + tokio::task::yield_now().await; // yield to other tasks + } + date_start += Duration::days(1); + } + } + + // update metadata + cache::stats::reset_stream_stats( + org_id, + stream_name, + stream_type, + FileMeta { + min_ts: time_range.1, + ..Default::default() + }, + )?; + + // delete from file list + delete_from_file_list(org_id, stream_name, stream_type, time_range).await?; + + // mark delete done + db::compact::delete::delete_stream_done(org_id, stream_name, stream_type, Some(date_range)) + .await +} + +async fn delete_from_file_list( + org_id: &str, + stream_name: &str, + stream_type: StreamType, + time_range: (i64, i64), +) -> Result<(), anyhow::Error> { + let files = file_list::get_file_list( + org_id, + stream_name, + Some(stream_type), + time_range.0, + time_range.1, + ) + .await?; + if files.is_empty() { + return Ok(()); + } + + let mut file_list_days: HashSet = HashSet::new(); + let mut hours_files: HashMap> = HashMap::with_capacity(24); + for file in files { + let columns: Vec<_> = file.split('/').collect(); + let day_key = format!("{}-{}-{}", columns[4], columns[5], columns[6]); + file_list_days.insert(day_key); + let hour_key = format!( + "{}/{}/{}/{}", + columns[4], columns[5], columns[6], columns[7] + ); + let entry = hours_files.entry(hour_key).or_default(); + entry.push(FileKey { + key: file, + meta: FileMeta::default(), + deleted: true, + }); + } + + let storage = &storage::DEFAULT; + for (key, items) in hours_files { + // upload the new file_list to storage + let new_file_list_key = format!("file_list/{}/{}.json.zst", key, ider::generate()); + let mut buf = zstd::Encoder::new(Vec::new(), 3)?; + for file in items.iter() { + let mut write_buf = json::to_vec(&file)?; + write_buf.push(b'\n'); + buf.write_all(&write_buf)?; + } + let compressed_bytes = buf.finish().unwrap(); + storage + .put(&new_file_list_key, compressed_bytes.into()) + .await?; + + // set to local cache & send broadcast + // retry 10 times + for _ in 0..9 { + // set to local cache + let mut cache_success = true; + for event in &items { + if let Err(e) = db::file_list::progress(&event.key, event.meta, event.deleted).await + { + cache_success = false; + log::error!( + "[COMPACT] delete_from_file_list set local cache failed, retrying: {}", + e + ); + time::sleep(time::Duration::from_secs(1)).await; + break; + } + } + if !cache_success { + continue; + } + // send broadcast to other nodes + if let Err(e) = db::file_list::broadcast::send(&items).await { + log::error!( + "[COMPACT] delete_from_file_list send broadcast failed, retrying: {}", + e + ); + time::sleep(time::Duration::from_secs(1)).await; + continue; + } + break; + } + } + + // mark file list need to do merge again + for key in file_list_days { + db::compact::file_list::set_delete(&key).await?; + } + + Ok(()) +} diff --git a/src/service/compact/file_list.rs b/src/service/compact/file_list.rs index 1a5cbce97..a06acdcae 100644 --- a/src/service/compact/file_list.rs +++ b/src/service/compact/file_list.rs @@ -25,11 +25,17 @@ use crate::infra::storage; use crate::meta::common::FileKey; use crate::service::db; +pub async fn run(offset: i64) -> Result<(), anyhow::Error> { + run_merge(offset).await?; + run_delete().await?; + Ok(()) +} + /// check all streams done compact in this hour /// merge all small file list keys in this hour to a single file and upload to storage /// delete all small file list keys in this hour from storage /// node should load new file list from storage -pub async fn run(offset: i64) -> Result<(), anyhow::Error> { +pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> { let mut offset = offset; if offset == 0 { // get earilest date from schema @@ -83,6 +89,28 @@ pub async fn run(offset: i64) -> Result<(), anyhow::Error> { db::compact::file_list::set_offset(offset).await } +pub async fn run_delete() -> Result<(), anyhow::Error> { + let days = db::compact::file_list::list_delete().await?; + if days.is_empty() { + return Ok(()); // no delete + } + + for day in days { + let mut t = + Utc.datetime_from_str(format!("{}T00:00:00Z", day).as_str(), "%Y-%m-%dT%H:%M:%SZ")?; + for _hour in 0..24 { + let offset = t.timestamp_micros(); + merge_file_list(offset).await?; + t += Duration::hours(1); + } + + // delete day + db::compact::file_list::del_delete(&day).await?; + } + + Ok(()) +} + /// merge and delete the small file list keys in this hour from etcd /// upload new file list into storage async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> { @@ -101,6 +129,7 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> { let offset = Utc.timestamp_nanos(offset * 1000); let offset_prefix = offset.format("/%Y/%m/%d/%H/").to_string(); let key = format!("file_list{}", offset_prefix); + // println!("merge_file_list: key: {}", key); let storage = &storage::DEFAULT; let file_list = storage.list(&key).await?; if file_list.len() <= 1 { @@ -144,6 +173,7 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> { let id = ider::generate(); let file_name = format!("file_list{}{}.json.zst", offset_prefix, id); let mut buf = zstd::Encoder::new(Vec::new(), 3)?; + let mut has_content = false; for (_, item) in filter_file_keys.iter() { if item.deleted { continue; @@ -151,20 +181,29 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> { let val = json::to_vec(&item)?; buf.write_all(val.as_slice())?; buf.write_all(b"\n")?; + has_content = true; } let compressed_bytes = buf.finish().unwrap(); - match storage.put(&file_name, compressed_bytes.into()).await { - Ok(_) => { - log::info!("[COMPACT] merge file list success, new file: {}", file_name); - // delete all small file list keys in this hour from storage - for file in file_list { - storage.del(&file).await?; + let new_file_ok = if has_content { + match storage.put(&file_name, compressed_bytes.into()).await { + Ok(_) => { + log::info!("[COMPACT] merge file list success, new file: {}", file_name); + true + } + Err(err) => { + log::error!("[COMPACT] upload file list failed: {}", err); + false } } - Err(err) => { - log::error!("[COMPACT] upload file list failed: {}", err); - } + } else { + true + }; + if new_file_ok { + // delete all small file list keys in this hour from storage + storage + .del(&file_list.iter().map(|v| v.as_str()).collect::>()) + .await?; } if locker.is_some() { diff --git a/src/service/compact/lifecycle.rs b/src/service/compact/lifecycle.rs new file mode 100644 index 000000000..dfbc79205 --- /dev/null +++ b/src/service/compact/lifecycle.rs @@ -0,0 +1,34 @@ +use chrono::{DateTime, TimeZone, Utc}; + +use crate::infra::cache; +use crate::meta::StreamType; +use crate::service::db; + +pub async fn delete_by_stream( + lifecycle_end: &str, + org_id: &str, + stream_name: &str, + stream_type: StreamType, +) -> Result<(), anyhow::Error> { + // get schema + let stats = cache::stats::get_stream_stats(org_id, stream_name, stream_type); + let created_at = stats.doc_time_min; + if created_at == 0 { + return Ok(()); // no data, just skip + } + let created_at: DateTime = Utc.timestamp_nanos(created_at * 1000); + let lifecycle_start = created_at.format("%Y-%m-%d").to_string(); + let lifecycle_start = lifecycle_start.as_str(); + if lifecycle_start.ge(lifecycle_end) { + return Ok(()); // created_at is after lifecycle_end, just skip + } + + // delete files + db::compact::delete::delete_stream( + org_id, + stream_name, + stream_type, + Some((lifecycle_start, lifecycle_end)), + ) + .await +} diff --git a/src/service/compact/merge.rs b/src/service/compact/merge.rs index f7ae54aaf..58053cc7c 100644 --- a/src/service/compact/merge.rs +++ b/src/service/compact/merge.rs @@ -262,7 +262,7 @@ pub async fn merge_by_stream( if !db_success { merge_success = false; // delete the file just upload to storage - match storage.del(&new_file_name).await { + match storage.del(&[&new_file_name]).await { Ok(_) => {} Err(e) => { log::error!("[COMPACT] delete file failed: {}", e); @@ -272,13 +272,13 @@ pub async fn merge_by_stream( } // delete small files from storage - for file in &new_file_list { - tokio::task::yield_now().await; // yield to other tasks - match storage.del(file).await { - Ok(_) => {} - Err(e) => { - log::error!("[COMPACT] delete file failed: {}", e); - } + match storage + .del(&new_file_list.iter().map(|v| v.as_str()).collect::>()) + .await + { + Ok(_) => {} + Err(e) => { + log::error!("[COMPACT] delete file failed: {}", e); } } // delete files from file list diff --git a/src/service/compact/mod.rs b/src/service/compact/mod.rs index 258be8af3..4fb3d6ed2 100644 --- a/src/service/compact/mod.rs +++ b/src/service/compact/mod.rs @@ -13,13 +13,93 @@ // limitations under the License. use crate::infra::cache; +use crate::infra::config::CONFIG; use crate::meta::StreamType; use crate::service::db; +mod delete; mod file_list; +mod lifecycle; mod merge; -/// compactor run steps: +/// compactor delete run steps: +pub async fn run_delete() -> Result<(), anyhow::Error> { + // check data lifecyle date + if CONFIG.limit.data_lifecycle > 0 { + let now = chrono::Utc::now(); + let date = now - chrono::Duration::days(CONFIG.limit.data_lifecycle); + let data_lifecycle_end = date.format("%Y-%m-%d").to_string(); + + let orgs = cache::file_list::get_all_organization()?; + let stream_types = [ + StreamType::Logs, + StreamType::Metrics, + StreamType::Traces, + StreamType::Metadata, + ]; + for org_id in orgs { + for stream_type in stream_types { + let streams = cache::file_list::get_all_stream(&org_id, stream_type)?; + for stream_name in streams { + if let Err(e) = lifecycle::delete_by_stream( + &data_lifecycle_end, + &org_id, + &stream_name, + stream_type, + ) + .await + { + log::error!( + "[COMPACTOR] lifecycle: delete_by_stream [{}/{}/{}] error: {}", + org_id, + stream_type, + stream_name, + e + ); + } + } + } + } + } + + // delete files + let jobs = db::compact::delete::list().await?; + for job in jobs { + let columns = job.split('/').collect::>(); + let org_id = columns[0].to_string(); + let stream_type = StreamType::from(columns[1]); + let stream_name = columns[2].to_string(); + let retention = columns[3].to_string(); + tokio::task::yield_now().await; // yield to other tasks + + let ret = if retention.eq("all") { + delete::delete_all(&org_id, &stream_name, stream_type).await + } else { + let date_range = retention.split(',').collect::>(); + delete::delete_by_date( + &org_id, + &stream_name, + stream_type, + (date_range[0], date_range[1]), + ) + .await + }; + + if let Err(e) = ret { + log::error!( + "[COMPACTOR] delete: delete [{}/{}/{}] error: {}", + org_id, + stream_type, + stream_name, + e + ); + } + } + + Ok(()) +} + +/// compactor merge run steps: /// 1. get all organization /// 2. range streams by organization & stream_type /// 3. get a cluster lock for compactor stream @@ -32,7 +112,7 @@ mod merge; /// 10. update last compacted offset /// 11. release cluster lock /// 12. compact file list from storage -pub async fn run() -> Result<(), anyhow::Error> { +pub async fn run_merge() -> Result<(), anyhow::Error> { // get last file_list compact offset let last_file_list_offset = db::compact::file_list::get_offset().await?; @@ -47,6 +127,17 @@ pub async fn run() -> Result<(), anyhow::Error> { for stream_type in stream_types { let streams = cache::file_list::get_all_stream(&org_id, stream_type)?; for stream_name in streams { + // check if we are allowed to merge or just skip + if db::compact::delete::is_deleting_stream(&org_id, &stream_name, stream_type, None) + { + log::info!( + "[COMPACTOR] the stream [{}/{}/{}] is deleting, just skip", + &org_id, + stream_type, + &stream_name, + ); + } + tokio::task::yield_now().await; // yield to other tasks if let Err(e) = merge::merge_by_stream( last_file_list_offset, @@ -96,7 +187,7 @@ mod tests { false, ) .unwrap(); - let resp = run().await; + let resp = run_merge().await; assert!(resp.is_ok()); } } diff --git a/src/service/db/compact.rs b/src/service/db/compact.rs index b7dbeb7eb..f74ec8ddc 100644 --- a/src/service/db/compact.rs +++ b/src/service/db/compact.rs @@ -12,5 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. +pub mod delete; pub mod file_list; pub mod files; diff --git a/src/service/db/compact/delete.rs b/src/service/db/compact/delete.rs new file mode 100644 index 000000000..9e266f31e --- /dev/null +++ b/src/service/db/compact/delete.rs @@ -0,0 +1,136 @@ +use dashmap::DashSet; +use std::sync::Arc; + +use crate::{infra::db::Event, meta::StreamType}; + +lazy_static! { + static ref CACHE: DashSet = DashSet::new(); +} + +// delete data from stream +// if date_range is empty, delete all data +// date_range is a tuple of (start, end), eg: (20230102, 20230103) +#[inline] +pub async fn delete_stream( + org_id: &str, + stream_name: &str, + stream_type: StreamType, + date_range: Option<(&str, &str)>, +) -> Result<(), anyhow::Error> { + let db = &crate::infra::db::DEFAULT; + let key = match date_range { + Some((start, end)) => format!( + "{}/{}/{}/{},{}", + org_id, stream_type, stream_name, start, end + ), + None => format!("{}/{}/{}/all", org_id, stream_type, stream_name), + }; + + // write in cache + if CACHE.contains(&key) { + return Ok(()); // already in cache, just skip + } + CACHE.insert(key.to_string()); + + let db_key = format!("/compact/delete/{}", key); + db.put(&db_key, "OK".into()).await?; + + Ok(()) +} + +// check if stream is deleting from cache +#[inline] +pub fn is_deleting_stream( + org_id: &str, + stream_name: &str, + stream_type: StreamType, + date_range: Option<(&str, &str)>, +) -> bool { + let key = match date_range { + Some((start, end)) => format!( + "{}/{}/{}/{},{}", + org_id, stream_type, stream_name, start, end + ), + None => format!("{}/{}/{}/all", org_id, stream_type, stream_name), + }; + CACHE.contains(&key) +} + +#[inline] +pub async fn delete_stream_done( + org_id: &str, + stream_name: &str, + stream_type: StreamType, + date_range: Option<(&str, &str)>, +) -> Result<(), anyhow::Error> { + let db = &crate::infra::db::DEFAULT; + let key = match date_range { + Some((start, end)) => format!( + "{}/{}/{}/{},{}", + org_id, stream_type, stream_name, start, end + ), + None => format!("{}/{}/{}/all", org_id, stream_type, stream_name), + }; + let db_key = format!("/compact/delete/{}", key); + if let Err(e) = db.delete(&db_key, false).await { + if !e.to_string().contains("not exists") { + return Err(anyhow::anyhow!(e)); + } + } + + // remove in cache + CACHE.remove(&key); + + Ok(()) +} + +pub async fn list() -> Result, anyhow::Error> { + let mut items = Vec::new(); + let db = &crate::infra::db::DEFAULT; + let key = "/compact/delete/"; + let ret = db.list(key).await?; + for (item_key, _) in ret { + let item_key = item_key.strip_prefix(key).unwrap(); + items.push(item_key.to_string()); + } + Ok(items) +} + +pub async fn watch() -> Result<(), anyhow::Error> { + let db = &crate::infra::db::DEFAULT; + let key = "/compact/delete/"; + let mut events = db.watch(key).await?; + let events = Arc::get_mut(&mut events).unwrap(); + log::info!("[TRACE] Start watching stream deleting"); + loop { + let ev = match events.recv().await { + Some(ev) => ev, + None => { + log::error!("watch_stream_deleting: event channel closed"); + break; + } + }; + match ev { + Event::Put(ev) => { + let item_key = ev.key.strip_prefix(key).unwrap(); + CACHE.insert(item_key.to_string()); + } + Event::Delete(ev) => { + let item_key = ev.key.strip_prefix(key).unwrap(); + CACHE.remove(item_key); + } + } + } + Ok(()) +} + +pub async fn cache() -> Result<(), anyhow::Error> { + let db = &crate::infra::db::DEFAULT; + let key = "/compact/delete/"; + let ret = db.list(key).await?; + for (item_key, _) in ret { + let item_key = item_key.strip_prefix(key).unwrap(); + CACHE.insert(item_key.to_string()); + } + Ok(()) +} diff --git a/src/service/db/compact/file_list.rs b/src/service/db/compact/file_list.rs index 0e7d75e60..1dd1044f9 100644 --- a/src/service/db/compact/file_list.rs +++ b/src/service/db/compact/file_list.rs @@ -30,6 +30,36 @@ pub async fn set_offset(offset: i64) -> Result<(), anyhow::Error> { Ok(()) } +pub async fn set_delete(key: &str) -> Result<(), anyhow::Error> { + let db = &crate::infra::db::DEFAULT; + let key = format!("/compact/file_list/delete/{}", key); + db.put(&key, "OK".into()).await?; + Ok(()) +} + +pub async fn del_delete(key: &str) -> Result<(), anyhow::Error> { + let db = &crate::infra::db::DEFAULT; + let key = format!("/compact/file_list/delete/{}", key); + if let Err(e) = db.delete(&key, false).await { + if !e.to_string().contains("not exists") { + return Err(anyhow::anyhow!(e)); + } + } + Ok(()) +} + +pub async fn list_delete() -> Result, anyhow::Error> { + let mut items = Vec::new(); + let db = &crate::infra::db::DEFAULT; + let key = "/compact/file_list/delete/"; + let ret = db.list(key).await?; + for (item_key, _item_value) in ret { + let item_key = item_key.strip_prefix(key).unwrap(); + items.push(item_key.to_string()); + } + Ok(items) +} + #[cfg(test)] mod tests { @@ -42,5 +72,10 @@ mod tests { let _ = set_offset(off_set).await; let resp = get_offset().await; assert_eq!(resp.unwrap(), off_set); + + let delete_day = "2023-03-03"; + let _ = set_delete(delete_day).await; + let deletes = list_delete().await.unwrap(); + assert_eq!([delete_day.to_string()].to_vec(), deletes); } } diff --git a/src/service/db/compact/files.rs b/src/service/db/compact/files.rs index 786992048..d3e838530 100644 --- a/src/service/db/compact/files.rs +++ b/src/service/db/compact/files.rs @@ -41,6 +41,21 @@ pub async fn set_offset( Ok(()) } +pub async fn del_offset( + org_id: &str, + stream_name: &str, + stream_type: StreamType, +) -> Result<(), anyhow::Error> { + let db = &crate::infra::db::DEFAULT; + let key = format!("/compact/files/{}/{}/{}", org_id, stream_type, stream_name); + if let Err(e) = db.delete(&key, false).await { + if !e.to_string().contains("not exists") { + return Err(anyhow::anyhow!(e)); + } + } + Ok(()) +} + pub async fn list_offset() -> Result, anyhow::Error> { let mut items = Vec::new(); let db = &crate::infra::db::DEFAULT; diff --git a/src/service/db/mod.rs b/src/service/db/mod.rs index 9648a004e..1362cee70 100644 --- a/src/service/db/mod.rs +++ b/src/service/db/mod.rs @@ -17,11 +17,9 @@ use std::sync::Arc; use tracing::info_span; use crate::common::json; -use crate::infra::cache::stats; use crate::infra::config::METRIC_CLUSTER_LEADER; use crate::infra::db::Event; use crate::meta::prom::ClusterLeader; -use crate::meta::stream::StreamStats; pub mod alerts; pub mod compact; @@ -33,25 +31,6 @@ pub mod triggers; pub mod udf; pub mod user; -pub async fn get_stream_stats( - org_id: &str, - stream_name: &str, - stream_type: &str, -) -> Result { - match stats::get_stream_stats(org_id, stream_name, stream_type) { - Some(stats) => Ok(stats), - None => { - let key = format!("/stats/{}/{}/{}", org_id, stream_type, stream_name); - let db = &crate::infra::db::DEFAULT; - let value = match db.get(&key).await { - Ok(val) => serde_json::from_slice(&val).unwrap(), - Err(_) => StreamStats::default(), - }; - Ok(value) - } - } -} - pub async fn set_prom_cluster_info( cluster: &str, members: Vec, diff --git a/src/service/db/schema.rs b/src/service/db/schema.rs index d896b5555..2f28c2085 100644 --- a/src/service/db/schema.rs +++ b/src/service/db/schema.rs @@ -18,6 +18,7 @@ use serde_json::Value; use std::sync::Arc; use crate::common::json; +use crate::infra::cache; use crate::infra::config::STREAM_SCHEMAS; use crate::infra::db::Event; use crate::meta::stream::StreamSchema; @@ -60,7 +61,6 @@ pub async fn get( Ok(value) } -#[tracing::instrument(name = "db:schema:get_versions")] pub async fn get_versions( org_id: &str, stream_name: &str, @@ -98,8 +98,8 @@ pub async fn set( min_ts: Option, ) -> Result<(), anyhow::Error> { let db = &crate::infra::db::DEFAULT; - let key = format!("/schema/{}/{}/{}", org_id, stream_type, stream_name); let mut versions: Vec; + let key = format!("/schema/{}/{}/{}", org_id, stream_type, stream_name); let map_key = key.strip_prefix("/schema/").unwrap(); if STREAM_SCHEMAS.contains_key(map_key) { versions = STREAM_SCHEMAS.get(map_key).unwrap().value().clone(); @@ -130,9 +130,12 @@ pub async fn set( metadata.get("created_at").unwrap().clone(), ); } else { - let curr_ts = Utc::now().timestamp_micros().to_string(); - metadata.insert("start_dt".to_string(), curr_ts.clone()); - metadata.insert("created_at".to_string(), curr_ts); + let min_ts = match min_ts { + Some(v) => v, + None => Utc::now().timestamp_micros(), + }; + metadata.insert("start_dt".to_string(), min_ts.to_string()); + metadata.insert("created_at".to_string(), min_ts.to_string()); } let _ = db .put( @@ -146,6 +149,23 @@ pub async fn set( Ok(()) } +pub async fn delete( + org_id: &str, + stream_name: &str, + stream_type: Option, +) -> Result<(), anyhow::Error> { + let stream_type = match stream_type { + Some(v) => v, + None => StreamType::Logs, + }; + let key = format!("/schema/{}/{}/{}", org_id, stream_type, stream_name); + let db = &crate::infra::db::DEFAULT; + match db.delete(&key, false).await { + Ok(_) => Ok(()), + Err(e) => Err(anyhow::anyhow!(e)), + } +} + #[tracing::instrument(name = "db:schema:list")] pub async fn list( org_id: &str, @@ -252,7 +272,17 @@ pub async fn watch() -> Result<(), anyhow::Error> { } Event::Delete(ev) => { let item_key = ev.key.strip_prefix(key).unwrap(); + let columns = item_key.split('/').collect::>(); + let org_id = columns[0]; + let stream_type = StreamType::from(columns[1]); + let stream_name = columns[2]; STREAM_SCHEMAS.remove(item_key); + cache::stats::remove_stream_stats(org_id, stream_name, stream_type); + if let Err(e) = + super::compact::files::del_offset(org_id, stream_name, stream_type).await + { + log::error!("del_offset: {}", e); + } } } } diff --git a/src/service/logs/bulk.rs b/src/service/logs/bulk.rs index 33dae6000..b110d1ae6 100644 --- a/src/service/logs/bulk.rs +++ b/src/service/logs/bulk.rs @@ -36,6 +36,7 @@ use crate::meta::ingestion::{ IngestionResponse, RecordStatus, StreamData, StreamSchemaChk, StreamStatus, }; use crate::meta::StreamType; +use crate::service::db; use crate::service::schema::stream_schema_exists; use crate::{common::time::parse_timestamp_micro_from_value, meta::alert::Trigger}; @@ -55,7 +56,9 @@ pub async fn ingest( )), ); } - let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros(); + + let mut min_ts = + (Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros(); #[cfg(feature = "zo_functions")] let lua = Lua::new(); #[cfg(feature = "zo_functions")] @@ -92,6 +95,17 @@ pub async fn ingest( next_line_is_data = true; stream_name = super::get_stream_name(&value); + // check if we are allowed to ingest + if db::compact::delete::is_deleting_stream(org_id, &stream_name, StreamType::Logs, None) + { + return Ok( + HttpResponse::InternalServerError().json(MetaHttpResponse::error( + http::StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("stream [{}] is being deleted", stream_name)), + )), + ); + } + // Start Register Transfoms for stream #[cfg(feature = "zo_functions")] let key = format!("{}/{}/{}", &org_id, StreamType::Logs, &stream_name); @@ -197,7 +211,7 @@ pub async fn ingest( None => Utc::now().timestamp_micros(), }; // check ingestion time - let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.allowed_upto); + let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.ingest_allowed_upto); if timestamp < earlest_time.timestamp_micros() { status.failed += 1; // to old data, just discard status.error = super::get_upto_discard_error(); diff --git a/src/service/logs/json.rs b/src/service/logs/json.rs index b843e30d7..97e82726b 100644 --- a/src/service/logs/json.rs +++ b/src/service/logs/json.rs @@ -37,6 +37,7 @@ use crate::meta::functions::Transform; use crate::meta::http::HttpResponse as MetaHttpResponse; use crate::meta::ingestion::{IngestionResponse, RecordStatus, StreamStatus}; use crate::meta::StreamType; +use crate::service::db; use crate::service::schema::stream_schema_exists; pub async fn ingest( @@ -57,7 +58,18 @@ pub async fn ingest( ); } - let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros(); + // check if we are allowed to ingest + if db::compact::delete::is_deleting_stream(org_id, stream_name, StreamType::Logs, None) { + return Ok( + HttpResponse::InternalServerError().json(MetaHttpResponse::error( + http::StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("stream [{}] is being deleted", stream_name)), + )), + ); + } + + let mut min_ts = + (Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros(); #[cfg(feature = "zo_functions")] let lua = Lua::new(); #[cfg(feature = "zo_functions")] @@ -159,7 +171,7 @@ pub async fn ingest( None => Utc::now().timestamp_micros(), }; // check ingestion time - let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.allowed_upto); + let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.ingest_allowed_upto); if timestamp < earlest_time.timestamp_micros() { stream_status.status.failed += 1; // to old data, just discard stream_status.status.error = super::get_upto_discard_error(); diff --git a/src/service/logs/mod.rs b/src/service/logs/mod.rs index 86f98a3bc..d2b62df2d 100644 --- a/src/service/logs/mod.rs +++ b/src/service/logs/mod.rs @@ -41,7 +41,7 @@ pub mod multi; pub(crate) fn get_upto_discard_error() -> String { format!( "too old data, by default only last {} hours data can be ingested. Data dscarded.", - CONFIG.limit.allowed_upto + CONFIG.limit.ingest_allowed_upto ) } diff --git a/src/service/logs/multi.rs b/src/service/logs/multi.rs index f0629fde7..291ca5ddc 100644 --- a/src/service/logs/multi.rs +++ b/src/service/logs/multi.rs @@ -36,6 +36,7 @@ use crate::meta::functions::Transform; use crate::meta::http::HttpResponse as MetaHttpResponse; use crate::meta::ingestion::{IngestionResponse, RecordStatus, StreamStatus}; use crate::meta::StreamType; +use crate::service::db; use crate::service::logs::StreamMeta; use crate::service::schema::stream_schema_exists; @@ -57,7 +58,18 @@ pub async fn ingest( ); } - let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros(); + // check if we are allowed to ingest + if db::compact::delete::is_deleting_stream(org_id, stream_name, StreamType::Logs, None) { + return Ok( + HttpResponse::InternalServerError().json(MetaHttpResponse::error( + http::StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("stream [{}] is being deleted", stream_name)), + )), + ); + } + + let mut min_ts = + (Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros(); #[cfg(feature = "zo_functions")] let lua = Lua::new(); #[cfg(feature = "zo_functions")] @@ -162,7 +174,7 @@ pub async fn ingest( None => Utc::now().timestamp_micros(), }; // check ingestion time - let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.allowed_upto); + let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.ingest_allowed_upto); if timestamp < earlest_time.timestamp_micros() { stream_status.status.failed += 1; // to old data, just discard stream_status.status.error = super::get_upto_discard_error(); diff --git a/src/service/metrics/mod.rs b/src/service/metrics/mod.rs index 523c6d317..b882ffc59 100644 --- a/src/service/metrics/mod.rs +++ b/src/service/metrics/mod.rs @@ -71,7 +71,8 @@ pub async fn prometheus_write_proto( ); } - let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros(); + let mut min_ts = + (Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros(); let dedup_enabled = CONFIG.common.metrics_dedup_enabled; let election_interval = CONFIG.limit.metrics_leader_election_interval * 1000000; let mut last_received: i64 = 0; @@ -204,6 +205,22 @@ pub async fn prometheus_write_proto( if entry.is_empty() { continue; } + + // check if we are allowed to ingest + if db::compact::delete::is_deleting_stream( + org_id, + &metric_name, + StreamType::Metrics, + None, + ) { + return Ok(HttpResponse::InternalServerError().json( + meta::http::HttpResponse::error( + http::StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("stream [{}] is being deleted", metric_name)), + ), + )); + } + write_buf.clear(); for row in &entry { write_buf.put(row.as_bytes()); diff --git a/src/service/search/cache.rs b/src/service/search/cache.rs index b9b9f9b22..8b1d83ced 100644 --- a/src/service/search/cache.rs +++ b/src/service/search/cache.rs @@ -66,11 +66,9 @@ pub async fn search( let _guard3 = span3.enter(); // fetch all schema versions, get latest schema - let schema_versions = - db::schema::get_versions(&sql.org_id, &sql.stream_name, Some(stream_type)).await?; - let schema_latest = schema_versions.last().unwrap(); + let schema = db::schema::get(&sql.org_id, &sql.stream_name, Some(stream_type)).await?; let schema = Arc::new( - schema_latest + schema .to_owned() .with_metadata(std::collections::HashMap::new()), ); diff --git a/src/service/search/storage.rs b/src/service/search/storage.rs index 3ce64c588..7dc454237 100644 --- a/src/service/search/storage.rs +++ b/src/service/search/storage.rs @@ -94,6 +94,9 @@ pub async fn search( // fetch all schema versions, group files by version let schema_versions = db::schema::get_versions(&sql.org_id, &sql.stream_name, Some(stream_type)).await?; + if schema_versions.is_empty() { + return Err(anyhow::anyhow!("stream not found")); + } let schema_latest = schema_versions.last().unwrap(); let schema_latest_id = schema_versions.len() - 1; let mut files_group: HashMap> = diff --git a/src/service/stream.rs b/src/service/stream.rs index ad5597549..5b4042c7c 100644 --- a/src/service/stream.rs +++ b/src/service/stream.rs @@ -21,6 +21,8 @@ use tracing::info_span; use crate::common::json; use crate::common::utils::is_local_disk_storage; +use crate::infra::cache::stats; +use crate::infra::config::STREAM_SCHEMAS; use crate::meta::http::HttpResponse as MetaHttpResponse; use crate::meta::stream::{ListStream, Stream, StreamProperty, StreamSettings, StreamStats}; use crate::meta::StreamType; @@ -40,9 +42,7 @@ pub async fn get_stream( let schema = db::schema::get(org_id, stream_name, Some(stream_type)) .await .unwrap(); - let mut stats = db::get_stream_stats(org_id, stream_name, &stream_type.to_string()) - .await - .unwrap(); + let mut stats = stats::get_stream_stats(org_id, stream_name, stream_type); stats = transform_stats(&mut stats); if schema != Schema::empty() { let stream = stream_res(stream_name, stream_type, schema, Some(stats)); @@ -76,13 +76,11 @@ pub async fn get_streams( .unwrap(); let mut indices_res = Vec::new(); for stream_loc in indices { - let mut stats = db::get_stream_stats( + let mut stats = stats::get_stream_stats( org_id, stream_loc.stream_name.as_str(), - &stream_loc.stream_type.to_string(), - ) - .await - .unwrap(); + stream_loc.stream_type, + ); if stats.eq(&StreamStats::default()) { indices_res.push(stream_res( stream_loc.stream_name.as_str(), @@ -171,6 +169,17 @@ pub async fn save_stream_settings( ) -> Result { let loc_span = info_span!("service:streams:set_partition_keys"); let _guard = loc_span.enter(); + + // check if we are allowed to ingest + if db::compact::delete::is_deleting_stream(org_id, stream_name, stream_type, None) { + return Ok( + HttpResponse::InternalServerError().json(MetaHttpResponse::error( + http::StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("stream [{}] is being deleted", stream_name)), + )), + ); + } + let schema = db::schema::get(org_id, stream_name, Some(stream_type)) .await .unwrap(); @@ -195,6 +204,67 @@ pub async fn save_stream_settings( ))) } +pub async fn delete_stream( + org_id: &str, + stream_name: &str, + stream_type: StreamType, +) -> Result { + let loc_span = info_span!("service:streams:delete_stream"); + let _guard = loc_span.enter(); + let schema = db::schema::get_versions(org_id, stream_name, Some(stream_type)) + .await + .unwrap(); + if schema.is_empty() { + return Ok(HttpResponse::NotFound().json(MetaHttpResponse::error( + StatusCode::NOT_FOUND.into(), + Some("stream not found".to_owned()), + ))); + } + + // create delete for compactor + if let Err(e) = db::compact::delete::delete_stream(org_id, stream_name, stream_type, None).await + { + return Ok( + HttpResponse::InternalServerError().json(MetaHttpResponse::error( + StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("failed to delete stream: {}", e)), + )), + ); + } + + // delete stream schema + if let Err(e) = db::schema::delete(org_id, stream_name, Some(stream_type)).await { + return Ok( + HttpResponse::InternalServerError().json(MetaHttpResponse::error( + StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("failed to delete stream: {}", e)), + )), + ); + } + + // delete stream schema cache + let key = format!("{}/{}/{}", org_id, stream_type, stream_name); + STREAM_SCHEMAS.remove(&key); + + // delete stream stats cache + stats::remove_stream_stats(org_id, stream_name, stream_type); + + // delete stream compaction offset + if let Err(e) = db::compact::files::del_offset(org_id, stream_name, stream_type).await { + return Ok( + HttpResponse::InternalServerError().json(MetaHttpResponse::error( + StatusCode::INTERNAL_SERVER_ERROR.into(), + Some(format!("failed to delete stream: {}", e)), + )), + ); + }; + + Ok(HttpResponse::Ok().json(MetaHttpResponse::message( + StatusCode::OK.into(), + "stream deleted".to_owned(), + ))) +} + pub fn get_stream_setting_fts_fields(schema: &Schema) -> Result, anyhow::Error> { let mut full_text_search_keys = vec![]; let settings = schema.metadata.get("settings"); diff --git a/src/service/traces/mod.rs b/src/service/traces/mod.rs index 40a6843b6..d033a30c5 100644 --- a/src/service/traces/mod.rs +++ b/src/service/traces/mod.rs @@ -74,7 +74,8 @@ pub async fn handle_trace_request( let mut data_buf: AHashMap> = AHashMap::new(); let mut traces_schema_map: AHashMap = AHashMap::new(); - let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros(); + let mut min_ts = + (Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros(); let mut service_name: String = traces_stream_name.to_string(); let res_spans = request.resource_spans; for res_span in res_spans { diff --git a/src/service/traces/otlp_http.rs b/src/service/traces/otlp_http.rs index 92806c225..88d0e07b1 100644 --- a/src/service/traces/otlp_http.rs +++ b/src/service/traces/otlp_http.rs @@ -76,7 +76,8 @@ pub async fn traces_json( let mut trace_meta_coll: AHashMap>> = AHashMap::new(); - let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros(); + let mut min_ts = + (Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros(); let mut data_buf: AHashMap> = AHashMap::new(); let mut traces_schema_map: AHashMap = AHashMap::new(); let mut service_name: String = traces_stream_name.to_string(); diff --git a/tests/integration_test.rs b/tests/integration_test.rs index d2fe7f4d0..42162b531 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -704,7 +704,7 @@ mod tests { .set_payload(body_str) .to_request(); let resp = test::call_service(&app, req).await; - //println!("{:?}", resp); + // println!("{:?}", resp); assert!(resp.status().is_success()); } @@ -783,7 +783,7 @@ mod tests { .append_header(auth) .to_request(); let resp = test::call_service(&app, req).await; - //println!("{:?}", resp); + // println!("{:?}", resp); assert!(resp.status().is_success()); } async fn e2e_cache_status() { @@ -802,7 +802,7 @@ mod tests { .append_header(auth) .to_request(); let resp = test::call_service(&app, req).await; - //println!("{:?}", resp); + // println!("{:?}", resp); assert!(resp.status().is_success()); } diff --git a/web/src/components/alerts/add.vue b/web/src/components/alerts/add.vue index f7840da48..940804cf0 100644 --- a/web/src/components/alerts/add.vue +++ b/web/src/components/alerts/add.vue @@ -270,7 +270,7 @@ import { useI18n } from "vue-i18n"; import { useStore } from "vuex"; import { useQuasar } from "quasar"; -import IndexService from "../../services/index"; +import streamService from "../../services/stream"; import { Parser } from "node-sql-parser"; import segment from "../../services/segment_analytics"; @@ -479,7 +479,7 @@ export default defineComponent({ this.formData = this.modelValue; } - IndexService.nameList( + streamService.nameList( this.store.state.selectedOrganization.identifier, "", true diff --git a/web/src/components/functions/add.vue b/web/src/components/functions/add.vue index 5df275378..7a8a095a3 100644 --- a/web/src/components/functions/add.vue +++ b/web/src/components/functions/add.vue @@ -145,7 +145,7 @@ import { useI18n } from "vue-i18n"; import { useStore } from "vuex"; import { useQuasar } from "quasar"; -import IndexService from "../../services/index"; +import streamService from "../../services/stream"; import { update } from "plotly.js"; import segment from "../../services/segment_analytics"; @@ -296,7 +296,7 @@ end`; this.formData = this.modelValue; } - IndexService.nameList( + streamService.nameList( this.store.state.selectedOrganization.identifier, "", false diff --git a/web/src/components/logstream/schema.vue b/web/src/components/logstream/schema.vue index f5d2d0b5c..81ecae739 100644 --- a/web/src/components/logstream/schema.vue +++ b/web/src/components/logstream/schema.vue @@ -139,7 +139,7 @@ import { defineComponent, ref } from "vue"; import { useI18n } from "vue-i18n"; import { useStore } from "vuex"; import { useQuasar, date, format } from "quasar"; -import indexService from "../../services/index"; +import streamService from "../../services/stream"; import segment from "../../services/segment_analytics"; const defaultValue: any = () => { @@ -173,7 +173,7 @@ export default defineComponent({ message: "Please wait while loading stats...", }); - await indexService + await streamService .schema( store.state.selectedOrganization.identifier, indexData.value.name, @@ -256,7 +256,7 @@ export default defineComponent({ settings.partition_keys.concat(added_part_keys); } - await indexService + await streamService .updateSettings( store.state.selectedOrganization.identifier, indexData.value.name, diff --git a/web/src/plugins/logs/Index.vue b/web/src/plugins/logs/Index.vue index 596b2eeb8..e42f87d82 100644 --- a/web/src/plugins/logs/Index.vue +++ b/web/src/plugins/logs/Index.vue @@ -146,7 +146,7 @@ import useLogs from "../../composables/useLogs"; import { deepKeys, byString } from "../../utils/json"; import { Parser } from "node-sql-parser"; -import indexService from "../../services/index"; +import streamService from "../../services/stream"; import searchService from "../../services/search"; import TransformService from "../../services/jstransform"; import { useLocalLogsObj } from "../../utils/zincutils"; @@ -293,7 +293,7 @@ export default defineComponent({ function getStreamList() { try { - indexService + streamService .nameList(store.state.selectedOrganization.identifier, "logs", true) .then((res) => { searchObj.data.streamResults = res.data; diff --git a/web/src/services/index.ts b/web/src/services/stream.ts similarity index 73% rename from web/src/services/index.ts rename to web/src/services/stream.ts index 8e7cfd4a1..fac56ede6 100644 --- a/web/src/services/index.ts +++ b/web/src/services/stream.ts @@ -10,11 +10,11 @@ // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and -// limitations under the License. +// limitations under the License. import http from "./http"; -var index = { +const stream = { nameList: (org_identifier: string, type: string, schema: boolean) => { let url = `/api/${org_identifier}/streams`; @@ -23,7 +23,10 @@ var index = { } if (schema) { - url += url.indexOf("?")>0 ? "&fetchSchema="+schema: "?fetchSchema="+schema; + url += + url.indexOf("?") > 0 + ? "&fetchSchema=" + schema + : "?fetchSchema=" + schema; } return http().get(url); }, @@ -35,7 +38,12 @@ var index = { } return http().get(url); }, - updateSettings: (org_identifier: string, stream_name: string, type: string,data: any) => { + updateSettings: ( + org_identifier: string, + stream_name: string, + type: string, + data: any + ) => { let url = `/api/${org_identifier}/${stream_name}/settings`; if (type != "") { @@ -43,8 +51,9 @@ var index = { } return http().post(url, data); }, + delete: (org_identifier: string, stream_name: string) => { + return http().delete(`/api/${org_identifier}/${stream_name}`); + }, }; - - -export default index; +export default stream; diff --git a/web/src/views/LogStream.vue b/web/src/views/LogStream.vue index 888063867..b09c245f3 100644 --- a/web/src/views/LogStream.vue +++ b/web/src/views/LogStream.vue @@ -37,6 +37,17 @@ @@ -121,6 +132,7 @@ no-caps class="no-border" color="primary" + @click="deleteStream" > {{ t("logStream.ok") }} @@ -138,7 +150,7 @@ import { useQuasar, type QTableProps } from "quasar"; import { useI18n } from "vue-i18n"; import QTablePagination from "../components/shared/grid/Pagination.vue"; -import indexService from "../services/index"; +import streamService from "../services/stream"; import SchemaIndex from "../components/logstream/schema.vue"; import NoData from "../components/shared/grid/NoData.vue"; import segment from "../services/segment_analytics"; @@ -151,7 +163,7 @@ export default defineComponent({ setup(props, { emit }) { const store = useStore(); const { t } = useI18n(); - const q = useQuasar(); + const $q = useQuasar(); const router = useRouter(); const logStream = ref([]); const showIndexSchemaDialog = ref(false); @@ -210,15 +222,16 @@ export default defineComponent({ align: "center", }, ]); + let deleteStreamName = ""; const getLogStream = () => { if (store.state.selectedOrganization != null) { - const dismiss = q.notify({ + const dismiss = $q.notify({ spinner: true, message: "Please wait while loading streams...", }); - indexService + streamService .nameList(store.state.selectedOrganization.identifier, "logs", false) .then((res) => { let counter = 1; @@ -257,7 +270,7 @@ export default defineComponent({ }) .catch((err) => { dismiss(); - q.notify({ + $q.notify({ type: "negative", message: "Error while pulling stream.", timeout: 2000, @@ -312,8 +325,29 @@ export default defineComponent({ maxRecordToReturn.value = val; }; - const deleteIndex = (props: any) => { + const confirmDeleteAction = (props: any) => { confirmDelete.value = true; + deleteStreamName = props.row.name; + }; + + const deleteStream = () => { + streamService + .delete(store.state.selectedOrganization.identifier, deleteStreamName) + .then((res: any) => { + if (res.data.code == 200) { + $q.notify({ + color: "positive", + message: "Stream deleted successfully.", + }); + getLogStream(); + } + }) + .catch((err: any) => { + $q.notify({ + color: "negative", + message: "Error while deleting stream.", + }); + }); }; onActivated(() => { @@ -339,7 +373,8 @@ export default defineComponent({ pagination, resultTotal, listSchema, - deleteIndex, + deleteStream, + confirmDeleteAction, confirmDelete, schemaData, perPageOptions,