Move queue classes to ActiveSupport

This commit is contained in:
Santiago Pastorino 2012-09-13 15:09:15 -07:00
parent ae00adecf4
commit 8577687fcb
17 changed files with 180 additions and 182 deletions

View File

@ -11,7 +11,7 @@ end
require 'minitest/autorun'
require 'action_mailer'
require 'action_mailer/test_case'
require 'rails/queueing'
require 'active_support/queueing'
silence_warnings do
# These external dependencies have warnings :/
@ -27,7 +27,7 @@ ActionView::Template.register_template_handler :bak, lambda { |template| "Lame b
FIXTURE_LOAD_PATH = File.expand_path('fixtures', File.dirname(__FILE__))
ActionMailer::Base.view_paths = FIXTURE_LOAD_PATH
ActionMailer::Base.queue = Rails::Queueing::SynchronousQueue.new
ActionMailer::Base.queue = ActiveSupport::SynchronousQueue.new
class MockSMTP
def self.deliveries

View File

@ -3,13 +3,13 @@ require 'abstract_unit'
require 'set'
require 'action_dispatch'
require 'active_support/queueing'
require 'active_support/time'
require 'mailers/base_mailer'
require 'mailers/proc_mailer'
require 'mailers/asset_mailer'
require 'mailers/async_mailer'
require 'rails/queueing'
class BaseTest < ActiveSupport::TestCase
def teardown
@ -433,7 +433,7 @@ class BaseTest < ActiveSupport::TestCase
end
test "delivering message asynchronously" do
testing_queue = Rails::Queueing::TestQueue.new
testing_queue = ActiveSupport::TestQueue.new
AsyncMailer.delivery_method = :test
AsyncMailer.deliveries.clear
stub_queue(AsyncMailer, testing_queue).welcome.deliver

View File

@ -0,0 +1,116 @@
require 'delegate'
require 'thread'
module ActiveSupport
# A Queue that simply inherits from STDLIB's Queue. Everytime this
# queue is used, Rails automatically sets up a ThreadedConsumer
# to consume it.
class Queue < ::Queue
end
class SynchronousQueue < ::Queue
def push(job)
result = nil
Thread.new { result = job.run }.join
result
end
alias << push
alias enq push
end
# In test mode, the Rails queue is backed by an Array so that assertions
# can be made about its contents. The test queue provides a +jobs+
# method to make assertions about the queue's contents and a +drain+
# method to drain the queue and run the jobs.
#
# Jobs are run in a separate thread to catch mistakes where code
# assumes that the job is run in the same thread.
class TestQueue < ::Queue
# Get a list of the jobs off this queue. This method may not be
# available on production queues.
def jobs
@que.dup
end
# Marshal and unmarshal job before pushing it onto the queue. This will
# raise an exception on any attempts in tests to push jobs that can't (or
# shouldn't) be marshalled.
def push(job)
super Marshal.load(Marshal.dump(job))
end
# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
# run the jobs in a separate thread so assumptions of synchronous
# jobs are caught in test mode.
Thread.new { pop.run until empty? }.join
end
end
# A container for multiple queues. This class delegates to a default Queue
# so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class
# with multiple queues:
#
# # In your configuration:
# Rails.queue[:image_queue] = SomeQueue.new
# Rails.queue[:mail_queue] = SomeQueue.new
#
# # In your app code:
# Rails.queue[:mail_queue].push SomeJob.new
#
class QueueContainer < DelegateClass(::Queue)
def initialize(default_queue)
@queues = { :default => default_queue }
super(default_queue)
end
def [](queue_name)
@queues[queue_name]
end
def []=(queue_name, queue)
@queues[queue_name] = queue
end
end
# The threaded consumer will run jobs in a background thread in
# development mode or in a VM where running jobs on a thread in
# production mode makes sense.
#
# When the process exits, the consumer pushes a nil onto the
# queue and joins the thread, which will ensure that all jobs
# are executed before the process finally dies.
class ThreadedQueueConsumer
def self.start(queue, logger=nil)
new(queue, logger).start
end
def initialize(queue, logger=nil)
@queue = queue
@logger = logger
end
def start
@thread = Thread.new do
while job = @queue.pop
begin
job.run
rescue Exception => e
handle_exception e
end
end
end
self
end
def shutdown
@queue.push nil
@thread.join
end
def handle_exception(e)
@logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}" if @logger
end
end
end

