2017-04-28 03:54:48 +08:00
|
|
|
#
|
|
|
|
# Copyright (C) 2011 - present Instructure, Inc.
|
|
|
|
#
|
|
|
|
# This file is part of Canvas.
|
|
|
|
#
|
|
|
|
# Canvas is free software: you can redistribute it and/or modify it under
|
|
|
|
# the terms of the GNU Affero General Public License as published by the Free
|
|
|
|
# Software Foundation, version 3 of the License.
|
|
|
|
#
|
|
|
|
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
|
|
|
|
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
|
|
|
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
|
|
|
|
# details.
|
|
|
|
#
|
|
|
|
# You should have received a copy of the GNU Affero General Public License along
|
|
|
|
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
2020-04-14 03:27:19 +08:00
|
|
|
require_relative './job_live_events_context'
|
2017-03-24 05:06:56 +08:00
|
|
|
Delayed::Job.include(JobLiveEventsContext)
|
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
Delayed::Backend::Base.class_eval do
|
|
|
|
attr_writer :current_shard
|
2012-08-16 23:21:18 +08:00
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
def current_shard
|
|
|
|
@current_shard || Shard.birth
|
2012-08-16 23:21:18 +08:00
|
|
|
end
|
2020-07-10 01:44:12 +08:00
|
|
|
|
|
|
|
def root_account_id
|
2020-07-18 05:35:08 +08:00
|
|
|
return nil if account.nil?
|
2020-07-10 01:44:12 +08:00
|
|
|
account.root_account_id.nil? ? account.id : account.root_account_id
|
|
|
|
end
|
|
|
|
|
|
|
|
def to_log_format
|
|
|
|
logged_attributes = [:tag, :strand, :priority, :attempts, :created_at, :max_attempts, :source, :account_id]
|
|
|
|
log_hash = attributes.with_indifferent_access.slice(*logged_attributes)
|
|
|
|
log_hash[:shard_id] = current_shard&.id
|
|
|
|
log_hash[:jobs_cluster] = "NONE"
|
|
|
|
if current_shard.respond_to?(:delayed_jobs_shard_id)
|
|
|
|
log_hash[:jobs_cluster] = current_shard&.delayed_jobs_shard&.id
|
|
|
|
end
|
|
|
|
log_hash[:db_cluster] = current_shard&.database_server&.id
|
|
|
|
log_hash[:root_account_id] = Shard.global_id_for(root_account_id)
|
|
|
|
log_hash.with_indifferent_access.to_json
|
|
|
|
end
|
2012-08-16 23:21:18 +08:00
|
|
|
end
|
|
|
|
|
2020-12-24 03:06:38 +08:00
|
|
|
Delayed::Pool.on_fork = -> {
|
|
|
|
# because it's possible to accidentally share an open http
|
|
|
|
# socket between processes shortly after fork.
|
|
|
|
Imperium::Agent.reset_default_client
|
|
|
|
Imperium::Catalog.reset_default_client
|
|
|
|
Imperium::Client.reset_default_client
|
|
|
|
Imperium::Events.reset_default_client
|
|
|
|
Imperium::KV.reset_default_client
|
2021-01-09 02:50:19 +08:00
|
|
|
# it's really important to reset the default clients
|
|
|
|
# BEFORE letting dynamic setting pull a new one.
|
|
|
|
# do not change this order.
|
|
|
|
Canvas::DynamicSettings.on_fork!
|
2020-12-24 03:06:38 +08:00
|
|
|
}
|
|
|
|
|
2018-03-09 03:23:33 +08:00
|
|
|
# if the method was defined by a previous module, use the existing
|
|
|
|
# implementation, but provide a default otherwise
|
2018-03-03 05:37:50 +08:00
|
|
|
module Delayed::Backend::DefaultJobAccount
|
|
|
|
def account
|
2018-03-09 03:23:33 +08:00
|
|
|
if defined?(super)
|
|
|
|
super
|
|
|
|
else
|
|
|
|
Account.default
|
|
|
|
end
|
2018-03-03 05:37:50 +08:00
|
|
|
end
|
|
|
|
end
|
2018-03-09 03:23:33 +08:00
|
|
|
Delayed::Backend::ActiveRecord::Job.include(Delayed::Backend::DefaultJobAccount)
|
|
|
|
Delayed::Backend::Redis::Job.include(Delayed::Backend::DefaultJobAccount)
|
2018-03-03 05:37:50 +08:00
|
|
|
|
2020-07-10 01:44:12 +08:00
|
|
|
Delayed::Settings.default_job_options = ->{ { current_shard: Shard.current }}
|
|
|
|
Delayed::Settings.fetch_batch_size = ->{ Setting.get('jobs_get_next_batch_size', '5').to_i }
|
|
|
|
Delayed::Settings.job_detailed_log_format = ->(job){ job.to_log_format }
|
|
|
|
Delayed::Settings.max_attempts = 1
|
|
|
|
Delayed::Settings.num_strands = ->(strand_name){ Setting.get("#{strand_name}_num_strands", nil) }
|
|
|
|
Delayed::Settings.pool_procname_suffix = " (#{Canvas.revision})" if Canvas.revision
|
|
|
|
Delayed::Settings.queue = "canvas_queue"
|
|
|
|
Delayed::Settings.select_random_from_batch = ->{ Setting.get('jobs_select_random', 'false') == 'true' }
|
|
|
|
Delayed::Settings.sleep_delay = ->{ Setting.get('delayed_jobs_sleep_delay', '2.0').to_f }
|
|
|
|
Delayed::Settings.sleep_delay_stagger = ->{ Setting.get('delayed_jobs_sleep_delay_stagger', '2.0').to_f }
|
|
|
|
Delayed::Settings.worker_procname_prefix = ->{ "#{Shard.current(:delayed_jobs).id}~" }
|
|
|
|
Delayed::Settings.worker_health_check_type = Delayed::CLI.instance&.config&.dig('health_check', 'type')&.to_sym || :none
|
2018-05-09 03:33:30 +08:00
|
|
|
Delayed::Settings.worker_health_check_config = Delayed::CLI.instance&.config&.[]('health_check')
|
2014-10-01 02:41:19 +08:00
|
|
|
|
|
|
|
# load our periodic_jobs.yml (cron overrides config file)
|
2020-05-06 03:49:42 +08:00
|
|
|
Delayed::Periodic.add_overrides(ConfigFile.load('periodic_jobs').dup || {})
|
2014-10-01 02:41:19 +08:00
|
|
|
|
2016-02-24 01:42:01 +08:00
|
|
|
if ActiveRecord::Base.configurations[Rails.env]['queue']
|
|
|
|
ActiveSupport::Deprecation.warn("A queue section in database.yml is no longer supported. Please run migrations, then remove it.")
|
2011-02-01 09:57:29 +08:00
|
|
|
end
|
|
|
|
|
2019-08-16 00:34:35 +08:00
|
|
|
|
|
|
|
Rails.application.config.after_initialize do
|
|
|
|
# configure autoscaling plugin
|
|
|
|
if (config = Delayed::CLI.instance&.config&.[](:auto_scaling))
|
|
|
|
require 'jobs_autoscaling'
|
|
|
|
actions = [JobsAutoscaling::LoggerAction.new]
|
|
|
|
if config[:asg_name]
|
|
|
|
aws_config = config[:aws_config] || {}
|
|
|
|
aws_config[:region] ||= ApplicationController.region
|
|
|
|
actions << JobsAutoscaling::AwsAction.new(asg_name: config[:asg_name],
|
|
|
|
aws_config: aws_config,
|
|
|
|
instance_id: ApplicationController.instance_id)
|
|
|
|
end
|
|
|
|
autoscaler = JobsAutoscaling::Monitor.new(action: actions)
|
|
|
|
autoscaler.activate!
|
2017-08-26 01:02:29 +08:00
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2011-02-01 09:57:29 +08:00
|
|
|
Delayed::Worker.on_max_failures = proc do |job, err|
|
2013-02-21 06:07:56 +08:00
|
|
|
# We don't want to keep around max_attempts failed jobs that failed because the
|
|
|
|
# underlying AR object was destroyed.
|
|
|
|
# All other failures are kept for inspection.
|
|
|
|
err.is_a?(Delayed::Backend::RecordNotFound)
|
2011-02-01 09:57:29 +08:00
|
|
|
end
|
2014-02-13 01:29:09 +08:00
|
|
|
|
2018-09-03 13:21:46 +08:00
|
|
|
module DelayedJobConfig
|
|
|
|
class << self
|
|
|
|
def config
|
|
|
|
@config ||= YAML.load(Canvas::DynamicSettings.find(tree: :private)['delayed_jobs.yml'] || '{}')
|
|
|
|
end
|
|
|
|
|
|
|
|
def strands_to_send_to_statsd
|
|
|
|
@strands_to_send_to_statsd ||= (config['strands_to_send_to_statsd'] || []).to_set
|
|
|
|
end
|
|
|
|
|
|
|
|
def reload
|
|
|
|
@config = @strands_to_send_to_statsd = nil
|
|
|
|
end
|
|
|
|
Canvas::Reloader.on_reload { DelayedJobConfig.reload }
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
### lifecycle callbacks
|
|
|
|
|
2014-02-13 01:29:09 +08:00
|
|
|
Delayed::Worker.lifecycle.around(:perform) do |worker, job, &block|
|
2017-09-14 03:28:05 +08:00
|
|
|
Canvas::Reloader.reload! if Canvas::Reloader.pending_reload
|
2020-08-28 08:23:33 +08:00
|
|
|
Canvas::Redis.clear_idle_connections
|
2017-09-14 03:28:05 +08:00
|
|
|
|
2014-10-01 02:41:19 +08:00
|
|
|
# context for our custom logger
|
|
|
|
Thread.current[:context] = {
|
|
|
|
# these 2 keys aren't terribly well named for this, since they were intended for http requests
|
|
|
|
:request_id => job.id,
|
|
|
|
:session_id => worker.name,
|
|
|
|
}
|
|
|
|
|
2017-03-24 05:06:56 +08:00
|
|
|
LiveEvents.set_context(job.live_events_context)
|
2015-03-14 04:07:54 +08:00
|
|
|
|
2018-03-03 05:37:50 +08:00
|
|
|
HostUrl.reset_cache!
|
|
|
|
old_root_account = Attachment.current_root_account
|
|
|
|
Attachment.current_root_account = job.account
|
|
|
|
|
2014-02-13 01:29:09 +08:00
|
|
|
starting_mem = Canvas.sample_memory()
|
|
|
|
starting_cpu = Process.times()
|
2018-03-03 05:37:50 +08:00
|
|
|
|
2014-02-13 01:29:09 +08:00
|
|
|
begin
|
2019-05-21 01:01:33 +08:00
|
|
|
RequestCache.enable do
|
|
|
|
block.call(worker, job)
|
|
|
|
end
|
2014-02-13 01:29:09 +08:00
|
|
|
ensure
|
|
|
|
ending_cpu = Process.times()
|
|
|
|
ending_mem = Canvas.sample_memory()
|
|
|
|
user_cpu = ending_cpu.utime - starting_cpu.utime
|
|
|
|
system_cpu = ending_cpu.stime - starting_cpu.stime
|
|
|
|
|
2018-03-03 05:37:50 +08:00
|
|
|
Attachment.current_root_account = old_root_account
|
|
|
|
|
2015-03-14 04:07:54 +08:00
|
|
|
LiveEvents.clear_context!
|
|
|
|
|
2014-02-13 01:29:09 +08:00
|
|
|
Rails.logger.info "[STAT] #{starting_mem} #{ending_mem} #{ending_mem - starting_mem} #{user_cpu} #{system_cpu}"
|
2015-10-03 05:01:40 +08:00
|
|
|
|
|
|
|
Thread.current[:context] = nil
|
2014-02-13 01:29:09 +08:00
|
|
|
end
|
|
|
|
end
|
2014-05-13 03:03:59 +08:00
|
|
|
|
2017-10-04 00:38:36 +08:00
|
|
|
Delayed::Worker.lifecycle.before(:perform) do |_worker, _job|
|
2014-05-03 00:35:29 +08:00
|
|
|
# Since AdheresToPolicy::Cache uses an instance variable class cache lets clear
|
|
|
|
# it so we start with a clean slate.
|
|
|
|
AdheresToPolicy::Cache.clear
|
2014-08-26 22:53:26 +08:00
|
|
|
LoadAccount.clear_shard_cache
|
2014-05-03 00:35:29 +08:00
|
|
|
end
|
2014-10-01 02:41:19 +08:00
|
|
|
|
|
|
|
Delayed::Worker.lifecycle.before(:exceptional_exit) do |worker, exception|
|
2015-06-18 11:19:24 +08:00
|
|
|
info = Canvas::Errors::WorkerInfo.new(worker)
|
2015-04-05 10:39:49 +08:00
|
|
|
Canvas::Errors.capture(exception, info.to_h)
|
|
|
|
end
|
|
|
|
|
2020-12-09 03:33:59 +08:00
|
|
|
Delayed::Worker.lifecycle.before(:retry) do |worker, job, exception|
|
|
|
|
# any job that fails with a RetriableError gets routed
|
|
|
|
# here if it has any retries left. We just want the stats
|
|
|
|
info = Canvas::Errors::JobInfo.new(job, worker)
|
|
|
|
begin
|
|
|
|
(job.current_shard || Shard.default).activate do
|
|
|
|
Canvas::Errors.capture(exception, info.to_h, :info)
|
|
|
|
end
|
|
|
|
rescue => e
|
|
|
|
Canvas::Errors.capture_exception(:jobs_lifecycle, e)
|
|
|
|
Canvas::Errors.capture(exception, info.to_h, :info)
|
|
|
|
end
|
|
|
|
end
|
|
|
|
|
|
|
|
# Delayed::Backend::RecordNotFound happens when a job is queued and then the thing that
|
|
|
|
# it's queued on gets deleted. It happens all the time for stuff
|
|
|
|
# like test students (we delete their stuff immediately), and
|
|
|
|
# we don't need detailed exception reports for those.
|
|
|
|
#
|
|
|
|
# Delayed::RetriableError is thrown by any job to indicate the thing
|
|
|
|
# that's failing is "kind of expected". Upstream service backpressure,
|
|
|
|
# etc.
|
|
|
|
WARNABLE_DELAYED_EXCEPTIONS = [
|
|
|
|
Delayed::Backend::RecordNotFound,
|
|
|
|
Delayed::RetriableError,
|
|
|
|
].freeze
|
|
|
|
|
2015-04-05 10:39:49 +08:00
|
|
|
Delayed::Worker.lifecycle.before(:error) do |worker, job, exception|
|
2020-12-09 03:33:59 +08:00
|
|
|
is_warnable = WARNABLE_DELAYED_EXCEPTIONS.any?{|klass| exception.is_a?(klass) }
|
|
|
|
error_level = is_warnable ? :warn : :error
|
2015-04-05 10:39:49 +08:00
|
|
|
info = Canvas::Errors::JobInfo.new(job, worker)
|
2015-08-12 00:54:37 +08:00
|
|
|
begin
|
|
|
|
(job.current_shard || Shard.default).activate do
|
2020-11-10 01:23:38 +08:00
|
|
|
Canvas::Errors.capture(exception, info.to_h, error_level)
|
2015-08-12 00:54:37 +08:00
|
|
|
end
|
|
|
|
rescue
|
2020-11-10 01:23:38 +08:00
|
|
|
Canvas::Errors.capture(exception, info.to_h, error_level)
|
2015-08-12 00:54:37 +08:00
|
|
|
end
|
2014-10-01 02:41:19 +08:00
|
|
|
end
|
2020-10-23 04:58:48 +08:00
|
|
|
|
|
|
|
# syntactic sugar and compatibility shims
|
|
|
|
module CanvasDelayedMessageSending
|
2020-10-23 07:32:48 +08:00
|
|
|
def delay_if_production(sender: nil, **kwargs)
|
|
|
|
sender ||= __calculate_sender_for_delay
|
|
|
|
delay(sender: sender, **kwargs.merge(synchronous: !Rails.env.production?))
|
2020-10-23 04:58:48 +08:00
|
|
|
end
|
|
|
|
end
|
2020-10-23 07:32:48 +08:00
|
|
|
Object.send(:include, CanvasDelayedMessageSending)
|