Set, serialize, and deserialize Active Job `scheduled_at` as Time; deserialize `enqueued_at` as Time; deprecate setting `scheduled_at=` with numeric/epoch

Co-authored-by: Adam Pahlevi <adam.pahlevi@gmail.com>
This commit is contained in:
Ben Sheldon [he/him] 2023-04-22 11:07:21 -07:00
parent 60002c3af6
commit c919b474a7
No known key found for this signature in database
GPG Key ID: 862102672FFD9973
10 changed files with 92 additions and 16 deletions

View File

@ -1,3 +1,10 @@
* Set `scheduled_at` attribute as a Time object instead of epoch seconds, and serialize and deserialize the value
when enqueued. Assigning a numeric/epoch value to scheduled_at= is deprecated; use a Time object instead.
Deserializes `enqueued_at` as a Time instead of ISO8601 String.
*Ben Sheldon*
* Clarify the backoff strategy for the recommended `:wait` option when retrying jobs
`wait: :exponentially_longer` is waiting polynomially longer, so it is now recommended to use `wait: :polynomially_longer` to keep the same behavior.

View File

@ -12,8 +12,10 @@ module ActiveJob
attr_accessor :arguments
attr_writer :serialized_arguments
# Timestamp when the job should be performed
attr_accessor :scheduled_at
# Time when the job should be performed
attr_reader :scheduled_at
attr_reader :_scheduled_at_time # :nodoc:
# Job Identifier
attr_accessor :job_id
@ -94,6 +96,8 @@ module ActiveJob
@arguments = arguments
@job_id = SecureRandom.uuid
@queue_name = self.class.queue_name
@scheduled_at = nil
@_scheduled_at_time = nil
@priority = self.class.priority
@executions = 0
@exception_executions = {}
@ -115,7 +119,8 @@ module ActiveJob
"exception_executions" => exception_executions,
"locale" => I18n.locale.to_s,
"timezone" => timezone,
"enqueued_at" => Time.now.utc.iso8601(9)
"enqueued_at" => Time.now.utc.iso8601(9),
"scheduled_at" => _scheduled_at_time ? _scheduled_at_time.utc.iso8601(9) : nil,
}
end
@ -155,19 +160,32 @@ module ActiveJob
self.exception_executions = job_data["exception_executions"]
self.locale = job_data["locale"] || I18n.locale.to_s
self.timezone = job_data["timezone"] || Time.zone&.name
self.enqueued_at = job_data["enqueued_at"]
self.enqueued_at = Time.iso8601(job_data["enqueued_at"]) if job_data["enqueued_at"]
self.scheduled_at = Time.iso8601(job_data["scheduled_at"]) if job_data["scheduled_at"]
end
# Configures the job with the given options.
def set(options = {}) # :nodoc:
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.scheduled_at = options[:wait].seconds.from_now if options[:wait]
self.scheduled_at = options[:wait_until] if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
self
end
def scheduled_at=(value)
@_scheduled_at_time = if value&.is_a?(Numeric)
ActiveJob.deprecator.warn(<<~MSG.squish)
Assigning a numeric/epoch value to scheduled_at is deprecated. Use a Time object instead.
MSG
Time.at(value)
else
value
end
@scheduled_at = value
end
private
def serialize_arguments_if_needed(arguments)
if arguments_serialized?

View File

@ -23,7 +23,7 @@ module ActiveJob
adapter_jobs.each do |job|
job.successfully_enqueued = false
if job.scheduled_at
queue_adapter.enqueue_at(job, job.scheduled_at)
queue_adapter.enqueue_at(job, job._scheduled_at_time.to_f)
else
queue_adapter.enqueue(job)
end
@ -92,7 +92,7 @@ module ActiveJob
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
queue_adapter.enqueue_at self, _scheduled_at_time.to_f
else
queue_adapter.enqueue self
end

View File

@ -76,7 +76,7 @@ module ActiveJob
def perform_start(event)
info do
job = event.payload[:job]
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
"Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at.utc.iso8601(9)}" + args_info(job)
end
end
subscribe_log_level :perform_start, :info

View File

@ -54,7 +54,7 @@ module ActiveJob
"wrapped" => job_class,
"queue" => queue,
"args" => scheduled_jobs.map { |job| [job.serialize] },
"at" => scheduled_jobs.map { |job| job.scheduled_at }
"at" => scheduled_jobs.map { |job| job.scheduled_at&.to_f }
)
enqueued_count += jids.compact.size
end

