From d887b6eed216149a53a55ee161a3950999c4e5f0 Mon Sep 17 00:00:00 2001 From: Hengfei Yang Date: Sun, 4 Feb 2024 19:07:03 +0800 Subject: [PATCH] feat: impl hash partition (#2647) --- proto/cluster/metrics.proto | 6 ++ src/common/meta/stream.rs | 103 ++++++++++++++++++-- src/config/src/utils/hash/fnv.rs | 56 +++++++++++ src/config/src/utils/hash/mod.rs | 16 +++ src/config/src/utils/mod.rs | 1 + src/handler/grpc/request/metrics/querier.rs | 35 ++++++- src/service/compact/retention.rs | 2 + src/service/ingestion/mod.rs | 27 +++-- src/service/logs/mod.rs | 8 +- src/service/organization.rs | 2 +- src/service/promql/engine.rs | 6 +- src/service/promql/mod.rs | 2 +- src/service/promql/search/grpc/mod.rs | 2 +- src/service/promql/search/grpc/storage.rs | 21 +++- src/service/promql/search/grpc/wal.rs | 14 ++- src/service/schema.rs | 7 +- src/service/search/grpc/storage.rs | 23 ++++- src/service/search/grpc/wal.rs | 21 +++- src/service/search/mod.rs | 87 ++++++++++++----- src/service/search/sql.rs | 21 +++- src/service/stream.rs | 35 +++++-- src/service/traces/mod.rs | 4 +- src/service/traces/otlp_http.rs | 4 +- tests/integration_test.rs | 3 +- web/src/components/logstream/schema.vue | 21 ++-- 25 files changed, 438 insertions(+), 89 deletions(-) create mode 100644 src/config/src/utils/hash/fnv.rs create mode 100644 src/config/src/utils/hash/mod.rs diff --git a/proto/cluster/metrics.proto b/proto/cluster/metrics.proto index 377e18209..65974a48a 100644 --- a/proto/cluster/metrics.proto +++ b/proto/cluster/metrics.proto @@ -60,6 +60,12 @@ message MetricsWalFileRequest { string stream_name = 2; int64 start_time = 3; int64 end_time = 4; + repeated MetricsWalFileFilter filters = 5; +} + +message MetricsWalFileFilter { + string field = 1; + repeated string value = 2; } message MetricsWalFileResponse { diff --git a/src/common/meta/stream.rs b/src/common/meta/stream.rs index 3eaf1c513..88a9880db 100644 --- a/src/common/meta/stream.rs +++ b/src/common/meta/stream.rs @@ -13,12 +13,12 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, fmt::Display, sync::Arc}; use arrow_schema::Field; use config::{ meta::stream::{PartitionTimeLevel, StreamStats, StreamType}, - utils::json, + utils::{hash::fnv, json}, }; use datafusion::arrow::datatypes::Schema; use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer}; @@ -59,11 +59,11 @@ pub struct StreamSchema { pub schema: Schema, } -#[derive(Clone, Debug, Deserialize, ToSchema, Default)] +#[derive(Clone, Debug, Default, Deserialize, ToSchema)] pub struct StreamSettings { #[serde(skip_serializing_if = "Vec::is_empty")] #[serde(default)] - pub partition_keys: Vec, + pub partition_keys: Vec, #[serde(skip_serializing_if = "Option::None")] pub partition_time_level: Option, #[serde(skip_serializing_if = "Vec::is_empty")] @@ -83,7 +83,7 @@ impl Serialize for StreamSettings { let mut state = serializer.serialize_struct("stream_settings", 4)?; let mut part_keys = HashMap::new(); for (index, key) in self.partition_keys.iter().enumerate() { - part_keys.insert(format!("L{index}"), key.to_string()); + part_keys.insert(format!("L{index}"), key); } state.serialize_field("partition_keys", &part_keys)?; state.serialize_field( @@ -105,8 +105,18 @@ impl From<&str> for StreamSettings { if let Some(value) = settings.get("partition_keys") { let mut v: Vec<_> = value.as_object().unwrap().into_iter().collect(); v.sort_by(|a, b| a.0.cmp(b.0)); - for (_, value) in v { - partition_keys.push(value.as_str().unwrap().to_string()); + for (_, value) in v.iter() { + match value { + json::Value::String(v) => { + partition_keys.push(StreamPartition::new(v)); + } + json::Value::Object(v) => { + let val: StreamPartition = + json::from_value(json::Value::Object(v.to_owned())).unwrap(); + partition_keys.push(val); + } + _ => {} + } } } @@ -148,6 +158,65 @@ impl From<&str> for StreamSettings { } } +#[derive(Clone, Debug, Default, Hash, PartialEq, Serialize, Deserialize, ToSchema)] +pub struct StreamPartition { + pub field: String, + #[serde(default)] + pub types: StreamPartitionType, + #[serde(default)] + pub disabled: bool, +} + +impl StreamPartition { + pub fn new(field: &str) -> Self { + Self { + field: field.to_string(), + types: StreamPartitionType::Value, + disabled: false, + } + } + + pub fn new_hash(field: &str, buckets: u64) -> Self { + Self { + field: field.to_string(), + types: StreamPartitionType::Hash(std::cmp::max(16, buckets)), + disabled: false, + } + } + + pub fn get_partition_key(&self, value: &str) -> String { + format!("{}={}", self.field, self.get_partition_value(value)) + } + + pub fn get_partition_value(&self, value: &str) -> String { + match &self.types { + StreamPartitionType::Value => value.to_string(), + StreamPartitionType::Hash(n) => { + let h = fnv::new().sum64(value); + let bucket = h % n; + bucket.to_string() + } + } + } +} + +#[derive(Clone, Debug, Default, Hash, PartialEq, Serialize, Deserialize, ToSchema)] +#[serde(rename_all = "lowercase")] +pub enum StreamPartitionType { + #[default] + Value, // each value is a partition + Hash(u64), // partition with fixed bucket size by hash +} + +impl Display for StreamPartitionType { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + StreamPartitionType::Value => write!(f, "value"), + StreamPartitionType::Hash(_) => write!(f, "hash"), + } + } +} + #[derive(Clone, Debug, Serialize, Deserialize, ToSchema)] pub struct ListStream { pub list: Vec, @@ -212,7 +281,7 @@ impl ScanStats { #[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)] #[serde(rename_all = "lowercase")] pub struct PartitioningDetails { - pub partition_keys: Vec, + pub partition_keys: Vec, pub partition_time_level: Option, } @@ -240,4 +309,22 @@ mod tests { assert_eq!(params.stream_name, "stream_name"); assert_eq!(params.stream_type, StreamType::Logs); } + + #[test] + fn test_hash_partition() { + let part = StreamPartition::new("field"); + assert_eq!( + json::to_string(&part).unwrap(), + r#"{"field":"field","types":"value","disabled":false}"# + ); + let part = StreamPartition::new_hash("field", 32); + assert_eq!( + json::to_string(&part).unwrap(), + r#"{"field":"field","types":{"hash":32},"disabled":false}"# + ); + assert_eq!(part.get_partition_key("hello"), "field=11"); + assert_eq!(part.get_partition_key("world"), "field=19"); + assert_eq!(part.get_partition_key("foo"), "field=23"); + assert_eq!(part.get_partition_key("bar"), "field=26"); + } } diff --git a/src/config/src/utils/hash/fnv.rs b/src/config/src/utils/hash/fnv.rs new file mode 100644 index 000000000..5a067e1d5 --- /dev/null +++ b/src/config/src/utils/hash/fnv.rs @@ -0,0 +1,56 @@ +// Copyright 2023 Zinc Labs Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +// offset64 FNVa offset basis. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash +const OFFSET64: u64 = 14695981039346656037; +// prime64 FNVa prime value. See https://en.wikipedia.org/wiki/Fowler–Noll–Vo_hash_function#FNV-1a_hash +const PRIME64: u64 = 1099511628211; + +/// refer: https://github.com/allegro/bigcache/blob/main/fnv.go +#[derive(Default)] +pub struct Fnv64a {} + +pub fn new() -> Fnv64a { + Fnv64a::new() +} + +impl Fnv64a { + pub fn new() -> Fnv64a { + Fnv64a {} + } + + pub fn sum64(&self, key: &str) -> u64 { + let mut hash: u64 = OFFSET64; + for c in key.chars() { + hash ^= c as u64; + hash = hash.wrapping_mul(PRIME64); + } + hash + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_fnv64a() { + let fnv = Fnv64a::new(); + assert_eq!(fnv.sum64("hello"), 11831194018420276491); + assert_eq!(fnv.sum64("world"), 5717881983045765875); + assert_eq!(fnv.sum64("foo"), 15902901984413996407); + assert_eq!(fnv.sum64("bar"), 16101355973854746); + } +} diff --git a/src/config/src/utils/hash/mod.rs b/src/config/src/utils/hash/mod.rs new file mode 100644 index 000000000..9ef62daa1 --- /dev/null +++ b/src/config/src/utils/hash/mod.rs @@ -0,0 +1,16 @@ +// Copyright 2023 Zinc Labs Inc. +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +pub mod fnv; diff --git a/src/config/src/utils/mod.rs b/src/config/src/utils/mod.rs index 75d3050af..240368d75 100644 --- a/src/config/src/utils/mod.rs +++ b/src/config/src/utils/mod.rs @@ -18,6 +18,7 @@ pub mod base64; pub mod cgroup; pub mod file; pub mod flatten; +pub mod hash; pub mod json; pub mod parquet; pub mod rand; diff --git a/src/handler/grpc/request/metrics/querier.rs b/src/handler/grpc/request/metrics/querier.rs index 07aa3d803..25fd144fc 100644 --- a/src/handler/grpc/request/metrics/querier.rs +++ b/src/handler/grpc/request/metrics/querier.rs @@ -16,7 +16,7 @@ use arrow::ipc::writer::StreamWriter; use config::{ ider, - meta::stream::StreamType, + meta::stream::{FileKey, FileMeta, StreamType}, metrics, utils::{ file::{get_file_contents, scan_files}, @@ -31,7 +31,7 @@ use tonic::{Request, Response, Status}; use tracing_opentelemetry::OpenTelemetrySpanExt; use crate::{ - common::infra::wal, + common::{infra::wal, meta::stream::StreamParams}, handler::grpc::{ cluster_rpc::{ metrics_server::Metrics, MetricsQueryRequest, MetricsQueryResponse, MetricsWalFile, @@ -39,7 +39,7 @@ use crate::{ }, request::MetadataMap, }, - service::promql::search as SearchService, + service::{promql::search as SearchService, search::match_source}, }; pub struct Querier; @@ -96,6 +96,12 @@ impl Metrics for Querier { let end_time = req.get_ref().end_time; let org_id = &req.get_ref().org_id; let stream_name = &req.get_ref().stream_name; + let filters = req + .get_ref() + .filters + .iter() + .map(|f| (f.field.as_str(), f.value.clone())) + .collect::>(); let mut resp = MetricsWalFileResponse::default(); // get memtable records @@ -196,6 +202,29 @@ impl Metrics for Querier { wal::release_files(&[file.clone()]).await; continue; } + // filter by partition keys + let file_key = FileKey::new( + file, + FileMeta { + min_ts: file_min_ts, + max_ts: file_max_ts, + ..Default::default() + }, + false, + ); + if !match_source( + StreamParams::new(org_id, stream_name, StreamType::Metrics), + Some((start_time, end_time)), + filters.as_slice(), + &file_key, + true, + false, + ) + .await + { + wal::release_files(&[file.clone()]).await; + continue; + } // check time range by parquet metadata let source_file = wal_dir.to_string() + "/" + file; let Ok(body) = get_file_contents(&source_file) else { diff --git a/src/service/compact/retention.rs b/src/service/compact/retention.rs index 37ed77460..14808b56c 100644 --- a/src/service/compact/retention.rs +++ b/src/service/compact/retention.rs @@ -479,6 +479,7 @@ mod tests { #[tokio::test] async fn test_delete_by_stream() { + infra_file_list::create_table().await.unwrap(); let org_id = "test"; let stream_name = "test"; let stream_type = config::meta::stream::StreamType::Logs; @@ -490,6 +491,7 @@ mod tests { #[tokio::test] async fn test_delete_all() { + infra_file_list::create_table().await.unwrap(); let org_id = "test"; let stream_name = "test"; let stream_type = config::meta::stream::StreamType::Logs; diff --git a/src/service/ingestion/mod.rs b/src/service/ingestion/mod.rs index 70be8726f..1b9284ec8 100644 --- a/src/service/ingestion/mod.rs +++ b/src/service/ingestion/mod.rs @@ -45,7 +45,7 @@ use crate::{ meta::{ alerts::Alert, functions::{StreamTransform, VRLResultResolver, VRLRuntimeConfig}, - stream::{PartitioningDetails, SchemaRecords}, + stream::{PartitioningDetails, SchemaRecords, StreamPartition}, }, utils::functions::get_vrl_compiler_config, }, @@ -197,7 +197,7 @@ pub async fn evaluate_trigger(trigger: Option) { pub fn get_wal_time_key( timestamp: i64, - partition_keys: &Vec, + partition_keys: &Vec, time_level: PartitionTimeLevel, local_val: &Map, suffix: Option<&str>, @@ -219,13 +219,13 @@ pub fn get_wal_time_key( time_key.push_str("/default"); } for key in partition_keys { - match local_val.get(key) { + if key.disabled { + continue; + } + match local_val.get(&key.field) { Some(v) => { - let val = if v.is_string() { - format!("{}={}", key, v.as_str().unwrap()) - } else { - format!("{}={}", key, v) - }; + let val = get_string_value(v); + let val = key.get_partition_key(&val); time_key.push_str(&format!("/{}", format_partition_key(&val))); } None => continue, @@ -459,6 +459,7 @@ mod tests { use std::collections::HashMap; use super::*; + use crate::common::meta::stream::StreamPartition; #[test] fn test_format_partition_key() { @@ -472,7 +473,10 @@ mod tests { assert_eq!( get_wal_time_key( 1620000000, - &vec!["country".to_string(), "sport".to_string()], + &vec![ + StreamPartition::new("country"), + StreamPartition::new("sport") + ], PartitionTimeLevel::Hourly, &local_val, None @@ -523,7 +527,10 @@ mod tests { let keys = get_stream_partition_keys("olympics", &stream_schema_map).await; assert_eq!( keys.partition_keys, - vec!["country".to_string(), "sport".to_string()] + vec![ + StreamPartition::new("country"), + StreamPartition::new("sport") + ] ); } diff --git a/src/service/logs/mod.rs b/src/service/logs/mod.rs index c8720b737..8339f1425 100644 --- a/src/service/logs/mod.rs +++ b/src/service/logs/mod.rs @@ -29,7 +29,11 @@ use datafusion::arrow::datatypes::Schema; use super::ingestion::{get_string_value, TriggerAlertData}; use crate::{ - common::meta::{alerts::Alert, ingestion::RecordStatus, stream::SchemaRecords}, + common::meta::{ + alerts::Alert, + ingestion::RecordStatus, + stream::{SchemaRecords, StreamPartition}, + }, service::{ ingestion::get_wal_time_key, schema::check_for_schema, stream::unwrap_partition_time_level, }, @@ -266,7 +270,7 @@ fn set_parsing_error(parse_error: &mut String, field: &Field) { struct StreamMeta<'a> { org_id: String, stream_name: String, - partition_keys: &'a Vec, + partition_keys: &'a Vec, partition_time_level: &'a Option, stream_alerts_map: &'a HashMap>, } diff --git a/src/service/organization.rs b/src/service/organization.rs index a9ecd7b03..192329bbd 100644 --- a/src/service/organization.rs +++ b/src/service/organization.rs @@ -205,7 +205,7 @@ mod tests { #[tokio::test] async fn test_organization() { let org_id = "default"; - let user_id = "user1@example.com"; + let user_id = "user-org-1@example.com"; let init_user = "root@example.com"; let pwd = "Complexpass#123"; diff --git a/src/service/promql/engine.rs b/src/service/promql/engine.rs index e98e3784d..7bc08a086 100644 --- a/src/service/promql/engine.rs +++ b/src/service/promql/engine.rs @@ -411,17 +411,17 @@ impl Engine { // 1. Group by metrics (sets of label name-value pairs) let table_name = selector.name.as_ref().unwrap(); - let filters = selector + let mut filters = selector .matchers .matchers .iter() .filter(|mat| mat.op == MatchOp::Equal) - .map(|mat| (mat.name.as_str(), vec![mat.value.as_str()])) + .map(|mat| (mat.name.as_str(), vec![mat.value.to_string()])) .collect::>(); let ctxs = self .ctx .table_provider - .create_context(&self.ctx.org_id, table_name, (start, end), &filters) + .create_context(&self.ctx.org_id, table_name, (start, end), &mut filters) .await?; let mut tasks = Vec::new(); diff --git a/src/service/promql/mod.rs b/src/service/promql/mod.rs index fe86b9f91..f7f111d0f 100644 --- a/src/service/promql/mod.rs +++ b/src/service/promql/mod.rs @@ -50,7 +50,7 @@ pub trait TableProvider: Sync + Send + 'static { org_id: &str, stream_name: &str, time_range: (i64, i64), - filters: &[(&str, Vec<&str>)], + filters: &mut [(&str, Vec)], ) -> Result, ScanStats)>>; } diff --git a/src/service/promql/search/grpc/mod.rs b/src/service/promql/search/grpc/mod.rs index e9f94ea47..2f2517d3a 100644 --- a/src/service/promql/search/grpc/mod.rs +++ b/src/service/promql/search/grpc/mod.rs @@ -48,7 +48,7 @@ impl TableProvider for StorageProvider { org_id: &str, stream_name: &str, time_range: (i64, i64), - filters: &[(&str, Vec<&str>)], + filters: &mut [(&str, Vec)], ) -> datafusion::error::Result, ScanStats)>> { let mut resp = Vec::new(); // register storage table diff --git a/src/service/promql/search/grpc/storage.rs b/src/service/promql/search/grpc/storage.rs index c713bb99d..11be5ec92 100644 --- a/src/service/promql/search/grpc/storage.rs +++ b/src/service/promql/search/grpc/storage.rs @@ -26,13 +26,14 @@ use datafusion::{ error::{DataFusionError, Result}, prelude::SessionContext, }; +use hashbrown::HashMap; use infra::cache::file_data; use tokio::sync::Semaphore; use crate::{ common::meta::{ search::{SearchType, Session as SearchSession}, - stream::{ScanStats, StreamParams}, + stream::{ScanStats, StreamParams, StreamPartition}, }, service::{ db, file_list, @@ -50,7 +51,7 @@ pub(crate) async fn create_context( org_id: &str, stream_name: &str, time_range: (i64, i64), - filters: &[(&str, Vec<&str>)], + filters: &mut [(&str, Vec)], ) -> Result<(SessionContext, Arc, ScanStats)> { // check if we are allowed to search if db::compact::retention::is_deleting_stream(org_id, stream_name, StreamType::Metrics, None) { @@ -77,6 +78,20 @@ pub(crate) async fn create_context( let partition_time_level = stream::unwrap_partition_time_level(stream_settings.partition_time_level, stream_type); + // rewrite partition filters + let partition_keys: HashMap<&str, &StreamPartition> = stream_settings + .partition_keys + .iter() + .map(|v| (v.field.as_str(), v)) + .collect(); + for entry in filters.iter_mut() { + if let Some(partition_key) = partition_keys.get(entry.0) { + for val in entry.1.iter_mut() { + *val = partition_key.get_partition_value(val); + } + } + } + // get file list let mut files = get_file_list( session_id, @@ -159,7 +174,7 @@ async fn get_file_list( stream_name: &str, time_level: PartitionTimeLevel, time_range: (i64, i64), - filters: &[(&str, Vec<&str>)], + filters: &[(&str, Vec)], ) -> Result> { let (time_min, time_max) = time_range; let results = match file_list::query( diff --git a/src/service/promql/search/grpc/wal.rs b/src/service/promql/search/grpc/wal.rs index b4c66cb39..69bbc82f1 100644 --- a/src/service/promql/search/grpc/wal.rs +++ b/src/service/promql/search/grpc/wal.rs @@ -57,11 +57,11 @@ pub(crate) async fn create_context( org_id: &str, stream_name: &str, time_range: (i64, i64), - _filters: &[(&str, Vec<&str>)], + filters: &mut [(&str, Vec)], ) -> Result, ScanStats)>> { let mut resp = vec![]; // get file list - let files = get_file_list(session_id, org_id, stream_name, time_range).await?; + let files = get_file_list(session_id, org_id, stream_name, time_range, filters).await?; if files.is_empty() { return Ok(vec![( SessionContext::new(), @@ -200,6 +200,7 @@ async fn get_file_list( org_id: &str, stream_name: &str, time_range: (i64, i64), + filters: &[(&str, Vec)], ) -> Result> { let nodes = get_cached_online_ingester_nodes(); if nodes.is_none() && nodes.as_deref().unwrap().is_empty() { @@ -207,6 +208,14 @@ async fn get_file_list( } let nodes = nodes.unwrap(); + let mut req_filters = Vec::with_capacity(filters.len()); + for (k, v) in filters { + req_filters.push(cluster_rpc::MetricsWalFileFilter { + field: k.to_string(), + value: v.clone(), + }); + } + let mut tasks = Vec::new(); for node in nodes { let session_id = session_id.to_string(); @@ -217,6 +226,7 @@ async fn get_file_list( stream_name: stream_name.to_string(), start_time: time_range.0, end_time: time_range.1, + filters: req_filters.clone(), }; let grpc_span = info_span!("promql:search:grpc:wal:grpc_wal_file", session_id); let task: tokio::task::JoinHandle< diff --git a/src/service/schema.rs b/src/service/schema.rs index effd96d94..ba52d8941 100644 --- a/src/service/schema.rs +++ b/src/service/schema.rs @@ -42,7 +42,10 @@ use crate::{ common::{ infra::config::LOCAL_SCHEMA_LOCKER, meta::{ - authz::Authz, ingestion::StreamSchemaChk, prom::METADATA_LABEL, stream::SchemaEvolution, + authz::Authz, + ingestion::StreamSchemaChk, + prom::METADATA_LABEL, + stream::{SchemaEvolution, StreamPartition}, }, }, service::{db, search::server_internal_error}, @@ -637,7 +640,7 @@ pub async fn add_stream_schema( metadata.insert("created_at".to_string(), min_ts.to_string()); if stream_type == StreamType::Traces { let settings = crate::common::meta::stream::StreamSettings { - partition_keys: vec!["service_name".to_string()], + partition_keys: vec![StreamPartition::new("service_name")], partition_time_level: None, full_text_search_keys: vec![], bloom_filter_fields: vec![], diff --git a/src/service/search/grpc/storage.rs b/src/service/search/grpc/storage.rs index 9bd204e21..0dd770726 100644 --- a/src/service/search/grpc/storage.rs +++ b/src/service/search/grpc/storage.rs @@ -31,7 +31,11 @@ use tokio::{sync::Semaphore, time::Duration}; use tracing::{info_span, Instrument}; use crate::{ - common::meta::{self, search::SearchType, stream::ScanStats}, + common::meta::{ + self, + search::SearchType, + stream::{ScanStats, StreamPartition}, + }, service::{ db, file_list, search::{ @@ -77,7 +81,16 @@ pub async fn search( // get file list let files = match file_list.is_empty() { - true => get_file_list(session_id, &sql, stream_type, partition_time_level).await?, + true => { + get_file_list( + session_id, + &sql, + stream_type, + partition_time_level, + &stream_settings.partition_keys, + ) + .await? + } false => file_list.to_vec(), }; if files.is_empty() { @@ -265,6 +278,7 @@ async fn get_file_list( sql: &Sql, stream_type: StreamType, time_level: PartitionTimeLevel, + partition_keys: &[StreamPartition], ) -> Result, Error> { let (time_min, time_max) = sql.meta.time_range.unwrap(); let file_list = match file_list::query( @@ -289,7 +303,10 @@ async fn get_file_list( let mut files = Vec::with_capacity(file_list.len()); for file in file_list { - if sql.match_source(&file, false, false, stream_type).await { + if sql + .match_source(&file, false, false, stream_type, partition_keys) + .await + { files.push(file.to_owned()); } } diff --git a/src/service/search/grpc/wal.rs b/src/service/search/grpc/wal.rs index 3c6d936b1..a44141b13 100644 --- a/src/service/search/grpc/wal.rs +++ b/src/service/search/grpc/wal.rs @@ -41,7 +41,11 @@ use tracing::{info_span, Instrument}; use crate::{ common::{ infra::wal, - meta::{self, search::SearchType, stream::ScanStats}, + meta::{ + self, + search::SearchType, + stream::{ScanStats, StreamPartition}, + }, }, service::{ db, @@ -70,7 +74,14 @@ pub async fn search_parquet( unwrap_partition_time_level(schema_settings.partition_time_level, stream_type); // get file list - let mut files = get_file_list(session_id, &sql, stream_type, &partition_time_level).await?; + let mut files = get_file_list( + session_id, + &sql, + stream_type, + &partition_time_level, + &schema_settings.partition_keys, + ) + .await?; if files.is_empty() { return Ok((HashMap::new(), ScanStats::new())); } @@ -467,6 +478,7 @@ async fn get_file_list( sql: &Sql, stream_type: StreamType, _partition_time_level: &PartitionTimeLevel, + partition_keys: &[StreamPartition], ) -> Result, Error> { let wal_dir = match Path::new(&CONFIG.common.data_wal_dir).canonicalize() { Ok(path) => { @@ -526,7 +538,10 @@ async fn get_file_list( continue; } } - if sql.match_source(&file_key, false, true, stream_type).await { + if sql + .match_source(&file_key, false, true, stream_type, partition_keys) + .await + { result.push(file_key); } else { wal::release_files(&[file.clone()]).await; diff --git a/src/service/search/mod.rs b/src/service/search/mod.rs index 4cc5844f3..ab1f53e7e 100644 --- a/src/service/search/mod.rs +++ b/src/service/search/mod.rs @@ -46,7 +46,7 @@ use crate::{ meta::{ functions::VRLResultResolver, search, - stream::{ScanStats, StreamParams}, + stream::{ScanStats, StreamParams, StreamPartition}, }, }, handler::grpc::cluster_rpc, @@ -104,7 +104,14 @@ pub async fn search_partition( let stream_settings = stream::stream_settings(&meta.schema).unwrap_or_default(); let partition_time_level = stream::unwrap_partition_time_level(stream_settings.partition_time_level, stream_type); - let files = get_file_list(session_id, &meta, stream_type, partition_time_level).await; + let files = get_file_list( + session_id, + &meta, + stream_type, + partition_time_level, + &stream_settings.partition_keys, + ) + .await; let nodes = cluster::get_cached_online_querier_nodes().unwrap_or_default(); let cpu_cores = nodes.iter().map(|n| n.cpu_num).sum::() as usize; @@ -161,6 +168,7 @@ async fn get_file_list( sql: &sql::Sql, stream_type: StreamType, time_level: PartitionTimeLevel, + partition_keys: &[StreamPartition], ) -> Vec { let is_local = CONFIG.common.meta_store_external || cluster::get_cached_online_querier_nodes() @@ -185,7 +193,10 @@ async fn get_file_list( let mut files = Vec::with_capacity(file_list.len()); for file in file_list { - if sql.match_source(&file, false, false, stream_type).await { + if sql + .match_source(&file, false, false, stream_type, partition_keys) + .await + { files.push(file.to_owned()); } } @@ -232,7 +243,14 @@ async fn search_in_cluster(mut req: cluster_rpc::SearchRequest) -> Result = None; // 1. get work group @@ -768,7 +786,7 @@ fn handle_metrics_response(sources: Vec) -> Vec { pub async fn match_source( stream: StreamParams, time_range: Option<(i64, i64)>, - filters: &[(&str, Vec<&str>)], + filters: &[(&str, Vec)], source: &FileKey, is_wal: bool, match_min_ts_only: bool, @@ -833,7 +851,7 @@ pub async fn match_source( } /// match a source is a needed file or not, return true if needed -fn filter_source_by_partition_key(source: &str, filters: &[(&str, Vec<&str>)]) -> bool { +fn filter_source_by_partition_key(source: &str, filters: &[(&str, Vec)]) -> bool { !filters.iter().any(|(k, v)| { let field = format_partition_key(&format!("{k}=")); find(source, &format!("/{field}")) @@ -871,56 +889,77 @@ mod tests { let path = "files/default/logs/gke-fluentbit/2023/04/14/08/kuberneteshost=gke-dev1/kubernetesnamespacename=ziox-dev/7052558621820981249.parquet"; let filters = vec![ (vec![], true), - (vec![("kuberneteshost", vec!["gke-dev1"])], true), - (vec![("kuberneteshost", vec!["gke-dev2"])], false), - (vec![("kuberneteshost", vec!["gke-dev1", "gke-dev2"])], true), - (vec![("some_other_key", vec!["no-matter"])], true), + (vec![("kuberneteshost", vec!["gke-dev1".to_string()])], true), + ( + vec![("kuberneteshost", vec!["gke-dev2".to_string()])], + false, + ), + ( + vec![( + "kuberneteshost", + vec!["gke-dev1".to_string(), "gke-dev2".to_string()], + )], + true, + ), + ( + vec![("some_other_key", vec!["no-matter".to_string()])], + true, + ), ( vec![ - ("kuberneteshost", vec!["gke-dev1"]), - ("kubernetesnamespacename", vec!["ziox-dev"]), + ("kuberneteshost", vec!["gke-dev1".to_string()]), + ("kubernetesnamespacename", vec!["ziox-dev".to_string()]), ], true, ), ( vec![ - ("kuberneteshost", vec!["gke-dev1"]), - ("kubernetesnamespacename", vec!["abcdefg"]), + ("kuberneteshost", vec!["gke-dev1".to_string()]), + ("kubernetesnamespacename", vec!["abcdefg".to_string()]), ], false, ), ( vec![ - ("kuberneteshost", vec!["gke-dev2"]), - ("kubernetesnamespacename", vec!["ziox-dev"]), + ("kuberneteshost", vec!["gke-dev2".to_string()]), + ("kubernetesnamespacename", vec!["ziox-dev".to_string()]), ], false, ), ( vec![ - ("kuberneteshost", vec!["gke-dev2"]), - ("kubernetesnamespacename", vec!["abcdefg"]), + ("kuberneteshost", vec!["gke-dev2".to_string()]), + ("kubernetesnamespacename", vec!["abcdefg".to_string()]), ], false, ), ( vec![ - ("kuberneteshost", vec!["gke-dev1", "gke-dev2"]), - ("kubernetesnamespacename", vec!["ziox-dev"]), + ( + "kuberneteshost", + vec!["gke-dev1".to_string(), "gke-dev2".to_string()], + ), + ("kubernetesnamespacename", vec!["ziox-dev".to_string()]), ], true, ), ( vec![ - ("kuberneteshost", vec!["gke-dev1", "gke-dev2"]), - ("kubernetesnamespacename", vec!["abcdefg"]), + ( + "kuberneteshost", + vec!["gke-dev1".to_string(), "gke-dev2".to_string()], + ), + ("kubernetesnamespacename", vec!["abcdefg".to_string()]), ], false, ), ( vec![ - ("kuberneteshost", vec!["gke-dev1", "gke-dev2"]), - ("some_other_key", vec!["no-matter"]), + ( + "kuberneteshost", + vec!["gke-dev1".to_string(), "gke-dev2".to_string()], + ), + ("some_other_key", vec!["no-matter".to_string()]), ], true, ), diff --git a/src/service/search/sql.rs b/src/service/search/sql.rs index d253bd389..7bb100325 100644 --- a/src/service/search/sql.rs +++ b/src/service/search/sql.rs @@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize}; use crate::{ common::meta::{ sql::{Sql as MetaSql, SqlOperator}, - stream::StreamParams, + stream::{StreamParams, StreamPartition}, }, handler::grpc::cluster_rpc, service::{db, search::match_source, stream::get_stream_setting_fts_fields}, @@ -626,8 +626,21 @@ impl Sql { match_min_ts_only: bool, is_wal: bool, stream_type: StreamType, + partition_keys: &[StreamPartition], ) -> bool { - let filters = generate_filter_from_quick_text(&self.meta.quick_text); + let mut filters = generate_filter_from_quick_text(&self.meta.quick_text); + // rewrite partition filters + let partition_keys: HashMap<&str, &StreamPartition> = partition_keys + .iter() + .map(|v| (v.field.as_str(), v)) + .collect(); + for entry in filters.iter_mut() { + if let Some(partition_key) = partition_keys.get(entry.0) { + for val in entry.1.iter_mut() { + *val = partition_key.get_partition_value(val); + } + } + } match_source( StreamParams::new(&self.org_id, &self.stream_name, stream_type), self.meta.time_range, @@ -642,7 +655,7 @@ impl Sql { pub fn generate_filter_from_quick_text( data: &[(String, String, SqlOperator)], -) -> Vec<(&str, Vec<&str>)> { +) -> Vec<(&str, Vec)> { let quick_text_len = data.len(); let mut filters = HashMap::with_capacity(quick_text_len); for i in 0..quick_text_len { @@ -651,7 +664,7 @@ pub fn generate_filter_from_quick_text( || (op == &SqlOperator::Or && (i + 1 == quick_text_len || k == &data[i + 1].0)) { let entry = filters.entry(k.as_str()).or_insert_with(Vec::new); - entry.push(v.as_str()); + entry.push(v.to_string()); } else { filters.clear(); break; diff --git a/src/service/stream.rs b/src/service/stream.rs index 03d9c9b3d..5ba8d7463 100644 --- a/src/service/stream.rs +++ b/src/service/stream.rs @@ -160,12 +160,12 @@ pub fn stream_res( } } -#[tracing::instrument(skip(setting))] +#[tracing::instrument(skip(settings))] pub async fn save_stream_settings( org_id: &str, stream_name: &str, stream_type: StreamType, - setting: StreamSettings, + mut settings: StreamSettings, ) -> Result { // check if we are allowed to ingest if db::compact::retention::is_deleting_stream(org_id, stream_name, stream_type, None) { @@ -177,20 +177,43 @@ pub async fn save_stream_settings( ); } - for key in setting.partition_keys.iter() { - if SQL_FULL_TEXT_SEARCH_FIELDS.contains(key) { + for key in settings.partition_keys.iter() { + if SQL_FULL_TEXT_SEARCH_FIELDS.contains(&key.field) { return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( http::StatusCode::BAD_REQUEST.into(), - format!("field [{key}] can't be used for partition key"), + format!("field [{}] can't be used for partition key", key.field), ))); } } + // we need to keep the old partition information, because the hash bucket num can't be changed + // get old settings and then update partition_keys let schema = db::schema::get(org_id, stream_name, stream_type) .await .unwrap(); + let mut old_partition_keys = stream_settings(&schema).unwrap_or_default().partition_keys; + // first disable all old partition keys + for v in old_partition_keys.iter_mut() { + v.disabled = true; + } + // then update new partition keys + for v in settings.partition_keys.iter() { + if let Some(old_field) = old_partition_keys.iter_mut().find(|k| k.field == v.field) { + if old_field.types != v.types { + return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error( + http::StatusCode::BAD_REQUEST.into(), + format!("field [{}] partition types can't be changed", v.field), + ))); + } + old_field.disabled = v.disabled; + } else { + old_partition_keys.push(v.clone()); + } + } + settings.partition_keys = old_partition_keys; + let mut metadata = schema.metadata.clone(); - metadata.insert("settings".to_string(), json::to_string(&setting).unwrap()); + metadata.insert("settings".to_string(), json::to_string(&settings).unwrap()); if !metadata.contains_key("created_at") { metadata.insert( "created_at".to_string(), diff --git a/src/service/traces/mod.rs b/src/service/traces/mod.rs index 48e813cc6..ab18c8b58 100644 --- a/src/service/traces/mod.rs +++ b/src/service/traces/mod.rs @@ -42,7 +42,7 @@ use crate::{ common::meta::{ alerts::Alert, http::HttpResponse as MetaHttpResponse, - stream::SchemaRecords, + stream::{SchemaRecords, StreamPartition}, traces::{Event, Span, SpanRefType}, }, service::{ @@ -117,7 +117,7 @@ pub async fn handle_trace_request( ) .await; - let mut partition_keys: Vec = vec![]; + let mut partition_keys: Vec = vec![]; let mut partition_time_level = PartitionTimeLevel::from(CONFIG.limit.traces_file_retention.as_str()); if stream_schema.has_partition_keys { diff --git a/src/service/traces/otlp_http.rs b/src/service/traces/otlp_http.rs index e02cc3198..a025f606d 100644 --- a/src/service/traces/otlp_http.rs +++ b/src/service/traces/otlp_http.rs @@ -35,7 +35,7 @@ use crate::{ common::meta::{ alerts::Alert, http::HttpResponse as MetaHttpResponse, - stream::SchemaRecords, + stream::{SchemaRecords, StreamPartition}, traces::{Event, ExportTracePartialSuccess, ExportTraceServiceResponse, Span, SpanRefType}, }, service::{ @@ -120,7 +120,7 @@ pub async fn traces_json( ) .await; - let mut partition_keys: Vec = vec![]; + let mut partition_keys: Vec = vec![]; let mut partition_time_level = PartitionTimeLevel::from(CONFIG.limit.traces_file_retention.as_str()); if stream_schema.has_partition_keys { diff --git a/tests/integration_test.rs b/tests/integration_test.rs index e4239fb7a..151c10e44 100644 --- a/tests/integration_test.rs +++ b/tests/integration_test.rs @@ -291,7 +291,8 @@ mod tests { async fn e2e_post_stream_settings() { let auth = setup(); - let body_str = r#"{"partition_keys": ["test_key"], "full_text_search_keys": ["log"]}"#; + let body_str = + r#"{"partition_keys": [{"field":"test_key"}], "full_text_search_keys": ["log"]}"#; // app let thread_id: usize = 0; let app = test::init_service( diff --git a/web/src/components/logstream/schema.vue b/web/src/components/logstream/schema.vue index 59f44d2dc..5981aa2ce 100644 --- a/web/src/components/logstream/schema.vue +++ b/web/src/components/logstream/schema.vue @@ -405,18 +405,17 @@ export default defineComponent({ if ( res.data.settings.partition_keys && - Object.values(res.data.settings.partition_keys).includes( - property.name + Object.values(res.data.settings.partition_keys).some( + (v) => !v.disabled && v.field === property.name ) ) { - let index = Object.values( - res.data.settings.partition_keys - ).indexOf(property.name); property.partitionKey = true; property.level = Object.keys( res.data.settings.partition_keys ).find( - (key) => res.data.settings.partition_keys[key] === property.name + (key) => + res.data.settings.partition_keys[key]["field"] === + property.name ); } else { property.partitionKey = false; @@ -459,9 +458,15 @@ export default defineComponent({ settings.full_text_search_keys.push(property.name); } if (property.level && property.partitionKey) { - settings.partition_keys.push(property.name); + settings.partition_keys.push({ + field: property.name, + types: "value", + }); } else if (property.partitionKey) { - added_part_keys.push(property.name); + added_part_keys.push({ + field: property.name, + types: "value", + }); } if (property.bloomKey) {