factorize the code between the two documents batcher

This commit is contained in:
Tamo 2021-10-25 17:08:28 +02:00 committed by marin postma
parent a9523146a3
commit e64ba122e1
No known key found for this signature in database
GPG Key ID: 6088B7721C3E39F9
1 changed files with 56 additions and 62 deletions

View File

@ -34,6 +34,19 @@ mod segment {
const SEGMENT_API_KEY: &str = "vHi89WrNDckHSQssyUJqLvIyp2QFITSC";
pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> {
request
.headers()
.get(USER_AGENT)
.map(|header| header.to_str().ok())
.flatten()
.unwrap_or("unknown")
.split(";")
.map(str::trim)
.map(ToString::to_string)
.collect()
}
pub struct SegmentAnalytics {
user: User,
opt: Opt,
@ -196,7 +209,7 @@ mod segment {
query: &SearchQuery,
request: &HttpRequest,
) {
let user_agent = SearchBatcher::extract_user_agents(request);
let user_agent = extract_user_agents(request);
let sorted = query.sort.is_some() as usize;
let sort_with_geo_point = query
.sort
@ -275,6 +288,36 @@ mod segment {
search_batcher.max_offset = search_batcher.max_offset.max(max_offset);
});
}
fn batch_documents(
&'static self,
batcher: &'static Mutex<DocumentsBatcher>,
documents_query: &UpdateDocumentsQuery,
index_creation: bool,
request: &HttpRequest,
) {
let user_agents = extract_user_agents(request);
let primary_key = documents_query.primary_key.clone();
let content_type = request
.headers()
.get(CONTENT_TYPE)
.map(|s| s.to_str().unwrap_or("unkown"))
.unwrap()
.to_string();
tokio::spawn(async move {
let mut lock = batcher.lock().await;
for user_agent in user_agents {
lock.user_agents.insert(user_agent);
}
lock.content_types.insert(content_type);
if let Some(primary_key) = primary_key {
lock.primary_keys.insert(primary_key);
}
lock.index_creation |= index_creation;
// drop the lock here
});
}
}
#[async_trait::async_trait]
@ -333,30 +376,12 @@ mod segment {
index_creation: bool,
request: &HttpRequest,
) {
let user_agents = request
.headers()
.get(USER_AGENT)
.map(|header| header.to_str().unwrap_or("unknown").to_string());
let primary_key = documents_query.primary_key.clone();
let content_type = request
.headers()
.get(CONTENT_TYPE)
.map(|s| s.to_str().unwrap_or("unkown"))
.unwrap()
.to_string();
tokio::spawn(async move {
let mut lock = self.documents_added_batcher.lock().await;
for user_agent in user_agents {
lock.user_agents.insert(user_agent);
}
lock.content_types.insert(content_type);
if let Some(primary_key) = primary_key {
lock.primary_keys.insert(primary_key);
}
lock.index_creation |= index_creation;
// drop the lock here
});
self.batch_documents(
&self.documents_added_batcher,
documents_query,
index_creation,
request,
)
}
fn update_documents(
@ -365,30 +390,12 @@ mod segment {
index_creation: bool,
request: &HttpRequest,
) {
let user_agents = request
.headers()
.get(USER_AGENT)
.map(|header| header.to_str().unwrap_or("unknown").to_string());
let primary_key = documents_query.primary_key.clone();
let content_type = request
.headers()
.get(CONTENT_TYPE)
.map(|s| s.to_str().unwrap_or("unkown"))
.unwrap()
.to_string();
tokio::spawn(async move {
let mut lock = self.documents_updated_batcher.lock().await;
for user_agent in user_agents {
lock.user_agents.insert(user_agent);
}
lock.content_types.insert(content_type);
if let Some(primary_key) = primary_key {
lock.primary_keys.insert(primary_key);
}
lock.index_creation |= index_creation;
// drop the lock here
});
self.batch_documents(
&self.documents_updated_batcher,
documents_query,
index_creation,
request,
)
}
}
@ -435,19 +442,6 @@ mod segment {
}
impl SearchBatcher {
pub fn extract_user_agents(request: &HttpRequest) -> Vec<String> {
request
.headers()
.get(USER_AGENT)
.map(|header| header.to_str().ok())
.flatten()
.unwrap_or("unknown")
.split(";")
.map(str::trim)
.map(ToString::to_string)
.collect()
}
pub fn into_event(mut self, user: &User, event_name: &str) -> Option<Track> {
if self.total_received == 0 {
None