View File

@ -59,7 +59,7 @@ module ActiveJob
end
def filtered_time?(job)
job.scheduled_at > at.to_f if at && job.scheduled_at
job.scheduled_at > at if at && job.scheduled_at
end
def filtered_queue?(job)

View File

@ -730,7 +730,7 @@ module ActiveJob
def instantiate_job(payload, skip_deserialize_arguments: false)
job = payload[:job].deserialize(payload)
job.scheduled_at = payload[:at].to_f if payload.key?(:at)
job.scheduled_at = payload[:at] if payload.key?(:at)
job.send(:deserialize_arguments_if_needed) unless skip_deserialize_arguments
job
end

View File

@ -65,13 +65,50 @@ class JobSerializationTest < ActiveSupport::TestCase
end
end
test "serializes enqueued_at with full precision" do
test "serializes and deserializes enqueued_at with full precision" do
freeze_time
serialized = HelloJob.new.serialize
assert_kind_of String, serialized["enqueued_at"]
enqueued_at = HelloJob.deserialize(serialized).enqueued_at
assert_equal Time.now.utc, Time.iso8601(enqueued_at)
assert_kind_of Time, enqueued_at
assert_equal Time.now.utc, enqueued_at
end
test "serializes and deserializes scheduled_at as Time" do
freeze_time
current_time = Time.now
job = HelloJob.new
job.scheduled_at = current_time
serialized_job = job.serialize
assert_kind_of String, serialized_job["enqueued_at"]
assert_equal current_time.utc.iso8601(9), serialized_job["enqueued_at"]
deserialized_job = HelloJob.new
deserialized_job.deserialize(serialized_job)
assert_equal current_time, deserialized_job.scheduled_at
assert_equal job.serialize, deserialized_job.serialize
end
test "deprecates and coerces numerical scheduled_at attribute to Time when serialized and deserialized" do
freeze_time
current_time = Time.now
job = HelloJob.new
assert_deprecated(ActiveJob.deprecator) do
job.scheduled_at = current_time.to_f
end
serialized_job = job.serialize
assert_kind_of String, serialized_job["scheduled_at"]
assert_equal current_time.utc.iso8601(9), serialized_job["scheduled_at"]
deserialized_job = HelloJob.new
deserialized_job.deserialize(serialized_job)
assert_equal current_time, deserialized_job.scheduled_at
assert_equal job.serialize, deserialized_job.serialize
end
end

View File

@ -5,6 +5,7 @@ require "jobs/hello_job"
require "jobs/enqueue_error_job"
require "jobs/multiple_kwargs_job"
require "active_support/core_ext/numeric/time"
require "minitest/mock"
class QueuingTest < ActiveSupport::TestCase
setup do
@ -35,7 +36,7 @@ class QueuingTest < ActiveSupport::TestCase
test "job returned by perform_at has the timestamp available" do
job = HelloJob.set(wait_until: Time.utc(2014, 1, 1)).perform_later
assert_equal Time.utc(2014, 1, 1).to_f, job.scheduled_at
assert_equal Time.utc(2014, 1, 1), job.scheduled_at
rescue NotImplementedError
skip
end
@ -71,6 +72,19 @@ class QueuingTest < ActiveSupport::TestCase
assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort
end
test "perform_all_later enqueues jobs with schedules" do
scheduled_job_1 = HelloJob.new("Scheduled 2014")
scheduled_job_1.set(wait_until: Time.utc(2014, 1, 1))
scheduled_job_2 = HelloJob.new("Scheduled 2015")
scheduled_job_2.scheduled_at = Time.utc(2015, 1, 1)
ActiveJob.perform_all_later(scheduled_job_1, scheduled_job_2)
assert_equal ["Scheduled 2014 says hello", "Scheduled 2015 says hello"], JobBuffer.values.sort
rescue NotImplementedError
skip
end
test "perform_all_later instrumentation" do
jobs = HelloJob.new("Jamie"), HelloJob.new("John")
called = false

View File

@ -38,7 +38,7 @@ class RetryJob < ActiveJob::Base
before_enqueue do |job|
if job.arguments.include?(:log_scheduled_at) && job.scheduled_at
JobBuffer.add("Next execution scheduled at #{job.scheduled_at}")
JobBuffer.add("Next execution scheduled at #{job.scheduled_at.to_f}")
end
end