From 9b62f88a2fde0d2bf8c4f6e3bcd06ecba7ca9d8d Mon Sep 17 00:00:00 2001 From: Sander Verdonschot Date: Wed, 2 Nov 2022 16:53:10 -0400 Subject: [PATCH] Add `perform_all_later` to enqueue multiple jobs at once Sidekiq has a useful optimisation called `push_bulk` that enqueues many jobs at once, eliminating the repeated Redis roundtrips. However, this feature is not exposed through Active Job, so it only works for `Sidekiq::Worker` jobs. This adds a barrier to Active Job adoption for apps that rely on this feature. It also makes it harder for other queue adapters to implement similar functionality, as they then have to take care of serialization, callbacks, etc. themselves. This commit adds `ActiveJob.perform_all_later(, )`, backed by Sidekiq's `push_bulk` and with a fallback to enqueuing serially if the queue adapter does not support bulk enqueue. The performance benefit for 1000 jobs can be more than an order of magnitude: | Enqueue type | Serial time (ms) | Bulk time (ms) | Speedup | | ------------------ | ---------------- | -------------- | ------- | | Raw Sidekiq | 2661 | 119 | 22x | | Active Job Sidekiq | 2853 | 208 | 14x | (Measured in a simple test app in our production environment.) Instrumentation for perform_all_later uses a new event `enqueue_all.active_job` --- activejob/CHANGELOG.md | 22 ++++++++++++++ activejob/lib/active_job/configured_job.rb | 4 +++ activejob/lib/active_job/enqueuing.rb | 29 +++++++++++++++++++ activejob/lib/active_job/instrumentation.rb | 12 ++++++++ .../queue_adapters/sidekiq_adapter.rb | 27 +++++++++++++++++ activejob/test/cases/queuing_test.rb | 16 ++++++++++ activejob/test/integration/queuing_test.rb | 10 +++++++ 7 files changed, 120 insertions(+) diff --git a/activejob/CHANGELOG.md b/activejob/CHANGELOG.md index 226284f10dd..68d2da30e6b 100644 --- a/activejob/CHANGELOG.md +++ b/activejob/CHANGELOG.md @@ -1,3 +1,25 @@ +* Add `perform_all_later` to enqueue multiple jobs at once + + This adds the ability to bulk enqueue jobs, without running callbacks, by + passing multiple jobs or an array of jobs. For example: + + ```ruby + ActiveJob.perform_all_later(MyJob.new("hello", 42), MyJob.new("world", 0)) + + user_jobs = User.pluck(:id).map { |id| UserJob.new(user_id: id) } + ActiveJob.perform_all_later(user_jobs) + ``` + + This can greatly reduce the number of round-trips to the queue datastore. + For queue adapters that do not implement the new `enqueue_all` method, we + fall back to enqueuing jobs indvidually. The Sidekiq adapter implements + `enqueue_all` with `push_bulk`. + + This method does not use the existing `enqueue.active_job` event, but adds a + new event `enqueue_all.active_job`. + + *Sander Verdonschot* + * Don't double log the `job` when using `ActiveRecord::QueryLog` Previously if you set `config.active_record.query_log_tags` to an array that included diff --git a/activejob/lib/active_job/configured_job.rb b/activejob/lib/active_job/configured_job.rb index 3f5c80d241d..7d8aafe3a17 100644 --- a/activejob/lib/active_job/configured_job.rb +++ b/activejob/lib/active_job/configured_job.rb @@ -14,5 +14,9 @@ module ActiveJob def perform_later(...) @job_class.new(...).enqueue @options end + + def perform_all_later(multi_args) + @job_class.perform_all_later(multi_args, options: @options) + end end end diff --git a/activejob/lib/active_job/enqueuing.rb b/activejob/lib/active_job/enqueuing.rb index 62fb57ea090..9f011108e2f 100644 --- a/activejob/lib/active_job/enqueuing.rb +++ b/activejob/lib/active_job/enqueuing.rb @@ -9,6 +9,35 @@ module ActiveJob # why the adapter was unexpectedly unable to enqueue a job. class EnqueueError < StandardError; end + class << self + # Push many jobs onto the queue at once without running enqueue callbacks. + # Queue adapters may communicate the enqueue status of each job by setting + # successfully_enqueued and/or enqueue_error on the passed-in job instances. + def perform_all_later(*jobs) + jobs.flatten! + jobs.group_by(&:queue_adapter).each do |queue_adapter, adapter_jobs| + instrument_enqueue_all(queue_adapter, adapter_jobs) do + if queue_adapter.respond_to?(:enqueue_all) + queue_adapter.enqueue_all(adapter_jobs) + else + adapter_jobs.each do |job| + job.successfully_enqueued = false + if job.scheduled_at + queue_adapter.enqueue_at(job, job.scheduled_at) + else + queue_adapter.enqueue(job) + end + job.successfully_enqueued = true + rescue EnqueueError => e + job.enqueue_error = e + end + end + end + end + nil + end + end + module Enqueuing extend ActiveSupport::Concern diff --git a/activejob/lib/active_job/instrumentation.rb b/activejob/lib/active_job/instrumentation.rb index e74b95b64f0..8e84f8f250a 100644 --- a/activejob/lib/active_job/instrumentation.rb +++ b/activejob/lib/active_job/instrumentation.rb @@ -1,6 +1,18 @@ # frozen_string_literal: true module ActiveJob + class << self + private + def instrument_enqueue_all(queue_adapter, jobs) + payload = { adapter: queue_adapter, jobs: jobs } + ActiveSupport::Notifications.instrument("enqueue_all.active_job", payload) do + result = yield payload + payload[:enqueued_count] = result + result + end + end + end + module Instrumentation # :nodoc: extend ActiveSupport::Concern diff --git a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb index 4a13eeb0727..5b17f7d6bf5 100644 --- a/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb +++ b/activejob/lib/active_job/queue_adapters/sidekiq_adapter.rb @@ -32,6 +32,33 @@ module ActiveJob ).perform_at(timestamp, job.serialize) end + def enqueue_all(jobs) # :nodoc: + jobs.group_by(&:class).each do |job_class, same_class_jobs| + same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs| + immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? } + + if immediate_jobs.any? + Sidekiq::Client.push_bulk( + "class" => JobWrapper, + "wrapped" => job_class, + "queue" => queue, + "args" => immediate_jobs.map { |job| [job.serialize] }, + ) + end + + if scheduled_jobs.any? + Sidekiq::Client.push_bulk( + "class" => JobWrapper, + "wrapped" => job_class, + "queue" => queue, + "args" => scheduled_jobs.map { |job| [job.serialize] }, + "at" => scheduled_jobs.map { |job| job.scheduled_at } + ) + end + end + end + end + class JobWrapper # :nodoc: include Sidekiq::Worker diff --git a/activejob/test/cases/queuing_test.rb b/activejob/test/cases/queuing_test.rb index 9e74a307f91..5e7f2cbbe79 100644 --- a/activejob/test/cases/queuing_test.rb +++ b/activejob/test/cases/queuing_test.rb @@ -3,6 +3,7 @@ require "helper" require "jobs/hello_job" require "jobs/enqueue_error_job" +require "jobs/multiple_kwargs_job" require "active_support/core_ext/numeric/time" class QueuingTest < ActiveSupport::TestCase @@ -54,4 +55,19 @@ class QueuingTest < ActiveSupport::TestCase assert_equal ActiveJob::EnqueueError, job.enqueue_error.class end end + + test "run multiple queued jobs" do + ActiveJob.perform_all_later(HelloJob.new("Jamie"), HelloJob.new("John")) + assert_equal ["Jamie says hello", "John says hello"], JobBuffer.values.sort + end + + test "run multiple queued jobs passed as array" do + ActiveJob.perform_all_later([HelloJob.new("Jamie"), HelloJob.new("John")]) + assert_equal ["Jamie says hello", "John says hello"], JobBuffer.values.sort + end + + test "run multiple queued jobs of different classes" do + ActiveJob.perform_all_later([HelloJob.new("Jamie"), MultipleKwargsJob.new(argument1: "John", argument2: 42)]) + assert_equal ["Jamie says hello", "Job with argument1: John, argument2: 42"], JobBuffer.values.sort + end end diff --git a/activejob/test/integration/queuing_test.rb b/activejob/test/integration/queuing_test.rb index b977498b154..d7a245a893f 100644 --- a/activejob/test/integration/queuing_test.rb +++ b/activejob/test/integration/queuing_test.rb @@ -77,6 +77,16 @@ class QueuingTest < ActiveSupport::TestCase skip end + test "should run job bulk enqueued in the future at the specified time" do + ActiveJob.perform_all_later([TestJob.new(@id).set(wait: 5.seconds)]) + wait_for_jobs_to_finish_for(2.seconds) + assert_job_not_executed + wait_for_jobs_to_finish_for(10.seconds) + assert_job_executed + rescue NotImplementedError + skip + end + test "should supply a provider_job_id when available for immediate jobs" do skip unless adapter_is?(:async, :delayed_job, :sidekiq, :queue_classic) test_job = TestJob.perform_later @id