From 3065f0e3ba809b59bdafe4d6ed07af7054cd372a Mon Sep 17 00:00:00 2001 From: Brian Palmer Date: Tue, 14 Jun 2011 17:32:18 -0600 Subject: [PATCH] remove periodic job double-indirection Now that that the block passed as a periodic jobs is already running as a delayed job, it's pointless to just turn around and schedule another delayed job inside the block. Also make periodic jobs default to low priority and force max_attempts = 1. Change-Id: Ib453df3cdecca2fcaddf70ee8d2ef731e7f1d65b Reviewed-on: https://gerrit.instructure.com/4210 Tested-by: Hudson Reviewed-by: Zach Wily --- config/periodic_jobs.rb | 41 ++++++++----------- .../delayed_job/lib/delayed/periodic.rb | 12 +++--- 2 files changed, 25 insertions(+), 28 deletions(-) diff --git a/config/periodic_jobs.rb b/config/periodic_jobs.rb index c60f20bca78..70d770f4373 100644 --- a/config/periodic_jobs.rb +++ b/config/periodic_jobs.rb @@ -1,17 +1,13 @@ # Defines periodic jobs run by DelayedJobs. -# +# # Scheduling is done by rufus-scheduler -# -# Note that periodic jobs must currently only run on a single machine. -# TODO: Create some sort of coordination so that these jobs can be -# triggered from any of the jobs processing nodes. - -# Nearly all of these should just create new DelayedJobs to be run immediately. -# The only exception should be with something that runs quickly and doesn't -# create any AR objects. - +# # You should only use "cron" type jobs, not "every". Cron jobs have deterministic -# times to run which help across daemon restarts. +# times to run which help across daemon restarts, and distributing periodic +# jobs across multiple job servers. +# +# Periodic jobs default to low priority. You can override this in the arguments +# passed to Delayed::Periodic.cron if ActionController::Base.session_store == ActiveRecord::SessionStore expire_after = (Setting.from_config("session_store") || {})[:expire_after] @@ -23,57 +19,56 @@ if ActionController::Base.session_store == ActiveRecord::SessionStore end Delayed::Periodic.cron 'ExternalFeedAggregator.process', '*/30 * * * *' do - ExternalFeedAggregator.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) + ExternalFeedAggregator.process end Delayed::Periodic.cron 'SummaryMessageConsolidator.process', '*/15 * * * *' do - SummaryMessageConsolidator.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) + SummaryMessageConsolidator.process end Delayed::Periodic.cron 'Attachment.process_scribd_conversion_statuses', '*/5 * * * *' do - Attachment.send_later_enqueue_args(:process_scribd_conversion_statuses, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) + Attachment.process_scribd_conversion_statuses end Delayed::Periodic.cron 'Twitter processing', '*/15 * * * *' do - TwitterSearcher.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) - TwitterUserPoller.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) + TwitterSearcher.process + TwitterUserPoller.process end Delayed::Periodic.cron 'Reporting::CountsReport.process', '0 11 * * *' do - Reporting::CountsReport.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) + Reporting::CountsReport.process end Delayed::Periodic.cron 'StreamItem.destroy_stream_items', '45 11 * * *' do # we pass false for the touch_users argument, on the assumption that these # stream items that we delete aren't visible on the user's dashboard anymore # anyway, so there's no need to invalidate all the caches. - StreamItem.send_later_enqueue_args(:destroy_stream_items, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }, 4.weeks.ago, false) + StreamItem.destroy_stream_items(4.weeks.ago, false) end if Mailman.config.poll_interval == 0 && Mailman.config.ignore_stdin == true Delayed::Periodic.cron 'IncomingMessageProcessor.process', '*/1 * * * *' do - IncomingMessageProcessor.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) + IncomingMessageProcessor.process end end if PageView.page_view_method == :cache # periodically pull new page views off the cache and insert them into the db Delayed::Periodic.cron 'PageView.process_cache_queue', '*/5 * * * *' do - PageView.send_later_enqueue_args(:process_cache_queue, - { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }) + PageView.process_cache_queue end end Delayed::Periodic.cron 'ErrorReport.destroy_error_reports', '35 */1 * * *' do cutoff = Setting.get('error_reports_retain_for', 3.months.to_s).to_i if cutoff > 0 - ErrorReport.send_later_enqueue_args(:destroy_error_reports, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }, cutoff.ago) + ErrorReport.destroy_error_reports(cutoff.ago) end end if Delayed::Stats.enabled? Delayed::Periodic.cron 'Delayed::Stats.cleanup', '0 11 * * *' do - Delayed::Stats.send_later_enqueue_args(:cleanup, :priority => Delayed::LOW_PRIORITY, :max_attempts => 1) + Delayed::Stats.cleanup end end diff --git a/vendor/plugins/delayed_job/lib/delayed/periodic.rb b/vendor/plugins/delayed_job/lib/delayed/periodic.rb index b649561f6a1..0daaad5da93 100644 --- a/vendor/plugins/delayed_job/lib/delayed/periodic.rb +++ b/vendor/plugins/delayed_job/lib/delayed/periodic.rb @@ -18,15 +18,15 @@ class Periodic # schedule the built-in unlocking job self.cron('Unlock Expired Jobs', '*/5 * * * *') do - Delayed::Job.send_later_enqueue_args(:unlock_expired_jobs, :max_attempts => 1) + Delayed::Job.unlock_expired_jobs end end STRAND = 'periodic scheduling' - def self.cron(job_name, cron_line, &block) + def self.cron(job_name, cron_line, job_args = {}, &block) raise ArgumentError, "job #{job_name} already scheduled!" if self.scheduled[job_name] - self.scheduled[job_name] = self.new(job_name, cron_line, block) + self.scheduled[job_name] = self.new(job_name, cron_line, job_args, block) end def self.audit_queue @@ -48,14 +48,15 @@ class Periodic end end - def initialize(name, cron_line, block) + def initialize(name, cron_line, job_args, block) @name = name @cron = Rufus::CronLine.new(cron_line) + @job_args = { :priority => Delayed::LOW_PRIORITY }.merge(job_args.symbolize_keys) @block = block end def enqueue - Delayed::Job.enqueue(self, :max_attempts => 1, :run_at => @cron.next_time) + Delayed::Job.enqueue(self, @job_args.merge(:max_attempts => 1, :run_at => @cron.next_time)) end def perform @@ -72,5 +73,6 @@ class Periodic def tag "periodic: #{@name}" end + alias_method :display_name, :tag end end