feat: rum/maxmind integration (#1520)

This PR implements the first draft for the Real User Monitoring.

The endpoints at the moment are:

- `/rum/v1/{org_id}/rum`
- `/rum/v1/{org_id}/replay`
- `/rum/v1/{org_id}/logs`

For now the authentication is done using `oo-api-key` which is
being passed in the query-param using `browser-sdk`.

As of now, the data is being collected in the following three streams:

- `_rumdata` for rum data
- `_rumlog` for log data
- `_sessionreplay` for session-replay data

The data is being ingested using the multi-json implementation, with a
small change where some extra data is also ingested in the json from
the headers, query-params etc. using a middleware extractor and stores
the data in `HashMap<String, String>` and gets ingested in this
multi-json.

A file containing regexes for common user-agents is also committed which
is read during the initialization time which is used to parse incoming
user-agents and browser, os etc are inserted into the incoming row of
data.

Added the endpoints for `get`, `put`, `post` for rum-tokens.

- `/api/{org_id}/organizations/rumtoken`

#### maxmind integration 
Integrated the maxmind-db data in the source. Currently the data
is injected via the middleware and the file gets uploaded every
24 hours.
The file pointer gets mutated and updated when there is a change
in the mmdb file. To efficiently do this, we also query the sha256
of our file and compare it using the .sha256 file available on the
public bucket.
This commit is contained in:
Ankur Srivastava 2023-10-18 15:09:24 +02:00 committed by GitHub
parent d49fad487d
commit 5b01947491
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
27 changed files with 7459 additions and 89 deletions

181
Cargo.lock generated
View File

@ -80,7 +80,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e01ed3140b2f8d422c68afa1ed2e85d996ea619c988ac834d255db32138655cb"
dependencies = [
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -118,7 +118,7 @@ dependencies = [
"parse-size",
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -251,7 +251,7 @@ dependencies = [
"actix-router",
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -269,6 +269,54 @@ dependencies = [
"pin-project-lite",
]
[[package]]
name = "actix-web-lab"
version = "0.19.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e9f49571dfcf49ed79c6e7a645e9554ae01925eb55fa6e3b2501ceeed24d7e7"
dependencies = [
"actix-http",
"actix-router",
"actix-service",
"actix-utils",
"actix-web",
"actix-web-lab-derive",
"ahash 0.8.3",
"arc-swap",
"async-trait",
"bytes",
"bytestring",
"csv",
"derive_more",
"futures-core",
"futures-util",
"http",
"impl-more",
"itertools 0.10.5",
"local-channel",
"mediatype",
"mime",
"once_cell",
"pin-project-lite",
"regex",
"serde",
"serde_html_form",
"serde_json",
"tokio",
"tracing",
]
[[package]]
name = "actix-web-lab-derive"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "16294584c7794939b1e5711f28e7cae84ef30e62a520db3f9af425f85269bcd2"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "actix-web-opentelemetry"
version = "0.15.0"
@ -842,7 +890,7 @@ checksum = "5fd55a5ba1179988837d24ab4c7cc8ed6efdeff578ede0416b4225a5fca35bd0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -864,7 +912,7 @@ checksum = "16e62a023e7c117e27523144c5d2459f4397fcc3cab0085af8e2224f643a0193"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -875,7 +923,7 @@ checksum = "bc00ceb34980c03614e35a3a4e218276a0a824e911d07651cd0d858a51e8c0f0"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -1592,9 +1640,9 @@ dependencies = [
[[package]]
name = "byteorder"
version = "1.4.3"
version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
@ -1885,7 +1933,7 @@ dependencies = [
"heck 0.4.1",
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -2230,7 +2278,7 @@ dependencies = [
"proc-macro2",
"quote",
"strsim 0.10.0",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -2241,7 +2289,7 @@ checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5"
dependencies = [
"darling_core",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -2489,7 +2537,7 @@ checksum = "53e0efad4403bfc52dc201159c4b842a246a14b98c64b55dfd0f2d89729dfeb8"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -2672,7 +2720,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -2999,7 +3047,7 @@ checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -3524,6 +3572,15 @@ version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "28b29a3cd74f0f4598934efe3aeba42bae0eb4680554128851ebbecb02af14e6"
[[package]]
name = "ipnetwork"
version = "0.18.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4088d739b183546b239688ddbc79891831df421773df95e236daf7867866d355"
dependencies = [
"serde",
]
[[package]]
name = "ipnetwork"
version = "0.20.0"
@ -3540,7 +3597,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb0889898416213fab133e1d33a0e5858a48177452750691bde3666d0fdbaf8b"
dependencies = [
"hermit-abi 0.3.3",
"rustix 0.38.15",
"rustix 0.38.17",
"windows-sys 0.48.0",
]
@ -3977,6 +4034,18 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94"
[[package]]
name = "maxminddb"
version = "0.23.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fe2ba61113f9f7a9f0e87c519682d39c43a6f3f79c2cc42c3ba3dda83b1fa334"
dependencies = [
"ipnetwork 0.18.0",
"log",
"memchr",
"serde",
]
[[package]]
name = "md-5"
version = "0.10.6"
@ -3987,6 +4056,12 @@ dependencies = [
"digest",
]
[[package]]
name = "mediatype"
version = "0.19.15"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c408dc227d302f1496c84d9dc68c00fec6f56f9228a18f3023f976f3ca7c945"
[[package]]
name = "memchr"
version = "2.6.4"
@ -4344,6 +4419,7 @@ dependencies = [
"actix-multipart",
"actix-web",
"actix-web-httpauth",
"actix-web-lab",
"actix-web-opentelemetry",
"actix-web-prometheus",
"actix-web-rust-embed-responder",
@ -4384,11 +4460,12 @@ dependencies = [
"hex",
"http-auth-basic",
"indexmap 2.0.2",
"ipnetwork",
"ipnetwork 0.20.0",
"itertools 0.11.0",
"lazy_static",
"log",
"lru",
"maxminddb",
"memchr",
"mimalloc",
"object_store",
@ -4414,6 +4491,7 @@ dependencies = [
"segment",
"serde",
"serde_json",
"sha256",
"simd-json",
"sled",
"snap",
@ -4433,6 +4511,7 @@ dependencies = [
"tracing",
"tracing-opentelemetry",
"tracing-subscriber",
"uaparser",
"url",
"utoipa",
"utoipa-swagger-ui",
@ -4898,7 +4977,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -4986,7 +5065,7 @@ checksum = "4359fd9c9171ec6e8c62926d6faaf553a8dc3f64e1507e76da7911b4f6a04405"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -5148,9 +5227,9 @@ checksum = "dc375e1527247fe1a97d8b7156678dfe7c1af2fc075c9a4db3690ecd2a148068"
[[package]]
name = "proc-macro2"
version = "1.0.67"
version = "1.0.68"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d433d9f1a3e8c1263d9456598b16fec66f4acc9a74dacffd35c7bb09b3a1328"
checksum = "5b1106fec09662ec6dd98ccac0f81cef56984d0b49f75c92d8cbad76e20c005c"
dependencies = [
"unicode-ident",
]
@ -5708,7 +5787,7 @@ dependencies = [
"quote",
"rust-embed-utils",
"shellexpand",
"syn 2.0.37",
"syn 2.0.38",
"walkdir",
]
@ -5769,9 +5848,9 @@ dependencies = [
[[package]]
name = "rustix"
version = "0.38.15"
version = "0.38.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2f9da0cbd88f9f09e7814e388301c8414c51c62aa6ce1e4b5c551d49d96e531"
checksum = "f25469e9ae0f3d0047ca8b93fc56843f38e6774f0914a107ff8b41be8be8e0b7"
dependencies = [
"bitflags 2.4.0",
"errno",
@ -5970,7 +6049,20 @@ checksum = "4eca7ac642d82aa35b60049a6eccb4be6be75e599bd2e9adb5f875a737654af2"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
name = "serde_html_form"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cde65b75f2603066b78d6fa239b2c07b43e06ead09435f60554d3912962b4a3c"
dependencies = [
"form_urlencoded",
"indexmap 2.0.2",
"itoa",
"ryu",
"serde",
]
[[package]]
@ -6050,6 +6142,19 @@ dependencies = [
"digest",
]
[[package]]
name = "sha256"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7895c8ae88588ccead14ff438b939b0c569cd619116f14b4d13fdff7b8333386"
dependencies = [
"async-trait",
"bytes",
"hex",
"sha2",
"tokio",
]
[[package]]
name = "sha3"
version = "0.10.8"
@ -6062,9 +6167,9 @@ dependencies = [
[[package]]
name = "sharded-slab"
version = "0.1.6"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c1b21f559e07218024e7e9f90f96f601825397de0e25420135f7f952453fed0b"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
@ -6624,7 +6729,7 @@ dependencies = [
"proc-macro2",
"quote",
"rustversion",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -6669,9 +6774,9 @@ dependencies = [
[[package]]
name = "syn"
version = "2.0.37"
version = "2.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7303ef2c05cd654186cb250d29049a24840ca25d2747c25c0381c8d9e2f582e8"
checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b"
dependencies = [
"proc-macro2",
"quote",
@ -6755,7 +6860,7 @@ dependencies = [
"cfg-if 1.0.0",
"fastrand 2.0.1",
"redox_syscall 0.3.5",
"rustix 0.38.15",
"rustix 0.38.17",
"windows-sys 0.48.0",
]
@ -6811,7 +6916,7 @@ checksum = "10712f02019e9288794769fba95cd6847df9874d49d871d062172f9dd41bc4cc"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -6944,7 +7049,7 @@ checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -7134,7 +7239,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -7373,7 +7478,7 @@ dependencies = [
"proc-macro2",
"quote",
"regex",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -7412,7 +7517,7 @@ checksum = "f7e1ba1f333bd65ce3c9f27de592fcbc256dafe3af2717f56d7c87761fbaccf4"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
]
[[package]]
@ -7643,7 +7748,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
"wasm-bindgen-shared",
]
@ -7677,7 +7782,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.37",
"syn 2.0.38",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -7752,7 +7857,7 @@ dependencies = [
"either",
"home",
"once_cell",
"rustix 0.38.15",
"rustix 0.38.17",
]
[[package]]

View File

@ -42,9 +42,10 @@ codegen-units = 4
[dependencies]
actix-cors = "0.6"
actix-multipart = "0.6"
actix-multipart = { version = "0.6", features = ["derive"] }
actix-web = "4.4"
actix-web-httpauth = "0.8"
actix-web-lab = "0.19.1"
actix-web-opentelemetry = { version = "0.15", features = ["metrics"] }
actix-web-prometheus = { version = "0.1", features = ["process"] }
actix-web-rust-embed-responder = { version = "2.2", default-features = false, features = [
@ -96,6 +97,7 @@ itertools = "0.11"
lazy_static = "1.4"
log = "0.4"
lru = "0.10"
maxminddb = "0.23.0"
memchr = "2.5"
mimalloc = { version = "0.1", default-features = false, optional = true }
object_store = { version = "0.7", features = ["aws", "azure", "gcp"] }
@ -123,9 +125,7 @@ rand = "0.8"
rayon = "1.7.0"
regex = "1.7"
regex-syntax = "0.6"
reqwest = { version = "0.11", default-features = false, features = [
"rustls-tls",
] }
reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"] }
rs-snowflake = "0.6"
rust-embed-for-web = "11.1"
segment = "0.2"
@ -154,6 +154,7 @@ tonic = { version = "0.8", features = ["prost", "gzip"] }
tracing = { version = "0.1.37", features = ["attributes"] }
tracing-opentelemetry = "0.18"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uaparser = "0.6.1"
url = "2.2"
utoipa = { version = "3", features = ["actix_extras", "openapi_extensions"] }
utoipa-swagger-ui = { version = "3", features = ["actix-web"] }
@ -163,6 +164,8 @@ vrl = { version = "0.6.0", features = ["value", "compiler"] }
walkdir = "2"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
zstd = "0.12"
sha256 = "1.4.0"
[build-dependencies]
chrono = { version = "0.4", default-features = false, features = ["clock"] }

View File

@ -23,12 +23,14 @@ use parking_lot::RwLock;
use reqwest::Client;
use std::{path::Path, sync::Arc, time::Duration};
use sysinfo::{DiskExt, SystemExt};
use tokio::sync::RwLock as TRwLock;
use vector_enrichment::TableRegistry;
use crate::common::{
meta::{
alert::{AlertDestination, AlertList, DestinationTemplate, Trigger, TriggerTimer},
functions::{StreamFunctionsList, Transform},
maxmind::MaxmindClient,
prom::ClusterLeader,
syslog::SyslogRoute,
user::User,
@ -100,6 +102,8 @@ pub static STREAM_FUNCTIONS: Lazy<RwHashMap<String, StreamFunctionsList>> =
Lazy::new(DashMap::default);
pub static QUERY_FUNCTIONS: Lazy<RwHashMap<String, Transform>> = Lazy::new(DashMap::default);
pub static USERS: Lazy<RwHashMap<String, User>> = Lazy::new(DashMap::default);
pub static USERS_RUM_TOKEN: Lazy<Arc<RwHashMap<String, User>>> =
Lazy::new(|| Arc::new(DashMap::default()));
pub static ROOT_USER: Lazy<RwHashMap<String, User>> = Lazy::new(DashMap::default);
pub static PASSWORD_HASH: Lazy<RwHashMap<String, String>> = Lazy::new(DashMap::default);
pub static METRIC_CLUSTER_MAP: Lazy<Arc<RwAHashMap<String, Vec<String>>>> =
@ -121,6 +125,9 @@ pub static ENRICHMENT_REGISTRY: Lazy<Arc<TableRegistry>> =
pub static LOCAL_SCHEMA_LOCKER: Lazy<Arc<RwAHashMap<String, tokio::sync::RwLock<bool>>>> =
Lazy::new(|| Arc::new(Default::default)());
pub static MAXMIND_DB_CLIENT: Lazy<Arc<TRwLock<Option<MaxmindClient>>>> =
Lazy::new(|| Arc::new(TRwLock::new(None)));
#[derive(EnvConfig)]
pub struct Config {
pub auth: Auth,
@ -286,6 +293,24 @@ pub struct Common {
pub usage_org: String,
#[env_config(name = "ZO_USAGE_BATCH_SIZE", default = 2000)]
pub usage_batch_size: usize,
#[env_config(name = "ZO_MMDB_DATA_DIR")] // ./data/openobserve/mmdb/
pub mmdb_data_dir: String,
#[env_config(name = "ZO_MMDB_DISABLE_DOWNLOAD", default = "false")]
pub mmdb_disable_download: bool,
#[env_config(name = "ZO_MMDB_UPDATE_DURATION", default = "86400")] // Everyday to test
pub mmdb_update_duration: u64,
#[env_config(
name = "ZO_MMDB_GEOLITE_CITYDB_URL",
default = "https://dha4druvz9fbr.cloudfront.net/GeoLite2-City.mmdb"
)]
pub mmdb_geolite_citydb_url: String,
#[env_config(
name = "ZO_MMDB_GEOLITE_CITYDB_SHA256_URL",
default = "https://dha4druvz9fbr.cloudfront.net/GeoLite2-City.sha256"
)]
pub mmdb_geolite_citydb_sha256_url: String,
}
#[derive(EnvConfig)]
@ -680,6 +705,12 @@ fn check_path_config(cfg: &mut Config) -> Result<(), anyhow::Error> {
if !cfg.sled.data_dir.ends_with('/') {
cfg.sled.data_dir = format!("{}/", cfg.sled.data_dir);
}
if cfg.common.mmdb_data_dir.is_empty() {
cfg.common.mmdb_data_dir = format!("{}mmdb/", cfg.common.data_dir);
}
if !cfg.common.mmdb_data_dir.ends_with('/') {
cfg.common.mmdb_data_dir = format!("{}/", cfg.common.mmdb_data_dir);
}
Ok(())
}

View File

@ -0,0 +1,44 @@
// Copyright 2023 Zinc Labs Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use anyhow::{Context, Result};
use maxminddb::Reader;
use std::path::Path;
use std::sync::Arc;
#[derive(Clone)]
pub struct MaxmindClient {
pub city_reader: Arc<Reader<Vec<u8>>>,
}
impl MaxmindClient {
/// Create a new instance of MaxmindClient
pub fn new_with_reader(city_reader: Reader<Vec<u8>>) -> Self {
Self {
city_reader: Arc::new(city_reader),
}
}
/// Create a new instance of MaxmindClient with path to city/country database
pub fn new_with_path<T: AsRef<Path>>(city_database: T) -> Result<MaxmindClient> {
let city_reader: Reader<Vec<u8>> =
Reader::open_readfile(&city_database).with_context(|| {
format!(
"Failed to find city-database from path {:?}",
city_database.as_ref()
)
})?;
Ok(MaxmindClient::new_with_reader(city_reader))
}
}

View File

@ -0,0 +1,16 @@
// Copyright 2023 Zinc Labs Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
mod client;
pub use client::MaxmindClient;

View File

@ -0,0 +1,143 @@
// Copyright 2023 Zinc Labs Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::common::infra::config::MAXMIND_DB_CLIENT;
use actix_web::{
body::MessageBody,
dev::{ServiceRequest, ServiceResponse},
web, Error as ActixErr, FromRequest, HttpMessage,
};
use actix_web_lab::middleware::Next;
use ahash::AHashMap;
use maxminddb::geoip2::city::Location;
use serde::{Deserialize, Serialize};
use std::net::IpAddr;
use uaparser::{Parser, UserAgentParser};
#[derive(Serialize, Deserialize, Clone, Debug, Default)]
pub struct GeoInfoData<'a> {
pub city: Option<&'a str>,
pub country: Option<&'a str>,
pub location: Option<Location<'a>>,
}
/// This is the custom data which is provided by `browser-sdk`
/// in form of query-parameters.
/// NOTE: the only condition is that the prefix of such params is `oo`.
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct RumExtraData {
pub data: AHashMap<String, serde_json::Value>,
}
impl RumExtraData {
pub async fn extractor(
req: ServiceRequest,
next: Next<impl MessageBody>,
) -> Result<ServiceResponse<impl MessageBody>, ActixErr> {
let maxminddb_client = MAXMIND_DB_CLIENT.read().await;
let mut data =
web::Query::<AHashMap<String, String>>::from_query(req.query_string()).unwrap();
data.retain(|k, _| {
(k.starts_with("oo") || k.starts_with("batch_time")) && !k.eq("oo-api-key")
});
// These are the tags which come in `ootags`
let tags: AHashMap<String, serde_json::Value> = match data.get("ootags") {
Some(tags) => tags
.split(',')
.map(|tag| {
let key_val: Vec<_> = tag.split(':').collect();
(key_val[0].to_string(), key_val[1].into())
})
.collect(),
None => AHashMap::default(),
};
let mut user_agent_hashmap: AHashMap<String, serde_json::Value> = data
.into_inner()
.into_iter()
.map(|(key, val)| (key, val.into()))
.collect();
// Now extend the existing hashmap with tags.
user_agent_hashmap.extend(tags);
{
let headers = req.headers();
let conn_info = req.connection_info();
let ip_address = match headers.contains_key("X-Forwarded-For")
|| headers.contains_key("Forwarded")
{
true => conn_info.realip_remote_addr().unwrap(),
false => conn_info.peer_addr().unwrap(),
};
user_agent_hashmap.insert("ip".into(), ip_address.into());
let ip: IpAddr = ip_address.parse().unwrap();
let geo_info = if let Some(client) = &(*maxminddb_client) {
if let Ok(city_info) = client.city_reader.lookup::<maxminddb::geoip2::City>(ip) {
let country = city_info
.country
.and_then(|c| c.names.and_then(|map| map.get("en").copied()));
let city = city_info
.city
.and_then(|c| c.names.and_then(|map| map.get("en").copied()));
GeoInfoData {
city,
country,
location: city_info.location,
}
} else {
GeoInfoData::default()
}
} else {
GeoInfoData::default()
};
user_agent_hashmap.insert(
"geo_info".into(),
serde_json::to_value(geo_info).unwrap_or_default(),
);
}
// User-agent parsing
{
let user_agent = req
.headers()
.get("User-Agent")
.map(|v| v.to_str().unwrap_or(""))
.unwrap_or_default();
let ua_parser = web::Data::<UserAgentParser>::extract(req.request())
.await
.unwrap();
let parsed_user_agent = ua_parser.parse(user_agent);
user_agent_hashmap.insert(
"user_agent".into(),
serde_json::to_value(parsed_user_agent).unwrap_or_default(),
);
}
let rum_extracted_data = RumExtraData {
data: user_agent_hashmap,
};
req.extensions_mut().insert(rum_extracted_data);
next.call(req).await
}
}

View File

@ -21,7 +21,9 @@ pub mod dashboards;
pub mod functions;
pub mod http;
pub mod ingestion;
pub mod maxmind;
pub mod meta_store;
pub mod middleware_data;
pub mod organization;
pub mod prom;
pub mod search;

View File

@ -60,6 +60,13 @@ pub struct OrgSummary {
pub alerts: Vec<Alert>,
}
/// A container for passcodes and rumtokens
#[derive(Serialize, ToSchema)]
pub enum IngestionTokensContainer {
Passcode(IngestionPasscode),
RumToken(RumIngestionToken),
}
#[derive(Serialize, ToSchema)]
pub struct IngestionPasscode {
pub passcode: String,
@ -70,3 +77,14 @@ pub struct IngestionPasscode {
pub struct PasscodeResponse {
pub data: IngestionPasscode,
}
#[derive(Serialize, ToSchema)]
pub struct RumIngestionToken {
pub user: String,
pub rum_token: Option<String>,
}
#[derive(Serialize, ToSchema)]
pub struct RumIngestionResponse {
pub data: RumIngestionToken,
}

View File

@ -35,6 +35,7 @@ impl UserRequest {
salt: String,
org: String,
token: String,
rum_token: String,
) -> DBUser {
DBUser {
email: self.email.clone(),
@ -45,6 +46,7 @@ impl UserRequest {
organizations: vec![UserOrg {
name: org,
token,
rum_token: Some(rum_token),
role: self.role.clone(),
}],
}
@ -85,6 +87,7 @@ impl DBUser {
role: org.role.clone(),
org: org.name.clone(),
token: org.token.clone(),
rum_token: org.rum_token.clone(),
salt: local.salt,
})
}
@ -103,6 +106,7 @@ impl DBUser {
role: org.role,
org: org.name,
token: org.token,
rum_token: org.rum_token,
salt: self.salt.clone(),
})
}
@ -122,6 +126,8 @@ pub struct User {
pub salt: String,
#[serde(default)]
pub token: String,
#[serde(default)]
pub rum_token: Option<String>,
pub role: UserRole,
pub org: String,
}
@ -132,6 +138,8 @@ pub struct UserOrg {
#[serde(default)]
pub token: String,
#[serde(default)]
pub rum_token: Option<String>,
#[serde(default)]
pub role: UserRole,
}

View File

@ -103,6 +103,7 @@ mod tests {
first_name: "root".to_owned(),
last_name: "".to_owned(),
token: "token".to_string(),
rum_token: Some("rum_token".to_string()),
org: "dummy".to_owned(),
},
);
@ -130,6 +131,7 @@ mod tests {
first_name: "root".to_owned(),
last_name: "".to_owned(),
token: "token".to_string(),
rum_token: Some("rum_token".to_string()),
org: "dummy".to_owned(),
},
);
@ -155,6 +157,7 @@ mod tests {
first_name: "root".to_owned(),
last_name: "".to_owned(),
token: "token".to_string(),
rum_token: Some("rum_token".to_string()),
org: "dummy".to_owned(),
},
);

View File

@ -12,15 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use actix_web::{
dev::ServiceRequest,
error::{ErrorForbidden, ErrorUnauthorized},
http::header,
http::Method,
web, Error,
};
use actix_web_httpauth::extractors::basic::BasicAuth;
use crate::common::infra::config::CONFIG;
use crate::common::meta::ingestion::INGESTION_EP;
use crate::common::meta::user::UserRole;
@ -29,6 +20,14 @@ use crate::common::utils::{
base64,
};
use crate::service::{db, users};
use actix_web::{
dev::ServiceRequest,
error::{ErrorForbidden, ErrorUnauthorized},
http::header,
http::Method,
web, Error,
};
use actix_web_httpauth::extractors::basic::BasicAuth;
pub async fn validator(
req: ServiceRequest,
@ -68,6 +67,19 @@ pub async fn validator(
}
}
/// `validate_token` validates the endpoints which are token only.
/// This includes endpoints like `rum` etc.
///
/// ### Args:
/// - token: The token to validate
///
pub async fn validate_token(token: &str, org_id: &str) -> Result<bool, Error> {
match users::get_user_by_token(org_id, token).await {
Some(_user) => Ok(true),
None => Err(ErrorForbidden("Not allowed")),
}
}
pub async fn validate_credentials(
user_id: &str,
user_password: &str,
@ -224,6 +236,45 @@ pub async fn validator_gcp(
}
}
pub async fn validator_rum(
req: ServiceRequest,
_credentials: Option<BasicAuth>,
) -> Result<ServiceRequest, (Error, ServiceRequest)> {
let path = req
.request()
.path()
.strip_prefix(format!("{}/rum/v1/", CONFIG.common.base_uri).as_str())
.unwrap_or(req.request().path());
// After this previous path clean we should get only the
// remaining `org_id/rum` or `org_id/replay` or `org_id/logs`
let org_id_end_point: Vec<&str> = path.split('/').collect();
if org_id_end_point.len() != 2 {
return Err((
ErrorUnauthorized("Unauthorized Access. Please pass a valid org_id."),
req,
));
}
let query =
web::Query::<std::collections::HashMap<String, String>>::from_query(req.query_string())
.unwrap();
match query.get("oo-api-key") {
Some(token) => match validate_token(token, org_id_end_point[0]).await {
Ok(res) => {
if res {
Ok(req)
} else {
Err((ErrorUnauthorized("Unauthorized Access"), req))
}
}
Err(err) => Err((err, req)),
},
None => Err((ErrorUnauthorized("Unauthorized Access"), req)),
}
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -21,6 +21,7 @@ pub mod logs;
pub mod metrics;
pub mod organization;
pub mod prom;
pub mod rum;
pub mod search;
pub mod status;
pub mod stream;

View File

@ -12,18 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.
use actix_web::{get, put, web, HttpResponse, Result};
use actix_web::{get, post, put, web, HttpResponse, Result};
use actix_web_httpauth::extractors::basic::BasicAuth;
use std::collections::HashSet;
use std::io::Error;
use crate::common::infra::config::{STREAM_SCHEMAS, USERS};
use crate::common::meta::organization::{
OrgDetails, OrgUser, OrganizationResponse, PasscodeResponse, CUSTOM, DEFAULT_ORG, THRESHOLD,
OrgDetails, OrgUser, OrganizationResponse, PasscodeResponse, RumIngestionResponse, CUSTOM,
DEFAULT_ORG, THRESHOLD,
};
use crate::common::utils::auth::is_root_user;
use crate::service::organization::get_passcode;
use crate::service::organization::{self, update_passcode};
use crate::service::organization::{get_passcode, get_rum_token, update_rum_token};
pub mod es;
@ -203,3 +204,93 @@ async fn update_user_passcode(
let passcode = update_passcode(org_id, user_id).await;
Ok(HttpResponse::Ok().json(PasscodeResponse { data: passcode }))
}
/** GetRumIngestToken */
#[utoipa::path(
context_path = "/api",
tag = "Organizations",
operation_id = "GetOrganizationUserRumIngestToken",
security(
("Authorization"= [])
),
params(
("org_id" = String, Path, description = "Organization name"),
),
responses(
(status = 200, description="Success", content_type = "application/json", body = RumIngestionResponse),
)
)]
#[get("/{org_id}/organizations/rumtoken")]
async fn get_user_rumtoken(
credentials: BasicAuth,
org_id: web::Path<String>,
) -> Result<HttpResponse, Error> {
let org = org_id.into_inner();
let user_id = credentials.user_id();
let mut org_id = Some(org.as_str());
if is_root_user(user_id) {
org_id = None;
}
let rumtoken = get_rum_token(org_id, user_id).await;
Ok(HttpResponse::Ok().json(RumIngestionResponse { data: rumtoken }))
}
/** UpdateRumIngestToken */
#[utoipa::path(
context_path = "/api",
tag = "Organizations",
operation_id = "UpdateOrganizationUserRumIngestToken",
security(
("Authorization"= [])
),
params(
("org_id" = String, Path, description = "Organization name"),
),
responses(
(status = 200, description="Success", content_type = "application/json", body = RumIngestionResponse),
)
)]
#[put("/{org_id}/organizations/rumtoken")]
async fn update_user_rumtoken(
credentials: BasicAuth,
org_id: web::Path<String>,
) -> Result<HttpResponse, Error> {
let org = org_id.into_inner();
let user_id = credentials.user_id();
let mut org_id = Some(org.as_str());
if is_root_user(user_id) {
org_id = None;
}
let rumtoken = update_rum_token(org_id, user_id).await;
Ok(HttpResponse::Ok().json(RumIngestionResponse { data: rumtoken }))
}
/** CreateRumIngestToken */
#[utoipa::path(
context_path = "/api",
tag = "Organizations",
operation_id = "CreateOrganizationUserRumIngestToken",
security(
("Authorization"= [])
),
params(
("org_id" = String, Path, description = "Organization name"),
),
responses(
(status = 200, description="Success", content_type = "application/json", body = RumIngestionResponse),
)
)]
#[post("/{org_id}/organizations/rumtoken")]
async fn create_user_rumtoken(
credentials: BasicAuth,
org_id: web::Path<String>,
) -> Result<HttpResponse, Error> {
let org = org_id.into_inner();
let user_id = credentials.user_id();
let mut org_id = Some(org.as_str());
if is_root_user(user_id) {
org_id = None;
}
let rumtoken = update_rum_token(org_id, user_id).await;
Ok(HttpResponse::Ok().json(RumIngestionResponse { data: rumtoken }))
}

View File

@ -0,0 +1,220 @@
// Copyright 2023 Zinc Labs Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::common::meta::middleware_data::RumExtraData;
use crate::common::{meta::http::HttpResponse as MetaHttpResponse, utils::json};
use crate::service::logs;
use actix_multipart::form::{bytes::Bytes, MultipartForm};
use actix_web::{http, post, web, HttpResponse};
use ahash::AHashMap;
use std::io::Error;
use flate2::read::ZlibDecoder;
use serde::{Deserialize, Serialize};
use std::io::prelude::*;
pub const RUM_LOG_STREAM: &str = "_rumlog";
pub const RUM_SESSION_REPLAY_STREAM: &str = "_sessionreplay";
pub const RUM_DATA_STREAM: &str = "_rumdata";
/// Multipart form data being ingested in the form of session-replay
#[derive(MultipartForm)]
pub struct SegmentEvent {
pub segment: Bytes,
pub event: Bytes,
}
#[derive(Serialize, Deserialize)]
pub struct SegmentEventSerde {
pub segment: String,
#[serde(flatten)]
pub event: Event,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Event {
#[serde(rename = "raw_segment_size")]
pub raw_segment_size: i64,
#[serde(rename = "compressed_segment_size")]
pub compressed_segment_size: i64,
pub start: i64,
pub end: i64,
#[serde(rename = "creation_reason")]
pub creation_reason: String,
#[serde(rename = "records_count")]
pub records_count: i64,
#[serde(rename = "has_full_snapshot")]
pub has_full_snapshot: bool,
#[serde(rename = "index_in_view")]
pub index_in_view: i64,
pub source: String,
pub application: Application,
pub session: Session,
pub view: View,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Application {
pub id: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct Session {
pub id: String,
}
#[derive(Default, Debug, Clone, PartialEq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct View {
pub id: String,
}
/** Rum data ingestion API */
#[utoipa::path(
context_path = "/rum",
tag = "Rum",
operation_id = "RumIngestionMulti",
security(
("Authorization"= [])
),
params(
("org_id" = String, Path, description = "Organization name"),
),
request_body(content = String, description = "Ingest data (multiple line json)", content_type = "application/json"),
responses(
(status = 200, description="Success", content_type = "application/json", body = IngestionResponse, example = json!({"code": 200,"status": [{"name": "olympics","successful": 3,"failed": 0}]})),
(status = 500, description="Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[post("/v1/{org_id}/rum")]
pub async fn data(
path: web::Path<String>,
body: web::Bytes,
thread_id: web::Data<usize>,
rum_query_data: web::ReqData<RumExtraData>,
) -> Result<HttpResponse, Error> {
let org_id: String = path.into_inner();
let extend_json = &rum_query_data.data;
ingest_multi_json(&org_id, RUM_DATA_STREAM, body, extend_json, **thread_id).await
}
/** Rum log ingestion API */
#[utoipa::path(
context_path = "/rum",
tag = "Rum",
operation_id = "LogIngestionJson",
security(
("Authorization"= [])
),
params(
("org_id" = String, Path, description = "Organization name"),
),
request_body(content = String, description = "Ingest data (json array)", content_type = "application/json", example = json!([{"Year": 1896, "City": "Athens", "Sport": "Aquatics", "Discipline": "Swimming", "Athlete": "Alfred", "Country": "HUN"},{"Year": 1896, "City": "Athens", "Sport": "Aquatics", "Discipline": "Swimming", "Athlete": "HERSCHMANN", "Country":"CHN"}])),
responses(
(status = 200, description="Success", content_type = "application/json", body = IngestionResponse, example = json!({"code": 200,"status": [{"name": "olympics","successful": 3,"failed": 0}]})),
(status = 500, description="Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[post("/v1/{org_id}/logs")]
pub async fn log(
path: web::Path<String>,
body: web::Bytes,
thread_id: web::Data<usize>,
rum_query_data: web::ReqData<RumExtraData>,
) -> Result<HttpResponse, Error> {
let org_id = path.into_inner();
let extend_json = &rum_query_data.data;
ingest_multi_json(&org_id, RUM_LOG_STREAM, body, extend_json, **thread_id).await
}
/** Rum session-replay ingestion API */
#[utoipa::path(
context_path = "/rum",
tag = "Rum",
operation_id = "ReplayIngestionJson",
security(
("Authorization"= [])
),
params(
("org_id" = String, Path, description = "Organization name"),
),
request_body(content = String, description = "Ingest data (json array)", content_type = "application/json", example = json!([{"Year": 1896, "City": "Athens", "Sport": "Aquatics", "Discipline": "Swimming", "Athlete": "Alfred", "Country": "HUN"},{"Year": 1896, "City": "Athens", "Sport": "Aquatics", "Discipline": "Swimming", "Athlete": "HERSCHMANN", "Country":"CHN"}])),
responses(
(status = 200, description="Success", content_type = "application/json", body = IngestionResponse, example = json!({"code": 200,"status": [{"name": "olympics","successful": 3,"failed": 0}]})),
(status = 500, description="Failure", content_type = "application/json", body = HttpResponse),
)
)]
#[post("/v1/{org_id}/replay")]
pub async fn sessionreplay(
path: web::Path<String>,
payload: MultipartForm<SegmentEvent>,
thread_id: web::Data<usize>,
rum_query_data: web::ReqData<RumExtraData>,
) -> Result<HttpResponse, Error> {
let org_id = path.into_inner();
let mut segment_payload = String::new();
if let Err(_e) =
ZlibDecoder::new(&payload.segment.data[..]).read_to_string(&mut segment_payload)
{
return Ok(bad_request("Failed to decompress the incoming payload"));
}
let event: Event = json::from_slice(&payload.event.data[..]).unwrap();
let ingestion_payload = SegmentEventSerde {
segment: segment_payload,
event,
};
let extend_json = &rum_query_data.data;
let body = json::to_vec(&ingestion_payload).unwrap();
ingest_multi_json(
&org_id,
RUM_SESSION_REPLAY_STREAM,
body.into(),
extend_json,
**thread_id,
)
.await
}
async fn ingest_multi_json(
org_id: &str,
stream_name: &str,
body: web::Bytes,
extend_json: &AHashMap<String, serde_json::Value>,
thread_id: usize,
) -> Result<HttpResponse, Error> {
Ok(
match logs::multi::ingest_with_keys(org_id, stream_name, body, extend_json, thread_id).await
{
Ok(v) => HttpResponse::Ok().json(v),
Err(e) => bad_request(e.to_string()),
},
)
}
fn bad_request<T>(reason: T) -> HttpResponse
where
T: Into<String>,
{
HttpResponse::BadRequest().json(MetaHttpResponse::error(
http::StatusCode::BAD_REQUEST.into(),
reason.into(),
))
}

View File

@ -0,0 +1,15 @@
// Copyright 2023 Zinc Labs Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
pub mod ingest;

View File

@ -25,13 +25,13 @@ use std::rc::Rc;
use utoipa::OpenApi;
use utoipa_swagger_ui::SwaggerUi;
use super::auth::{validator, validator_aws, validator_gcp};
use super::auth::{validator, validator_aws, validator_gcp, validator_rum};
use super::request::{
alerts::*, dashboards::folders::*, dashboards::*, enrichment_table, functions, kv, logs,
metrics, organization, prom, search, status, stream, syslog, traces, users,
metrics, organization, prom, rum, search, status, stream, syslog, traces, users,
};
use crate::common::infra::config::CONFIG;
use crate::common::{infra::config::CONFIG, meta::middleware_data::RumExtraData};
use actix_web_lab::middleware::from_fn;
pub mod openapi;
pub mod ui;
@ -172,6 +172,9 @@ pub fn get_service_routes(cfg: &mut web::ServiceConfig) {
.service(organization::org_summary)
.service(organization::get_user_passcode)
.service(organization::update_user_passcode)
.service(organization::create_user_rumtoken)
.service(organization::get_user_rumtoken)
.service(organization::update_user_rumtoken)
.service(organization::es::org_index)
.service(organization::es::org_license)
.service(organization::es::org_xpack)
@ -227,8 +230,22 @@ pub fn get_other_service_routes(cfg: &mut web::ServiceConfig) {
let gcp_auth = HttpAuthentication::with_fn(validator_gcp);
cfg.service(
web::scope("/gcp")
.wrap(cors)
.wrap(cors.clone())
.wrap(gcp_auth)
.service(logs::ingest::handle_gcp_request),
);
//NOTE: Here the order of middlewares matter. Once we consume the api-token in `rum_auth`,
//we drop it in the RumExtraData data.
//https://docs.rs/actix-web/latest/actix_web/middleware/index.html#ordering
let rum_auth = HttpAuthentication::with_fn(validator_rum);
cfg.service(
web::scope("/rum")
.wrap(cors)
.wrap(from_fn(RumExtraData::extractor))
.wrap(rum_auth)
.service(rum::ingest::log)
.service(rum::ingest::sessionreplay)
.service(rum::ingest::data),
);
}

View File

@ -29,6 +29,9 @@ use crate::handler::http::request;
request::logs::ingest::handle_kinesis_request,
request::logs::ingest::multi,
request::logs::ingest::json,
request::rum::ingest::log,
request::rum::ingest::data,
request::rum::ingest::sessionreplay,
request::metrics::ingest::json,
request::dashboards::create_dashboard,
request::dashboards::update_dashboard,
@ -75,6 +78,9 @@ use crate::handler::http::request;
request::organization::org_summary,
request::organization::get_user_passcode,
request::organization::update_user_passcode,
request::organization::get_user_rumtoken,
request::organization::update_user_rumtoken,
request::organization::create_user_rumtoken,
request::kv::get,
request::kv::set,
request::kv::delete,

123
src/job/mmdb_downloader.rs Normal file
View File

@ -0,0 +1,123 @@
// Copyright 2023 Zinc Labs Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::common::infra::cluster::is_ingester;
use crate::common::infra::config::{CONFIG, MAXMIND_DB_CLIENT};
use crate::common::meta::maxmind::MaxmindClient;
use futures::stream::StreamExt;
use reqwest::Client;
use sha256::try_digest;
use std::cmp::min;
use std::path::Path;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use tokio::time;
pub async fn is_digest_different(
local_file_path: &str,
remote_sha256sum_path: &str,
) -> Result<bool, anyhow::Error> {
let response = reqwest::get(remote_sha256sum_path).await?;
let remote_file_sha = response.text().await?;
let local_file_sha = try_digest(Path::new(local_file_path)).unwrap_or_default();
Ok(remote_file_sha.trim() != local_file_sha.trim())
}
pub async fn download_file(client: &Client, url: &str, path: &str) -> Result<(), String> {
// Reqwest setup
let res = client
.get(url)
.send()
.await
.or(Err(format!("Failed to GET from '{}'", &url)))?;
let total_size = res
.content_length()
.ok_or(format!("Failed to get content length from '{}'", &url))?;
// download chunks
let mut file = File::create(path)
.await
.or(Err(format!("Failed to create file '{}'", path)))?;
let mut downloaded: u64 = 0;
let mut stream = res.bytes_stream();
while let Some(item) = stream.next().await {
let chunk = item.or(Err(format!("Error while downloading file")))?;
file.write_all(&chunk)
.await
.or(Err(format!("Error while writing to file")))?;
let new = min(downloaded + (chunk.len() as u64), total_size);
downloaded = new;
}
Ok(())
}
pub async fn run() -> Result<(), anyhow::Error> {
log::info!("spawned");
if !is_ingester(&super::cluster::LOCAL_NODE_ROLE) {
return Ok(());
}
std::fs::create_dir_all(&CONFIG.common.mmdb_data_dir)?;
// should run it every 24 hours
let mut interval = time::interval(time::Duration::from_secs(
CONFIG.common.mmdb_update_duration,
));
loop {
// send request and await response
let client = reqwest::ClientBuilder::default().build().unwrap();
let fname = format!("{}/GeoLite2-City.mmdb", &CONFIG.common.mmdb_data_dir);
let download_files = match is_digest_different(
&fname,
&CONFIG.common.mmdb_geolite_citydb_sha256_url,
)
.await
{
Ok(is_different) => is_different,
Err(e) => {
log::error!("Well something broke. {e}");
false
}
};
if download_files {
match download_file(&client, &CONFIG.common.mmdb_geolite_citydb_url, &fname).await {
Ok(()) => {
let maxminddb_client = MaxmindClient::new_with_path(fname);
let mut client = MAXMIND_DB_CLIENT.write().await;
*client = maxminddb_client.ok();
log::info!("Updated geo-json data")
}
Err(e) => log::error!("failed to download the files {}", e),
}
} else {
log::info!("No change in geo-json data")
}
interval.tick().await;
}
}
// #[cfg(test)]
// mod tests {
// use super::*;
// #[tokio::test]
// async fn test_run() {
// run().await.unwrap();
// assert!(true);
// }
// }

View File

@ -30,6 +30,7 @@ mod compact;
pub(crate) mod file_list;
pub(crate) mod files;
mod metrics;
mod mmdb_downloader;
mod prom;
mod stats;
pub(crate) mod syslog_server;
@ -62,6 +63,10 @@ pub async fn init() -> Result<(), anyhow::Error> {
.await;
}
if !CONFIG.common.mmdb_disable_download {
// Try to download the mmdb files, if its not disabled.
tokio::task::spawn(async move { mmdb_downloader::run().await });
}
// cache users
tokio::task::spawn(async move { db::user::watch().await });
db::user::cache().await.expect("user cache failed");

View File

@ -36,6 +36,7 @@ use std::{
};
use tonic::codec::CompressionEncoding;
use tracing_subscriber::{prelude::*, Registry};
use uaparser::UserAgentParser;
use openobserve::{
common::{
@ -86,6 +87,11 @@ static GLOBAL: mimalloc::MiMalloc = mimalloc::MiMalloc;
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
static USER_AGENT_REGEX_FILE: &[u8] = include_bytes!(concat!(
env!("CARGO_MANIFEST_DIR"),
"/ua_regex/regexes.yaml"
));
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
#[cfg(feature = "profiling")]
@ -184,11 +190,17 @@ async fn main() -> Result<(), anyhow::Error> {
.event("OpenObserve - Starting server", None, false)
.await;
let ua_parser = web::Data::new(
UserAgentParser::builder()
.build_from_bytes(USER_AGENT_REGEX_FILE)
.expect("User Agent Parser creation failed"),
);
let server = HttpServer::new(move || {
let local_id = thread_id.load(Ordering::SeqCst) as usize;
if CONFIG.common.feature_per_thread_lock {
thread_id.fetch_add(1, Ordering::SeqCst);
}
log::info!(
"starting HTTP server at: {}, thread_id: {}",
haddr,
@ -203,6 +215,7 @@ async fn main() -> Result<(), anyhow::Error> {
.service(router::api)
.service(router::aws)
.service(router::gcp)
.service(router::rum)
.configure(get_basic_routes),
)
} else {
@ -217,6 +230,7 @@ async fn main() -> Result<(), anyhow::Error> {
app.app_data(web::JsonConfig::default().limit(CONFIG.limit.req_json_limit))
.app_data(web::PayloadConfig::new(CONFIG.limit.req_payload_limit)) // size is in bytes
.app_data(web::Data::new(local_id))
.app_data(ua_parser.clone())
.wrap(middleware::Compress::default())
.wrap(middleware::Logger::new(
r#"%a "%r" %s %b "%{Content-Length}i" "%{Referer}i" "%{User-Agent}i" %T"#,

View File

@ -14,12 +14,14 @@
use std::sync::Arc;
use anyhow::bail;
use crate::common::{
infra::{
config::{ROOT_USER, USERS},
config::{ROOT_USER, USERS, USERS_RUM_TOKEN},
db as infra_db,
},
meta::user::{DBUser, User, UserRole},
meta::user::{DBUser, User, UserOrg, UserRole},
utils::json,
};
@ -33,7 +35,7 @@ pub async fn get(org_id: Option<&str>, name: &str) -> Result<Option<User>, anyho
return Ok(Some(user.clone()));
}
let org_id = org_id.expect("BUG");
let org_id = org_id.expect("Missing org_id");
let db = &infra_db::DEFAULT;
let key = format!("/user/{name}");
@ -42,6 +44,46 @@ pub async fn get(org_id: Option<&str>, name: &str) -> Result<Option<User>, anyho
Ok(db_user.get_user(org_id.to_string()))
}
/// Retrieve the user object given token and the requested org
pub async fn get_by_token(
org_id: Option<&str>,
token: &str,
) -> Result<Option<User>, anyhow::Error> {
let user = match org_id {
None => ROOT_USER.get("root"),
Some(org_id) => USERS_RUM_TOKEN.get(&format!("{org_id}/{token}")),
};
if let Some(user) = user {
return Ok(Some(user.clone()));
}
let org_id = org_id.expect("Missing org_id");
let db = &infra_db::DEFAULT;
let key = "/user/";
let ret = db.list_values(key).await.unwrap();
let normal_valid_user = |org: &UserOrg| {
org.name == org_id && org.rum_token.is_some() && org.rum_token.as_ref().unwrap() == token
};
let users: Vec<DBUser> = ret
.iter()
.map(|item| {
let user: DBUser = json::from_slice(item).unwrap();
user
})
.filter(|user| user.organizations.iter().any(|org| normal_valid_user(org)))
.collect();
if users.len() != 1 {
bail!("Found invalid token for the given org");
}
Ok(users[0].get_user(org_id.to_string()))
}
pub async fn get_db_user(name: &str) -> Result<DBUser, anyhow::Error> {
let db = &infra_db::DEFAULT;
let key = format!("/user/{name}");
@ -58,21 +100,30 @@ pub async fn set(user: DBUser) -> Result<(), anyhow::Error> {
infra_db::NEED_WATCH,
)
.await?;
// cache user
for org in user.organizations {
let user = User {
email: user.email.clone(),
first_name: user.first_name.clone(),
last_name: user.last_name.clone(),
password: user.password.clone(),
role: org.role,
org: org.name.clone(),
token: org.token,
rum_token: org.rum_token.clone(),
salt: user.salt.clone(),
};
USERS.insert(
format!("{}/{}", org.name, user.email),
User {
email: user.email.clone(),
first_name: user.first_name.clone(),
last_name: user.last_name.clone(),
password: user.password.clone(),
role: org.role,
org: org.name,
token: org.token,
salt: user.salt.clone(),
},
format!("{}/{}", org.name.clone(), user.email.clone()),
user.clone(),
);
if let Some(rum_token) = org.rum_token {
USERS_RUM_TOKEN
.clone()
.insert(format!("{}/{}", org.name.clone(), rum_token), user);
}
}
Ok(())
}
@ -143,7 +194,12 @@ pub async fn cache() -> Result<(), anyhow::Error> {
if user.role.eq(&UserRole::Root) {
ROOT_USER.insert("root".to_string(), user.clone());
}
USERS.insert(format!("{}/{}", user.org, user.email), user);
USERS.insert(format!("{}/{}", user.org, user.email), user.clone());
if let Some(rum_token) = &user.rum_token {
USERS_RUM_TOKEN
.clone()
.insert(format!("{}/{}", user.org, rum_token), user);
}
}
}
log::info!("Users Cached");
@ -192,6 +248,7 @@ mod tests {
role: crate::common::meta::user::UserRole::Admin,
name: org_id.clone(),
token: "Abcd".to_string(),
rum_token: Some("rumAbcd".to_string()),
}],
})
.await;

View File

@ -33,11 +33,55 @@ use crate::service::{
usage::report_request_usage_stats,
};
/// Ingest a multiline json body but add extra keys to each json row
///
/// ### Args
/// - org_id: org id to ingest data in
/// - in_stream_name: stream to write data in
/// - body: incoming payload
/// - extend_json: a hashmap of string -> string values which should be extended in each json row
/// - thread_id: a unique thread-id associated with this process
///
pub async fn ingest_with_keys(
org_id: &str,
in_stream_name: &str,
body: web::Bytes,
extend_json: &AHashMap<String, serde_json::Value>,
thread_id: usize,
) -> Result<IngestionResponse, anyhow::Error> {
ingest_inner(org_id, in_stream_name, body, extend_json, thread_id).await
}
/// Ingest a multiline json body
///
/// ### Args
/// - org_id: org id to ingest data in
/// - in_stream_name: stream to write data in
/// - body: incoming payload
/// - thread_id: a unique thread-id associated with this process
///
pub async fn ingest(
org_id: &str,
in_stream_name: &str,
body: web::Bytes,
thread_id: usize,
) -> Result<IngestionResponse, anyhow::Error> {
ingest_inner(
org_id,
in_stream_name,
body,
&AHashMap::default(),
thread_id,
)
.await
}
async fn ingest_inner(
org_id: &str,
in_stream_name: &str,
body: web::Bytes,
extend_json: &AHashMap<String, serde_json::Value>,
thread_id: usize,
) -> Result<IngestionResponse, anyhow::Error> {
let start = std::time::Instant::now();
@ -89,6 +133,10 @@ pub async fn ingest(
let mut value: json::Value = json::from_slice(line.as_bytes())?;
for (key, val) in extend_json.iter() {
value[key] = val.clone();
}
// JSON Flattening
value = flatten::flatten(&value)?;
// Start row based transform

View File

@ -15,7 +15,9 @@
use rand::distributions::{Alphanumeric, DistString};
use super::stream::get_streams;
use crate::common::meta::organization::{IngestionPasscode, OrgSummary};
use crate::common::meta::organization::{
IngestionPasscode, IngestionTokensContainer, OrgSummary, RumIngestionToken,
};
use crate::common::meta::user::UserOrg;
use crate::common::utils::auth::is_root_user;
use crate::service::db;
@ -41,8 +43,39 @@ pub async fn get_passcode(org_id: Option<&str>, user_id: &str) -> IngestionPassc
}
}
#[tracing::instrument]
pub async fn get_rum_token(org_id: Option<&str>, user_id: &str) -> RumIngestionToken {
let user = db::user::get(org_id, user_id).await.unwrap().unwrap();
RumIngestionToken {
user: user.email,
rum_token: user.rum_token,
}
}
#[tracing::instrument]
pub async fn update_rum_token(org_id: Option<&str>, user_id: &str) -> RumIngestionToken {
let is_rum_update = true;
match update_passcode_inner(org_id, user_id, is_rum_update).await {
IngestionTokensContainer::RumToken(response) => response,
_ => panic!("This shouldn't have happened, we were expecting rum token updates"),
}
}
#[tracing::instrument]
pub async fn update_passcode(org_id: Option<&str>, user_id: &str) -> IngestionPasscode {
let is_rum_update = false;
match update_passcode_inner(org_id, user_id, is_rum_update).await {
IngestionTokensContainer::Passcode(response) => response,
_ => panic!("This shouldn't have happened, we were expecting ingestion token updates"),
}
}
#[tracing::instrument]
async fn update_passcode_inner(
org_id: Option<&str>,
user_id: &str,
is_rum_update: bool,
) -> IngestionTokensContainer {
let mut local_org_id = "dummy";
let mut db_user = db::user::get_db_user(user_id).await.unwrap();
@ -50,29 +83,57 @@ pub async fn update_passcode(org_id: Option<&str>, user_id: &str) -> IngestionPa
local_org_id = org_id.unwrap();
}
let token = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
let rum_token = format!(
"rum{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
);
let updated_org = |existing_org: &UserOrg| {
if is_rum_update {
UserOrg {
rum_token: Some(rum_token.clone()),
..existing_org.clone()
}
} else {
UserOrg {
token: token.clone(),
..existing_org.clone()
}
}
};
let mut orgs = db_user.clone().organizations;
let new_orgs = if !is_root_user(user_id) {
let mut existing_org = orgs.clone();
// Find the org which we need to update
existing_org.retain(|org| org.name.eq(&local_org_id));
// Filter out the org which needs to be updated, so that we can modify and insert it back.
orgs.retain(|org| !org.name.eq(&local_org_id));
orgs.push(UserOrg {
name: local_org_id.to_string(),
token: token.clone(),
role: existing_org.first().unwrap().role.clone(),
});
let updated_org = updated_org(&existing_org[0]);
orgs.push(updated_org);
orgs
} else {
let mut existing_org = orgs.first().unwrap().clone();
existing_org.token = token.clone();
vec![existing_org]
// This is a root-user, so pick up the first/default org.
let existing_org = orgs.first().unwrap().clone();
let updated_org = updated_org(&existing_org);
vec![updated_org]
};
db_user.organizations = new_orgs;
let _ = db::user::set(db_user.clone()).await;
IngestionPasscode {
user: db_user.email,
passcode: token,
if is_rum_update {
IngestionTokensContainer::RumToken(RumIngestionToken {
user: db_user.email,
rum_token: Some(rum_token),
})
} else {
IngestionTokensContainer::Passcode(IngestionPasscode {
user: db_user.email,
passcode: token,
})
}
}

View File

@ -97,6 +97,18 @@ pub async fn gcp(
dispatch(req, payload).await
}
#[route(
"/rum/{path:.*}",
// method = "GET",
method = "POST",
)]
pub async fn rum(
req: HttpRequest,
payload: web::Payload,
) -> actix_web::Result<HttpResponse, Error> {
dispatch(req, payload).await
}
async fn dispatch(
req: HttpRequest,
payload: web::Payload,

View File

@ -21,8 +21,11 @@ use std::io::Error;
use uuid::Uuid;
use super::db;
use crate::common::meta::user::{User, UserList, UserResponse, UserRole};
use crate::common::{infra::config::USERS, meta::user::UpdateUser};
use crate::common::{
infra::config::USERS_RUM_TOKEN,
meta::user::{User, UserList, UserResponse, UserRole},
};
use crate::{
common::infra::config::ROOT_USER,
common::meta::{
@ -43,7 +46,12 @@ pub async fn post_user(org_id: &str, usr_req: UserRequest) -> Result<HttpRespons
let salt = Uuid::new_v4().to_string();
let password = get_hash(&usr_req.password, &salt);
let token = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
let user = usr_req.to_new_dbuser(password, salt, org_id.replace(' ', "_"), token);
let rum_token = format!(
"rum{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
);
let user =
usr_req.to_new_dbuser(password, salt, org_id.replace(' ', "_"), token, rum_token);
db::user::set(user).await.unwrap();
Ok(HttpResponse::Ok().json(MetaHttpResponse::message(
http::StatusCode::OK.into(),
@ -152,6 +160,7 @@ pub async fn update_user(
vec![UserOrg {
name: org_id.to_string(),
token: new_user.token,
rum_token: new_user.rum_token,
role: new_user.role,
}]
} else {
@ -159,6 +168,7 @@ pub async fn update_user(
orgs.push(UserOrg {
name: org_id.to_string(),
token: new_user.token,
rum_token: new_user.rum_token,
role: new_user.role,
});
orgs
@ -223,11 +233,16 @@ pub async fn add_user_to_org(
};
if initiating_user.role.eq(&UserRole::Root) || initiating_user.role.eq(&UserRole::Admin) {
let token = Alphanumeric.sample_string(&mut rand::thread_rng(), 16);
let rum_token = format!(
"rum{}",
Alphanumeric.sample_string(&mut rand::thread_rng(), 16)
);
let mut orgs = db_user.clone().organizations;
let new_orgs = if orgs.is_empty() {
vec![UserOrg {
name: local_org.to_string(),
token,
rum_token: Some(rum_token),
role,
}]
} else {
@ -235,6 +250,7 @@ pub async fn add_user_to_org(
orgs.push(UserOrg {
name: local_org.to_string(),
token,
rum_token: Some(rum_token),
role,
});
orgs
@ -283,6 +299,27 @@ pub async fn get_user(org_id: Option<&str>, name: &str) -> Option<User> {
}
}
pub async fn get_user_by_token(org_id: &str, token: &str) -> Option<User> {
let root_user = USERS_RUM_TOKEN.get(&format!("{DEFAULT_ORG}/{token}"));
if let Some(user) = root_user {
return Some(user.value().clone());
}
let key = format!("{org_id}/{token}");
let user = USERS_RUM_TOKEN.get(&key);
match user {
Some(loc_user) => Some(loc_user.value().clone()),
None => {
let res = db::user::get_by_token(Some(org_id), token).await;
if res.is_err() {
None
} else {
res.unwrap()
}
}
}
}
pub async fn list_users(org_id: &str) -> Result<HttpResponse, Error> {
let mut user_list: Vec<UserResponse> = vec![];
for user in USERS.iter() {
@ -385,6 +422,7 @@ mod tests {
role: crate::common::meta::user::UserRole::Admin,
salt: String::new(),
token: "token".to_string(),
rum_token: Some("rum_token".to_string()),
first_name: "admin".to_owned(),
last_name: "".to_owned(),
org: "dummy".to_string(),
@ -450,6 +488,7 @@ mod tests {
role: crate::common::meta::user::UserRole::Admin,
salt: String::new(),
token: "token".to_string(),
rum_token: Some("rum_token".to_string()),
first_name: "admin".to_owned(),
last_name: "".to_owned(),
org: "dummy".to_string(),

File diff suppressed because one or more lines are too long

5863
ua_regex/regexes.yaml Normal file

File diff suppressed because it is too large Load Diff