View File

@ -0,0 +1,28 @@
require 'abstract_unit'
require 'active_support/queueing'
module ActiveSupport
class ContainerTest < ActiveSupport::TestCase
def test_delegates_to_default
q = Queue.new
container = QueueContainer.new q
job = Object.new
container.push job
assert_equal job, q.pop
end
def test_access_default
q = Queue.new
container = QueueContainer.new q
assert_equal q, container[:default]
end
def test_assign_queue
container = QueueContainer.new Object.new
q = Object.new
container[:foo] = q
assert_equal q, container[:foo]
end
end
end

View File

@ -1,9 +1,9 @@
require 'abstract_unit'
require 'rails/queueing'
require 'active_support/queueing'
class TestQueueTest < ActiveSupport::TestCase
def setup
@queue = Rails::Queueing::TestQueue.new
@queue = ActiveSupport::TestQueue.new
end
class ExceptionRaisingJob

View File

@ -1,5 +1,6 @@
require 'abstract_unit'
require 'rails/queueing'
require 'active_support/queueing'
require "active_support/log_subscriber/test_helper"
class TestThreadConsumer < ActiveSupport::TestCase
class Job
@ -15,8 +16,9 @@ class TestThreadConsumer < ActiveSupport::TestCase
end
def setup
@queue = Rails::Queueing::Queue.new
@consumer = Rails::Queueing::ThreadedConsumer.start(@queue)
@queue = ActiveSupport::Queue.new
@logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
@consumer = ActiveSupport::ThreadedQueueConsumer.start(@queue, @logger)
end
def teardown
@ -64,10 +66,6 @@ class TestThreadConsumer < ActiveSupport::TestCase
end
test "log job that raises an exception" do
require "active_support/log_subscriber/test_helper"
logger = ActiveSupport::LogSubscriber::TestHelper::MockLogger.new
Rails.logger = logger
job = Job.new(1) do
raise "RuntimeError: Error!"
end
@ -75,13 +73,13 @@ class TestThreadConsumer < ActiveSupport::TestCase
@queue.push job
sleep 0.1
assert_equal 1, logger.logged(:error).size
assert_match(/Job Error: RuntimeError: Error!/, logger.logged(:error).last)
assert_equal 1, @logger.logged(:error).size
assert_match(/Job Error: RuntimeError: Error!/, @logger.logged(:error).last)
end
test "test overriding exception handling" do
@consumer.shutdown
@consumer = Class.new(Rails::Queueing::ThreadedConsumer) do
@consumer = Class.new(ActiveSupport::ThreadedQueueConsumer) do
attr_reader :last_error
def handle_exception(e)
@last_error = e.message

View File

@ -111,9 +111,9 @@ end
* +config.middleware+ allows you to configure the application's middleware. This is covered in depth in the "Configuring Middleware":#configuring-middleware section below.
* +config.queue+ configures a different queue implementation for the application. Defaults to +Rails::Queueing::Queue+. Note that, if the default queue is changed, the default +queue_consumer+ is not going to be initialized, it is up to the new queue implementation to handle starting and shutting down its own consumer(s).
* +config.queue+ configures a different queue implementation for the application. Defaults to +ActiveSupport::SynchronousQueue+. Note that, if the default queue is changed, the default +queue_consumer+ is not going to be initialized, it is up to the new queue implementation to handle starting and shutting down its own consumer(s).
* +config.queue_consumer+ configures a different consumer implementation for the default queue. Defaults to +Rails::Queueing::ThreadedConsumer+.
* +config.queue_consumer+ configures a different consumer implementation for the default queue. Defaults to +ActiveSupport::ThreadedQueueConsumer+.
* +config.reload_classes_only_on_change+ enables or disables reloading of classes only when tracked files change. By default tracks everything on autoload paths and is set to true. If +config.cache_classes+ is true, this option is ignored.

View File

@ -22,7 +22,6 @@ end
module Rails
autoload :Info, 'rails/info'
autoload :InfoController, 'rails/info_controller'
autoload :Queueing, 'rails/queueing'
class << self
def application

