diff --git a/config/initializers/periodic_jobs.rb b/config/initializers/periodic_jobs.rb index 1dab7909502..43d7578b16f 100644 --- a/config/initializers/periodic_jobs.rb +++ b/config/initializers/periodic_jobs.rb @@ -91,25 +91,13 @@ Rails.configuration.after_initialize do with_each_shard_by_database(StreamItem, :destroy_stream_items_using_setting) end - if IncomingMailProcessor::IncomingMessageProcessor.run_periodically? - Delayed::Periodic.cron 'IncomingMailProcessor::IncomingMessageProcessor#process', '*/1 * * * *' do - imp = IncomingMailProcessor::IncomingMessageProcessor.new(IncomingMail::MessageHandler.new, ErrorReport::Reporter.new) - IncomingMailProcessor::IncomingMessageProcessor.workers.times do |worker_id| - if IncomingMailProcessor::IncomingMessageProcessor.dedicated_workers_per_mailbox - # Launch one per mailbox - IncomingMailProcessor::IncomingMessageProcessor.mailbox_accounts.each do |account| - imp.send_later_enqueue_args(:process, - {singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}:#{account.address}", max_attempts: 1}, - {worker_id: worker_id, mailbox_account_address: account.address}) - end - else - # Just launch the one - imp.send_later_enqueue_args(:process, - {singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}", max_attempts: 1}, - {worker_id: worker_id}) - end - end - end + Delayed::Periodic.cron 'IncomingMailProcessor::IncomingMessageProcessor#process', '*/1 * * * *' do + DatabaseServer.send_in_each_region( + IncomingMailProcessor::IncomingMessageProcessor, + :queue_processors, + { run_current_region_asynchronously: true, + singleton: 'IncomingMailProcessor::IncomingMessageProcessor.queue_processors' } + ) end Delayed::Periodic.cron 'IncomingMailProcessor::Instrumentation#process', '*/5 * * * *' do diff --git a/gems/incoming_mail_processor/lib/incoming_mail_processor/incoming_message_processor.rb b/gems/incoming_mail_processor/lib/incoming_mail_processor/incoming_message_processor.rb index 8da35bcc6c6..2fcb6acf3c6 100644 --- a/gems/incoming_mail_processor/lib/incoming_mail_processor/incoming_message_processor.rb +++ b/gems/incoming_mail_processor/lib/incoming_mail_processor/incoming_message_processor.rb @@ -43,6 +43,27 @@ module IncomingMailProcessor @error_reporter = error_reporter end + def self.queue_processors + if self.run_periodically? + imp = self.new(IncomingMail::MessageHandler.new, ErrorReport::Reporter.new) + self.workers.times do |worker_id| + if self.dedicated_workers_per_mailbox + # Launch one per mailbox + self.mailbox_accounts.each do |account| + imp.send_later_enqueue_args(:process, + {singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}:#{account.address}", max_attempts: 1}, + {worker_id: worker_id, mailbox_account_address: account.address}) + end + else + # Just launch the one + imp.send_later_enqueue_args(:process, + {singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}", max_attempts: 1}, + {worker_id: worker_id}) + end + end + end + end + # See config/incoming_mail.yml.example for documentation on how to configure incoming mail def self.configure(config) configure_settings(config.except(*mailbox_keys))