4410: Implement the experimental log mode cli flag and log level updates at runtime r=dureuill a=irevoire

# Pull Request
This PR fixes two issues at once because they’re highly correlated in the codebase.

## Related issue
Fixes https://github.com/meilisearch/meilisearch/issues/4415
Fixes https://github.com/meilisearch/meilisearch/issues/4413

## What does this PR do?
- It makes the fmt logger configurable to output json or human-readable logs (like we already do today)
- It moves the fmt logger under a `reload` layer so we can update its targets at runtime
- Add the possibility to stream logs in the json mode
- Adds an analytics for the new CLI flag

Co-authored-by: Tamo <tamo@meilisearch.com>
This commit is contained in:
meili-bors[bot] 2024-02-15 10:01:06 +00:00 committed by GitHub
commit 88c6165e20
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 202 additions and 35 deletions

13
Cargo.lock generated
View File

@ -5197,6 +5197,16 @@ dependencies = [
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc6b213177105856957181934e4920de57730fc69bf42c37ee5bb664d406d9e1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.18"
@ -5204,11 +5214,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad0f048c97dbd9faa9b7df56362b8ebcaa52adb06b498c050d2f4e32f90a7a8b"
dependencies = [
"nu-ansi-term",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]

View File

@ -48,7 +48,7 @@ impl RoFeatures {
Ok(())
} else {
Err(FeatureNotEnabledError {
disabled_action: "getting logs through the `/logs/stream` route",
disabled_action: "Modifying logs through the `/logs/*` routes",
feature: "logs route",
issue_link: "https://github.com/orgs/meilisearch/discussions/721",
}

View File

@ -104,7 +104,7 @@ serde_urlencoded = "0.7.1"
termcolor = "1.4.1"
url = { version = "2.5.0", features = ["serde"] }
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
tracing-subscriber = { version = "0.3.18", features = ["json"] }
tracing-trace = { version = "0.1.0", path = "../tracing-trace" }
tracing-actix-web = "0.7.9"

View File

@ -28,7 +28,9 @@ use super::{
config_user_id_path, DocumentDeletionKind, DocumentFetchKind, MEILISEARCH_CONFIG_PATH,
};
use crate::analytics::Analytics;
use crate::option::{default_http_addr, IndexerOpts, MaxMemory, MaxThreads, ScheduleSnapshot};
use crate::option::{
default_http_addr, IndexerOpts, LogMode, MaxMemory, MaxThreads, ScheduleSnapshot,
};
use crate::routes::indexes::documents::UpdateDocumentsQuery;
use crate::routes::indexes::facet_search::FacetSearchQuery;
use crate::routes::tasks::TasksFilterQuery;
@ -250,6 +252,7 @@ impl super::Analytics for SegmentAnalytics {
struct Infos {
env: String,
experimental_enable_metrics: bool,
experimental_logs_mode: LogMode,
experimental_enable_logs_route: bool,
experimental_reduce_indexing_memory_usage: bool,
experimental_max_number_of_batched_tasks: usize,
@ -288,6 +291,7 @@ impl From<Opt> for Infos {
let Opt {
db_path,
experimental_enable_metrics,
experimental_logs_mode,
experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage,
experimental_max_number_of_batched_tasks,
@ -335,6 +339,7 @@ impl From<Opt> for Infos {
Self {
env,
experimental_enable_metrics,
experimental_logs_mode,
experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage,
db_path: db_path != PathBuf::from("./data.ms"),

View File

@ -97,11 +97,25 @@ pub type LogRouteType = tracing_subscriber::filter::Filtered<
tracing_subscriber::Registry,
>;
pub type SubscriberForSecondLayer = tracing_subscriber::layer::Layered<
tracing_subscriber::reload::Layer<LogRouteType, tracing_subscriber::Registry>,
tracing_subscriber::Registry,
>;
pub type LogStderrHandle =
tracing_subscriber::reload::Handle<LogStderrType, SubscriberForSecondLayer>;
pub type LogStderrType = tracing_subscriber::filter::Filtered<
Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>,
Targets,
SubscriberForSecondLayer,
>;
pub fn create_app(
index_scheduler: Data<IndexScheduler>,
auth_controller: Data<AuthController>,
opt: Opt,
logs: LogRouteHandle,
logs: (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
enable_dashboard: bool,
) -> actix_web::App<
@ -444,7 +458,7 @@ pub fn configure_data(
index_scheduler: Data<IndexScheduler>,
auth: Data<AuthController>,
opt: &Opt,
logs: LogRouteHandle,
(logs_route, logs_stderr): (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
) {
let http_payload_size_limit = opt.http_payload_size_limit.get_bytes() as usize;
@ -452,7 +466,8 @@ pub fn configure_data(
.app_data(index_scheduler)
.app_data(auth)
.app_data(web::Data::from(analytics))
.app_data(web::Data::new(logs))
.app_data(web::Data::new(logs_route))
.app_data(web::Data::new(logs_stderr))
.app_data(
web::JsonConfig::default()
.limit(http_payload_size_limit)

View File

@ -10,8 +10,10 @@ use actix_web::HttpServer;
use index_scheduler::IndexScheduler;
use is_terminal::IsTerminal;
use meilisearch::analytics::Analytics;
use meilisearch::option::LogMode;
use meilisearch::{
analytics, create_app, prototype_name, setup_meilisearch, LogRouteHandle, LogRouteType, Opt,
analytics, create_app, prototype_name, setup_meilisearch, LogRouteHandle, LogRouteType,
LogStderrHandle, LogStderrType, Opt, SubscriberForSecondLayer,
};
use meilisearch_auth::{generate_master_key, AuthController, MASTER_KEY_MIN_SIZE};
use mimalloc::MiMalloc;
@ -23,28 +25,43 @@ use tracing_subscriber::Layer;
#[global_allocator]
static ALLOC: MiMalloc = MiMalloc;
fn default_layer() -> LogRouteType {
fn default_log_route_layer() -> LogRouteType {
None.with_filter(tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF))
}
fn default_log_stderr_layer(opt: &Opt) -> LogStderrType {
let layer = tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
let layer = match opt.experimental_logs_mode {
LogMode::Human => Box::new(layer)
as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>,
LogMode::Json => Box::new(layer.json())
as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>,
};
layer.with_filter(
tracing_subscriber::filter::Targets::new()
.with_target("", LevelFilter::from_str(&opt.log_level.to_string()).unwrap()),
)
}
/// does all the setup before meilisearch is launched
fn setup(opt: &Opt) -> anyhow::Result<LogRouteHandle> {
let (route_layer, route_layer_handle) = tracing_subscriber::reload::Layer::new(default_layer());
fn setup(opt: &Opt) -> anyhow::Result<(LogRouteHandle, LogStderrHandle)> {
let (route_layer, route_layer_handle) =
tracing_subscriber::reload::Layer::new(default_log_route_layer());
let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer;
let subscriber = tracing_subscriber::registry().with(route_layer).with(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE)
.with_filter(
tracing_subscriber::filter::LevelFilter::from_str(&opt.log_level.to_string())
.unwrap(),
),
);
let (stderr_layer, stderr_layer_handle) =
tracing_subscriber::reload::Layer::new(default_log_stderr_layer(opt));
let route_layer: tracing_subscriber::reload::Layer<_, _> = route_layer;
let subscriber = tracing_subscriber::registry().with(route_layer).with(stderr_layer);
// set the subscriber as the default for the application
tracing::subscriber::set_global_default(subscriber).unwrap();
Ok(route_layer_handle)
Ok((route_layer_handle, stderr_layer_handle))
}
fn on_panic(info: &std::panic::PanicInfo) {
@ -110,7 +127,7 @@ async fn run_http(
index_scheduler: Arc<IndexScheduler>,
auth_controller: Arc<AuthController>,
opt: Opt,
logs: LogRouteHandle,
logs: (LogRouteHandle, LogStderrHandle),
analytics: Arc<dyn Analytics>,
) -> anyhow::Result<()> {
let enable_dashboard = &opt.env == "development";

View File

@ -51,6 +51,7 @@ const MEILI_IGNORE_MISSING_DUMP: &str = "MEILI_IGNORE_MISSING_DUMP";
const MEILI_IGNORE_DUMP_IF_DB_EXISTS: &str = "MEILI_IGNORE_DUMP_IF_DB_EXISTS";
const MEILI_DUMP_DIR: &str = "MEILI_DUMP_DIR";
const MEILI_LOG_LEVEL: &str = "MEILI_LOG_LEVEL";
const MEILI_EXPERIMENTAL_LOGS_MODE: &str = "MEILI_EXPERIMENTAL_LOGS_MODE";
const MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE: &str = "MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE";
const MEILI_EXPERIMENTAL_ENABLE_METRICS: &str = "MEILI_EXPERIMENTAL_ENABLE_METRICS";
const MEILI_EXPERIMENTAL_REDUCE_INDEXING_MEMORY_USAGE: &str =
@ -79,6 +80,39 @@ const DEFAULT_LOG_EVERY_N: usize = 100_000;
pub const INDEX_SIZE: u64 = 2 * 1024 * 1024 * 1024 * 1024; // 2 TiB
pub const TASK_DB_SIZE: u64 = 20 * 1024 * 1024 * 1024; // 20 GiB
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum LogMode {
#[default]
Human,
Json,
}
impl Display for LogMode {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
LogMode::Human => Display::fmt("HUMAN", f),
LogMode::Json => Display::fmt("JSON", f),
}
}
}
impl FromStr for LogMode {
type Err = LogModeError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s.trim().to_lowercase().as_str() {
"human" => Ok(LogMode::Human),
"json" => Ok(LogMode::Json),
_ => Err(LogModeError(s.to_owned())),
}
}
}
#[derive(Debug, thiserror::Error)]
#[error("Unsupported log mode level `{0}`. Supported values are `HUMAN` and `JSON`.")]
pub struct LogModeError(String);
#[derive(Debug, Default, Clone, Copy, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
pub enum LogLevel {
@ -310,9 +344,16 @@ pub struct Opt {
#[serde(default)]
pub experimental_enable_metrics: bool,
/// Experimental logs mode feature. For more information, see: <https://github.com/orgs/meilisearch/discussions/723>
///
/// Change the mode of the logs on the console.
#[clap(long, env = MEILI_EXPERIMENTAL_LOGS_MODE, default_value_t)]
#[serde(default)]
pub experimental_logs_mode: LogMode,
/// Experimental logs route feature. For more information, see: <https://github.com/orgs/meilisearch/discussions/721>
///
/// Enables the log route on the `POST /logs/stream` endpoint and the `DELETE /logs/stream` to stop receiving logs.
/// Enables the log routes on the `POST /logs/stream`, `POST /logs/stderr` endpoints, and the `DELETE /logs/stream` to stop receiving logs.
#[clap(long, env = MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE)]
#[serde(default)]
pub experimental_enable_logs_route: bool,
@ -422,6 +463,7 @@ impl Opt {
#[cfg(feature = "analytics")]
no_analytics,
experimental_enable_metrics,
experimental_logs_mode,
experimental_enable_logs_route,
experimental_reduce_indexing_memory_usage,
} = self;
@ -479,6 +521,10 @@ impl Opt {
MEILI_EXPERIMENTAL_ENABLE_METRICS,
experimental_enable_metrics.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_LOGS_MODE,
experimental_logs_mode.to_string(),
);
export_to_env_if_not_present(
MEILI_EXPERIMENTAL_ENABLE_LOGS_ROUTE,
experimental_enable_logs_route.to_string(),

View File

@ -22,14 +22,15 @@ use crate::error::MeilisearchHttpError;
use crate::extractors::authentication::policies::*;
use crate::extractors::authentication::GuardedData;
use crate::extractors::sequential_extractor::SeqHandler;
use crate::LogRouteHandle;
use crate::{LogRouteHandle, LogStderrHandle};
pub fn configure(cfg: &mut web::ServiceConfig) {
cfg.service(
web::resource("stream")
.route(web::post().to(SeqHandler(get_logs)))
.route(web::delete().to(SeqHandler(cancel_logs))),
);
)
.service(web::resource("stderr").route(web::post().to(SeqHandler(update_stderr_target))));
}
#[derive(Debug, Default, Clone, Copy, Deserr, PartialEq, Eq)]
@ -37,6 +38,7 @@ pub fn configure(cfg: &mut web::ServiceConfig) {
pub enum LogMode {
#[default]
Human,
Json,
Profile,
}
@ -165,7 +167,18 @@ fn make_layer<
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(move || LogWriter { sender: sender.clone() })
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE);
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
let stream = byte_stream(receiver, guard);
(Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync>, Box::pin(stream))
}
LogMode::Json => {
let (sender, receiver) = tokio::sync::mpsc::unbounded_channel();
let fmt_layer = tracing_subscriber::fmt::layer()
.with_writer(move || LogWriter { sender: sender.clone() })
.json()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE);
let stream = byte_stream(receiver, guard);
(Box::new(fmt_layer) as Box<dyn Layer<S> + Send + Sync>, Box::pin(stream))
@ -279,3 +292,27 @@ pub async fn cancel_logs(
Ok(HttpResponse::NoContent().finish())
}
#[derive(Debug, Deserr)]
#[deserr(error = DeserrJsonError, rename_all = camelCase, deny_unknown_fields)]
pub struct UpdateStderrLogs {
#[deserr(default = "info".parse().unwrap(), try_from(&String) = MyTargets::from_str -> DeserrJsonError<BadRequest>)]
target: MyTargets,
}
pub async fn update_stderr_target(
index_scheduler: GuardedData<ActionPolicy<{ actions::METRICS_GET }>, Data<IndexScheduler>>,
logs: Data<LogStderrHandle>,
body: AwebJson<UpdateStderrLogs, DeserrJsonError>,
) -> Result<HttpResponse, ResponseError> {
index_scheduler.features().check_logs_route()?;
let opt = body.into_inner();
logs.modify(|layer| {
*layer.filter_mut() = opt.target.0.clone();
})
.unwrap();
Ok(HttpResponse::NoContent().finish())
}

View File

@ -9,7 +9,7 @@ use actix_web::http::StatusCode;
use byte_unit::{Byte, ByteUnit};
use clap::Parser;
use meilisearch::option::{IndexerOpts, MaxMemory, Opt};
use meilisearch::{analytics, create_app, setup_meilisearch};
use meilisearch::{analytics, create_app, setup_meilisearch, SubscriberForSecondLayer};
use once_cell::sync::Lazy;
use tempfile::TempDir;
use tokio::time::sleep;
@ -87,12 +87,20 @@ impl Server {
tracing_subscriber::reload::Layer::new(None.with_filter(
tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF),
));
let (_stderr_layer, stderr_layer_handle) = tracing_subscriber::reload::Layer::new(
(Box::new(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
)
as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>)
.with_filter(tracing_subscriber::filter::Targets::new()),
);
actix_web::test::init_service(create_app(
self.service.index_scheduler.clone().into(),
self.service.auth.clone().into(),
self.service.options.clone(),
route_layer_handle,
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.service.options),
true,
))

View File

@ -5,7 +5,7 @@ use actix_web::http::StatusCode;
use actix_web::test;
use actix_web::test::TestRequest;
use index_scheduler::IndexScheduler;
use meilisearch::{analytics, create_app, Opt};
use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer};
use meilisearch_auth::AuthController;
use tracing::level_filters::LevelFilter;
use tracing_subscriber::Layer;
@ -111,12 +111,20 @@ impl Service {
tracing_subscriber::reload::Layer::new(None.with_filter(
tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF),
));
let (_stderr_layer, stderr_layer_handle) = tracing_subscriber::reload::Layer::new(
(Box::new(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
)
as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>)
.with_filter(tracing_subscriber::filter::Targets::new()),
);
let app = test::init_service(create_app(
self.index_scheduler.clone().into(),
self.auth.clone().into(),
self.options.clone(),
route_layer_handle,
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&self.options),
true,
))

View File

@ -89,7 +89,7 @@ async fn logs_stream_bad_mode() {
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Unknown value `tamo` at `.mode`: expected one of `human`, `profile`",
"message": "Unknown value `tamo` at `.mode`: expected one of `human`, `json`, `profile`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
@ -146,7 +146,7 @@ async fn logs_stream_bad_profile_memory() {
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Unknown value `fmt` at `.mode`: expected one of `human`, `profile`",
"message": "Unknown value `fmt` at `.mode`: expected one of `human`, `json`, `profile`",
"code": "bad_request",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#bad_request"
@ -162,7 +162,7 @@ async fn logs_stream_without_enabling_the_route() {
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "getting logs through the `/logs/stream` route requires enabling the `logs route` experimental feature. See https://github.com/orgs/meilisearch/discussions/721",
"message": "Modifying logs through the `/logs/*` routes requires enabling the `logs route` experimental feature. See https://github.com/orgs/meilisearch/discussions/721",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
@ -173,7 +173,18 @@ async fn logs_stream_without_enabling_the_route() {
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "getting logs through the `/logs/stream` route requires enabling the `logs route` experimental feature. See https://github.com/orgs/meilisearch/discussions/721",
"message": "Modifying logs through the `/logs/*` routes requires enabling the `logs route` experimental feature. See https://github.com/orgs/meilisearch/discussions/721",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"
}
"###);
let (response, code) = server.service.post("/logs/stderr", json!({})).await;
snapshot!(code, @"400 Bad Request");
snapshot!(response, @r###"
{
"message": "Modifying logs through the `/logs/*` routes requires enabling the `logs route` experimental feature. See https://github.com/orgs/meilisearch/discussions/721",
"code": "feature_not_enabled",
"type": "invalid_request",
"link": "https://docs.meilisearch.com/errors#feature_not_enabled"

View File

@ -5,7 +5,7 @@ use std::str::FromStr;
use actix_web::http::header::ContentType;
use meili_snap::snapshot;
use meilisearch::{analytics, create_app, Opt};
use meilisearch::{analytics, create_app, Opt, SubscriberForSecondLayer};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::Layer;
@ -27,18 +27,25 @@ async fn basic_test_log_stream_route() {
tracing_subscriber::reload::Layer::new(None.with_filter(
tracing_subscriber::filter::Targets::new().with_target("", LevelFilter::OFF),
));
let (_stderr_layer, stderr_layer_handle) = tracing_subscriber::reload::Layer::new(
(Box::new(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::CLOSE),
) as Box<dyn tracing_subscriber::Layer<SubscriberForSecondLayer> + Send + Sync>)
.with_filter(tracing_subscriber::filter::Targets::new()),
);
let subscriber = tracing_subscriber::registry().with(route_layer).with(
tracing_subscriber::fmt::layer()
.with_span_events(tracing_subscriber::fmt::format::FmtSpan::ACTIVE)
.with_filter(tracing_subscriber::filter::LevelFilter::from_str("INFO").unwrap()),
.with_filter(tracing_subscriber::filter::LevelFilter::from_str("OFF").unwrap()),
);
let app = actix_web::test::init_service(create_app(
server.service.index_scheduler.clone().into(),
server.service.auth.clone().into(),
server.service.options.clone(),
route_layer_handle,
(route_layer_handle, stderr_layer_handle),
analytics::MockAnalytics::new(&server.service.options),
true,
))