2014-10-01 02:41:19 +08:00
|
|
|
Delayed::Backend::Base.class_eval do
|
|
|
|
attr_writer :current_shard
|
2012-08-16 23:21:18 +08:00
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
def current_shard
|
|
|
|
@current_shard || Shard.birth
|
2012-08-16 23:21:18 +08:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
Delayed::Settings.max_attempts = 15
|
|
|
|
Delayed::Settings.queue = "canvas_queue"
|
|
|
|
Delayed::Settings.sleep_delay = ->{ Setting.get('delayed_jobs_sleep_delay', '2.0').to_f }
|
|
|
|
Delayed::Settings.sleep_delay_stagger = ->{ Setting.get('delayed_jobs_sleep_delay_stagger', '2.0').to_f }
|
|
|
|
Delayed::Settings.fetch_batch_size = ->{ Setting.get('jobs_get_next_batch_size', '5').to_i }
|
|
|
|
Delayed::Settings.select_random_from_batch = ->{ Setting.get('jobs_select_random', 'false') == 'true' }
|
|
|
|
Delayed::Settings.num_strands = ->(strand_name){ Setting.get("#{strand_name}_num_strands", nil) }
|
|
|
|
Delayed::Settings.worker_procname_prefix = ->{ "#{Shard.current(:delayed_jobs).id}~" }
|
|
|
|
Delayed::Settings.pool_procname_suffix = " (#{Canvas.revision})" if Canvas.revision
|
|
|
|
|
|
|
|
Delayed::Settings.default_job_options = ->{
|
|
|
|
{
|
|
|
|
current_shard: Shard.current,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
# load our periodic_jobs.yml (cron overrides config file)
|
|
|
|
Delayed::Periodic.add_overrides(ConfigFile.load('periodic_jobs') || {})
|
|
|
|
|
2016-02-24 01:42:01 +08:00
|
|
|
if ActiveRecord::Base.configurations[Rails.env]['queue']
|
|
|
|
ActiveSupport::Deprecation.warn("A queue section in database.yml is no longer supported. Please run migrations, then remove it.")
|
2011-02-01 09:57:29 +08:00
|
|
|
end
|
|
|
|
|
|
|
|
Delayed::Worker.on_max_failures = proc do |job, err|
|
2013-02-21 06:07:56 +08:00
|
|
|
# We don't want to keep around max_attempts failed jobs that failed because the
|
|
|
|
# underlying AR object was destroyed.
|
|
|
|
# All other failures are kept for inspection.
|
|
|
|
err.is_a?(Delayed::Backend::RecordNotFound)
|
2011-02-01 09:57:29 +08:00
|
|
|
end
|
2014-02-13 01:29:09 +08:00
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
### lifecycle callbacks
|
|
|
|
|
|
|
|
Delayed::Pool.on_fork = ->{
|
|
|
|
Canvas.reconnect_redis
|
|
|
|
}
|
|
|
|
|
2014-02-13 01:29:09 +08:00
|
|
|
Delayed::Worker.lifecycle.around(:perform) do |worker, job, &block|
|
2014-10-01 02:41:19 +08:00
|
|
|
# context for our custom logger
|
|
|
|
Thread.current[:context] = {
|
|
|
|
# these 2 keys aren't terribly well named for this, since they were intended for http requests
|
|
|
|
:request_id => job.id,
|
|
|
|
:session_id => worker.name,
|
|
|
|
}
|
|
|
|
|
2016-06-03 11:47:31 +08:00
|
|
|
live_events_ctx = {
|
2015-03-14 04:07:54 +08:00
|
|
|
:root_account_id => job.respond_to?(:global_account_id) ? job.global_account_id : nil,
|
|
|
|
:job_id => job.global_id,
|
2017-03-18 06:28:07 +08:00
|
|
|
:job_tag => job.tag,
|
|
|
|
:producer => 'canvas'
|
2016-06-03 11:47:31 +08:00
|
|
|
}
|
|
|
|
StringifyIds.recursively_stringify_ids(live_events_ctx)
|
|
|
|
LiveEvents.set_context(live_events_ctx)
|
2015-03-14 04:07:54 +08:00
|
|
|
|
2014-02-13 01:29:09 +08:00
|
|
|
starting_mem = Canvas.sample_memory()
|
|
|
|
starting_cpu = Process.times()
|
2014-11-12 02:56:59 +08:00
|
|
|
lag = ((Time.now - job.run_at) * 1000).round
|
2014-05-13 03:03:59 +08:00
|
|
|
tag = CanvasStatsd::Statsd.escape(job.tag)
|
2014-12-27 00:35:11 +08:00
|
|
|
shard_id = job.current_shard.try(:id).to_i
|
|
|
|
stats = ["delayedjob.queue", "delayedjob.queue.tag.#{tag}", "delayedjob.queue.shard.#{shard_id}"]
|
2014-05-13 03:03:59 +08:00
|
|
|
stats << "delayedjob.queue.jobshard.#{job.shard.id}" if job.respond_to?(:shard)
|
|
|
|
CanvasStatsd::Statsd.timing(stats, lag)
|
2014-02-13 01:29:09 +08:00
|
|
|
begin
|
2014-12-27 00:35:11 +08:00
|
|
|
stats = ["delayedjob.perform", "delayedjob.perform.tag.#{tag}", "delayedjob.perform.shard.#{shard_id}"]
|
2014-05-13 03:03:59 +08:00
|
|
|
stats << "delayedjob.perform.jobshard.#{job.shard.id}" if job.respond_to?(:shard)
|
|
|
|
CanvasStatsd::Statsd.time(stats) do
|
|
|
|
block.call(worker, job)
|
|
|
|
end
|
2014-02-13 01:29:09 +08:00
|
|
|
ensure
|
|
|
|
ending_cpu = Process.times()
|
|
|
|
ending_mem = Canvas.sample_memory()
|
|
|
|
user_cpu = ending_cpu.utime - starting_cpu.utime
|
|
|
|
system_cpu = ending_cpu.stime - starting_cpu.stime
|
|
|
|
|
2015-03-14 04:07:54 +08:00
|
|
|
LiveEvents.clear_context!
|
|
|
|
|
2014-02-13 01:29:09 +08:00
|
|
|
Rails.logger.info "[STAT] #{starting_mem} #{ending_mem} #{ending_mem - starting_mem} #{user_cpu} #{system_cpu}"
|
2015-10-03 05:01:40 +08:00
|
|
|
|
|
|
|
Thread.current[:context] = nil
|
2014-02-13 01:29:09 +08:00
|
|
|
end
|
|
|
|
end
|
2014-05-13 03:03:59 +08:00
|
|
|
|
|
|
|
Delayed::Worker.lifecycle.around(:pop) do |worker, &block|
|
2014-08-07 03:00:44 +08:00
|
|
|
CanvasStatsd::Statsd.time(["delayedjob.pop", "delayedjob.pop.jobshard.#{Shard.current(:delayed_jobs).id}"]) do
|
2014-05-13 03:03:59 +08:00
|
|
|
block.call(worker)
|
|
|
|
end
|
|
|
|
end
|
2014-05-03 00:35:29 +08:00
|
|
|
|
2017-02-18 06:22:38 +08:00
|
|
|
Delayed::Worker.lifecycle.around(:work_queue_pop) do |worker, config, &block|
|
2017-02-14 08:01:03 +08:00
|
|
|
CanvasStatsd::Statsd.time(["delayedjob.workqueuepop", "delayedjob.workqueuepop.jobshard.#{Shard.current(:delayed_jobs).id}"]) do
|
2017-02-18 06:22:38 +08:00
|
|
|
block.call(worker, config)
|
2017-02-14 08:01:03 +08:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2017-03-22 02:03:20 +08:00
|
|
|
Delayed::Worker.lifecycle.before(:perform) do |_job|
|
2014-05-03 00:35:29 +08:00
|
|
|
# Since AdheresToPolicy::Cache uses an instance variable class cache lets clear
|
|
|
|
# it so we start with a clean slate.
|
|
|
|
AdheresToPolicy::Cache.clear
|
2014-08-26 22:53:26 +08:00
|
|
|
LoadAccount.clear_shard_cache
|
2014-05-03 00:35:29 +08:00
|
|
|
end
|
2014-10-01 02:41:19 +08:00
|
|
|
|
2017-03-22 02:03:20 +08:00
|
|
|
Delayed::Worker.lifecycle.around(:perform) do |job, &block|
|
|
|
|
CanvasStatsd::Statsd.batch do
|
|
|
|
block.call(job)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
Delayed::Worker.lifecycle.before(:exceptional_exit) do |worker, exception|
|
2015-06-18 11:19:24 +08:00
|
|
|
info = Canvas::Errors::WorkerInfo.new(worker)
|
2015-04-05 10:39:49 +08:00
|
|
|
Canvas::Errors.capture(exception, info.to_h)
|
|
|
|
end
|
|
|
|
|
|
|
|
Delayed::Worker.lifecycle.before(:error) do |worker, job, exception|
|
|
|
|
info = Canvas::Errors::JobInfo.new(job, worker)
|
2015-08-12 00:54:37 +08:00
|
|
|
begin
|
|
|
|
(job.current_shard || Shard.default).activate do
|
|
|
|
Canvas::Errors.capture(exception, info.to_h)
|
|
|
|
end
|
|
|
|
rescue
|
|
|
|
Canvas::Errors.capture(exception, info.to_h)
|
|
|
|
end
|
2014-10-01 02:41:19 +08:00
|
|
|
end
|