feat: stream delete (#193)

* feat: update datafusion to 19

* feat: add stream deleting

* feat: data delete

* feat: stream delete done for local mode

* feat: data delete work on s3

* feat: data delete works with gcs

* feat: improve data delete

* feat: improve data delete

* feat: data delete done with merge file list

* fix: comment some unused code

* test: change coverage

* docs: update readme
This commit is contained in:
Hengfei Yang 2023-03-06 18:44:42 +08:00 committed by GitHub
parent 8e20a370cb
commit 3cca9ad12b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
47 changed files with 1248 additions and 294 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@ report.json
# Generated by Cargo
# will have compiled files and executables
/examples/
/target/
/bin/

95
Cargo.lock generated
View File

@ -373,9 +373,9 @@ checksum = "8da52d66c7071e2e3fa2a1e5c6d088fec47b593032b254f5e980de8ea54454d6"
[[package]]
name = "arrow"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b556d39f9d19e363833a0fe65d591cd0e2ecc0977589a78179b592bea8dc945"
checksum = "f3724c874f1517cf898cd1c3ad18ab5071edf893c48e73139ab1e16cf0f2affe"
dependencies = [
"ahash 0.8.3",
"arrow-arith",
@ -396,9 +396,9 @@ dependencies = [
[[package]]
name = "arrow-arith"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "85c61b9235694b48f60d89e0e8d6cb478f39c65dd14b0fe1c3f04379b7d50068"
checksum = "e958823b8383ca14d0a2e973de478dd7674cd9f72837f8c41c132a0fda6a4e5e"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -411,9 +411,9 @@ dependencies = [
[[package]]
name = "arrow-array"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1e6e839764618a911cc460a58ebee5ad3d42bc12d9a5e96a29b7cc296303aa1"
checksum = "db670eab50e76654065b5aed930f4367101fcddcb2223802007d1e0b4d5a2579"
dependencies = [
"ahash 0.8.3",
"arrow-buffer",
@ -428,9 +428,9 @@ dependencies = [
[[package]]
name = "arrow-buffer"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "03a21d232b1bc1190a3fdd2f9c1e39b7cd41235e95a0d44dd4f522bc5f495748"
checksum = "9f0e01c931882448c0407bd32311a624b9f099739e94e786af68adc97016b5f2"
dependencies = [
"half",
"num",
@ -438,9 +438,9 @@ dependencies = [
[[package]]
name = "arrow-cast"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "83dcdb1436cac574f1c1b30fda91c53c467534337bef4064bbd4ea2d6fbc6e04"
checksum = "4bf35d78836c93f80d9362f3ccb47ff5e2c5ecfc270ff42cdf1ef80334961d44"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -454,9 +454,9 @@ dependencies = [
[[package]]
name = "arrow-csv"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a01677ae9458f5af9e35e1aa6ba97502f539e621db0c6672566403f97edd0448"
checksum = "0a6aa7c2531d89d01fed8c469a9b1bf97132a0bdf70b4724fe4bbb4537a50880"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -473,9 +473,9 @@ dependencies = [
[[package]]
name = "arrow-data"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14e3e69c9fd98357eeeab4aa0f626ecf7ecf663e68e8fc04eac87c424a414477"
checksum = "ea50db4d1e1e4c2da2bfdea7b6d2722eef64267d5ab680d815f7ae42428057f5"
dependencies = [
"arrow-buffer",
"arrow-schema",
@ -485,9 +485,9 @@ dependencies = [
[[package]]
name = "arrow-ipc"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64cac2706acbd796965b6eaf0da30204fe44aacf70273f8cb3c9b7d7f3d4c190"
checksum = "a4042fe6585155d1ec28a8e4937ec901a3ca7a19a22b9f6cd3f551b935cd84f5"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -501,9 +501,9 @@ dependencies = [
[[package]]
name = "arrow-json"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7790e8b7df2d8ef5ac802377ac256cf2fb80cbf7d44b82d6464e20ace6232a5a"
checksum = "7c907c4ab4f26970a3719dc06e78e8054a01d0c96da3664d23b941e201b33d2b"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -513,15 +513,16 @@ dependencies = [
"chrono",
"half",
"indexmap",
"lexical-core",
"num",
"serde_json",
]
[[package]]
name = "arrow-ord"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7ee6e1b761dfffaaf7b5bbe68c113a576a3a802146c5c0b9fcec781e30d80a3"
checksum = "e131b447242a32129efc7932f58ed8931b42f35d8701c1a08f9f524da13b1d3c"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -533,9 +534,9 @@ dependencies = [
[[package]]
name = "arrow-row"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e65bfedf782fc92721e796fdd26ae7343c98ba9a9243d62def9e4e1c4c1cf0b"
checksum = "b591ef70d76f4ac28dd7666093295fece0e5f9298f49af51ea49c001e1635bb6"
dependencies = [
"ahash 0.8.3",
"arrow-array",
@ -548,18 +549,18 @@ dependencies = [
[[package]]
name = "arrow-schema"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73ca49d010b27e2d73f70c1d1f90c1b378550ed0f4ad379c4dea0c997d97d723"
checksum = "eb327717d87eb94be5eff3b0cb8987f54059d343ee5235abf7f143c85f54cfc8"
dependencies = [
"serde",
]
[[package]]
name = "arrow-select"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "976cbaeb1a85c09eea81f3f9c149c758630ff422ed0238624c5c3f4704b6a53c"
checksum = "79d3c389d1cea86793934f31594f914c8547d82e91e3411d4833ad0aac3266a7"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -570,9 +571,9 @@ dependencies = [
[[package]]
name = "arrow-string"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d4882762f8f48a9218946c016553d38b04b4fe8202038dad4141b3b887b7da8"
checksum = "30ee67790496dd310ddbf5096870324431e89aa76453e010020ac29b1184d356"
dependencies = [
"arrow-array",
"arrow-buffer",
@ -1664,9 +1665,9 @@ dependencies = [
[[package]]
name = "datafusion"
version = "17.0.0"
version = "19.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a6d90cae91414aaeda37ae8022a23ef1124ca8efc08ac7d7770274249f7cf148"
checksum = "12d462c103bd1cfd24f8e8a199986d89582af6280528e085c393c4be2ff25da7"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1711,9 +1712,9 @@ dependencies = [
[[package]]
name = "datafusion-common"
version = "17.0.0"
version = "19.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b21c4b8e8b7815e86d79d25da16854fee6d4d1b386572e802a248b7d43188e86"
checksum = "b5babdbcf102862b1f1828c1ab41094e39ba881d5ece4cee2d481d528148f592"
dependencies = [
"arrow",
"chrono",
@ -1725,9 +1726,9 @@ dependencies = [
[[package]]
name = "datafusion-expr"
version = "17.0.0"
version = "19.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "db8c07b051fbaf01657a3eb910a76b042ecfed0350a40412f70cf6b949bd5328"
checksum = "90f0c34e87fa541a59d378dc7ee7c9c3dd1fcfa793eab09561b8b4cb35e1827a"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1738,9 +1739,9 @@ dependencies = [
[[package]]
name = "datafusion-optimizer"
version = "17.0.0"
version = "19.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2ce4d34a808cd2e4c4864cdc759dd1bd22dcac2b8af38aa570e30fd54577c4d"
checksum = "7d0c6d912b7b7e4637d85947222455cd948ea193ca454ebf649e7265fd10b048"
dependencies = [
"arrow",
"async-trait",
@ -1755,9 +1756,9 @@ dependencies = [
[[package]]
name = "datafusion-physical-expr"
version = "17.0.0"
version = "19.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a38afa11a09505c24bd7e595039d7914ec39329ba490209413ef2d37895c8220"
checksum = "8000e8f8efafb810ff2943323bb48bd722ac5bb919fe302a66b832ed9c25245f"
dependencies = [
"ahash 0.8.3",
"arrow",
@ -1786,9 +1787,9 @@ dependencies = [
[[package]]
name = "datafusion-row"
version = "17.0.0"
version = "19.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9172411b25ff4aa97f8e99884898595a581636d93cc96c12f96dbe3bf51cd7e5"
checksum = "4e900f05d7e5666e8ab714a96a28cb6f143e62aa1d501ba1199024f8635c726c"
dependencies = [
"arrow",
"datafusion-common",
@ -1798,9 +1799,9 @@ dependencies = [
[[package]]
name = "datafusion-sql"
version = "17.0.0"
version = "19.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fbe5e61563ced2f6992a60afea568ff3de69e32940bbf07db06fc5c9d8cd866"
checksum = "096f293799e8ae883e0f79f8ebaa51e4292e690ba45e0269b48ca9bd79f57094"
dependencies = [
"arrow-schema",
"datafusion-common",
@ -1964,12 +1965,12 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "flatbuffers"
version = "22.9.29"
version = "23.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ce016b9901aef3579617931fbb2df8fc9a9f7cb95a16eb8acc8148209bb9e70"
checksum = "77f5399c2c9c50ae9418e522842ad362f61ee48b346ac106807bd355a8a7c619"
dependencies = [
"bitflags",
"thiserror",
"rustc_version",
]
[[package]]
@ -3298,9 +3299,9 @@ dependencies = [
[[package]]
name = "parquet"
version = "31.0.0"
version = "33.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6b4ee1ffc0778395c9783a5c74f2cad2fb1a128ade95a965212d31b7b13e3d45"
checksum = "b1b076829801167d889795cd1957989055543430fa1469cb1f6e32b789bfc764"
dependencies = [
"ahash 0.8.3",
"arrow-array",

View File

@ -30,8 +30,8 @@ actix-web-rust-embed-responder = {version = "2.2.0", default-features = false, f
ahash = {version = "0.8.3", features = ["serde"]}
anyhow = "1.0"
argon2 = {version = "0.4.1", features = ["alloc", "password-hash"]}
arrow = {version = "31.0", features = ["simd", "ipc_compression"]}
arrow-schema = {version = "31.0", features = ["serde"]}
arrow = {version = "33.0", features = ["simd", "ipc_compression"]}
arrow-schema = {version = "33.0", features = ["serde"]}
async-trait = "0.1.57"
async_once = "0.2.6"
awc = "3.1.0"
@ -41,8 +41,8 @@ bytes = "1.1"
chrono = "0.4"
clap = { version = "4.1.6", default-features = false, features = ["std", "help", "usage", "suggestions", "cargo"] }
dashmap = {version = "5.4.0", features = ["serde"]}
datafusion = {version = "17.0", features = ["simd"]}
datafusion-common = "17.0"
datafusion = {version = "19.0", features = ["simd"]}
datafusion-common = "19.0"
dotenv_config = "0.1.5"
dotenvy = "0.15.6"
env_logger = "0.9"
@ -65,7 +65,7 @@ opentelemetry = {version = "0.17", features = ["rt-tokio"]}
opentelemetry-otlp = {version = "0.10", features = ["http-proto", "serialize", "reqwest-client"]}
opentelemetry-proto = {version = "0.1", features = ["gen-tonic", "traces", "with-serde", "build-server"]}
parking_lot = "0.12"
parquet = {version = "31.0", features = ["arrow", "async"]}
parquet = {version = "33.0", features = ["arrow", "async"]}
prometheus = "0.13.3"
prost = "0.11.2"
rand = "0.8.5"

View File

@ -75,4 +75,3 @@ Check the [contributing guide](./CONTRIBUTING.md)
## Ingestion
![Home](./screenshots/zo_ingestion.png)

View File

@ -17,9 +17,9 @@ echo "line_cov $line_cov"
# enable threshold
#COVERAGE_THRESHOLD=80
FUNC_COV_THRESHOLD=60
LINE_COV_THRESHOLD=53
REGION_COV_THRESHOLD=40
FUNC_COV_THRESHOLD=55
LINE_COV_THRESHOLD=50
REGION_COV_THRESHOLD=35
# clean up
# find ./target -name llvm-cov-target -type d|xargs rm -fR

View File

@ -39,7 +39,7 @@ pub fn get_stream_file_num_v1(file_name: &str) -> u32 {
}
pub fn get_file_name_v1(org_id: &str, stream_name: &str, suffix: u32) -> String {
// creates file name like "./data/olympics/olympics#2022#09#13#13_1.json"
// creates file name like "./data/zincobserve/olympics/olympics#2022#09#13#13_1.json"
format!(
"{}{}/{}/{}/{}_{}{}",
&CONFIG.common.data_wal_dir,
@ -101,7 +101,7 @@ mod test_utils {
for i in 0..suffix_nums.len() {
let suffix = increment_stream_file_num_v1(&format!(
"./data/WAL/nexus/logs/olympics/1663064862606912_{}.json",
"./data/zincobserve/WAL/nexus/logs/olympics/1663064862606912_{}.json",
suffix_nums[i]
));
assert_eq!(suffix, suffix_nums[i] + 1);
@ -113,13 +113,14 @@ mod test_utils {
let file_key = get_file_name_v1(&"nexus".to_owned(), &"Olympics".to_owned(), 2);
assert_eq!(
file_key.as_str(),
"./data/wal/nexus/logs/Olympics/Olympics_2.json"
"./data/zincobserve/wal/nexus/logs/Olympics/Olympics_2.json"
);
}
#[test]
fn test_get_stream_file_num_v1() {
let file_key = get_stream_file_num_v1("./data/WAL/logs/nexus/Olympics/Olympics_2.json");
let file_key =
get_stream_file_num_v1("./data/zincobserve/WAL/logs/nexus/Olympics/Olympics_2.json");
assert_eq!(file_key, 2);
}

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use actix_web::{get, http, post, web, HttpRequest, HttpResponse, Responder};
use actix_web::{delete, get, http, post, web, HttpRequest, HttpResponse, Responder};
use std::collections::HashMap;
use std::io::Error;
use std::io::ErrorKind;
@ -55,21 +55,11 @@ async fn schema(
)
}
};
match stream_type {
Some(stream_type_loc) => {
stream::get_stream(org_id.as_str(), stream_name.as_str(), stream_type_loc).await
}
/* Some(StreamType::Logs) => {
stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Logs).await
}
Some(StreamType::Metrics) => {
stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Metrics).await
}
Some(StreamType::Traces) => {
stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Traces).await
} */
None => stream::get_stream(org_id.as_str(), stream_name.as_str(), StreamType::Logs).await,
}
let stream_type = match stream_type {
Some(v) => v,
None => StreamType::Logs,
};
stream::get_stream(&org_id, &stream_name, stream_type).await
}
#[utoipa::path(
@ -109,26 +99,52 @@ async fn settings(
}
};
match stream_type {
Some(steam_type_loc) => {
stream::save_stream_settings(
org_id.as_str(),
stream_name.as_str(),
steam_type_loc,
settings.into_inner(),
let stream_type = match stream_type {
Some(v) => v,
None => StreamType::Logs,
};
stream::save_stream_settings(&org_id, &stream_name, stream_type, settings.into_inner()).await
}
#[utoipa::path(
context_path = "/api",
tag = "Streams",
operation_id = "StreamDelete",
security(
("Authorization"= [])
),
params(
("org_id" = String, Path, description = "Organization name"),
("stream_name" = String, Path, description = "Stream name"),
),
responses(
(status = 200, description="Success", content_type = "application/json", body = Stream),
(status = 400, description="Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[delete("/{org_id}/{stream_name}")]
async fn delete(
path: web::Path<(String, String)>,
req: HttpRequest,
) -> Result<HttpResponse, Error> {
let (org_id, stream_name) = path.into_inner();
let query = web::Query::<HashMap<String, String>>::from_query(req.query_string()).unwrap();
let stream_type = match get_stream_type_from_request(&query) {
Ok(v) => v,
Err(e) => {
return Ok(
HttpResponse::BadRequest().json(meta::http::HttpResponse::error(
http::StatusCode::BAD_REQUEST.into(),
Some(e.to_string()),
)),
)
.await
}
None => {
stream::save_stream_settings(
org_id.as_str(),
stream_name.as_str(),
StreamType::Logs,
settings.into_inner(),
)
.await
}
}
};
let stream_type = match stream_type {
Some(v) => v,
None => StreamType::Logs,
};
stream::delete_stream(&org_id, &stream_name, stream_type).await
}
#[utoipa::path(
@ -175,21 +191,7 @@ async fn list(org_id: web::Path<String>, req: HttpRequest) -> impl Responder {
None => false,
};
match stream_type {
/* Some(StreamType::Logs) => {
stream::list_streams(org_id.as_str(), Some(StreamType::Logs), fetch_schema).await
}
Some(StreamType::Metrics) => {
stream::list_streams(org_id.as_str(), Some(StreamType::Metrics), fetch_schema).await
}
Some(StreamType::Traces) => {
stream::list_streams(org_id.as_str(), Some(StreamType::Traces), fetch_schema).await
} */
Some(stream_type_loc) => {
stream::list_streams(org_id.as_str(), Some(stream_type_loc), fetch_schema).await
}
None => stream::list_streams(org_id.as_str(), None, fetch_schema).await,
}
stream::list_streams(org_id.as_str(), stream_type, fetch_schema).await
}
#[get("/{org_id}/")]

View File

@ -95,6 +95,7 @@ pub fn get_service_routes(cfg: &mut web::ServiceConfig) {
.service(search::around)
.service(stream::schema)
.service(stream::settings)
.service(stream::delete)
.service(stream::list)
.service(stream::org_index)
.service(functions::save_function)

View File

@ -26,6 +26,7 @@ use crate::meta;
request::stream::list,
request::stream::schema,
request::stream::settings,
request::stream::delete,
request::ingest::bulk,
request::ingest::multi,
request::ingest::json,

View File

@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use chrono::{Datelike, TimeZone, Timelike, Utc};
use chrono::{Datelike, Duration, TimeZone, Timelike, Utc};
use dashmap::DashMap;
use crate::meta::{common::FileMeta, StreamType};
@ -377,73 +377,37 @@ pub async fn get_file_list(
) -> Result<Vec<String>, anyhow::Error> {
let mut files = Vec::new();
let mut keys = Vec::new();
let mut key = "".to_string();
if time_min > 0 && time_max > 0 {
let time_min = Utc.timestamp_nanos(time_min * 1000);
let time_max = Utc.timestamp_nanos(time_max * 1000);
if time_min.year() == time_max.year() {
key.push_str(&time_min.format("%Y").to_string());
key.push('/');
if time_min.month() == time_max.month() {
key.push_str(&time_min.format("%m").to_string());
key.push('/');
if time_min.day() == time_max.day() {
key.push_str(&time_min.format("%d").to_string());
key.push('/');
if time_min.hour() == time_max.hour() {
key.push_str(&time_min.format("%H").to_string());
key.push('/');
keys.push(key);
} else {
for i in time_min.hour()..(time_max.hour() + 1) {
let mut k = key.clone();
if i < 10 {
k.push_str(&format!("0{}", i))
} else {
k.push_str(&format!("{}", i))
}
k.push('/');
keys.push(k);
}
}
} else {
for i in time_min.day()..(time_max.day() + 1) {
let mut k = key.clone();
if i < 10 {
k.push_str(&format!("0{}", i))
} else {
k.push_str(&format!("{}", i))
}
k.push('/');
keys.push(k);
}
}
} else {
for i in time_min.month()..(time_max.month() + 1) {
let mut k = key.clone();
if i < 10 {
k.push_str(&format!("0{}", i))
} else {
k.push_str(&format!("{}", i))
}
k.push('/');
keys.push(k);
}
if time_min + Duration::hours(48) >= time_max {
// less than 48 hours, generate keys by hours
let mut time_min = Utc
.with_ymd_and_hms(
time_min.year(),
time_min.month(),
time_min.day(),
time_min.hour(),
0,
0,
)
.unwrap();
while time_min <= time_max {
keys.push(time_min.format("%Y/%m/%d/%H/").to_string());
time_min += Duration::hours(1);
}
} else {
for i in time_min.year()..(time_max.year() + 1) {
let mut k = key.clone();
if i < 10 {
k.push_str(&format!("0{}", i))
} else {
k.push_str(&format!("{}", i))
}
k.push('/');
keys.push(k);
// more than 48 hours, generate keys by days
let mut time_min = Utc
.with_ymd_and_hms(time_min.year(), time_min.month(), time_min.day(), 0, 0, 0)
.unwrap();
while time_min <= time_max {
keys.push(time_min.format("%Y/%m/%d/").to_string());
time_min += Duration::days(1);
}
}
} else {
keys.push(key);
keys.push("".to_string());
}
for key in keys {

View File

@ -16,6 +16,7 @@ use dashmap::DashMap;
use crate::meta::common::FileMeta;
use crate::meta::stream::StreamStats;
use crate::meta::StreamType;
lazy_static! {
static ref STATS: DashMap<String, StreamStats> = DashMap::with_capacity(2);
@ -23,20 +24,38 @@ lazy_static! {
const STREAM_STATS_MEM_SIZE: usize = std::mem::size_of::<StreamStats>();
#[inline]
pub fn get_stats() -> DashMap<String, StreamStats> {
STATS.clone()
}
pub fn get_stream_stats(org_id: &str, stream_name: &str, stream_type: &str) -> Option<StreamStats> {
#[inline]
pub fn get_stream_stats(org_id: &str, stream_name: &str, stream_type: StreamType) -> StreamStats {
let key = format!("{}/{}/{}", org_id, stream_type, stream_name);
STATS.get(&key).map(|v| *v.value())
match STATS.get(&key).map(|v| *v.value()) {
Some(v) => v,
None => StreamStats::default(),
}
}
pub fn set_stream_stats(org_id: &str, stream_name: &str, stream_type: &str, val: StreamStats) {
#[inline]
pub fn remove_stream_stats(org_id: &str, stream_name: &str, stream_type: StreamType) {
let key = format!("{}/{}/{}", org_id, stream_type, stream_name);
STATS.remove(&key);
}
#[inline]
pub fn set_stream_stats(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
val: StreamStats,
) {
let key = format!("{}/{}/{}", org_id, stream_type, stream_name);
STATS.insert(key, val);
}
#[inline]
pub fn incr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error> {
// eg: files/default/olympics/2022/10/03/10/6982652937134804993_1.parquet
let columns = key.split('/').collect::<Vec<&str>>();
@ -69,6 +88,7 @@ pub fn incr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error>
Ok(())
}
#[inline]
pub fn decr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error> {
// eg: files/default/olympics/2022/10/03/10/6982652937134804993_1.parquet
let columns = key.split('/').collect::<Vec<&str>>();
@ -83,6 +103,9 @@ pub fn decr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error>
let stream_type = columns[2].to_string();
let stream_name = columns[3].to_string();
let key = format!("{}/{}/{}", org_id, stream_type, stream_name);
if !STATS.contains_key(&key) {
return Ok(());
}
let mut stats = STATS.entry(key).or_default();
stats.doc_num -= val.records;
stats.file_num -= 1;
@ -92,10 +115,34 @@ pub fn decr_stream_stats(key: &str, val: FileMeta) -> Result<(), anyhow::Error>
Ok(())
}
#[inline]
pub fn reset_stream_stats(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
val: FileMeta,
) -> Result<(), anyhow::Error> {
let key = format!("{}/{}/{}", org_id, stream_type, stream_name);
if !STATS.contains_key(&key) {
return Ok(());
}
let mut stats = STATS.entry(key).or_default();
if val.min_ts != 0 {
stats.doc_time_min = val.min_ts;
}
if val.max_ts != 0 {
stats.doc_time_max = val.max_ts;
}
Ok(())
}
#[inline]
pub fn get_stream_stats_len() -> usize {
STATS.len()
}
#[inline]
pub fn get_stream_stats_in_memory_size() -> usize {
STATS
.iter()
@ -121,9 +168,9 @@ mod tests {
compressed_size: 3.00,
};
let _ = set_stream_stats("nexus", "default", "logs", val);
let stats = get_stream_stats("nexus", "default", "logs");
assert_eq!(stats, Some(val));
let _ = set_stream_stats("nexus", "default", StreamType::Logs, val);
let stats = get_stream_stats("nexus", "default", StreamType::Logs);
assert_eq!(stats, val);
let file_meta = FileMeta {
min_ts: 1667978841110,
@ -136,11 +183,11 @@ mod tests {
let file_key = "files/nexus/logs/default/2022/10/03/10/6982652937134804993_1.parquet";
let _ = incr_stream_stats(file_key, file_meta);
let stats = get_stream_stats("nexus", "default", "logs");
assert_eq!(stats.unwrap().doc_num, 5300);
let stats = get_stream_stats("nexus", "default", StreamType::Logs);
assert_eq!(stats.doc_num, 5300);
let _ = decr_stream_stats(file_key, file_meta);
let stats = get_stream_stats("nexus", "default", "logs");
assert_eq!(stats.unwrap().doc_num, 5000);
let stats = get_stream_stats("nexus", "default", StreamType::Logs);
assert_eq!(stats.doc_num, 5000);
}
}

View File

@ -186,8 +186,10 @@ pub struct Limit {
pub file_move_thread_num: usize,
#[env_config(name = "ZO_QUERY_THREAD_NUM", default = 0)]
pub query_thread_num: usize,
#[env_config(name = "ZO_TS_ALLOWED_UPTO", default = 5)] // in hours - in past
pub allowed_upto: i64,
#[env_config(name = "ZO_INGEST_ALLOWED_UPTO", default = 5)] // in hours - in past
pub ingest_allowed_upto: i64,
#[env_config(name = "ZO_DATA_LIFECYCLE", default = 0)] // in days
pub data_lifecycle: i64,
#[env_config(name = "ZO_METRICS_LEADER_PUSH_INTERVAL", default = 15)]
pub metrics_leader_push_interval: u64,
#[env_config(name = "ZO_METRICS_LEADER_ELECTION_INTERVAL", default = 30)]
@ -270,6 +272,8 @@ pub struct Sled {
#[derive(Clone, Debug, EnvConfig)]
pub struct S3 {
#[env_config(name = "ZO_S3_PROVIDER", default = "")]
pub provider: String,
#[env_config(name = "ZO_S3_SERVER_URL", default = "")]
pub server_url: String,
#[env_config(name = "ZO_S3_REGION_NAME", default = "")]
@ -299,6 +303,10 @@ pub fn init() -> Config {
if cfg.limit.file_push_interval == 0 {
cfg.limit.file_push_interval = 10;
}
if cfg.limit.data_lifecycle > 0 && cfg.limit.data_lifecycle < 3 {
panic!("data lifecyle disallow set period less than 3 days.");
}
// HACK instance_name
if cfg.common.instance_name.is_empty() {
cfg.common.instance_name = hostname().unwrap();
@ -331,6 +339,11 @@ pub fn init() -> Config {
if let Err(e) = check_etcd_config(&mut cfg) {
panic!("etcd config error: {}", e);
}
// check s3 config
if let Err(e) = check_s3_config(&mut cfg) {
panic!("s3 config error: {}", e);
}
cfg
}
@ -408,6 +421,13 @@ fn check_memory_cache_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
Ok(())
}
fn check_s3_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
if cfg.s3.provider.is_empty() && cfg.s3.server_url.contains(".googleapis.com") {
cfg.s3.provider = "gcs".to_string();
}
Ok(())
}
#[inline]
pub fn get_parquet_compression() -> parquet::basic::Compression {
match CONFIG.common.parquet_compression.to_lowercase().as_str() {

View File

@ -60,12 +60,17 @@ impl FileStorage for Local {
}
}
async fn del(&self, file: &str) -> Result<(), anyhow::Error> {
let file = format!("{}{}", CONFIG.common.data_stream_dir, file);
match fs::remove_file(file) {
Ok(_) => Ok(()),
Err(e) => Err(anyhow::anyhow!(e)),
async fn del(&self, files: &[&str]) -> Result<(), anyhow::Error> {
if files.is_empty() {
return Ok(());
}
for file in files {
let file = format!("{}{}", CONFIG.common.data_stream_dir, file);
if let Err(e) = fs::remove_file(file) {
return Err(anyhow::anyhow!(e));
}
}
Ok(())
}
}
@ -87,7 +92,7 @@ mod test_util {
let resp = local.list("").await;
assert!(resp.unwrap().contains(&file_name.to_string()));
let resp = local.del(file_name).await;
let resp = local.del(&[file_name]).await;
assert!(resp.is_ok());
}
}

View File

@ -26,7 +26,7 @@ pub trait FileStorage: Sync + 'static {
async fn list(&self, prefix: &str) -> Result<Vec<String>, anyhow::Error>;
async fn get(&self, file: &str) -> Result<Bytes, anyhow::Error>;
async fn put(&self, file: &str, data: Bytes) -> Result<(), anyhow::Error>;
async fn del(&self, file: &str) -> Result<(), anyhow::Error>;
async fn del(&self, files: &[&str]) -> Result<(), anyhow::Error>;
}
lazy_static! {

View File

@ -16,6 +16,7 @@ use async_once::AsyncOnce;
use async_trait::async_trait;
use aws_config::default_provider::credentials::DefaultCredentialsChain;
use aws_config::{timeout::TimeoutConfig, SdkConfig};
use aws_sdk_s3::model::{Delete, ObjectIdentifier};
use aws_sdk_s3::{Client, Credentials, Region};
use std::{sync::Arc, time::Duration};
@ -103,7 +104,65 @@ impl FileStorage for S3 {
}
}
async fn del(&self, file: &str) -> Result<(), anyhow::Error> {
async fn del(&self, files: &[&str]) -> Result<(), anyhow::Error> {
if files.is_empty() {
return Ok(());
}
// Hack for GCS
if CONFIG.s3.provider.eq("gcs") {
for file in files {
if let Err(e) = self.del_for_gcs(file).await {
return Err(anyhow::anyhow!(e));
}
tokio::task::yield_now().await; // yield to other tasks
}
return Ok(());
}
let step = 100;
let mut start = 0;
let files_len = files.len();
while start < files_len {
let s3config = S3CONFIG.get().await.clone().unwrap();
let client = Client::new(&s3config);
let result = client
.delete_objects()
.bucket(&CONFIG.s3.bucket_name)
.delete(
Delete::builder()
.set_objects(Some(
files[start..(start + step).min(files_len)]
.iter()
.map(|file| {
ObjectIdentifier::builder()
.set_key(Some(file.to_string()))
.build()
})
.collect::<Vec<_>>(),
))
.build(),
)
.send()
.await;
match result {
Ok(_output) => {
log::info!("s3 File delete success: {:?}", files);
}
Err(err) => {
log::error!("s3 File delete error: {:?}", err);
return Err(anyhow::anyhow!(err));
}
}
start += step;
tokio::task::yield_now().await; // yield to other tasks
}
Ok(())
}
}
impl S3 {
async fn del_for_gcs(&self, file: &str) -> Result<(), anyhow::Error> {
let s3config = S3CONFIG.get().await.clone().unwrap();
let client = Client::new(&s3config);
let result = client
@ -114,11 +173,11 @@ impl FileStorage for S3 {
.await;
match result {
Ok(_output) => {
log::info!("s3 File delete success: {}", file);
log::info!("s3[GCS] File delete success: {}", file);
Ok(())
}
Err(err) => {
log::error!("s3 File delete error: {:?}", err);
log::error!("s3[GCS] File delete error: {:?}", err);
Err(anyhow::anyhow!(err))
}
}

View File

@ -22,17 +22,36 @@ pub async fn run() -> Result<(), anyhow::Error> {
if !is_compactor(&super::cluster::LOCAL_NODE_ROLE) {
return Ok(());
}
if !CONFIG.compact.enabled {
return Ok(());
}
// should run it every hour
tokio::task::spawn(async move { run_delete().await });
tokio::task::spawn(async move { run_merge().await });
Ok(())
}
async fn run_delete() -> Result<(), anyhow::Error> {
let mut interval = time::interval(time::Duration::from_secs(CONFIG.compact.interval));
interval.tick().await; // trigger the first run
loop {
interval.tick().await;
let ret = service::compact::run().await;
let ret = service::compact::run_delete().await;
if ret.is_err() {
log::error!("[COMPACTOR] run error: {}", ret.err().unwrap());
log::error!("[COMPACTOR] run data delete error: {}", ret.err().unwrap());
}
}
}
async fn run_merge() -> Result<(), anyhow::Error> {
let mut interval = time::interval(time::Duration::from_secs(CONFIG.compact.interval));
interval.tick().await; // trigger the first run
loop {
interval.tick().await;
let ret = service::compact::run_merge().await;
if ret.is_err() {
log::error!("[COMPACTOR] run data merge error: {}", ret.err().unwrap());
}
}
}

View File

@ -94,6 +94,25 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> {
}
log::info!("[JOB] convert disk file: {}", file);
// check if we are allowed to ingest or just delete the file
if db::compact::delete::is_deleting_stream(&org_id, &stream_name, stream_type, None) {
log::info!(
"[JOB] the stream [{}/{}/{}] is deleting, just delete file: {}",
&org_id,
stream_type,
&stream_name,
file
);
if let Err(e) = fs::remove_file(&local_file) {
log::error!(
"[JOB] Failed to remove disk file from disk: {}, {}",
local_file,
e
);
}
continue;
}
let mut partitions = file_name.split('_').collect::<Vec<&str>>();
partitions.retain(|&x| x.contains('='));
let mut partition_key = String::from("");

View File

@ -73,6 +73,21 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> {
log::info!("[JOB] convert memory file: {}", file);
// check if we are allowed to ingest or just delete the file
if db::compact::delete::is_deleting_stream(&org_id, &stream_name, stream_type, None) {
log::info!(
"[JOB] the stream [{}/{}/{}] is deleting, just delete file: {}",
&org_id,
stream_type,
&stream_name,
file
);
if file_lock::FILES.write().unwrap().remove(&file).is_none() {
log::error!("[JOB] Failed to remove memory file: {}", file)
}
continue;
}
let mut partitions = file_name.split('_').collect::<Vec<&str>>();
partitions.retain(|&x| x.contains('='));
let mut partition_key = String::from("");
@ -108,12 +123,11 @@ async fn move_files_to_storage() -> Result<(), anyhow::Error> {
Ok(ret) => match ret {
Ok((path, key, meta, _stream_type)) => {
match db::file_list::local::set(&key, meta, false).await {
Ok(_) => match file_lock::FILES.write().unwrap().remove(&path) {
Some(_) => {}
None => {
Ok(_) => {
if file_lock::FILES.write().unwrap().remove(&path).is_none() {
log::error!("[JOB] Failed to remove memory file: {}", path)
}
},
}
Err(e) => log::error!(
"[JOB] Failed write memory file meta:{}, error: {}",
path,

View File

@ -69,6 +69,7 @@ pub async fn init() -> Result<(), anyhow::Error> {
tokio::task::spawn(async move { db::functions::watch().await });
tokio::task::spawn(async move { db::user::watch().await });
tokio::task::spawn(async move { db::schema::watch().await });
tokio::task::spawn(async move { db::compact::delete::watch().await });
tokio::task::spawn(async move { db::watch_prom_cluster_leader().await });
tokio::task::spawn(async move { db::alerts::watch().await });
tokio::task::spawn(async move { db::triggers::watch().await });
@ -76,6 +77,7 @@ pub async fn init() -> Result<(), anyhow::Error> {
db::functions::cache().await?;
db::user::cache().await?;
db::schema::cache().await?;
db::compact::delete::cache().await?;
db::cache_prom_cluster_leader().await?;
db::alerts::cache().await?;
db::triggers::cache().await?;

View File

@ -0,0 +1,267 @@
use chrono::{Duration, TimeZone, Utc};
use std::collections::{HashMap, HashSet};
use std::io::Write;
use tokio::time;
use crate::common::json;
use crate::common::utils::is_local_disk_storage;
use crate::infra::config::CONFIG;
use crate::infra::{cache, ider, storage};
use crate::meta::common::{FileKey, FileMeta};
use crate::meta::StreamType;
use crate::service::{db, file_list};
pub async fn delete_all(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
) -> Result<(), anyhow::Error> {
// println!("delete_all: {}/{}/{}", org_id, stream_type, stream_name);
if is_local_disk_storage() {
let data_dir = format!(
"{}/files/{}/{}/{}",
CONFIG.common.data_stream_dir, org_id, stream_type, stream_name
);
let path = std::path::Path::new(&data_dir);
if path.exists() {
std::fs::remove_dir_all(path)?;
}
} else {
// delete files from s3
// first fetch file list from local cache
let files = file_list::get_file_list(org_id, stream_name, Some(stream_type), 0, 0).await?;
let storage = &storage::DEFAULT;
match storage
.del(&files.iter().map(|v| v.as_str()).collect::<Vec<_>>())
.await
{
Ok(_) => {}
Err(e) => {
log::error!("[COMPACT] delete file failed: {}", e);
}
}
// at the end, fetch a file list from s3 to guatantte there is no file
let prefix = format!("files/{}/{}/{}/", org_id, stream_type, stream_name);
loop {
let files = storage.list(&prefix).await?;
if files.is_empty() {
break;
}
match storage
.del(&files.iter().map(|v| v.as_str()).collect::<Vec<_>>())
.await
{
Ok(_) => {}
Err(e) => {
log::error!("[COMPACT] delete file failed: {}", e);
}
}
tokio::task::yield_now().await; // yield to other tasks
}
}
// delete from file list
delete_from_file_list(org_id, stream_name, stream_type, (0, 0)).await?;
// mark delete done
db::compact::delete::delete_stream_done(org_id, stream_name, stream_type, None).await
}
pub async fn delete_by_date(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
date_range: (&str, &str),
) -> Result<(), anyhow::Error> {
// println!(
// "delete_by_date: {}/{}/{}, {:?}",
// org_id, stream_type, stream_name, date_range
// );
let mut date_start = Utc.datetime_from_str(
format!("{}T00:00:00Z", date_range.0).as_str(),
"%Y-%m-%dT%H:%M:%SZ",
)?;
let date_end = Utc.datetime_from_str(
format!("{}T23:59:59Z", date_range.1).as_str(),
"%Y-%m-%dT%H:%M:%SZ",
)?;
let time_range = { (date_start.timestamp_micros(), date_end.timestamp_micros()) };
if is_local_disk_storage() {
while date_start <= date_end {
let data_dir = format!(
"{}/files/{}/{}/{}/{}",
CONFIG.common.data_stream_dir,
org_id,
stream_type,
stream_name,
date_start.format("%Y/%m/%d")
);
let path = std::path::Path::new(&data_dir);
if path.exists() {
std::fs::remove_dir_all(path)?;
}
date_start += Duration::days(1);
}
} else {
// delete files from s3
// first fetch file list from local cache
let files = file_list::get_file_list(
org_id,
stream_name,
Some(stream_type),
time_range.0,
time_range.1,
)
.await?;
let storage = &storage::DEFAULT;
match storage
.del(&files.iter().map(|v| v.as_str()).collect::<Vec<_>>())
.await
{
Ok(_) => {}
Err(e) => {
log::error!("[COMPACT] delete file failed: {}", e);
}
}
// at the end, fetch a file list from s3 to guatantte there is no file
while date_start <= date_end {
let prefix = format!(
"files/{}/{}/{}/{}/",
org_id,
stream_type,
stream_name,
date_start.format("%Y/%m/%d")
);
loop {
let files = storage.list(&prefix).await?;
if files.is_empty() {
break;
}
match storage
.del(&files.iter().map(|v| v.as_str()).collect::<Vec<_>>())
.await
{
Ok(_) => {}
Err(e) => {
log::error!("[COMPACT] delete file failed: {}", e);
}
}
tokio::task::yield_now().await; // yield to other tasks
}
date_start += Duration::days(1);
}
}
// update metadata
cache::stats::reset_stream_stats(
org_id,
stream_name,
stream_type,
FileMeta {
min_ts: time_range.1,
..Default::default()
},
)?;
// delete from file list
delete_from_file_list(org_id, stream_name, stream_type, time_range).await?;
// mark delete done
db::compact::delete::delete_stream_done(org_id, stream_name, stream_type, Some(date_range))
.await
}
async fn delete_from_file_list(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
time_range: (i64, i64),
) -> Result<(), anyhow::Error> {
let files = file_list::get_file_list(
org_id,
stream_name,
Some(stream_type),
time_range.0,
time_range.1,
)
.await?;
if files.is_empty() {
return Ok(());
}
let mut file_list_days: HashSet<String> = HashSet::new();
let mut hours_files: HashMap<String, Vec<FileKey>> = HashMap::with_capacity(24);
for file in files {
let columns: Vec<_> = file.split('/').collect();
let day_key = format!("{}-{}-{}", columns[4], columns[5], columns[6]);
file_list_days.insert(day_key);
let hour_key = format!(
"{}/{}/{}/{}",
columns[4], columns[5], columns[6], columns[7]
);
let entry = hours_files.entry(hour_key).or_default();
entry.push(FileKey {
key: file,
meta: FileMeta::default(),
deleted: true,
});
}
let storage = &storage::DEFAULT;
for (key, items) in hours_files {
// upload the new file_list to storage
let new_file_list_key = format!("file_list/{}/{}.json.zst", key, ider::generate());
let mut buf = zstd::Encoder::new(Vec::new(), 3)?;
for file in items.iter() {
let mut write_buf = json::to_vec(&file)?;
write_buf.push(b'\n');
buf.write_all(&write_buf)?;
}
let compressed_bytes = buf.finish().unwrap();
storage
.put(&new_file_list_key, compressed_bytes.into())
.await?;
// set to local cache & send broadcast
// retry 10 times
for _ in 0..9 {
// set to local cache
let mut cache_success = true;
for event in &items {
if let Err(e) = db::file_list::progress(&event.key, event.meta, event.deleted).await
{
cache_success = false;
log::error!(
"[COMPACT] delete_from_file_list set local cache failed, retrying: {}",
e
);
time::sleep(time::Duration::from_secs(1)).await;
break;
}
}
if !cache_success {
continue;
}
// send broadcast to other nodes
if let Err(e) = db::file_list::broadcast::send(&items).await {
log::error!(
"[COMPACT] delete_from_file_list send broadcast failed, retrying: {}",
e
);
time::sleep(time::Duration::from_secs(1)).await;
continue;
}
break;
}
}
// mark file list need to do merge again
for key in file_list_days {
db::compact::file_list::set_delete(&key).await?;
}
Ok(())
}

View File

@ -25,11 +25,17 @@ use crate::infra::storage;
use crate::meta::common::FileKey;
use crate::service::db;
pub async fn run(offset: i64) -> Result<(), anyhow::Error> {
run_merge(offset).await?;
run_delete().await?;
Ok(())
}
/// check all streams done compact in this hour
/// merge all small file list keys in this hour to a single file and upload to storage
/// delete all small file list keys in this hour from storage
/// node should load new file list from storage
pub async fn run(offset: i64) -> Result<(), anyhow::Error> {
pub async fn run_merge(offset: i64) -> Result<(), anyhow::Error> {
let mut offset = offset;
if offset == 0 {
// get earilest date from schema
@ -83,6 +89,28 @@ pub async fn run(offset: i64) -> Result<(), anyhow::Error> {
db::compact::file_list::set_offset(offset).await
}
pub async fn run_delete() -> Result<(), anyhow::Error> {
let days = db::compact::file_list::list_delete().await?;
if days.is_empty() {
return Ok(()); // no delete
}
for day in days {
let mut t =
Utc.datetime_from_str(format!("{}T00:00:00Z", day).as_str(), "%Y-%m-%dT%H:%M:%SZ")?;
for _hour in 0..24 {
let offset = t.timestamp_micros();
merge_file_list(offset).await?;
t += Duration::hours(1);
}
// delete day
db::compact::file_list::del_delete(&day).await?;
}
Ok(())
}
/// merge and delete the small file list keys in this hour from etcd
/// upload new file list into storage
async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> {
@ -101,6 +129,7 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> {
let offset = Utc.timestamp_nanos(offset * 1000);
let offset_prefix = offset.format("/%Y/%m/%d/%H/").to_string();
let key = format!("file_list{}", offset_prefix);
// println!("merge_file_list: key: {}", key);
let storage = &storage::DEFAULT;
let file_list = storage.list(&key).await?;
if file_list.len() <= 1 {
@ -144,6 +173,7 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> {
let id = ider::generate();
let file_name = format!("file_list{}{}.json.zst", offset_prefix, id);
let mut buf = zstd::Encoder::new(Vec::new(), 3)?;
let mut has_content = false;
for (_, item) in filter_file_keys.iter() {
if item.deleted {
continue;
@ -151,20 +181,29 @@ async fn merge_file_list(offset: i64) -> Result<(), anyhow::Error> {
let val = json::to_vec(&item)?;
buf.write_all(val.as_slice())?;
buf.write_all(b"\n")?;
has_content = true;
}
let compressed_bytes = buf.finish().unwrap();
match storage.put(&file_name, compressed_bytes.into()).await {
Ok(_) => {
log::info!("[COMPACT] merge file list success, new file: {}", file_name);
// delete all small file list keys in this hour from storage
for file in file_list {
storage.del(&file).await?;
let new_file_ok = if has_content {
match storage.put(&file_name, compressed_bytes.into()).await {
Ok(_) => {
log::info!("[COMPACT] merge file list success, new file: {}", file_name);
true
}
Err(err) => {
log::error!("[COMPACT] upload file list failed: {}", err);
false
}
}
Err(err) => {
log::error!("[COMPACT] upload file list failed: {}", err);
}
} else {
true
};
if new_file_ok {
// delete all small file list keys in this hour from storage
storage
.del(&file_list.iter().map(|v| v.as_str()).collect::<Vec<_>>())
.await?;
}
if locker.is_some() {

View File

@ -0,0 +1,34 @@
use chrono::{DateTime, TimeZone, Utc};
use crate::infra::cache;
use crate::meta::StreamType;
use crate::service::db;
pub async fn delete_by_stream(
lifecycle_end: &str,
org_id: &str,
stream_name: &str,
stream_type: StreamType,
) -> Result<(), anyhow::Error> {
// get schema
let stats = cache::stats::get_stream_stats(org_id, stream_name, stream_type);
let created_at = stats.doc_time_min;
if created_at == 0 {
return Ok(()); // no data, just skip
}
let created_at: DateTime<Utc> = Utc.timestamp_nanos(created_at * 1000);
let lifecycle_start = created_at.format("%Y-%m-%d").to_string();
let lifecycle_start = lifecycle_start.as_str();
if lifecycle_start.ge(lifecycle_end) {
return Ok(()); // created_at is after lifecycle_end, just skip
}
// delete files
db::compact::delete::delete_stream(
org_id,
stream_name,
stream_type,
Some((lifecycle_start, lifecycle_end)),
)
.await
}

View File

@ -262,7 +262,7 @@ pub async fn merge_by_stream(
if !db_success {
merge_success = false;
// delete the file just upload to storage
match storage.del(&new_file_name).await {
match storage.del(&[&new_file_name]).await {
Ok(_) => {}
Err(e) => {
log::error!("[COMPACT] delete file failed: {}", e);
@ -272,13 +272,13 @@ pub async fn merge_by_stream(
}
// delete small files from storage
for file in &new_file_list {
tokio::task::yield_now().await; // yield to other tasks
match storage.del(file).await {
Ok(_) => {}
Err(e) => {
log::error!("[COMPACT] delete file failed: {}", e);
}
match storage
.del(&new_file_list.iter().map(|v| v.as_str()).collect::<Vec<_>>())
.await
{
Ok(_) => {}
Err(e) => {
log::error!("[COMPACT] delete file failed: {}", e);
}
}
// delete files from file list

View File

@ -13,13 +13,93 @@
// limitations under the License.
use crate::infra::cache;
use crate::infra::config::CONFIG;
use crate::meta::StreamType;
use crate::service::db;
mod delete;
mod file_list;
mod lifecycle;
mod merge;
/// compactor run steps:
/// compactor delete run steps:
pub async fn run_delete() -> Result<(), anyhow::Error> {
// check data lifecyle date
if CONFIG.limit.data_lifecycle > 0 {
let now = chrono::Utc::now();
let date = now - chrono::Duration::days(CONFIG.limit.data_lifecycle);
let data_lifecycle_end = date.format("%Y-%m-%d").to_string();
let orgs = cache::file_list::get_all_organization()?;
let stream_types = [
StreamType::Logs,
StreamType::Metrics,
StreamType::Traces,
StreamType::Metadata,
];
for org_id in orgs {
for stream_type in stream_types {
let streams = cache::file_list::get_all_stream(&org_id, stream_type)?;
for stream_name in streams {
if let Err(e) = lifecycle::delete_by_stream(
&data_lifecycle_end,
&org_id,
&stream_name,
stream_type,
)
.await
{
log::error!(
"[COMPACTOR] lifecycle: delete_by_stream [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
}
}
}
}
}
// delete files
let jobs = db::compact::delete::list().await?;
for job in jobs {
let columns = job.split('/').collect::<Vec<&str>>();
let org_id = columns[0].to_string();
let stream_type = StreamType::from(columns[1]);
let stream_name = columns[2].to_string();
let retention = columns[3].to_string();
tokio::task::yield_now().await; // yield to other tasks
let ret = if retention.eq("all") {
delete::delete_all(&org_id, &stream_name, stream_type).await
} else {
let date_range = retention.split(',').collect::<Vec<&str>>();
delete::delete_by_date(
&org_id,
&stream_name,
stream_type,
(date_range[0], date_range[1]),
)
.await
};
if let Err(e) = ret {
log::error!(
"[COMPACTOR] delete: delete [{}/{}/{}] error: {}",
org_id,
stream_type,
stream_name,
e
);
}
}
Ok(())
}
/// compactor merge run steps:
/// 1. get all organization
/// 2. range streams by organization & stream_type
/// 3. get a cluster lock for compactor stream
@ -32,7 +112,7 @@ mod merge;
/// 10. update last compacted offset
/// 11. release cluster lock
/// 12. compact file list from storage
pub async fn run() -> Result<(), anyhow::Error> {
pub async fn run_merge() -> Result<(), anyhow::Error> {
// get last file_list compact offset
let last_file_list_offset = db::compact::file_list::get_offset().await?;
@ -47,6 +127,17 @@ pub async fn run() -> Result<(), anyhow::Error> {
for stream_type in stream_types {
let streams = cache::file_list::get_all_stream(&org_id, stream_type)?;
for stream_name in streams {
// check if we are allowed to merge or just skip
if db::compact::delete::is_deleting_stream(&org_id, &stream_name, stream_type, None)
{
log::info!(
"[COMPACTOR] the stream [{}/{}/{}] is deleting, just skip",
&org_id,
stream_type,
&stream_name,
);
}
tokio::task::yield_now().await; // yield to other tasks
if let Err(e) = merge::merge_by_stream(
last_file_list_offset,
@ -96,7 +187,7 @@ mod tests {
false,
)
.unwrap();
let resp = run().await;
let resp = run_merge().await;
assert!(resp.is_ok());
}
}

View File

@ -12,5 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod delete;
pub mod file_list;
pub mod files;

View File

@ -0,0 +1,136 @@
use dashmap::DashSet;
use std::sync::Arc;
use crate::{infra::db::Event, meta::StreamType};
lazy_static! {
static ref CACHE: DashSet<String> = DashSet::new();
}
// delete data from stream
// if date_range is empty, delete all data
// date_range is a tuple of (start, end), eg: (20230102, 20230103)
#[inline]
pub async fn delete_stream(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
date_range: Option<(&str, &str)>,
) -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = match date_range {
Some((start, end)) => format!(
"{}/{}/{}/{},{}",
org_id, stream_type, stream_name, start, end
),
None => format!("{}/{}/{}/all", org_id, stream_type, stream_name),
};
// write in cache
if CACHE.contains(&key) {
return Ok(()); // already in cache, just skip
}
CACHE.insert(key.to_string());
let db_key = format!("/compact/delete/{}", key);
db.put(&db_key, "OK".into()).await?;
Ok(())
}
// check if stream is deleting from cache
#[inline]
pub fn is_deleting_stream(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
date_range: Option<(&str, &str)>,
) -> bool {
let key = match date_range {
Some((start, end)) => format!(
"{}/{}/{}/{},{}",
org_id, stream_type, stream_name, start, end
),
None => format!("{}/{}/{}/all", org_id, stream_type, stream_name),
};
CACHE.contains(&key)
}
#[inline]
pub async fn delete_stream_done(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
date_range: Option<(&str, &str)>,
) -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = match date_range {
Some((start, end)) => format!(
"{}/{}/{}/{},{}",
org_id, stream_type, stream_name, start, end
),
None => format!("{}/{}/{}/all", org_id, stream_type, stream_name),
};
let db_key = format!("/compact/delete/{}", key);
if let Err(e) = db.delete(&db_key, false).await {
if !e.to_string().contains("not exists") {
return Err(anyhow::anyhow!(e));
}
}
// remove in cache
CACHE.remove(&key);
Ok(())
}
pub async fn list() -> Result<Vec<String>, anyhow::Error> {
let mut items = Vec::new();
let db = &crate::infra::db::DEFAULT;
let key = "/compact/delete/";
let ret = db.list(key).await?;
for (item_key, _) in ret {
let item_key = item_key.strip_prefix(key).unwrap();
items.push(item_key.to_string());
}
Ok(items)
}
pub async fn watch() -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = "/compact/delete/";
let mut events = db.watch(key).await?;
let events = Arc::get_mut(&mut events).unwrap();
log::info!("[TRACE] Start watching stream deleting");
loop {
let ev = match events.recv().await {
Some(ev) => ev,
None => {
log::error!("watch_stream_deleting: event channel closed");
break;
}
};
match ev {
Event::Put(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
CACHE.insert(item_key.to_string());
}
Event::Delete(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
CACHE.remove(item_key);
}
}
}
Ok(())
}
pub async fn cache() -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = "/compact/delete/";
let ret = db.list(key).await?;
for (item_key, _) in ret {
let item_key = item_key.strip_prefix(key).unwrap();
CACHE.insert(item_key.to_string());
}
Ok(())
}

View File

@ -30,6 +30,36 @@ pub async fn set_offset(offset: i64) -> Result<(), anyhow::Error> {
Ok(())
}
pub async fn set_delete(key: &str) -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = format!("/compact/file_list/delete/{}", key);
db.put(&key, "OK".into()).await?;
Ok(())
}
pub async fn del_delete(key: &str) -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = format!("/compact/file_list/delete/{}", key);
if let Err(e) = db.delete(&key, false).await {
if !e.to_string().contains("not exists") {
return Err(anyhow::anyhow!(e));
}
}
Ok(())
}
pub async fn list_delete() -> Result<Vec<String>, anyhow::Error> {
let mut items = Vec::new();
let db = &crate::infra::db::DEFAULT;
let key = "/compact/file_list/delete/";
let ret = db.list(key).await?;
for (item_key, _item_value) in ret {
let item_key = item_key.strip_prefix(key).unwrap();
items.push(item_key.to_string());
}
Ok(items)
}
#[cfg(test)]
mod tests {
@ -42,5 +72,10 @@ mod tests {
let _ = set_offset(off_set).await;
let resp = get_offset().await;
assert_eq!(resp.unwrap(), off_set);
let delete_day = "2023-03-03";
let _ = set_delete(delete_day).await;
let deletes = list_delete().await.unwrap();
assert_eq!([delete_day.to_string()].to_vec(), deletes);
}
}

View File

@ -41,6 +41,21 @@ pub async fn set_offset(
Ok(())
}
pub async fn del_offset(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
) -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = format!("/compact/files/{}/{}/{}", org_id, stream_type, stream_name);
if let Err(e) = db.delete(&key, false).await {
if !e.to_string().contains("not exists") {
return Err(anyhow::anyhow!(e));
}
}
Ok(())
}
pub async fn list_offset() -> Result<Vec<(String, i64)>, anyhow::Error> {
let mut items = Vec::new();
let db = &crate::infra::db::DEFAULT;

View File

@ -17,11 +17,9 @@ use std::sync::Arc;
use tracing::info_span;
use crate::common::json;
use crate::infra::cache::stats;
use crate::infra::config::METRIC_CLUSTER_LEADER;
use crate::infra::db::Event;
use crate::meta::prom::ClusterLeader;
use crate::meta::stream::StreamStats;
pub mod alerts;
pub mod compact;
@ -33,25 +31,6 @@ pub mod triggers;
pub mod udf;
pub mod user;
pub async fn get_stream_stats(
org_id: &str,
stream_name: &str,
stream_type: &str,
) -> Result<StreamStats, anyhow::Error> {
match stats::get_stream_stats(org_id, stream_name, stream_type) {
Some(stats) => Ok(stats),
None => {
let key = format!("/stats/{}/{}/{}", org_id, stream_type, stream_name);
let db = &crate::infra::db::DEFAULT;
let value = match db.get(&key).await {
Ok(val) => serde_json::from_slice(&val).unwrap(),
Err(_) => StreamStats::default(),
};
Ok(value)
}
}
}
pub async fn set_prom_cluster_info(
cluster: &str,
members: Vec<String>,

View File

@ -18,6 +18,7 @@ use serde_json::Value;
use std::sync::Arc;
use crate::common::json;
use crate::infra::cache;
use crate::infra::config::STREAM_SCHEMAS;
use crate::infra::db::Event;
use crate::meta::stream::StreamSchema;
@ -60,7 +61,6 @@ pub async fn get(
Ok(value)
}
#[tracing::instrument(name = "db:schema:get_versions")]
pub async fn get_versions(
org_id: &str,
stream_name: &str,
@ -98,8 +98,8 @@ pub async fn set(
min_ts: Option<i64>,
) -> Result<(), anyhow::Error> {
let db = &crate::infra::db::DEFAULT;
let key = format!("/schema/{}/{}/{}", org_id, stream_type, stream_name);
let mut versions: Vec<Schema>;
let key = format!("/schema/{}/{}/{}", org_id, stream_type, stream_name);
let map_key = key.strip_prefix("/schema/").unwrap();
if STREAM_SCHEMAS.contains_key(map_key) {
versions = STREAM_SCHEMAS.get(map_key).unwrap().value().clone();
@ -130,9 +130,12 @@ pub async fn set(
metadata.get("created_at").unwrap().clone(),
);
} else {
let curr_ts = Utc::now().timestamp_micros().to_string();
metadata.insert("start_dt".to_string(), curr_ts.clone());
metadata.insert("created_at".to_string(), curr_ts);
let min_ts = match min_ts {
Some(v) => v,
None => Utc::now().timestamp_micros(),
};
metadata.insert("start_dt".to_string(), min_ts.to_string());
metadata.insert("created_at".to_string(), min_ts.to_string());
}
let _ = db
.put(
@ -146,6 +149,23 @@ pub async fn set(
Ok(())
}
pub async fn delete(
org_id: &str,
stream_name: &str,
stream_type: Option<StreamType>,
) -> Result<(), anyhow::Error> {
let stream_type = match stream_type {
Some(v) => v,
None => StreamType::Logs,
};
let key = format!("/schema/{}/{}/{}", org_id, stream_type, stream_name);
let db = &crate::infra::db::DEFAULT;
match db.delete(&key, false).await {
Ok(_) => Ok(()),
Err(e) => Err(anyhow::anyhow!(e)),
}
}
#[tracing::instrument(name = "db:schema:list")]
pub async fn list(
org_id: &str,
@ -252,7 +272,17 @@ pub async fn watch() -> Result<(), anyhow::Error> {
}
Event::Delete(ev) => {
let item_key = ev.key.strip_prefix(key).unwrap();
let columns = item_key.split('/').collect::<Vec<&str>>();
let org_id = columns[0];
let stream_type = StreamType::from(columns[1]);
let stream_name = columns[2];
STREAM_SCHEMAS.remove(item_key);
cache::stats::remove_stream_stats(org_id, stream_name, stream_type);
if let Err(e) =
super::compact::files::del_offset(org_id, stream_name, stream_type).await
{
log::error!("del_offset: {}", e);
}
}
}
}

View File

@ -36,6 +36,7 @@ use crate::meta::ingestion::{
IngestionResponse, RecordStatus, StreamData, StreamSchemaChk, StreamStatus,
};
use crate::meta::StreamType;
use crate::service::db;
use crate::service::schema::stream_schema_exists;
use crate::{common::time::parse_timestamp_micro_from_value, meta::alert::Trigger};
@ -55,7 +56,9 @@ pub async fn ingest(
)),
);
}
let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros();
let mut min_ts =
(Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros();
#[cfg(feature = "zo_functions")]
let lua = Lua::new();
#[cfg(feature = "zo_functions")]
@ -92,6 +95,17 @@ pub async fn ingest(
next_line_is_data = true;
stream_name = super::get_stream_name(&value);
// check if we are allowed to ingest
if db::compact::delete::is_deleting_stream(org_id, &stream_name, StreamType::Logs, None)
{
return Ok(
HttpResponse::InternalServerError().json(MetaHttpResponse::error(
http::StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("stream [{}] is being deleted", stream_name)),
)),
);
}
// Start Register Transfoms for stream
#[cfg(feature = "zo_functions")]
let key = format!("{}/{}/{}", &org_id, StreamType::Logs, &stream_name);
@ -197,7 +211,7 @@ pub async fn ingest(
None => Utc::now().timestamp_micros(),
};
// check ingestion time
let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.allowed_upto);
let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.ingest_allowed_upto);
if timestamp < earlest_time.timestamp_micros() {
status.failed += 1; // to old data, just discard
status.error = super::get_upto_discard_error();

View File

@ -37,6 +37,7 @@ use crate::meta::functions::Transform;
use crate::meta::http::HttpResponse as MetaHttpResponse;
use crate::meta::ingestion::{IngestionResponse, RecordStatus, StreamStatus};
use crate::meta::StreamType;
use crate::service::db;
use crate::service::schema::stream_schema_exists;
pub async fn ingest(
@ -57,7 +58,18 @@ pub async fn ingest(
);
}
let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros();
// check if we are allowed to ingest
if db::compact::delete::is_deleting_stream(org_id, stream_name, StreamType::Logs, None) {
return Ok(
HttpResponse::InternalServerError().json(MetaHttpResponse::error(
http::StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("stream [{}] is being deleted", stream_name)),
)),
);
}
let mut min_ts =
(Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros();
#[cfg(feature = "zo_functions")]
let lua = Lua::new();
#[cfg(feature = "zo_functions")]
@ -159,7 +171,7 @@ pub async fn ingest(
None => Utc::now().timestamp_micros(),
};
// check ingestion time
let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.allowed_upto);
let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.ingest_allowed_upto);
if timestamp < earlest_time.timestamp_micros() {
stream_status.status.failed += 1; // to old data, just discard
stream_status.status.error = super::get_upto_discard_error();

View File

@ -41,7 +41,7 @@ pub mod multi;
pub(crate) fn get_upto_discard_error() -> String {
format!(
"too old data, by default only last {} hours data can be ingested. Data dscarded.",
CONFIG.limit.allowed_upto
CONFIG.limit.ingest_allowed_upto
)
}

View File

@ -36,6 +36,7 @@ use crate::meta::functions::Transform;
use crate::meta::http::HttpResponse as MetaHttpResponse;
use crate::meta::ingestion::{IngestionResponse, RecordStatus, StreamStatus};
use crate::meta::StreamType;
use crate::service::db;
use crate::service::logs::StreamMeta;
use crate::service::schema::stream_schema_exists;
@ -57,7 +58,18 @@ pub async fn ingest(
);
}
let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros();
// check if we are allowed to ingest
if db::compact::delete::is_deleting_stream(org_id, stream_name, StreamType::Logs, None) {
return Ok(
HttpResponse::InternalServerError().json(MetaHttpResponse::error(
http::StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("stream [{}] is being deleted", stream_name)),
)),
);
}
let mut min_ts =
(Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros();
#[cfg(feature = "zo_functions")]
let lua = Lua::new();
#[cfg(feature = "zo_functions")]
@ -162,7 +174,7 @@ pub async fn ingest(
None => Utc::now().timestamp_micros(),
};
// check ingestion time
let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.allowed_upto);
let earlest_time = Utc::now() + Duration::hours(0 - CONFIG.limit.ingest_allowed_upto);
if timestamp < earlest_time.timestamp_micros() {
stream_status.status.failed += 1; // to old data, just discard
stream_status.status.error = super::get_upto_discard_error();

View File

@ -71,7 +71,8 @@ pub async fn prometheus_write_proto(
);
}
let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros();
let mut min_ts =
(Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros();
let dedup_enabled = CONFIG.common.metrics_dedup_enabled;
let election_interval = CONFIG.limit.metrics_leader_election_interval * 1000000;
let mut last_received: i64 = 0;
@ -204,6 +205,22 @@ pub async fn prometheus_write_proto(
if entry.is_empty() {
continue;
}
// check if we are allowed to ingest
if db::compact::delete::is_deleting_stream(
org_id,
&metric_name,
StreamType::Metrics,
None,
) {
return Ok(HttpResponse::InternalServerError().json(
meta::http::HttpResponse::error(
http::StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("stream [{}] is being deleted", metric_name)),
),
));
}
write_buf.clear();
for row in &entry {
write_buf.put(row.as_bytes());

View File

@ -66,11 +66,9 @@ pub async fn search(
let _guard3 = span3.enter();
// fetch all schema versions, get latest schema
let schema_versions =
db::schema::get_versions(&sql.org_id, &sql.stream_name, Some(stream_type)).await?;
let schema_latest = schema_versions.last().unwrap();
let schema = db::schema::get(&sql.org_id, &sql.stream_name, Some(stream_type)).await?;
let schema = Arc::new(
schema_latest
schema
.to_owned()
.with_metadata(std::collections::HashMap::new()),
);

View File

@ -94,6 +94,9 @@ pub async fn search(
// fetch all schema versions, group files by version
let schema_versions =
db::schema::get_versions(&sql.org_id, &sql.stream_name, Some(stream_type)).await?;
if schema_versions.is_empty() {
return Err(anyhow::anyhow!("stream not found"));
}
let schema_latest = schema_versions.last().unwrap();
let schema_latest_id = schema_versions.len() - 1;
let mut files_group: HashMap<usize, Vec<String>> =

View File

@ -21,6 +21,8 @@ use tracing::info_span;
use crate::common::json;
use crate::common::utils::is_local_disk_storage;
use crate::infra::cache::stats;
use crate::infra::config::STREAM_SCHEMAS;
use crate::meta::http::HttpResponse as MetaHttpResponse;
use crate::meta::stream::{ListStream, Stream, StreamProperty, StreamSettings, StreamStats};
use crate::meta::StreamType;
@ -40,9 +42,7 @@ pub async fn get_stream(
let schema = db::schema::get(org_id, stream_name, Some(stream_type))
.await
.unwrap();
let mut stats = db::get_stream_stats(org_id, stream_name, &stream_type.to_string())
.await
.unwrap();
let mut stats = stats::get_stream_stats(org_id, stream_name, stream_type);
stats = transform_stats(&mut stats);
if schema != Schema::empty() {
let stream = stream_res(stream_name, stream_type, schema, Some(stats));
@ -76,13 +76,11 @@ pub async fn get_streams(
.unwrap();
let mut indices_res = Vec::new();
for stream_loc in indices {
let mut stats = db::get_stream_stats(
let mut stats = stats::get_stream_stats(
org_id,
stream_loc.stream_name.as_str(),
&stream_loc.stream_type.to_string(),
)
.await
.unwrap();
stream_loc.stream_type,
);
if stats.eq(&StreamStats::default()) {
indices_res.push(stream_res(
stream_loc.stream_name.as_str(),
@ -171,6 +169,17 @@ pub async fn save_stream_settings(
) -> Result<HttpResponse, Error> {
let loc_span = info_span!("service:streams:set_partition_keys");
let _guard = loc_span.enter();
// check if we are allowed to ingest
if db::compact::delete::is_deleting_stream(org_id, stream_name, stream_type, None) {
return Ok(
HttpResponse::InternalServerError().json(MetaHttpResponse::error(
http::StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("stream [{}] is being deleted", stream_name)),
)),
);
}
let schema = db::schema::get(org_id, stream_name, Some(stream_type))
.await
.unwrap();
@ -195,6 +204,67 @@ pub async fn save_stream_settings(
)))
}
pub async fn delete_stream(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
) -> Result<HttpResponse, Error> {
let loc_span = info_span!("service:streams:delete_stream");
let _guard = loc_span.enter();
let schema = db::schema::get_versions(org_id, stream_name, Some(stream_type))
.await
.unwrap();
if schema.is_empty() {
return Ok(HttpResponse::NotFound().json(MetaHttpResponse::error(
StatusCode::NOT_FOUND.into(),
Some("stream not found".to_owned()),
)));
}
// create delete for compactor
if let Err(e) = db::compact::delete::delete_stream(org_id, stream_name, stream_type, None).await
{
return Ok(
HttpResponse::InternalServerError().json(MetaHttpResponse::error(
StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("failed to delete stream: {}", e)),
)),
);
}
// delete stream schema
if let Err(e) = db::schema::delete(org_id, stream_name, Some(stream_type)).await {
return Ok(
HttpResponse::InternalServerError().json(MetaHttpResponse::error(
StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("failed to delete stream: {}", e)),
)),
);
}
// delete stream schema cache
let key = format!("{}/{}/{}", org_id, stream_type, stream_name);
STREAM_SCHEMAS.remove(&key);
// delete stream stats cache
stats::remove_stream_stats(org_id, stream_name, stream_type);
// delete stream compaction offset
if let Err(e) = db::compact::files::del_offset(org_id, stream_name, stream_type).await {
return Ok(
HttpResponse::InternalServerError().json(MetaHttpResponse::error(
StatusCode::INTERNAL_SERVER_ERROR.into(),
Some(format!("failed to delete stream: {}", e)),
)),
);
};
Ok(HttpResponse::Ok().json(MetaHttpResponse::message(
StatusCode::OK.into(),
"stream deleted".to_owned(),
)))
}
pub fn get_stream_setting_fts_fields(schema: &Schema) -> Result<Vec<String>, anyhow::Error> {
let mut full_text_search_keys = vec![];
let settings = schema.metadata.get("settings");

View File

@ -74,7 +74,8 @@ pub async fn handle_trace_request(
let mut data_buf: AHashMap<String, Vec<String>> = AHashMap::new();
let mut traces_schema_map: AHashMap<String, Schema> = AHashMap::new();
let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros();
let mut min_ts =
(Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros();
let mut service_name: String = traces_stream_name.to_string();
let res_spans = request.resource_spans;
for res_span in res_spans {

View File

@ -76,7 +76,8 @@ pub async fn traces_json(
let mut trace_meta_coll: AHashMap<String, Vec<serde_json::Map<String, Value>>> =
AHashMap::new();
let mut min_ts = (Utc::now() + Duration::hours(CONFIG.limit.allowed_upto)).timestamp_micros();
let mut min_ts =
(Utc::now() + Duration::hours(CONFIG.limit.ingest_allowed_upto)).timestamp_micros();
let mut data_buf: AHashMap<String, Vec<String>> = AHashMap::new();
let mut traces_schema_map: AHashMap<String, Schema> = AHashMap::new();
let mut service_name: String = traces_stream_name.to_string();

View File

@ -704,7 +704,7 @@ mod tests {
.set_payload(body_str)
.to_request();
let resp = test::call_service(&app, req).await;
//println!("{:?}", resp);
// println!("{:?}", resp);
assert!(resp.status().is_success());
}
@ -783,7 +783,7 @@ mod tests {
.append_header(auth)
.to_request();
let resp = test::call_service(&app, req).await;
//println!("{:?}", resp);
// println!("{:?}", resp);
assert!(resp.status().is_success());
}
async fn e2e_cache_status() {
@ -802,7 +802,7 @@ mod tests {
.append_header(auth)
.to_request();
let resp = test::call_service(&app, req).await;
//println!("{:?}", resp);
// println!("{:?}", resp);
assert!(resp.status().is_success());
}

View File

@ -270,7 +270,7 @@ import { useI18n } from "vue-i18n";
import { useStore } from "vuex";
import { useQuasar } from "quasar";
import IndexService from "../../services/index";
import streamService from "../../services/stream";
import { Parser } from "node-sql-parser";
import segment from "../../services/segment_analytics";
@ -479,7 +479,7 @@ export default defineComponent({
this.formData = this.modelValue;
}
IndexService.nameList(
streamService.nameList(
this.store.state.selectedOrganization.identifier,
"",
true

View File

@ -145,7 +145,7 @@ import { useI18n } from "vue-i18n";
import { useStore } from "vuex";
import { useQuasar } from "quasar";
import IndexService from "../../services/index";
import streamService from "../../services/stream";
import { update } from "plotly.js";
import segment from "../../services/segment_analytics";
@ -296,7 +296,7 @@ end`;
this.formData = this.modelValue;
}
IndexService.nameList(
streamService.nameList(
this.store.state.selectedOrganization.identifier,
"",
false

View File

@ -139,7 +139,7 @@ import { defineComponent, ref } from "vue";
import { useI18n } from "vue-i18n";
import { useStore } from "vuex";
import { useQuasar, date, format } from "quasar";
import indexService from "../../services/index";
import streamService from "../../services/stream";
import segment from "../../services/segment_analytics";
const defaultValue: any = () => {
@ -173,7 +173,7 @@ export default defineComponent({
message: "Please wait while loading stats...",
});
await indexService
await streamService
.schema(
store.state.selectedOrganization.identifier,
indexData.value.name,
@ -256,7 +256,7 @@ export default defineComponent({
settings.partition_keys.concat(added_part_keys);
}
await indexService
await streamService
.updateSettings(
store.state.selectedOrganization.identifier,
indexData.value.name,

View File

@ -146,7 +146,7 @@ import useLogs from "../../composables/useLogs";
import { deepKeys, byString } from "../../utils/json";
import { Parser } from "node-sql-parser";
import indexService from "../../services/index";
import streamService from "../../services/stream";
import searchService from "../../services/search";
import TransformService from "../../services/jstransform";
import { useLocalLogsObj } from "../../utils/zincutils";
@ -293,7 +293,7 @@ export default defineComponent({
function getStreamList() {
try {
indexService
streamService
.nameList(store.state.selectedOrganization.identifier, "logs", true)
.then((res) => {
searchObj.data.streamResults = res.data;

View File

@ -10,11 +10,11 @@
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// limitations under the License.
import http from "./http";
var index = {
const stream = {
nameList: (org_identifier: string, type: string, schema: boolean) => {
let url = `/api/${org_identifier}/streams`;
@ -23,7 +23,10 @@ var index = {
}
if (schema) {
url += url.indexOf("?")>0 ? "&fetchSchema="+schema: "?fetchSchema="+schema;
url +=
url.indexOf("?") > 0
? "&fetchSchema=" + schema
: "?fetchSchema=" + schema;
}
return http().get(url);
},
@ -35,7 +38,12 @@ var index = {
}
return http().get(url);
},
updateSettings: (org_identifier: string, stream_name: string, type: string,data: any) => {
updateSettings: (
org_identifier: string,
stream_name: string,
type: string,
data: any
) => {
let url = `/api/${org_identifier}/${stream_name}/settings`;
if (type != "") {
@ -43,8 +51,9 @@ var index = {
}
return http().post(url, data);
},
delete: (org_identifier: string, stream_name: string) => {
return http().delete(`/api/${org_identifier}/${stream_name}`);
},
};
export default index;
export default stream;

View File

@ -37,6 +37,17 @@
</template>
<template #body-cell-actions="props">
<q-td :props="props">
<q-btn
icon="img:/src/assets/images/common/delete_icon.svg"
:title="t('logStream.delete')"
class="q-ml-xs iconHoverBtn"
padding="sm"
unelevated
size="sm"
round
flat
@click="confirmDeleteAction(props)"
/>
<q-btn
icon="img:/src/assets/images/common/list_icon.svg"
:title="t('logStream.schemaHeader')"
@ -47,7 +58,7 @@
round
flat
@click="listSchema(props)"
></q-btn>
/>
</q-td>
</template>
@ -121,6 +132,7 @@
no-caps
class="no-border"
color="primary"
@click="deleteStream"
>
{{ t("logStream.ok") }}
</q-btn>
@ -138,7 +150,7 @@ import { useQuasar, type QTableProps } from "quasar";
import { useI18n } from "vue-i18n";
import QTablePagination from "../components/shared/grid/Pagination.vue";
import indexService from "../services/index";
import streamService from "../services/stream";
import SchemaIndex from "../components/logstream/schema.vue";
import NoData from "../components/shared/grid/NoData.vue";
import segment from "../services/segment_analytics";
@ -151,7 +163,7 @@ export default defineComponent({
setup(props, { emit }) {
const store = useStore();
const { t } = useI18n();
const q = useQuasar();
const $q = useQuasar();
const router = useRouter();
const logStream = ref([]);
const showIndexSchemaDialog = ref(false);
@ -210,15 +222,16 @@ export default defineComponent({
align: "center",
},
]);
let deleteStreamName = "";
const getLogStream = () => {
if (store.state.selectedOrganization != null) {
const dismiss = q.notify({
const dismiss = $q.notify({
spinner: true,
message: "Please wait while loading streams...",
});
indexService
streamService
.nameList(store.state.selectedOrganization.identifier, "logs", false)
.then((res) => {
let counter = 1;
@ -257,7 +270,7 @@ export default defineComponent({
})
.catch((err) => {
dismiss();
q.notify({
$q.notify({
type: "negative",
message: "Error while pulling stream.",
timeout: 2000,
@ -312,8 +325,29 @@ export default defineComponent({
maxRecordToReturn.value = val;
};
const deleteIndex = (props: any) => {
const confirmDeleteAction = (props: any) => {
confirmDelete.value = true;
deleteStreamName = props.row.name;
};
const deleteStream = () => {
streamService
.delete(store.state.selectedOrganization.identifier, deleteStreamName)
.then((res: any) => {
if (res.data.code == 200) {
$q.notify({
color: "positive",
message: "Stream deleted successfully.",
});
getLogStream();
}
})
.catch((err: any) => {
$q.notify({
color: "negative",
message: "Error while deleting stream.",
});
});
};
onActivated(() => {
@ -339,7 +373,8 @@ export default defineComponent({
pagination,
resultTotal,
listSchema,
deleteIndex,
deleteStream,
confirmDeleteAction,
confirmDelete,
schemaData,
perPageOptions,