feat: impl hash partition (#2647)

This commit is contained in:
Hengfei Yang 2024-02-04 19:07:03 +08:00 committed by GitHub
parent ead43c9873
commit d887b6eed2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
25 changed files with 438 additions and 89 deletions

View File

@ -60,6 +60,12 @@ message MetricsWalFileRequest {
string stream_name = 2;
int64 start_time = 3;
int64 end_time = 4;
repeated MetricsWalFileFilter filters = 5;
}
message MetricsWalFileFilter {
string field = 1;
repeated string value = 2;
}
message MetricsWalFileResponse {

View File

@ -13,12 +13,12 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, fmt::Display, sync::Arc};
use arrow_schema::Field;
use config::{
meta::stream::{PartitionTimeLevel, StreamStats, StreamType},
utils::json,
utils::{hash::fnv, json},
};
use datafusion::arrow::datatypes::Schema;
use serde::{ser::SerializeStruct, Deserialize, Serialize, Serializer};
@ -59,11 +59,11 @@ pub struct StreamSchema {
pub schema: Schema,
}
#[derive(Clone, Debug, Deserialize, ToSchema, Default)]
#[derive(Clone, Debug, Default, Deserialize, ToSchema)]
pub struct StreamSettings {
#[serde(skip_serializing_if = "Vec::is_empty")]
#[serde(default)]
pub partition_keys: Vec<String>,
pub partition_keys: Vec<StreamPartition>,
#[serde(skip_serializing_if = "Option::None")]
pub partition_time_level: Option<PartitionTimeLevel>,
#[serde(skip_serializing_if = "Vec::is_empty")]
@ -83,7 +83,7 @@ impl Serialize for StreamSettings {
let mut state = serializer.serialize_struct("stream_settings", 4)?;
let mut part_keys = HashMap::new();
for (index, key) in self.partition_keys.iter().enumerate() {
part_keys.insert(format!("L{index}"), key.to_string());
part_keys.insert(format!("L{index}"), key);
}
state.serialize_field("partition_keys", &part_keys)?;
state.serialize_field(
@ -105,8 +105,18 @@ impl From<&str> for StreamSettings {
if let Some(value) = settings.get("partition_keys") {
let mut v: Vec<_> = value.as_object().unwrap().into_iter().collect();
v.sort_by(|a, b| a.0.cmp(b.0));
for (_, value) in v {
partition_keys.push(value.as_str().unwrap().to_string());
for (_, value) in v.iter() {
match value {
json::Value::String(v) => {
partition_keys.push(StreamPartition::new(v));
}
json::Value::Object(v) => {
let val: StreamPartition =
json::from_value(json::Value::Object(v.to_owned())).unwrap();
partition_keys.push(val);
}
_ => {}
}
}
}
@ -148,6 +158,65 @@ impl From<&str> for StreamSettings {
}
}
#[derive(Clone, Debug, Default, Hash, PartialEq, Serialize, Deserialize, ToSchema)]
pub struct StreamPartition {
pub field: String,
#[serde(default)]
pub types: StreamPartitionType,
#[serde(default)]
pub disabled: bool,
}
impl StreamPartition {
pub fn new(field: &str) -> Self {
Self {
field: field.to_string(),
types: StreamPartitionType::Value,
disabled: false,
}
}
pub fn new_hash(field: &str, buckets: u64) -> Self {
Self {
field: field.to_string(),
types: StreamPartitionType::Hash(std::cmp::max(16, buckets)),
disabled: false,
}
}
pub fn get_partition_key(&self, value: &str) -> String {
format!("{}={}", self.field, self.get_partition_value(value))
}
pub fn get_partition_value(&self, value: &str) -> String {
match &self.types {
StreamPartitionType::Value => value.to_string(),
StreamPartitionType::Hash(n) => {
let h = fnv::new().sum64(value);
let bucket = h % n;
bucket.to_string()
}
}
}
}
#[derive(Clone, Debug, Default, Hash, PartialEq, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "lowercase")]
pub enum StreamPartitionType {
#[default]
Value, // each value is a partition
Hash(u64), // partition with fixed bucket size by hash
}
impl Display for StreamPartitionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamPartitionType::Value => write!(f, "value"),
StreamPartitionType::Hash(_) => write!(f, "hash"),
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize, ToSchema)]
pub struct ListStream {
pub list: Vec<Stream>,
@ -212,7 +281,7 @@ impl ScanStats {
#[derive(Clone, Debug, Default, Serialize, Deserialize, ToSchema)]
#[serde(rename_all = "lowercase")]
pub struct PartitioningDetails {
pub partition_keys: Vec<String>,
pub partition_keys: Vec<StreamPartition>,
pub partition_time_level: Option<PartitionTimeLevel>,
}
@ -240,4 +309,22 @@ mod tests {
assert_eq!(params.stream_name, "stream_name");
assert_eq!(params.stream_type, StreamType::Logs);
}
#[test]
fn test_hash_partition() {
let part = StreamPartition::new("field");
assert_eq!(
json::to_string(&part).unwrap(),
r#"{"field":"field","types":"value","disabled":false}"#
);
let part = StreamPartition::new_hash("field", 32);
assert_eq!(
json::to_string(&part).unwrap(),
r#"{"field":"field","types":{"hash":32},"disabled":false}"#
);
assert_eq!(part.get_partition_key("hello"), "field=11");
assert_eq!(part.get_partition_key("world"), "field=19");
assert_eq!(part.get_partition_key("foo"), "field=23");
assert_eq!(part.get_partition_key("bar"), "field=26");
}
}

View File

@ -0,0 +1,56 @@
// Copyright 2023 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
// offset64 FNVa offset basis. See https://en.wikipedia.org/wiki/FowlerNollVo_hash_function#FNV-1a_hash
const OFFSET64: u64 = 14695981039346656037;
// prime64 FNVa prime value. See https://en.wikipedia.org/wiki/FowlerNollVo_hash_function#FNV-1a_hash
const PRIME64: u64 = 1099511628211;
/// refer: https://github.com/allegro/bigcache/blob/main/fnv.go
#[derive(Default)]
pub struct Fnv64a {}
pub fn new() -> Fnv64a {
Fnv64a::new()
}
impl Fnv64a {
pub fn new() -> Fnv64a {
Fnv64a {}
}
pub fn sum64(&self, key: &str) -> u64 {
let mut hash: u64 = OFFSET64;
for c in key.chars() {
hash ^= c as u64;
hash = hash.wrapping_mul(PRIME64);
}
hash
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_fnv64a() {
let fnv = Fnv64a::new();
assert_eq!(fnv.sum64("hello"), 11831194018420276491);
assert_eq!(fnv.sum64("world"), 5717881983045765875);
assert_eq!(fnv.sum64("foo"), 15902901984413996407);
assert_eq!(fnv.sum64("bar"), 16101355973854746);
}
}

View File

@ -0,0 +1,16 @@
// Copyright 2023 Zinc Labs Inc.
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
pub mod fnv;

View File

@ -18,6 +18,7 @@ pub mod base64;
pub mod cgroup;
pub mod file;
pub mod flatten;
pub mod hash;
pub mod json;
pub mod parquet;
pub mod rand;

View File

@ -16,7 +16,7 @@
use arrow::ipc::writer::StreamWriter;
use config::{
ider,
meta::stream::StreamType,
meta::stream::{FileKey, FileMeta, StreamType},
metrics,
utils::{
file::{get_file_contents, scan_files},
@ -31,7 +31,7 @@ use tonic::{Request, Response, Status};
use tracing_opentelemetry::OpenTelemetrySpanExt;
use crate::{
common::infra::wal,
common::{infra::wal, meta::stream::StreamParams},
handler::grpc::{
cluster_rpc::{
metrics_server::Metrics, MetricsQueryRequest, MetricsQueryResponse, MetricsWalFile,
@ -39,7 +39,7 @@ use crate::{
},
request::MetadataMap,
},
service::promql::search as SearchService,
service::{promql::search as SearchService, search::match_source},
};
pub struct Querier;
@ -96,6 +96,12 @@ impl Metrics for Querier {
let end_time = req.get_ref().end_time;
let org_id = &req.get_ref().org_id;
let stream_name = &req.get_ref().stream_name;
let filters = req
.get_ref()
.filters
.iter()
.map(|f| (f.field.as_str(), f.value.clone()))
.collect::<Vec<_>>();
let mut resp = MetricsWalFileResponse::default();
// get memtable records
@ -196,6 +202,29 @@ impl Metrics for Querier {
wal::release_files(&[file.clone()]).await;
continue;
}
// filter by partition keys
let file_key = FileKey::new(
file,
FileMeta {
min_ts: file_min_ts,
max_ts: file_max_ts,
..Default::default()
},
false,
);
if !match_source(
StreamParams::new(org_id, stream_name, StreamType::Metrics),
Some((start_time, end_time)),
filters.as_slice(),
&file_key,
true,
false,
)
.await
{
wal::release_files(&[file.clone()]).await;
continue;
}
// check time range by parquet metadata
let source_file = wal_dir.to_string() + "/" + file;
let Ok(body) = get_file_contents(&source_file) else {

View File

@ -479,6 +479,7 @@ mod tests {
#[tokio::test]
async fn test_delete_by_stream() {
infra_file_list::create_table().await.unwrap();
let org_id = "test";
let stream_name = "test";
let stream_type = config::meta::stream::StreamType::Logs;
@ -490,6 +491,7 @@ mod tests {
#[tokio::test]
async fn test_delete_all() {
infra_file_list::create_table().await.unwrap();
let org_id = "test";
let stream_name = "test";
let stream_type = config::meta::stream::StreamType::Logs;

View File

@ -45,7 +45,7 @@ use crate::{
meta::{
alerts::Alert,
functions::{StreamTransform, VRLResultResolver, VRLRuntimeConfig},
stream::{PartitioningDetails, SchemaRecords},
stream::{PartitioningDetails, SchemaRecords, StreamPartition},
},
utils::functions::get_vrl_compiler_config,
},
@ -197,7 +197,7 @@ pub async fn evaluate_trigger(trigger: Option<TriggerAlertData>) {
pub fn get_wal_time_key(
timestamp: i64,
partition_keys: &Vec<String>,
partition_keys: &Vec<StreamPartition>,
time_level: PartitionTimeLevel,
local_val: &Map<String, Value>,
suffix: Option<&str>,
@ -219,13 +219,13 @@ pub fn get_wal_time_key(
time_key.push_str("/default");
}
for key in partition_keys {
match local_val.get(key) {
if key.disabled {
continue;
}
match local_val.get(&key.field) {
Some(v) => {
let val = if v.is_string() {
format!("{}={}", key, v.as_str().unwrap())
} else {
format!("{}={}", key, v)
};
let val = get_string_value(v);
let val = key.get_partition_key(&val);
time_key.push_str(&format!("/{}", format_partition_key(&val)));
}
None => continue,
@ -459,6 +459,7 @@ mod tests {
use std::collections::HashMap;
use super::*;
use crate::common::meta::stream::StreamPartition;
#[test]
fn test_format_partition_key() {
@ -472,7 +473,10 @@ mod tests {
assert_eq!(
get_wal_time_key(
1620000000,
&vec!["country".to_string(), "sport".to_string()],
&vec![
StreamPartition::new("country"),
StreamPartition::new("sport")
],
PartitionTimeLevel::Hourly,
&local_val,
None
@ -523,7 +527,10 @@ mod tests {
let keys = get_stream_partition_keys("olympics", &stream_schema_map).await;
assert_eq!(
keys.partition_keys,
vec!["country".to_string(), "sport".to_string()]
vec![
StreamPartition::new("country"),
StreamPartition::new("sport")
]
);
}

View File

@ -29,7 +29,11 @@ use datafusion::arrow::datatypes::Schema;
use super::ingestion::{get_string_value, TriggerAlertData};
use crate::{
common::meta::{alerts::Alert, ingestion::RecordStatus, stream::SchemaRecords},
common::meta::{
alerts::Alert,
ingestion::RecordStatus,
stream::{SchemaRecords, StreamPartition},
},
service::{
ingestion::get_wal_time_key, schema::check_for_schema, stream::unwrap_partition_time_level,
},
@ -266,7 +270,7 @@ fn set_parsing_error(parse_error: &mut String, field: &Field) {
struct StreamMeta<'a> {
org_id: String,
stream_name: String,
partition_keys: &'a Vec<String>,
partition_keys: &'a Vec<StreamPartition>,
partition_time_level: &'a Option<PartitionTimeLevel>,
stream_alerts_map: &'a HashMap<String, Vec<Alert>>,
}

View File

@ -205,7 +205,7 @@ mod tests {
#[tokio::test]
async fn test_organization() {
let org_id = "default";
let user_id = "user1@example.com";
let user_id = "user-org-1@example.com";
let init_user = "root@example.com";
let pwd = "Complexpass#123";

View File

@ -411,17 +411,17 @@ impl Engine {
// 1. Group by metrics (sets of label name-value pairs)
let table_name = selector.name.as_ref().unwrap();
let filters = selector
let mut filters = selector
.matchers
.matchers
.iter()
.filter(|mat| mat.op == MatchOp::Equal)
.map(|mat| (mat.name.as_str(), vec![mat.value.as_str()]))
.map(|mat| (mat.name.as_str(), vec![mat.value.to_string()]))
.collect::<Vec<(_, _)>>();
let ctxs = self
.ctx
.table_provider
.create_context(&self.ctx.org_id, table_name, (start, end), &filters)
.create_context(&self.ctx.org_id, table_name, (start, end), &mut filters)
.await?;
let mut tasks = Vec::new();

View File

@ -50,7 +50,7 @@ pub trait TableProvider: Sync + Send + 'static {
org_id: &str,
stream_name: &str,
time_range: (i64, i64),
filters: &[(&str, Vec<&str>)],
filters: &mut [(&str, Vec<String>)],
) -> Result<Vec<(SessionContext, Arc<Schema>, ScanStats)>>;
}

View File

@ -48,7 +48,7 @@ impl TableProvider for StorageProvider {
org_id: &str,
stream_name: &str,
time_range: (i64, i64),
filters: &[(&str, Vec<&str>)],
filters: &mut [(&str, Vec<String>)],
) -> datafusion::error::Result<Vec<(SessionContext, Arc<Schema>, ScanStats)>> {
let mut resp = Vec::new();
// register storage table

View File

@ -26,13 +26,14 @@ use datafusion::{
error::{DataFusionError, Result},
prelude::SessionContext,
};
use hashbrown::HashMap;
use infra::cache::file_data;
use tokio::sync::Semaphore;
use crate::{
common::meta::{
search::{SearchType, Session as SearchSession},
stream::{ScanStats, StreamParams},
stream::{ScanStats, StreamParams, StreamPartition},
},
service::{
db, file_list,
@ -50,7 +51,7 @@ pub(crate) async fn create_context(
org_id: &str,
stream_name: &str,
time_range: (i64, i64),
filters: &[(&str, Vec<&str>)],
filters: &mut [(&str, Vec<String>)],
) -> Result<(SessionContext, Arc<Schema>, ScanStats)> {
// check if we are allowed to search
if db::compact::retention::is_deleting_stream(org_id, stream_name, StreamType::Metrics, None) {
@ -77,6 +78,20 @@ pub(crate) async fn create_context(
let partition_time_level =
stream::unwrap_partition_time_level(stream_settings.partition_time_level, stream_type);
// rewrite partition filters
let partition_keys: HashMap<&str, &StreamPartition> = stream_settings
.partition_keys
.iter()
.map(|v| (v.field.as_str(), v))
.collect();
for entry in filters.iter_mut() {
if let Some(partition_key) = partition_keys.get(entry.0) {
for val in entry.1.iter_mut() {
*val = partition_key.get_partition_value(val);
}
}
}
// get file list
let mut files = get_file_list(
session_id,
@ -159,7 +174,7 @@ async fn get_file_list(
stream_name: &str,
time_level: PartitionTimeLevel,
time_range: (i64, i64),
filters: &[(&str, Vec<&str>)],
filters: &[(&str, Vec<String>)],
) -> Result<Vec<FileKey>> {
let (time_min, time_max) = time_range;
let results = match file_list::query(

View File

@ -57,11 +57,11 @@ pub(crate) async fn create_context(
org_id: &str,
stream_name: &str,
time_range: (i64, i64),
_filters: &[(&str, Vec<&str>)],
filters: &mut [(&str, Vec<String>)],
) -> Result<Vec<(SessionContext, Arc<Schema>, ScanStats)>> {
let mut resp = vec![];
// get file list
let files = get_file_list(session_id, org_id, stream_name, time_range).await?;
let files = get_file_list(session_id, org_id, stream_name, time_range, filters).await?;
if files.is_empty() {
return Ok(vec![(
SessionContext::new(),
@ -200,6 +200,7 @@ async fn get_file_list(
org_id: &str,
stream_name: &str,
time_range: (i64, i64),
filters: &[(&str, Vec<String>)],
) -> Result<Vec<cluster_rpc::MetricsWalFile>> {
let nodes = get_cached_online_ingester_nodes();
if nodes.is_none() && nodes.as_deref().unwrap().is_empty() {
@ -207,6 +208,14 @@ async fn get_file_list(
}
let nodes = nodes.unwrap();
let mut req_filters = Vec::with_capacity(filters.len());
for (k, v) in filters {
req_filters.push(cluster_rpc::MetricsWalFileFilter {
field: k.to_string(),
value: v.clone(),
});
}
let mut tasks = Vec::new();
for node in nodes {
let session_id = session_id.to_string();
@ -217,6 +226,7 @@ async fn get_file_list(
stream_name: stream_name.to_string(),
start_time: time_range.0,
end_time: time_range.1,
filters: req_filters.clone(),
};
let grpc_span = info_span!("promql:search:grpc:wal:grpc_wal_file", session_id);
let task: tokio::task::JoinHandle<

View File

@ -42,7 +42,10 @@ use crate::{
common::{
infra::config::LOCAL_SCHEMA_LOCKER,
meta::{
authz::Authz, ingestion::StreamSchemaChk, prom::METADATA_LABEL, stream::SchemaEvolution,
authz::Authz,
ingestion::StreamSchemaChk,
prom::METADATA_LABEL,
stream::{SchemaEvolution, StreamPartition},
},
},
service::{db, search::server_internal_error},
@ -637,7 +640,7 @@ pub async fn add_stream_schema(
metadata.insert("created_at".to_string(), min_ts.to_string());
if stream_type == StreamType::Traces {
let settings = crate::common::meta::stream::StreamSettings {
partition_keys: vec!["service_name".to_string()],
partition_keys: vec![StreamPartition::new("service_name")],
partition_time_level: None,
full_text_search_keys: vec![],
bloom_filter_fields: vec![],

View File

@ -31,7 +31,11 @@ use tokio::{sync::Semaphore, time::Duration};
use tracing::{info_span, Instrument};
use crate::{
common::meta::{self, search::SearchType, stream::ScanStats},
common::meta::{
self,
search::SearchType,
stream::{ScanStats, StreamPartition},
},
service::{
db, file_list,
search::{
@ -77,7 +81,16 @@ pub async fn search(
// get file list
let files = match file_list.is_empty() {
true => get_file_list(session_id, &sql, stream_type, partition_time_level).await?,
true => {
get_file_list(
session_id,
&sql,
stream_type,
partition_time_level,
&stream_settings.partition_keys,
)
.await?
}
false => file_list.to_vec(),
};
if files.is_empty() {
@ -265,6 +278,7 @@ async fn get_file_list(
sql: &Sql,
stream_type: StreamType,
time_level: PartitionTimeLevel,
partition_keys: &[StreamPartition],
) -> Result<Vec<FileKey>, Error> {
let (time_min, time_max) = sql.meta.time_range.unwrap();
let file_list = match file_list::query(
@ -289,7 +303,10 @@ async fn get_file_list(
let mut files = Vec::with_capacity(file_list.len());
for file in file_list {
if sql.match_source(&file, false, false, stream_type).await {
if sql
.match_source(&file, false, false, stream_type, partition_keys)
.await
{
files.push(file.to_owned());
}
}

View File

@ -41,7 +41,11 @@ use tracing::{info_span, Instrument};
use crate::{
common::{
infra::wal,
meta::{self, search::SearchType, stream::ScanStats},
meta::{
self,
search::SearchType,
stream::{ScanStats, StreamPartition},
},
},
service::{
db,
@ -70,7 +74,14 @@ pub async fn search_parquet(
unwrap_partition_time_level(schema_settings.partition_time_level, stream_type);
// get file list
let mut files = get_file_list(session_id, &sql, stream_type, &partition_time_level).await?;
let mut files = get_file_list(
session_id,
&sql,
stream_type,
&partition_time_level,
&schema_settings.partition_keys,
)
.await?;
if files.is_empty() {
return Ok((HashMap::new(), ScanStats::new()));
}
@ -467,6 +478,7 @@ async fn get_file_list(
sql: &Sql,
stream_type: StreamType,
_partition_time_level: &PartitionTimeLevel,
partition_keys: &[StreamPartition],
) -> Result<Vec<FileKey>, Error> {
let wal_dir = match Path::new(&CONFIG.common.data_wal_dir).canonicalize() {
Ok(path) => {
@ -526,7 +538,10 @@ async fn get_file_list(
continue;
}
}
if sql.match_source(&file_key, false, true, stream_type).await {
if sql
.match_source(&file_key, false, true, stream_type, partition_keys)
.await
{
result.push(file_key);
} else {
wal::release_files(&[file.clone()]).await;

View File

@ -46,7 +46,7 @@ use crate::{
meta::{
functions::VRLResultResolver,
search,
stream::{ScanStats, StreamParams},
stream::{ScanStats, StreamParams, StreamPartition},
},
},
handler::grpc::cluster_rpc,
@ -104,7 +104,14 @@ pub async fn search_partition(
let stream_settings = stream::stream_settings(&meta.schema).unwrap_or_default();
let partition_time_level =
stream::unwrap_partition_time_level(stream_settings.partition_time_level, stream_type);
let files = get_file_list(session_id, &meta, stream_type, partition_time_level).await;
let files = get_file_list(
session_id,
&meta,
stream_type,
partition_time_level,
&stream_settings.partition_keys,
)
.await;
let nodes = cluster::get_cached_online_querier_nodes().unwrap_or_default();
let cpu_cores = nodes.iter().map(|n| n.cpu_num).sum::<u64>() as usize;
@ -161,6 +168,7 @@ async fn get_file_list(
sql: &sql::Sql,
stream_type: StreamType,
time_level: PartitionTimeLevel,
partition_keys: &[StreamPartition],
) -> Vec<FileKey> {
let is_local = CONFIG.common.meta_store_external
|| cluster::get_cached_online_querier_nodes()
@ -185,7 +193,10 @@ async fn get_file_list(
let mut files = Vec::with_capacity(file_list.len());
for file in file_list {
if sql.match_source(&file, false, false, stream_type).await {
if sql
.match_source(&file, false, false, stream_type, partition_keys)
.await
{
files.push(file.to_owned());
}
}
@ -232,7 +243,14 @@ async fn search_in_cluster(mut req: cluster_rpc::SearchRequest) -> Result<search
let partition_time_level =
stream::unwrap_partition_time_level(stream_settings.partition_time_level, stream_type);
let file_list = get_file_list(&session_id, &meta, stream_type, partition_time_level).await;
let file_list = get_file_list(
&session_id,
&meta,
stream_type,
partition_time_level,
&stream_settings.partition_keys,
)
.await;
#[cfg(not(feature = "enterprise"))]
let work_group: Option<String> = None;
// 1. get work group
@ -768,7 +786,7 @@ fn handle_metrics_response(sources: Vec<json::Value>) -> Vec<json::Value> {
pub async fn match_source(
stream: StreamParams,
time_range: Option<(i64, i64)>,
filters: &[(&str, Vec<&str>)],
filters: &[(&str, Vec<String>)],
source: &FileKey,
is_wal: bool,
match_min_ts_only: bool,
@ -833,7 +851,7 @@ pub async fn match_source(
}
/// match a source is a needed file or not, return true if needed
fn filter_source_by_partition_key(source: &str, filters: &[(&str, Vec<&str>)]) -> bool {
fn filter_source_by_partition_key(source: &str, filters: &[(&str, Vec<String>)]) -> bool {
!filters.iter().any(|(k, v)| {
let field = format_partition_key(&format!("{k}="));
find(source, &format!("/{field}"))
@ -871,56 +889,77 @@ mod tests {
let path = "files/default/logs/gke-fluentbit/2023/04/14/08/kuberneteshost=gke-dev1/kubernetesnamespacename=ziox-dev/7052558621820981249.parquet";
let filters = vec![
(vec![], true),
(vec![("kuberneteshost", vec!["gke-dev1"])], true),
(vec![("kuberneteshost", vec!["gke-dev2"])], false),
(vec![("kuberneteshost", vec!["gke-dev1", "gke-dev2"])], true),
(vec![("some_other_key", vec!["no-matter"])], true),
(vec![("kuberneteshost", vec!["gke-dev1".to_string()])], true),
(
vec![("kuberneteshost", vec!["gke-dev2".to_string()])],
false,
),
(
vec![(
"kuberneteshost",
vec!["gke-dev1".to_string(), "gke-dev2".to_string()],
)],
true,
),
(
vec![("some_other_key", vec!["no-matter".to_string()])],
true,
),
(
vec![
("kuberneteshost", vec!["gke-dev1"]),
("kubernetesnamespacename", vec!["ziox-dev"]),
("kuberneteshost", vec!["gke-dev1".to_string()]),
("kubernetesnamespacename", vec!["ziox-dev".to_string()]),
],
true,
),
(
vec![
("kuberneteshost", vec!["gke-dev1"]),
("kubernetesnamespacename", vec!["abcdefg"]),
("kuberneteshost", vec!["gke-dev1".to_string()]),
("kubernetesnamespacename", vec!["abcdefg".to_string()]),
],
false,
),
(
vec![
("kuberneteshost", vec!["gke-dev2"]),
("kubernetesnamespacename", vec!["ziox-dev"]),
("kuberneteshost", vec!["gke-dev2".to_string()]),
("kubernetesnamespacename", vec!["ziox-dev".to_string()]),
],
false,
),
(
vec![
("kuberneteshost", vec!["gke-dev2"]),
("kubernetesnamespacename", vec!["abcdefg"]),
("kuberneteshost", vec!["gke-dev2".to_string()]),
("kubernetesnamespacename", vec!["abcdefg".to_string()]),
],
false,
),
(
vec![
("kuberneteshost", vec!["gke-dev1", "gke-dev2"]),
("kubernetesnamespacename", vec!["ziox-dev"]),
(
"kuberneteshost",
vec!["gke-dev1".to_string(), "gke-dev2".to_string()],
),
("kubernetesnamespacename", vec!["ziox-dev".to_string()]),
],
true,
),
(
vec![
("kuberneteshost", vec!["gke-dev1", "gke-dev2"]),
("kubernetesnamespacename", vec!["abcdefg"]),
(
"kuberneteshost",
vec!["gke-dev1".to_string(), "gke-dev2".to_string()],
),
("kubernetesnamespacename", vec!["abcdefg".to_string()]),
],
false,
),
(
vec![
("kuberneteshost", vec!["gke-dev1", "gke-dev2"]),
("some_other_key", vec!["no-matter"]),
(
"kuberneteshost",
vec!["gke-dev1".to_string(), "gke-dev2".to_string()],
),
("some_other_key", vec!["no-matter".to_string()]),
],
true,
),

View File

@ -33,7 +33,7 @@ use serde::{Deserialize, Serialize};
use crate::{
common::meta::{
sql::{Sql as MetaSql, SqlOperator},
stream::StreamParams,
stream::{StreamParams, StreamPartition},
},
handler::grpc::cluster_rpc,
service::{db, search::match_source, stream::get_stream_setting_fts_fields},
@ -626,8 +626,21 @@ impl Sql {
match_min_ts_only: bool,
is_wal: bool,
stream_type: StreamType,
partition_keys: &[StreamPartition],
) -> bool {
let filters = generate_filter_from_quick_text(&self.meta.quick_text);
let mut filters = generate_filter_from_quick_text(&self.meta.quick_text);
// rewrite partition filters
let partition_keys: HashMap<&str, &StreamPartition> = partition_keys
.iter()
.map(|v| (v.field.as_str(), v))
.collect();
for entry in filters.iter_mut() {
if let Some(partition_key) = partition_keys.get(entry.0) {
for val in entry.1.iter_mut() {
*val = partition_key.get_partition_value(val);
}
}
}
match_source(
StreamParams::new(&self.org_id, &self.stream_name, stream_type),
self.meta.time_range,
@ -642,7 +655,7 @@ impl Sql {
pub fn generate_filter_from_quick_text(
data: &[(String, String, SqlOperator)],
) -> Vec<(&str, Vec<&str>)> {
) -> Vec<(&str, Vec<String>)> {
let quick_text_len = data.len();
let mut filters = HashMap::with_capacity(quick_text_len);
for i in 0..quick_text_len {
@ -651,7 +664,7 @@ pub fn generate_filter_from_quick_text(
|| (op == &SqlOperator::Or && (i + 1 == quick_text_len || k == &data[i + 1].0))
{
let entry = filters.entry(k.as_str()).or_insert_with(Vec::new);
entry.push(v.as_str());
entry.push(v.to_string());
} else {
filters.clear();
break;

View File

@ -160,12 +160,12 @@ pub fn stream_res(
}
}
#[tracing::instrument(skip(setting))]
#[tracing::instrument(skip(settings))]
pub async fn save_stream_settings(
org_id: &str,
stream_name: &str,
stream_type: StreamType,
setting: StreamSettings,
mut settings: StreamSettings,
) -> Result<HttpResponse, Error> {
// check if we are allowed to ingest
if db::compact::retention::is_deleting_stream(org_id, stream_name, stream_type, None) {
@ -177,20 +177,43 @@ pub async fn save_stream_settings(
);
}
for key in setting.partition_keys.iter() {
if SQL_FULL_TEXT_SEARCH_FIELDS.contains(key) {
for key in settings.partition_keys.iter() {
if SQL_FULL_TEXT_SEARCH_FIELDS.contains(&key.field) {
return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error(
http::StatusCode::BAD_REQUEST.into(),
format!("field [{key}] can't be used for partition key"),
format!("field [{}] can't be used for partition key", key.field),
)));
}
}
// we need to keep the old partition information, because the hash bucket num can't be changed
// get old settings and then update partition_keys
let schema = db::schema::get(org_id, stream_name, stream_type)
.await
.unwrap();
let mut old_partition_keys = stream_settings(&schema).unwrap_or_default().partition_keys;
// first disable all old partition keys
for v in old_partition_keys.iter_mut() {
v.disabled = true;
}
// then update new partition keys
for v in settings.partition_keys.iter() {
if let Some(old_field) = old_partition_keys.iter_mut().find(|k| k.field == v.field) {
if old_field.types != v.types {
return Ok(HttpResponse::BadRequest().json(MetaHttpResponse::error(
http::StatusCode::BAD_REQUEST.into(),
format!("field [{}] partition types can't be changed", v.field),
)));
}
old_field.disabled = v.disabled;
} else {
old_partition_keys.push(v.clone());
}
}
settings.partition_keys = old_partition_keys;
let mut metadata = schema.metadata.clone();
metadata.insert("settings".to_string(), json::to_string(&setting).unwrap());
metadata.insert("settings".to_string(), json::to_string(&settings).unwrap());
if !metadata.contains_key("created_at") {
metadata.insert(
"created_at".to_string(),

View File

@ -42,7 +42,7 @@ use crate::{
common::meta::{
alerts::Alert,
http::HttpResponse as MetaHttpResponse,
stream::SchemaRecords,
stream::{SchemaRecords, StreamPartition},
traces::{Event, Span, SpanRefType},
},
service::{
@ -117,7 +117,7 @@ pub async fn handle_trace_request(
)
.await;
let mut partition_keys: Vec<String> = vec![];
let mut partition_keys: Vec<StreamPartition> = vec![];
let mut partition_time_level =
PartitionTimeLevel::from(CONFIG.limit.traces_file_retention.as_str());
if stream_schema.has_partition_keys {

View File

@ -35,7 +35,7 @@ use crate::{
common::meta::{
alerts::Alert,
http::HttpResponse as MetaHttpResponse,
stream::SchemaRecords,
stream::{SchemaRecords, StreamPartition},
traces::{Event, ExportTracePartialSuccess, ExportTraceServiceResponse, Span, SpanRefType},
},
service::{
@ -120,7 +120,7 @@ pub async fn traces_json(
)
.await;
let mut partition_keys: Vec<String> = vec![];
let mut partition_keys: Vec<StreamPartition> = vec![];
let mut partition_time_level =
PartitionTimeLevel::from(CONFIG.limit.traces_file_retention.as_str());
if stream_schema.has_partition_keys {

View File

@ -291,7 +291,8 @@ mod tests {
async fn e2e_post_stream_settings() {
let auth = setup();
let body_str = r#"{"partition_keys": ["test_key"], "full_text_search_keys": ["log"]}"#;
let body_str =
r#"{"partition_keys": [{"field":"test_key"}], "full_text_search_keys": ["log"]}"#;
// app
let thread_id: usize = 0;
let app = test::init_service(

View File

@ -405,18 +405,17 @@ export default defineComponent({
if (
res.data.settings.partition_keys &&
Object.values(res.data.settings.partition_keys).includes(
property.name
Object.values(res.data.settings.partition_keys).some(
(v) => !v.disabled && v.field === property.name
)
) {
let index = Object.values(
res.data.settings.partition_keys
).indexOf(property.name);
property.partitionKey = true;
property.level = Object.keys(
res.data.settings.partition_keys
).find(
(key) => res.data.settings.partition_keys[key] === property.name
(key) =>
res.data.settings.partition_keys[key]["field"] ===
property.name
);
} else {
property.partitionKey = false;
@ -459,9 +458,15 @@ export default defineComponent({
settings.full_text_search_keys.push(property.name);
}
if (property.level && property.partitionKey) {
settings.partition_keys.push(property.name);
settings.partition_keys.push({
field: property.name,
types: "value",
});
} else if (property.partitionKey) {
added_part_keys.push(property.name);
added_part_keys.push({
field: property.name,
types: "value",
});
}
if (property.bloomKey) {