remove periodic job double-indirection

Now that that the block passed as a periodic jobs is already running as
a delayed job, it's pointless to just turn around and schedule another
delayed job inside the block.

Also make periodic jobs default to low priority and force max_attempts =
1.

Change-Id: Ib453df3cdecca2fcaddf70ee8d2ef731e7f1d65b
Reviewed-on: https://gerrit.instructure.com/4210
Tested-by: Hudson <hudson@instructure.com>
Reviewed-by: Zach Wily <zach@instructure.com>
This commit is contained in:
Brian Palmer 2011-06-14 17:32:18 -06:00
parent 99ad305b3d
commit 3065f0e3ba
2 changed files with 25 additions and 28 deletions

View File

@ -1,17 +1,13 @@
# Defines periodic jobs run by DelayedJobs.
#
#
# Scheduling is done by rufus-scheduler
#
# Note that periodic jobs must currently only run on a single machine.
# TODO: Create some sort of coordination so that these jobs can be
# triggered from any of the jobs processing nodes.
# Nearly all of these should just create new DelayedJobs to be run immediately.
# The only exception should be with something that runs quickly and doesn't
# create any AR objects.
#
# You should only use "cron" type jobs, not "every". Cron jobs have deterministic
# times to run which help across daemon restarts.
# times to run which help across daemon restarts, and distributing periodic
# jobs across multiple job servers.
#
# Periodic jobs default to low priority. You can override this in the arguments
# passed to Delayed::Periodic.cron
if ActionController::Base.session_store == ActiveRecord::SessionStore
expire_after = (Setting.from_config("session_store") || {})[:expire_after]
@ -23,57 +19,56 @@ if ActionController::Base.session_store == ActiveRecord::SessionStore
end
Delayed::Periodic.cron 'ExternalFeedAggregator.process', '*/30 * * * *' do
ExternalFeedAggregator.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
ExternalFeedAggregator.process
end
Delayed::Periodic.cron 'SummaryMessageConsolidator.process', '*/15 * * * *' do
SummaryMessageConsolidator.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
SummaryMessageConsolidator.process
end
Delayed::Periodic.cron 'Attachment.process_scribd_conversion_statuses', '*/5 * * * *' do
Attachment.send_later_enqueue_args(:process_scribd_conversion_statuses, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
Attachment.process_scribd_conversion_statuses
end
Delayed::Periodic.cron 'Twitter processing', '*/15 * * * *' do
TwitterSearcher.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
TwitterUserPoller.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
TwitterSearcher.process
TwitterUserPoller.process
end
Delayed::Periodic.cron 'Reporting::CountsReport.process', '0 11 * * *' do
Reporting::CountsReport.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
Reporting::CountsReport.process
end
Delayed::Periodic.cron 'StreamItem.destroy_stream_items', '45 11 * * *' do
# we pass false for the touch_users argument, on the assumption that these
# stream items that we delete aren't visible on the user's dashboard anymore
# anyway, so there's no need to invalidate all the caches.
StreamItem.send_later_enqueue_args(:destroy_stream_items, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }, 4.weeks.ago, false)
StreamItem.destroy_stream_items(4.weeks.ago, false)
end
if Mailman.config.poll_interval == 0 && Mailman.config.ignore_stdin == true
Delayed::Periodic.cron 'IncomingMessageProcessor.process', '*/1 * * * *' do
IncomingMessageProcessor.send_later_enqueue_args(:process, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
IncomingMessageProcessor.process
end
end
if PageView.page_view_method == :cache
# periodically pull new page views off the cache and insert them into the db
Delayed::Periodic.cron 'PageView.process_cache_queue', '*/5 * * * *' do
PageView.send_later_enqueue_args(:process_cache_queue,
{ :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 })
PageView.process_cache_queue
end
end
Delayed::Periodic.cron 'ErrorReport.destroy_error_reports', '35 */1 * * *' do
cutoff = Setting.get('error_reports_retain_for', 3.months.to_s).to_i
if cutoff > 0
ErrorReport.send_later_enqueue_args(:destroy_error_reports, { :priority => Delayed::LOW_PRIORITY, :max_attempts => 1 }, cutoff.ago)
ErrorReport.destroy_error_reports(cutoff.ago)
end
end
if Delayed::Stats.enabled?
Delayed::Periodic.cron 'Delayed::Stats.cleanup', '0 11 * * *' do
Delayed::Stats.send_later_enqueue_args(:cleanup, :priority => Delayed::LOW_PRIORITY, :max_attempts => 1)
Delayed::Stats.cleanup
end
end

View File

@ -18,15 +18,15 @@ class Periodic
# schedule the built-in unlocking job
self.cron('Unlock Expired Jobs', '*/5 * * * *') do
Delayed::Job.send_later_enqueue_args(:unlock_expired_jobs, :max_attempts => 1)
Delayed::Job.unlock_expired_jobs
end
end
STRAND = 'periodic scheduling'
def self.cron(job_name, cron_line, &block)
def self.cron(job_name, cron_line, job_args = {}, &block)
raise ArgumentError, "job #{job_name} already scheduled!" if self.scheduled[job_name]
self.scheduled[job_name] = self.new(job_name, cron_line, block)
self.scheduled[job_name] = self.new(job_name, cron_line, job_args, block)
end
def self.audit_queue
@ -48,14 +48,15 @@ class Periodic
end
end
def initialize(name, cron_line, block)
def initialize(name, cron_line, job_args, block)
@name = name
@cron = Rufus::CronLine.new(cron_line)
@job_args = { :priority => Delayed::LOW_PRIORITY }.merge(job_args.symbolize_keys)
@block = block
end
def enqueue
Delayed::Job.enqueue(self, :max_attempts => 1, :run_at => @cron.next_time)
Delayed::Job.enqueue(self, @job_args.merge(:max_attempts => 1, :run_at => @cron.next_time))
end
def perform
@ -72,5 +73,6 @@ class Periodic
def tag
"periodic: #{@name}"
end
alias_method :display_name, :tag
end
end