mirror of https://github.com/rails/rails
Fix ActiveJob logging when callback chain is halted:
- ### Problem ActiveJob will always log "Enqueued MyJob (Job ID) ..." even if the job doesn't get enqueued through the adapter. Same problem happens when performing a Job, "Performed MyJob (Job ID) ..." will be logged even when job wasn't performed at all. This situation can happen either if the callback chain is terminated (before_enqueue throwing an `abort`) or if an exception is raised. ### Solution Check if the callback chain is aborted/exception is raised, and log accordingly.
This commit is contained in:
parent
737fb59eb3
commit
0d3aec4969
|
@ -1,3 +1,20 @@
|
||||||
|
* Fix enqueuing and performing incorrect logging message.
|
||||||
|
|
||||||
|
Jobs will no longer always log "Enqueued MyJob" or "Performed MyJob" when they actually didn't get enqueued/performed.
|
||||||
|
|
||||||
|
```ruby
|
||||||
|
class MyJob < ApplicationJob
|
||||||
|
before_enqueue { throw(:abort) }
|
||||||
|
end
|
||||||
|
|
||||||
|
MyJob.perform_later # Will no longer log "Enqueud MyJob" since job wasn't even enqueued through adapter.
|
||||||
|
```
|
||||||
|
|
||||||
|
A new message will be logged in case a job couldn't be enqueued, either because the callback chain was halted or
|
||||||
|
because an exception happened during enqueing. (i.e. Redis is down when you try to enqueue your job)
|
||||||
|
|
||||||
|
*Edouard Chin*
|
||||||
|
|
||||||
* Add an option to disable logging of the job arguments when enqueuing and executing the job.
|
* Add an option to disable logging of the job arguments when enqueuing and executing the job.
|
||||||
|
|
||||||
class SensitiveJob < ApplicationJob
|
class SensitiveJob < ApplicationJob
|
||||||
|
|
|
@ -17,8 +17,13 @@ module ActiveJob
|
||||||
|
|
||||||
private
|
private
|
||||||
def instrument(operation, payload = {}, &block)
|
def instrument(operation, payload = {}, &block)
|
||||||
|
enhanced_block = ->(event_payload) do
|
||||||
|
aborted = !block.call if block
|
||||||
|
event_payload[:aborted] = true if aborted
|
||||||
|
end
|
||||||
|
|
||||||
ActiveSupport::Notifications.instrument \
|
ActiveSupport::Notifications.instrument \
|
||||||
"#{operation}.active_job", payload.merge(adapter: queue_adapter, job: self), &block
|
"#{operation}.active_job", payload.merge(adapter: queue_adapter, job: self), &enhanced_block
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -5,16 +5,40 @@ require "active_support/log_subscriber"
|
||||||
module ActiveJob
|
module ActiveJob
|
||||||
class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
|
class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
|
||||||
def enqueue(event)
|
def enqueue(event)
|
||||||
info do
|
job = event.payload[:job]
|
||||||
job = event.payload[:job]
|
ex = event.payload[:exception_object]
|
||||||
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
|
|
||||||
|
if ex
|
||||||
|
error do
|
||||||
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
|
||||||
|
end
|
||||||
|
elsif event.payload[:aborted]
|
||||||
|
info do
|
||||||
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
|
||||||
|
end
|
||||||
|
else
|
||||||
|
info do
|
||||||
|
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def enqueue_at(event)
|
def enqueue_at(event)
|
||||||
info do
|
job = event.payload[:job]
|
||||||
job = event.payload[:job]
|
ex = event.payload[:exception_object]
|
||||||
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
|
|
||||||
|
if ex
|
||||||
|
error do
|
||||||
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
|
||||||
|
end
|
||||||
|
elsif event.payload[:aborted]
|
||||||
|
info do
|
||||||
|
"Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
|
||||||
|
end
|
||||||
|
else
|
||||||
|
info do
|
||||||
|
"Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -32,6 +56,10 @@ module ActiveJob
|
||||||
error do
|
error do
|
||||||
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
|
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
|
||||||
end
|
end
|
||||||
|
elsif event.payload[:aborted]
|
||||||
|
error do
|
||||||
|
"Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: a before_perform callback halted the job execution"
|
||||||
|
end
|
||||||
else
|
else
|
||||||
info do
|
info do
|
||||||
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
|
"Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
|
||||||
|
|
|
@ -10,6 +10,7 @@ require "jobs/nested_job"
|
||||||
require "jobs/rescue_job"
|
require "jobs/rescue_job"
|
||||||
require "jobs/retry_job"
|
require "jobs/retry_job"
|
||||||
require "jobs/disable_log_job"
|
require "jobs/disable_log_job"
|
||||||
|
require "jobs/abort_before_enqueue_job"
|
||||||
require "models/person"
|
require "models/person"
|
||||||
|
|
||||||
class LoggingTest < ActiveSupport::TestCase
|
class LoggingTest < ActiveSupport::TestCase
|
||||||
|
@ -112,6 +113,27 @@ class LoggingTest < ActiveSupport::TestCase
|
||||||
assert_equal(key, "enqueue.active_job")
|
assert_equal(key, "enqueue.active_job")
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_enqueue_job_log_error_when_callback_chain_is_halted
|
||||||
|
events = subscribed { AbortBeforeEnqueueJob.perform_later }
|
||||||
|
assert_match(/Failed enqueuing AbortBeforeEnqueueJob.* a before_enqueue callback halted/, @logger.messages)
|
||||||
|
assert_equal(events.count, 1)
|
||||||
|
key, * = events.first
|
||||||
|
assert_equal(key, "enqueue.active_job")
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_enqueue_job_log_error_when_error_is_raised_during_callback_chain
|
||||||
|
events = subscribed do
|
||||||
|
assert_raises(AbortBeforeEnqueueJob::MyError) do
|
||||||
|
AbortBeforeEnqueueJob.perform_later(:raise)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_match(/Failed enqueuing AbortBeforeEnqueueJob/, @logger.messages)
|
||||||
|
assert_equal(events.count, 1)
|
||||||
|
key, * = events.first
|
||||||
|
assert_equal(key, "enqueue.active_job")
|
||||||
|
end
|
||||||
|
|
||||||
def test_perform_job_logging
|
def test_perform_job_logging
|
||||||
perform_enqueued_jobs do
|
perform_enqueued_jobs do
|
||||||
LoggingJob.perform_later "Dummy"
|
LoggingJob.perform_later "Dummy"
|
||||||
|
@ -123,6 +145,11 @@ class LoggingTest < ActiveSupport::TestCase
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_perform_job_log_error_when_callback_chain_is_halted
|
||||||
|
subscribed { AbortBeforeEnqueueJob.perform_now }
|
||||||
|
assert_match(/Error performing AbortBeforeEnqueueJob.* a before_perform callback halted/, @logger.messages)
|
||||||
|
end
|
||||||
|
|
||||||
def test_perform_disabled_job_logging
|
def test_perform_disabled_job_logging
|
||||||
perform_enqueued_jobs do
|
perform_enqueued_jobs do
|
||||||
DisableLogJob.perform_later "Dummy"
|
DisableLogJob.perform_later "Dummy"
|
||||||
|
@ -159,6 +186,27 @@ class LoggingTest < ActiveSupport::TestCase
|
||||||
skip
|
skip
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def test_enqueue_at_job_log_error_when_callback_chain_is_halted
|
||||||
|
events = subscribed { AbortBeforeEnqueueJob.set(wait: 1.second).perform_later }
|
||||||
|
assert_match(/Failed enqueuing AbortBeforeEnqueueJob.* a before_enqueue callback halted/, @logger.messages)
|
||||||
|
assert_equal(events.count, 1)
|
||||||
|
key, * = events.first
|
||||||
|
assert_equal(key, "enqueue_at.active_job")
|
||||||
|
end
|
||||||
|
|
||||||
|
def test_enqueue_at_job_log_error_when_error_is_raised_during_callback_chain
|
||||||
|
events = subscribed do
|
||||||
|
assert_raises(AbortBeforeEnqueueJob::MyError) do
|
||||||
|
AbortBeforeEnqueueJob.set(wait: 1.second).perform_later(:raise)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
assert_match(/Failed enqueuing AbortBeforeEnqueueJob/, @logger.messages)
|
||||||
|
assert_equal(events.count, 1)
|
||||||
|
key, * = events.first
|
||||||
|
assert_equal(key, "enqueue_at.active_job")
|
||||||
|
end
|
||||||
|
|
||||||
def test_enqueue_in_job_logging
|
def test_enqueue_in_job_logging
|
||||||
events = subscribed { HelloJob.set(wait: 2.seconds).perform_later "Cristian" }
|
events = subscribed { HelloJob.set(wait: 2.seconds).perform_later "Cristian" }
|
||||||
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
|
assert_match(/Enqueued HelloJob \(Job ID: .*\) to .*? at.*Cristian/, @logger.messages)
|
||||||
|
|
|
@ -1,9 +1,20 @@
|
||||||
# frozen_string_literal: true
|
# frozen_string_literal: true
|
||||||
|
|
||||||
class AbortBeforeEnqueueJob < ActiveJob::Base
|
class AbortBeforeEnqueueJob < ActiveJob::Base
|
||||||
before_enqueue { throw(:abort) }
|
MyError = Class.new(StandardError)
|
||||||
|
|
||||||
|
before_enqueue :throw_or_raise
|
||||||
|
before_perform { throw(:abort) }
|
||||||
|
|
||||||
def perform
|
def perform
|
||||||
raise "This should never be called"
|
raise "This should never be called"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def throw_or_raise
|
||||||
|
if (arguments.first || :abort) == :abort
|
||||||
|
throw(:abort)
|
||||||
|
else
|
||||||
|
raise(MyError)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue