clippy: box usage buffer enum

This commit is contained in:
Taiming Liu 2024-10-30 21:57:36 -07:00
parent 0ebaf21f33
commit a27421f442
1 changed files with 35 additions and 10 deletions

View File

@ -193,7 +193,12 @@ async fn publish_usage(usage: Vec<UsageData>) {
}
match USAGE_QUEUER
.enqueue(usage.into_iter().map(UsageBuffer::Usage).collect())
.enqueue(
usage
.into_iter()
.map(|item| UsageBuffer::Usage(Box::new(item)))
.collect(),
)
.await
{
Err(e) => {
@ -212,7 +217,7 @@ pub async fn publish_triggers_usage(trigger: TriggerData) {
}
match USAGE_QUEUER
.enqueue(vec![UsageBuffer::Trigger(trigger)])
.enqueue(vec![UsageBuffer::Trigger(Box::new(trigger))])
.await
{
Err(e) => {
@ -321,7 +326,12 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
// on error in ingesting usage data, push back the data
let curr_usages = curr_usages.clone();
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.enqueue(
curr_usages
.into_iter()
.map(|item| UsageBuffer::Usage(Box::new(item)))
.collect(),
)
.await
{
log::error!(
@ -337,7 +347,12 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
// on error in ingesting usage data, push back the data
let curr_usages = curr_usages.clone();
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.enqueue(
curr_usages
.into_iter()
.map(|item| UsageBuffer::Usage(Box::new(item)))
.collect(),
)
.await
{
log::error!(
@ -363,7 +378,12 @@ async fn ingest_usages(curr_usages: Vec<UsageData>) {
log::error!("Error in ingesting usage data {:?}", e);
// on error in ingesting usage data, push back the data
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Usage).collect())
.enqueue(
curr_usages
.into_iter()
.map(|item| UsageBuffer::Usage(Box::new(item)))
.collect(),
)
.await
{
log::error!("Error in pushing back un-ingested Usage data to UsageQueuer: {e}");
@ -391,7 +411,12 @@ async fn ingest_trigger_usages(curr_usages: Vec<TriggerData>) {
if let Err(e) = ingestion_service::ingest(&get_config().common.usage_org, req).await {
log::error!("Error in ingesting triggers usage data {:?}", e);
if let Err(e) = USAGE_QUEUER
.enqueue(curr_usages.into_iter().map(UsageBuffer::Trigger).collect())
.enqueue(
curr_usages
.into_iter()
.map(|item| UsageBuffer::Trigger(Box::new(item)))
.collect(),
)
.await
{
log::error!("Error in pushing back un-ingested Usage data to UsageQueuer: {e}");
@ -435,8 +460,8 @@ enum UsageMessage {
#[derive(Debug)]
enum UsageBuffer {
Usage(UsageData),
Trigger(TriggerData),
Usage(Box<UsageData>),
Trigger(Box<TriggerData>),
}
#[derive(Debug)]
@ -526,8 +551,8 @@ async fn ingest_buffered_usage(usage_buffer: Vec<UsageBuffer>) {
let (mut usage_data, mut trigger_data) = (Vec::new(), Vec::new());
for item in usage_buffer {
match item {
UsageBuffer::Usage(usage) => usage_data.push(usage),
UsageBuffer::Trigger(trigger) => trigger_data.push(trigger),
UsageBuffer::Usage(usage) => usage_data.push(*usage),
UsageBuffer::Trigger(trigger) => trigger_data.push(*trigger),
}
}
if !usage_data.is_empty() {