View File

@ -1,4 +1,5 @@
require 'fileutils'
require 'active_support/queueing'
require 'rails/engine'
module Rails
@ -188,7 +189,7 @@ module Rails
end
def queue #:nodoc:
@queue ||= Queueing::Container.new(build_queue)
@queue ||= ActiveSupport::QueueContainer.new(build_queue)
end
def build_queue #:nodoc:

View File

@ -1,5 +1,6 @@
require 'active_support/core_ext/kernel/reporting'
require 'active_support/file_update_checker'
require 'active_support/queueing'
require 'rails/engine/configuration'
module Rails
@ -41,8 +42,8 @@ module Rails
@exceptions_app = nil
@autoflush_log = true
@log_formatter = ActiveSupport::Logger::SimpleFormatter.new
@queue = Rails::Queueing::SynchronousQueue
@queue_consumer = Rails::Queueing::ThreadedConsumer
@queue = ActiveSupport::SynchronousQueue
@queue_consumer = ActiveSupport::ThreadedQueueConsumer
@eager_load = nil
@assets = ActiveSupport::OrderedOptions.new

View File

@ -97,8 +97,8 @@ module Rails
end
initializer :activate_queue_consumer do |app|
if config.queue == Rails::Queueing::Queue
app.queue_consumer = config.queue_consumer.start(app.queue)
if config.queue == ActiveSupport::Queue
app.queue_consumer = config.queue_consumer.start(app.queue, Rails.logger)
at_exit { app.queue_consumer.shutdown }
end
end

View File

@ -82,5 +82,5 @@
# Default the production mode queue to an synchronous queue. You will probably
# want to replace this with an out-of-process queueing solution.
# config.queue = Rails::Queueing::SynchronousQueue
# config.queue = ActiveSupport::SynchronousQueue
end

View File

@ -40,5 +40,5 @@
config.active_support.deprecation = :stderr
# Use the testing queue.
config.queue = Rails::Queueing::TestQueue
config.queue = ActiveSupport::TestQueue
end

View File

@ -1,115 +0,0 @@
require "thread"
require 'delegate'
module Rails
module Queueing
# A container for multiple queues. This class delegates to a default Queue
# so that <tt>Rails.queue.push</tt> and friends will Just Work. To use this class
# with multiple queues:
#
# # In your configuration:
# Rails.queue[:image_queue] = SomeQueue.new
# Rails.queue[:mail_queue] = SomeQueue.new
#
# # In your app code:
# Rails.queue[:mail_queue].push SomeJob.new
#
class Container < DelegateClass(::Queue)
def initialize(default_queue)
@queues = { :default => default_queue }
super(default_queue)
end
def [](queue_name)
@queues[queue_name]
end
def []=(queue_name, queue)
@queues[queue_name] = queue
end
end
# A Queue that simply inherits from STDLIB's Queue. Everytime this
# queue is used, Rails automatically sets up a ThreadedConsumer
# to consume it.
class Queue < ::Queue
end
class SynchronousQueue < ::Queue
def push(job)
job.run
end
alias << push
alias enq push
end
# In test mode, the Rails queue is backed by an Array so that assertions
# can be made about its contents. The test queue provides a +jobs+
# method to make assertions about the queue's contents and a +drain+
# method to drain the queue and run the jobs.
#
# Jobs are run in a separate thread to catch mistakes where code
# assumes that the job is run in the same thread.
class TestQueue < ::Queue
# Get a list of the jobs off this queue. This method may not be
# available on production queues.
def jobs
@que.dup
end
# Marshal and unmarshal job before pushing it onto the queue. This will
# raise an exception on any attempts in tests to push jobs that can't (or
# shouldn't) be marshalled.
def push(job)
super Marshal.load(Marshal.dump(job))
end
# Drain the queue, running all jobs in a different thread. This method
# may not be available on production queues.
def drain
# run the jobs in a separate thread so assumptions of synchronous
# jobs are caught in test mode.
Thread.new { pop.run until empty? }.join
end
end
# The threaded consumer will run jobs in a background thread in
# development mode or in a VM where running jobs on a thread in
# production mode makes sense.
#
# When the process exits, the consumer pushes a nil onto the
# queue and joins the thread, which will ensure that all jobs
# are executed before the process finally dies.
class ThreadedConsumer
def self.start(queue)
new(queue).start
end
def initialize(queue)
@queue = queue
end
def start
@thread = Thread.new do
while job = @queue.pop
begin
job.run
rescue Exception => e
handle_exception e
end
end
end
self
end
def shutdown
@queue.push nil
@thread.join
end
def handle_exception(e)
Rails.logger.error "Job Error: #{e.message}\n#{e.backtrace.join("\n")}"
end
end
end
end

