rewrite the main analytics module and the information sent in the tick

This commit is contained in:
Tamo 2021-10-12 11:22:44 +02:00 committed by marin postma
parent b227666271
commit e226b1a87f
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
4 changed files with 163 additions and 114 deletions

15
Cargo.lock generated
View File

@ -1658,6 +1658,7 @@ dependencies = [
"regex",
"reqwest",
"rustls",
"segment",
"serde",
"serde_json",
"serde_url_params",
@ -1677,7 +1678,6 @@ dependencies = [
"uuid",
"vergen",
"walkdir",
"whoami",
"zip",
]
@ -2540,6 +2540,19 @@ dependencies = [
"untrusted",
]
[[package]]
name = "segment"
version = "0.1.1"
source = "git+https://github.com/meilisearch/segment#656b91e1f7a2c6443e2a8ed59f8942400e9a811e"
dependencies = [
"async-trait",
"chrono",
"reqwest",
"serde",
"serde_json",
"thiserror",
]
[[package]]
name = "semver"
version = "0.9.0"

View File

@ -55,6 +55,7 @@ rand = "0.8.4"
rayon = "1.5.1"
regex = "1.5.4"
rustls = "0.19.1"
segment = { git = "https://github.com/meilisearch/segment", optional = true }
serde = { version = "1.0.130", features = ["derive"] }
serde_json = { version = "1.0.67", features = ["preserve_order"] }
sha2 = "0.9.6"
@ -69,8 +70,6 @@ uuid = { version = "0.8.2", features = ["serde"] }
walkdir = "2.3.2"
obkv = "0.2.0"
pin-project = "1.0.8"
whoami = { version = "1.1.3", optional = true }
reqwest = { version = "0.11.4", features = ["json", "rustls-tls"], default-features = false, optional = true }
sysinfo = "0.20.2"
tokio-stream = "0.1.7"
@ -91,7 +90,7 @@ mini-dashboard = [
"tempfile",
"zip",
]
analytics = ["whoami", "reqwest"]
analytics = ["segment"]
default = ["analytics", "mini-dashboard"]
[target.'cfg(target_os = "linux")'.dependencies]

View File

@ -1,126 +1,164 @@
use std::hash::{Hash, Hasher};
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
use log::debug;
use meilisearch_lib::index_controller::Stats;
use meilisearch_lib::MeiliSearch;
use serde::Serialize;
use siphasher::sip::SipHasher;
use once_cell::sync::Lazy;
use segment::message::{Identify, Track, User};
use segment::{AutoBatcher, Batcher, HttpClient};
use serde_json::{json, Value};
use std::fmt::Display;
use std::time::{Duration, Instant};
use sysinfo::DiskExt;
use sysinfo::ProcessorExt;
use sysinfo::System;
use sysinfo::SystemExt;
use tokio::sync::Mutex;
use uuid::Uuid;
use crate::Opt;
const AMPLITUDE_API_KEY: &str = "f7fba398780e06d8fe6666a9be7e3d47";
const SEGMENT_API_KEY: &str = "vHi89WrNDckHSQssyUJqLvIyp2QFITSC";
#[derive(Debug, Serialize)]
struct EventProperties {
database_size: u64,
last_update_timestamp: Option<i64>, //timestamp
number_of_documents: Vec<u64>,
pub struct Analytics {
user: User,
opt: Opt,
batcher: Mutex<AutoBatcher>,
}
impl EventProperties {
async fn from(data: MeiliSearch) -> anyhow::Result<EventProperties> {
let stats = data.get_all_stats().await?;
impl Analytics {
pub fn publish(&'static self, event_name: String, send: Value) {
tokio::spawn(async move {
let _ = self
.batcher
.lock()
.await
.push(Track {
user: self.user.clone(),
event: event_name.clone(),
properties: send,
..Default::default()
})
.await;
println!("ANALYTICS: {} added to batch", event_name)
});
}
let database_size = stats.database_size;
let last_update_timestamp = stats.last_update.map(|u| u.timestamp());
pub fn tick(&'static self, meilisearch: MeiliSearch) {
tokio::spawn(async move {
loop {
tokio::time::sleep(Duration::from_secs(60)).await; // 1 minutes
println!("ANALYTICS: should do things");
if let Ok(stats) = meilisearch.get_all_stats().await {
let traits = Self::compute_traits(&self.opt, stats);
let user = self.user.clone();
println!("ANALYTICS: Pushing our identify tick");
let _ = self
.batcher
.lock()
.await
.push(Identify {
user,
traits,
..Default::default()
})
.await;
}
println!("ANALYTICS: Pushing our batch");
let _ = self.batcher.lock().await.flush().await;
}
});
}
}
impl Analytics {
pub async fn new(opt: &Opt, meilisearch: &MeiliSearch) -> &'static Self {
let user_id = std::fs::read_to_string(opt.db_path.join("user-id"));
let first_time_run = user_id.is_err();
let user_id = user_id.unwrap_or(Uuid::new_v4().to_string());
let _ = std::fs::write(opt.db_path.join("user-id"), user_id.as_bytes());
let client = HttpClient::default();
let user = User::UserId {
user_id: user_id.clone(),
};
let batcher = Batcher::new(None);
let batcher = Mutex::new(AutoBatcher::new(
client,
batcher,
SEGMENT_API_KEY.to_string(),
));
let segment = Box::new(Self {
user,
opt: opt.clone(),
batcher,
});
let segment = Box::leak(segment);
// send an identify event
let _ = segment
.batcher
.lock()
.await
.push(Identify {
user: segment.user.clone(),
// TODO: TAMO: what should we do when meilisearch is broken at start
traits: Self::compute_traits(
&segment.opt,
meilisearch.get_all_stats().await.unwrap(),
),
..Default::default()
})
.await;
println!("ANALYTICS: pushed the identify event");
// send the associated track event
if first_time_run {
segment.publish("Launched for the first time".to_string(), json!({}));
}
// start the runtime tick
segment.tick(meilisearch.clone());
segment
}
fn compute_traits(opt: &Opt, stats: Stats) -> Value {
static FIRST_START_TIMESTAMP: Lazy<Instant> = Lazy::new(|| Instant::now());
static SYSTEM: Lazy<Value> = Lazy::new(|| {
let mut sys = System::new_all();
sys.refresh_all();
json!({
"distribution": sys.name().zip(sys.kernel_version()).map(|(name, version)| format!("{}: {}", name, version)),
"core_number": sys.processors().len(),
"ram_size": sys.total_memory(),
"frequency": sys.processors().iter().map(|cpu| cpu.frequency()).sum::<u64>() / sys.processors().len() as u64,
"disk_size": sys.disks().iter().map(|disk| disk.available_space()).max(),
"server_provider": std::env::var("MEILI_SERVER_PROVIDER").ok(),
})
});
let number_of_documents = stats
.indexes
.values()
.map(|index| index.number_of_documents)
.collect();
.collect::<Vec<u64>>();
Ok(EventProperties {
database_size,
last_update_timestamp,
number_of_documents,
json!({
"system": *SYSTEM,
"stats": {
"database_size": stats.database_size,
"indexes_number": stats.indexes.len(),
"documents_number": number_of_documents,
},
"infos": {
"version": env!("CARGO_PKG_VERSION").to_string(),
"env": opt.env.clone(),
"snapshot": opt.schedule_snapshot,
"start_since_days": FIRST_START_TIMESTAMP.elapsed().as_secs() / 60 * 60 * 24, // one day
},
})
}
}
#[derive(Debug, Serialize)]
struct UserProperties<'a> {
env: &'a str,
start_since_days: u64,
user_email: Option<String>,
server_provider: Option<String>,
}
#[derive(Debug, Serialize)]
struct Event<'a> {
user_id: &'a str,
event_type: &'a str,
device_id: &'a str,
time: u64,
app_version: &'a str,
user_properties: UserProperties<'a>,
event_properties: Option<EventProperties>,
}
#[derive(Debug, Serialize)]
struct AmplitudeRequest<'a> {
api_key: &'a str,
events: Vec<Event<'a>>,
}
pub async fn analytics_sender(data: MeiliSearch, opt: Opt) {
let username = whoami::username();
let hostname = whoami::hostname();
let platform = whoami::platform();
let uid = username + &hostname + &platform.to_string();
let mut hasher = SipHasher::new();
uid.hash(&mut hasher);
let hash = hasher.finish();
let uid = format!("{:X}", hash);
let platform = platform.to_string();
let first_start = Instant::now();
loop {
let n = SystemTime::now().duration_since(UNIX_EPOCH).unwrap();
let user_id = &uid;
let device_id = &platform;
let time = n.as_secs();
let event_type = "runtime_tick";
let elapsed_since_start = first_start.elapsed().as_secs() / 86_400; // One day
let event_properties = EventProperties::from(data.clone()).await.ok();
let app_version = env!("CARGO_PKG_VERSION").to_string();
let app_version = app_version.as_str();
let user_email = std::env::var("MEILI_USER_EMAIL").ok();
let server_provider = std::env::var("MEILI_SERVER_PROVIDER").ok();
let user_properties = UserProperties {
env: &opt.env,
start_since_days: elapsed_since_start,
user_email,
server_provider,
};
let event = Event {
user_id,
event_type,
device_id,
time,
app_version,
user_properties,
event_properties,
};
let request = AmplitudeRequest {
api_key: AMPLITUDE_API_KEY,
events: vec![event],
};
let response = reqwest::Client::new()
.post("https://api2.amplitude.com/2/httpapi")
.timeout(Duration::from_secs(60)) // 1 minute max
.json(&request)
.send()
.await;
if let Err(e) = response {
debug!("Unsuccessful call to Amplitude: {}", e);
}
tokio::time::sleep(Duration::from_secs(3600)).await;
impl Display for Analytics {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self.user)
}
}

View File

@ -48,9 +48,8 @@ async fn main() -> anyhow::Result<()> {
#[cfg(all(not(debug_assertions), feature = "analytics"))]
if !opt.no_analytics {
let analytics_data = meilisearch.clone();
let analytics_opt = opt.clone();
tokio::task::spawn(analytics::analytics_sender(analytics_data, analytics_opt));
let analytics = analytics::Analytics::new(&opt, &meilisearch).await;
println!("go my analytics back");
}
print_launch_resume(&opt);