queue imp on a per-region basis

Change-Id: Ia1c3b06f8adffb822415b672912b873dd748197f
Reviewed-on: https://gerrit.instructure.com/158589
Reviewed-by: Steven Burnett <sburnett@instructure.com>
Tested-by: Jenkins
Product-Review: James Williams  <jamesw@instructure.com>
QA-Review: James Williams  <jamesw@instructure.com>
This commit is contained in:
James Williams 2018-07-25 07:25:48 -06:00
parent 08fbba2118
commit 6109d6e72e
2 changed files with 28 additions and 19 deletions

View File

@ -91,25 +91,13 @@ Rails.configuration.after_initialize do
with_each_shard_by_database(StreamItem, :destroy_stream_items_using_setting)
end
if IncomingMailProcessor::IncomingMessageProcessor.run_periodically?
Delayed::Periodic.cron 'IncomingMailProcessor::IncomingMessageProcessor#process', '*/1 * * * *' do
imp = IncomingMailProcessor::IncomingMessageProcessor.new(IncomingMail::MessageHandler.new, ErrorReport::Reporter.new)
IncomingMailProcessor::IncomingMessageProcessor.workers.times do |worker_id|
if IncomingMailProcessor::IncomingMessageProcessor.dedicated_workers_per_mailbox
# Launch one per mailbox
IncomingMailProcessor::IncomingMessageProcessor.mailbox_accounts.each do |account|
imp.send_later_enqueue_args(:process,
{singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}:#{account.address}", max_attempts: 1},
{worker_id: worker_id, mailbox_account_address: account.address})
end
else
# Just launch the one
imp.send_later_enqueue_args(:process,
{singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}", max_attempts: 1},
{worker_id: worker_id})
end
end
end
DatabaseServer.send_in_each_region(
IncomingMailProcessor::IncomingMessageProcessor,
:queue_processors,
{ run_current_region_asynchronously: true,
singleton: 'IncomingMailProcessor::IncomingMessageProcessor.queue_processors' }
)
end
Delayed::Periodic.cron 'IncomingMailProcessor::Instrumentation#process', '*/5 * * * *' do

View File

@ -43,6 +43,27 @@ module IncomingMailProcessor
@error_reporter = error_reporter
end
def self.queue_processors
if self.run_periodically?
imp = self.new(IncomingMail::MessageHandler.new, ErrorReport::Reporter.new)
self.workers.times do |worker_id|
if self.dedicated_workers_per_mailbox
# Launch one per mailbox
self.mailbox_accounts.each do |account|
imp.send_later_enqueue_args(:process,
{singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}:#{account.address}", max_attempts: 1},
{worker_id: worker_id, mailbox_account_address: account.address})
end
else
# Just launch the one
imp.send_later_enqueue_args(:process,
{singleton: "IncomingMailProcessor::IncomingMessageProcessor#process:#{worker_id}", max_attempts: 1},
{worker_id: worker_id})
end
end
end
end
# See config/incoming_mail.yml.example for documentation on how to configure incoming mail
def self.configure(config)
configure_settings(config.except(*mailbox_keys))