mirror of https://github.com/rails/rails
[ActiveJob] Add logging for `enqueue_all`
This commit is contained in:
parent
a1a026fb08
commit
5ab2034730
|
@ -31,6 +31,7 @@ module ActiveJob
|
|||
rescue EnqueueError => e
|
||||
job.enqueue_error = e
|
||||
end
|
||||
adapter_jobs.count(&:successfully_enqueued?)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -47,6 +47,32 @@ module ActiveJob
|
|||
end
|
||||
subscribe_log_level :enqueue_at, :info
|
||||
|
||||
def enqueue_all(event)
|
||||
info do
|
||||
jobs = event.payload[:jobs]
|
||||
adapter = event.payload[:adapter]
|
||||
enqueued_count = event.payload[:enqueued_count]
|
||||
|
||||
if enqueued_count == jobs.size
|
||||
enqueued_jobs_message(adapter, jobs)
|
||||
elsif jobs.any?(&:successfully_enqueued?)
|
||||
enqueued_jobs = jobs.select(&:successfully_enqueued?)
|
||||
|
||||
failed_enqueue_count = jobs.size - enqueued_count
|
||||
if failed_enqueue_count == 0
|
||||
enqueued_jobs_message(adapter, enqueued_jobs)
|
||||
else
|
||||
"#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\
|
||||
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}"
|
||||
end
|
||||
else
|
||||
failed_enqueue_count = jobs.size - enqueued_count
|
||||
"Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} to #{adapter_name(adapter)}"
|
||||
end
|
||||
end
|
||||
end
|
||||
subscribe_log_level :enqueue_all, :info
|
||||
|
||||
def perform_start(event)
|
||||
info do
|
||||
job = event.payload[:job]
|
||||
|
@ -111,7 +137,11 @@ module ActiveJob
|
|||
|
||||
private
|
||||
def queue_name(event)
|
||||
event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
|
||||
adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})"
|
||||
end
|
||||
|
||||
def adapter_name(adapter)
|
||||
adapter.class.name.demodulize.delete_suffix("Adapter")
|
||||
end
|
||||
|
||||
def args_info(job)
|
||||
|
@ -171,6 +201,13 @@ module ActiveJob
|
|||
def extract_enqueue_source_location(locations)
|
||||
backtrace_cleaner.clean(locations.lazy).first
|
||||
end
|
||||
|
||||
def enqueued_jobs_message(adapter, enqueued_jobs)
|
||||
enqueued_count = enqueued_jobs.size
|
||||
job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v }
|
||||
"Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{adapter_name(adapter)}"\
|
||||
" (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})"
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -33,30 +33,34 @@ module ActiveJob
|
|||
end
|
||||
|
||||
def enqueue_all(jobs) # :nodoc:
|
||||
enqueued_count = 0
|
||||
jobs.group_by(&:class).each do |job_class, same_class_jobs|
|
||||
same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs|
|
||||
immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? }
|
||||
|
||||
if immediate_jobs.any?
|
||||
Sidekiq::Client.push_bulk(
|
||||
jids = Sidekiq::Client.push_bulk(
|
||||
"class" => JobWrapper,
|
||||
"wrapped" => job_class,
|
||||
"queue" => queue,
|
||||
"args" => immediate_jobs.map { |job| [job.serialize] },
|
||||
)
|
||||
enqueued_count += jids.compact.size
|
||||
end
|
||||
|
||||
if scheduled_jobs.any?
|
||||
Sidekiq::Client.push_bulk(
|
||||
jids = Sidekiq::Client.push_bulk(
|
||||
"class" => JobWrapper,
|
||||
"wrapped" => job_class,
|
||||
"queue" => queue,
|
||||
"args" => scheduled_jobs.map { |job| [job.serialize] },
|
||||
"at" => scheduled_jobs.map { |job| job.scheduled_at }
|
||||
)
|
||||
enqueued_count += jids.compact.size
|
||||
end
|
||||
end
|
||||
end
|
||||
enqueued_count
|
||||
end
|
||||
|
||||
class JobWrapper # :nodoc:
|
||||
|
|
|
@ -11,6 +11,7 @@ require "jobs/rescue_job"
|
|||
require "jobs/retry_job"
|
||||
require "jobs/disable_log_job"
|
||||
require "jobs/abort_before_enqueue_job"
|
||||
require "jobs/enqueue_error_job"
|
||||
require "models/person"
|
||||
|
||||
class LoggingTest < ActiveSupport::TestCase
|
||||
|
@ -298,6 +299,28 @@ class LoggingTest < ActiveSupport::TestCase
|
|||
end
|
||||
end
|
||||
|
||||
def test_enqueue_all_job_logging_some_jobs_failed_enqueuing
|
||||
EnqueueErrorJob.disable_test_adapter
|
||||
|
||||
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [false, true]
|
||||
|
||||
ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new)
|
||||
assert_match(/Enqueued 1 job to .+ \(1 EnqueueErrorJob\)\. Failed enqueuing 1 job/, @logger.messages)
|
||||
ensure
|
||||
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = []
|
||||
end
|
||||
|
||||
def test_enqueue_all_job_logging_all_jobs_failed_enqueuing
|
||||
EnqueueErrorJob.disable_test_adapter
|
||||
|
||||
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = [true, true]
|
||||
|
||||
ActiveJob.perform_all_later(EnqueueErrorJob.new, EnqueueErrorJob.new)
|
||||
assert_match(/Failed enqueuing 2 jobs to .+/, @logger.messages)
|
||||
ensure
|
||||
EnqueueErrorJob::EnqueueErrorAdapter.should_raise_sequence = []
|
||||
end
|
||||
|
||||
def test_verbose_enqueue_logs
|
||||
ActiveJob.verbose_enqueue_logs = true
|
||||
|
||||
|
@ -311,4 +334,9 @@ class LoggingTest < ActiveSupport::TestCase
|
|||
LoggingJob.perform_later "Dummy"
|
||||
assert_no_match("↳", @logger.messages)
|
||||
end
|
||||
|
||||
def test_enqueue_all_job_logging
|
||||
ActiveJob.perform_all_later(LoggingJob.new("Dummy"), HelloJob.new("Jamie"), HelloJob.new("John"))
|
||||
assert_match(/Enqueued 3 jobs to .+ \(2 HelloJob, 1 LoggingJob\)/, @logger.messages)
|
||||
end
|
||||
end
|
||||
|
|
|
@ -70,4 +70,24 @@ class QueuingTest < ActiveSupport::TestCase
|
|||
ActiveJob.perform_all_later([HelloJob.new("Jamie"), MultipleKwargsJob.new(argument1: "John", argument2: 42)])
|
||||
assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort
|
||||
end
|
||||
|
||||
test "perform_all_later instrumentation" do
|
||||
jobs = HelloJob.new("Jamie"), HelloJob.new("John")
|
||||
called = false
|
||||
|
||||
subscriber = lambda do |*args|
|
||||
called = true
|
||||
event = ActiveSupport::Notifications::Event.new(*args)
|
||||
payload = event.payload
|
||||
assert payload[:adapter]
|
||||
assert_equal jobs, payload[:jobs]
|
||||
assert_equal 2, payload[:enqueued_count]
|
||||
end
|
||||
|
||||
ActiveSupport::Notifications.subscribed(subscriber, "enqueue_all.active_job") do
|
||||
ActiveJob.perform_all_later(jobs)
|
||||
end
|
||||
|
||||
assert called
|
||||
end
|
||||
end
|
||||
|
|
|
@ -3,17 +3,25 @@
|
|||
class EnqueueErrorJob < ActiveJob::Base
|
||||
class EnqueueErrorAdapter
|
||||
class << self
|
||||
def enqueue(*)
|
||||
raise ActiveJob::EnqueueError, "There was an error enqueuing the job"
|
||||
end
|
||||
|
||||
def enqueue_at(*)
|
||||
raise ActiveJob::EnqueueError, "There was an error enqueuing the job"
|
||||
end
|
||||
attr_accessor :should_raise_sequence
|
||||
end
|
||||
self.should_raise_sequence = []
|
||||
|
||||
def enqueue(*)
|
||||
raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise?
|
||||
end
|
||||
|
||||
def enqueue_at(*)
|
||||
raise ActiveJob::EnqueueError, "There was an error enqueuing the job" if should_raise?
|
||||
end
|
||||
|
||||
private
|
||||
def should_raise?
|
||||
self.class.should_raise_sequence.empty? || self.class.should_raise_sequence.shift
|
||||
end
|
||||
end
|
||||
|
||||
self.queue_adapter = EnqueueErrorAdapter
|
||||
self.queue_adapter = EnqueueErrorAdapter.new
|
||||
|
||||
def perform
|
||||
raise "This should never be called"
|
||||
|
|
Loading…
Reference in New Issue