fix: remove RwLock from in-memory USAGE_QUEUER

This commit is contained in:
Taiming Liu 2024-10-30 14:52:43 -07:00
parent f81a7657e3
commit f307c9476e
1 changed files with 36 additions and 22 deletions

View File

@ -36,14 +36,13 @@ use once_cell::sync::Lazy;
use proto::cluster_rpc;
use reqwest::Client;
use tokio::{
sync::{mpsc, oneshot, RwLock},
sync::{mpsc, oneshot},
time,
};
pub mod ingestion_service;
static USAGE_QUEUER: Lazy<Arc<RwLock<UsageQueuer>>> =
Lazy::new(|| Arc::new(RwLock::new(initialize_usage_queuer())));
static USAGE_QUEUER: Lazy<Arc<UsageQueuer>> = Lazy::new(|| Arc::new(initialize_usage_queuer()));
fn initialize_usage_queuer() -> UsageQueuer {
let cfg = get_config();
@ -62,7 +61,24 @@ pub async fn run() {
if !cfg.common.usage_enabled {
return;
}
_ = USAGE_QUEUER.read().await;
// Force initialization and wait for the background task to be ready
let (ping_sender, ping_receiver) = oneshot::channel();
if let Err(e) = USAGE_QUEUER
.msg_sender
.send(UsageMessage::Ping(ping_sender))
.await
{
log::error!("Failed to initialize usage queuer: {e}");
return;
}
if let Err(e) = ping_receiver.await {
log::error!("Usage queuer initialization failed: {e}");
return;
}
log::debug!("Usage queuer initialized successfully");
}
pub async fn report_request_usage_stats(
@ -176,8 +192,7 @@ async fn publish_usage(usage: Vec<UsageData>) {
return;
}
let usage_queuer = USAGE_QUEUER.read().await;
match usage_queuer
match USAGE_QUEUER
.enqueue(usage.into_iter().map(UsageBuffer::Usage).collect())
.await
{
@ -196,8 +211,7 @@ pub async fn publish_triggers_usage(trigger: TriggerData) {
return;
}
let usage_queuer = USAGE_QUEUER.read().await;
match usage_queuer
match USAGE_QUEUER
.enqueue(vec![UsageBuffer::Trigger(trigger)])
.await
{
@ -216,9 +230,8 @@ pub async fn flush() {
flush_audit().await;
// shutdown usage_queuer
let (res_sender, res_receiver) = oneshot::channel::<()>();
let usage_queuer = USAGE_QUEUER.read().await;
if let Err(e) = usage_queuer.shutdown(res_sender).await {
let (res_sender, res_receiver) = oneshot::channel();
if let Err(e) = USAGE_QUEUER.shutdown(res_sender).await {
log::error!("Error shutting down USAGE_QUEUER: {e}");
}
// wait for flush ingestion job
@ -307,8 +320,7 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
if &cfg.common.usage_reporting_mode != "both" {
// on error in ingesting usage data, push back the data
let curr_usages = curr_usages.clone();
let usage_queuer = USAGE_QUEUER.read().await;
if let Err(e) = usage_queuer
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.await
{
@ -324,8 +336,7 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
if &cfg.common.usage_reporting_mode != "both" {
// on error in ingesting usage data, push back the data
let curr_usages = curr_usages.clone();
let usage_queuer = USAGE_QUEUER.read().await;
if let Err(e) = usage_queuer
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.await
{
@ -351,9 +362,7 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
if let Err(e) = ingestion_service::ingest(&cfg.common.usage_org, req).await {
log::error!("Error in ingesting usage data {:?}", e);
// on error in ingesting usage data, push back the data
let curr_usages = curr_usages.clone();
let usage_queuer = USAGE_QUEUER.read().await;
if let Err(e) = usage_queuer
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.await
{
@ -381,8 +390,7 @@ async fn ingest_trigger_usages(curr_usages: Vec<TriggerData>) {
};
if let Err(e) = ingestion_service::ingest(&get_config().common.usage_org, req).await {
log::error!("Error in ingesting triggers usage data {:?}", e);
let usage_queuer = USAGE_QUEUER.read().await;
if let Err(e) = usage_queuer
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Trigger).collect())
.await
{
@ -422,6 +430,7 @@ impl UsageQueuer {
enum UsageMessage {
Data(Vec<UsageBuffer>),
Shutdown(oneshot::Sender<()>),
Ping(oneshot::Sender<()>),
}
#[derive(Debug)]
@ -448,8 +457,8 @@ impl UsageReportRunner {
}
}
fn push(&mut self, mut data: Vec<UsageBuffer>) {
self.pending.append(&mut data);
fn push(&mut self, data: Vec<UsageBuffer>) {
self.pending.extend(data);
}
fn should_process(&self) -> bool {
@ -486,6 +495,7 @@ async fn ingest_usage_job(
}
}
Some(UsageMessage::Shutdown(res_sender)) => {
log::debug!("Received shutdown signal");
// process any remaining data before shutting down
if !usage_report_runner.pending.is_empty() {
let buffered = usage_report_runner.take_batch();
@ -494,6 +504,10 @@ async fn ingest_usage_job(
res_sender.send(()).ok();
break;
}
Some(UsageMessage::Ping(ping_sender)) => {
log::debug!("Received initialization ping");
ping_sender.send(()).ok();
}
None => break, // channel closed
}
}