View File

@ -52,19 +52,19 @@ module ApplicationTests
test "uses the default queue for ActionMailer" do
require "#{app_path}/config/environment"
assert_kind_of Rails::Queueing::Container, ActionMailer::Base.queue
assert_kind_of ActiveSupport::QueueContainer, ActionMailer::Base.queue
end
test "allows me to configure queue for ActionMailer" do
app_file "config/environments/development.rb", <<-RUBY
AppTemplate::Application.configure do
Rails.queue[:mailer] = Rails::Queueing::TestQueue.new
Rails.queue[:mailer] = ActiveSupport::TestQueue.new
config.action_mailer.queue = Rails.queue[:mailer]
end
RUBY
require "#{app_path}/config/environment"
assert_kind_of Rails::Queueing::TestQueue, ActionMailer::Base.queue
assert_kind_of ActiveSupport::TestQueue, ActionMailer::Base.queue
end
test "does not include url helpers as action methods" do

View File

@ -19,14 +19,14 @@ module ApplicationTests
test "the queue is a TestQueue in test mode" do
app("test")
assert_kind_of Rails::Queueing::TestQueue, Rails.application.queue[:default]
assert_kind_of Rails::Queueing::TestQueue, Rails.queue[:default]
assert_kind_of ActiveSupport::TestQueue, Rails.application.queue[:default]
assert_kind_of ActiveSupport::TestQueue, Rails.queue[:default]
end
test "the queue is a SynchronousQueue in development mode" do
app("development")
assert_kind_of Rails::Queueing::SynchronousQueue, Rails.application.queue[:default]
assert_kind_of Rails::Queueing::SynchronousQueue, Rails.queue[:default]
assert_kind_of ActiveSupport::SynchronousQueue, Rails.application.queue[:default]
assert_kind_of ActiveSupport::SynchronousQueue, Rails.queue[:default]
end
class ThreadTrackingJob
@ -47,7 +47,7 @@ module ApplicationTests
end
end
test "in development mode, an enqueued job will be processed in the same thread" do
test "in development mode, an enqueued job will be processed in a separate thread" do
app("development")
job = ThreadTrackingJob.new
@ -55,7 +55,7 @@ module ApplicationTests
sleep 0.1
assert job.ran?, "Expected job to be run"
refute job.ran_in_different_thread?, "Expected job to run in the same thread"
assert job.ran_in_different_thread?, "Expected job to run in the same thread"
end
test "in test mode, explicitly draining the queue will process it in a separate thread" do
@ -160,12 +160,12 @@ module ApplicationTests
test "a custom consumer implementation can be provided" do
add_to_env_config "production", <<-RUBY
require "my_queue_consumer"
config.queue = Rails::Queueing::Queue
config.queue = ActiveSupport::Queue
config.queue_consumer = MyQueueConsumer
RUBY
app_file "lib/my_queue_consumer.rb", <<-RUBY
class MyQueueConsumer < Rails::Queueing::ThreadedConsumer
class MyQueueConsumer < ActiveSupport::ThreadedQueueConsumer
attr_reader :started
def start

View File

@ -1,30 +0,0 @@
require 'abstract_unit'
require 'rails/queueing'
module Rails
module Queueing
class ContainerTest < ActiveSupport::TestCase
def test_delegates_to_default
q = Queue.new
container = Container.new q
job = Object.new
container.push job
assert_equal job, q.pop
end
def test_access_default
q = Queue.new
container = Container.new q
assert_equal q, container[:default]
end
def test_assign_queue
container = Container.new Object.new
q = Object.new
container[:foo] = q
assert_equal q, container[:foo]
end
end
end
end