spec and migration cleanup for delayed jobs

Fixes up some abstraction so that we can add other jobs backends and
specs, migrations, etc will work as expected.

Also remove some unused parameters from the Delayed::Job methods for
finding and locking jobs.

test plan: run the database migrations on a new db, and migrations
should work without error. delayed jobs should also be created and
processed by workers without error.

Change-Id: I1fe6ef5464f9780db3010fa002703fc030832f8d
Reviewed-on: https://gerrit.instructure.com/11590
Reviewed-by: Cody Cutrer <cody@instructure.com>
Tested-by: Jenkins <jenkins@instructure.com>
Reviewed-by: Bracken Mosbacker <bracken@instructure.com>
This commit is contained in:
Brian Palmer 2012-06-11 13:17:31 -06:00
parent 73323a4815
commit 8e9bf96d18
23 changed files with 243 additions and 259 deletions

View File

@ -1,6 +1,6 @@
class CleanupDelayedJobsIndexes < ActiveRecord::Migration
def self.connection
Delayed::Job.connection
Delayed::Backend::ActiveRecord::Job.connection
end
def self.up

View File

@ -29,32 +29,12 @@ class OptimizeDelayedJobs < ActiveRecord::Migration
add_index :delayed_jobs, %w(strand id), :name => 'index_delayed_jobs_on_strand'
# move all failed jobs to the new failed table
Delayed::Job.find_each(:conditions => 'failed_at is not null') do |job|
Delayed::Backend::ActiveRecord::Job.find_each(:conditions => 'failed_at is not null') do |job|
job.fail! unless job.on_hold?
end
end
def self.down
remove_index :delayed_jobs, :name => 'index_delayed_jobs_for_get_next'
remove_index :delayed_jobs, :name => 'index_delayed_jobs_on_strand'
add_index :delayed_jobs, [:strand]
# from CleanupDelayedJobsIndexes migration
case connection.adapter_name
when 'PostgreSQL'
# "nulls first" syntax is postgresql specific, and allows for more
# efficient querying for the next job
connection.execute("CREATE INDEX get_delayed_jobs_index ON delayed_jobs (priority, run_at, failed_at nulls first, locked_at nulls first, queue)")
else
add_index :delayed_jobs, %w(priority run_at locked_at failed_at queue), :name => 'get_delayed_jobs_index'
end
Delayed::Job::Failed.find_each do |job|
attrs = job.attributes
attrs.delete('id')
Delayed::Job.create!(attrs)
end
drop_table :failed_jobs
raise ActiveRecord::IrreversibleMigration
end
end

View File

@ -1,6 +1,6 @@
class RemoveInactiveEnrollmentState < ActiveRecord::Migration
def self.up
Delayed::Job.delete_all(:tag => 'EnrollmentDateRestrictions.update_restricted_enrollments')
Delayed::Backend::ActiveRecord::Job.delete_all(:tag => 'EnrollmentDateRestrictions.update_restricted_enrollments')
Enrollment.update_all({:workflow_state => 'active'}, :workflow_state => 'inactive')
end

View File

@ -1,6 +1,6 @@
class AddDelayedJobsNextInStrand < ActiveRecord::Migration
def self.connection
Delayed::Job.connection
Delayed::Backend::ActiveRecord::Job.connection
end
def self.up
@ -95,10 +95,10 @@ class AddDelayedJobsNextInStrand < ActiveRecord::Migration
if connection.adapter_name == 'MySQL'
# use temp tables to work around subselect limitations in mysql
execute(%{CREATE TEMPORARY TABLE dj_20110831210257 (strand varchar(255), next_job_id bigint) SELECT strand, min(id) as next_job_id FROM delayed_jobs WHERE strand IS NOT NULL GROUP BY strand})
execute(%{UPDATE delayed_jobs SET next_in_strand = #{Delayed::Job.quote_value(false)} WHERE strand IS NOT NULL AND id <> (SELECT t.next_job_id FROM dj_20110831210257 t WHERE t.strand = delayed_jobs.strand)})
execute(%{UPDATE delayed_jobs SET next_in_strand = #{Delayed::Backend::ActiveRecord::Job.quote_value(false)} WHERE strand IS NOT NULL AND id <> (SELECT t.next_job_id FROM dj_20110831210257 t WHERE t.strand = delayed_jobs.strand)})
execute(%{DROP TABLE dj_20110831210257})
else
execute(%{UPDATE delayed_jobs SET next_in_strand = #{Delayed::Job.quote_value(false)} WHERE strand IS NOT NULL AND id <> (SELECT id FROM delayed_jobs j2 WHERE j2.strand = delayed_jobs.strand ORDER BY j2.strand, j2.id ASC LIMIT 1)})
execute(%{UPDATE delayed_jobs SET next_in_strand = #{Delayed::Backend::ActiveRecord::Job.quote_value(false)} WHERE strand IS NOT NULL AND id <> (SELECT id FROM delayed_jobs j2 WHERE j2.strand = delayed_jobs.strand ORDER BY j2.strand, j2.id ASC LIMIT 1)})
end
end

View File

@ -2,7 +2,7 @@ class DelayedJobsDeleteTriggerLockForUpdate < ActiveRecord::Migration
tag :predeploy
def self.connection
Delayed::Job.connection
Delayed::Backend::ActiveRecord::Job.connection
end
def self.up

View File

@ -2,7 +2,7 @@ class DelayedJobsUseAdvisoryLocks < ActiveRecord::Migration
tag :predeploy
def self.connection
Delayed::Job.connection
Delayed::Backend::ActiveRecord::Job.connection
end
def self.up

View File

@ -4,7 +4,7 @@ class IndexJobsOnLockedBy < ActiveRecord::Migration
self.transactional = false
def self.connection
Delayed::Job.connection
Delayed::Backend::ActiveRecord::Job.connection
end
def self.up

View File

@ -4,7 +4,7 @@ class AddJobsRunAtIndex < ActiveRecord::Migration
self.transactional = false
def self.connection
Delayed::Job.connection
Delayed::Backend::ActiveRecord::Job.connection
end
def self.up

View File

@ -194,7 +194,7 @@ describe "site admin jobs ui" do
context "running jobs" do
it "should display running jobs in the workers grid" do
j = Delayed::Job.first(:order => :id)
j.lock_exclusively!(100, 'my test worker')
j.lock_exclusively!('my test worker')
load_jobs_page
ffj('#running-grid .slick-row').size.should eql 1
first_cell = f('#running-grid .slick-cell.l0.r0')

View File

@ -32,7 +32,7 @@ ALL_MODELS = (ActiveRecord::Base.send(:subclasses) +
model = File.basename(file, ".*").camelize.constantize
next unless model < ActiveRecord::Base
model
}).compact.uniq.reject { |model| model.superclass != ActiveRecord::Base || model == Tableless }
}).compact.uniq.reject { |model| model.superclass != ActiveRecord::Base || (model.respond_to?(:tableless?) && model.tableless?) }
ALL_MODELS << Version
ALL_MODELS << Delayed::Backend::ActiveRecord::Job::Failed
ALL_MODELS << Delayed::Backend::ActiveRecord::Job
@ -759,7 +759,6 @@ Spec::Runner.configure do |config|
def run_jobs
while job = Delayed::Job.get_and_lock_next_available(
'spec run_jobs',
1.hour,
Delayed::Worker.queue,
0,
Delayed::MAX_PRIORITY)

View File

@ -19,11 +19,6 @@ module Delayed
class Job < ::ActiveRecord::Base
include Delayed::Backend::Base
set_table_name :delayed_jobs
attr_writer :current_shard
def current_shard
@current_shard || Shard.default
end
# be aware that some strand functionality is controlled by triggers on
# the database. see
@ -68,9 +63,6 @@ module Delayed
end
end
cattr_accessor :default_priority
self.default_priority = Delayed::NORMAL_PRIORITY
named_scope :current, lambda {
{ :conditions => ["run_at <= ?", db_time_now] }
}
@ -88,7 +80,7 @@ module Delayed
# 500.times { |i| "ohai".send_later_enqueue_args(:reverse, { :run_at => (12.hours.ago + (rand(24.hours.to_i))) }) }
# then fire up your workers
# you can check out strand correctness: diff test1.txt <(sort -n test1.txt)
named_scope :ready_to_run, lambda {|worker_name, max_run_time|
named_scope :ready_to_run, lambda {
{ :conditions => ["run_at <= ? AND locked_at IS NULL AND next_in_strand = ?", db_time_now, true] }
}
named_scope :by_priority, :order => 'priority ASC, run_at ASC'
@ -103,7 +95,6 @@ module Delayed
end
def self.get_and_lock_next_available(worker_name,
max_run_time,
queue = nil,
min_priority = nil,
max_priority = nil)
@ -113,30 +104,26 @@ module Delayed
@batch_size ||= Setting.get_cached('jobs_get_next_batch_size', '5').to_i
loop do
jobs = find_available(worker_name, @batch_size, max_run_time, queue, min_priority, max_priority)
jobs = find_available(@batch_size, queue, min_priority, max_priority)
return nil if jobs.empty?
job = jobs.detect do |job|
job.lock_exclusively!(max_run_time, worker_name)
job.lock_exclusively!(worker_name)
end
return job if job
end
end
def self.find_available(worker_name,
limit,
max_run_time,
def self.find_available(limit,
queue = nil,
min_priority = nil,
max_priority = nil)
all_available(worker_name, max_run_time, queue, min_priority, max_priority).all(:limit => limit)
all_available(queue, min_priority, max_priority).all(:limit => limit)
end
def self.all_available(worker_name,
max_run_time,
queue = nil,
def self.all_available(queue = nil,
min_priority = nil,
max_priority = nil)
scope = self.ready_to_run(worker_name, max_run_time)
scope = self.ready_to_run
scope = scope.scoped(:conditions => ['priority >= ?', min_priority]) if min_priority
scope = scope.scoped(:conditions => ['priority <= ?', max_priority]) if max_priority
scope = scope.scoped(:conditions => ['queue = ?', queue]) if queue
@ -169,7 +156,7 @@ module Delayed
# It's important to note that for performance reasons, this method does
# not re-check the strand constraints -- so you could manually lock a
# job using this method that isn't the next to run on its strand.
def lock_exclusively!(max_run_time, worker)
def lock_exclusively!(worker)
now = self.class.db_time_now
# We don't own this job so we will update the locked_by name and the locked_at
affected_rows = self.class.update_all(["locked_at = ?, locked_by = ?", now, worker], ["id = ? and locked_at is null and run_at <= ?", id, now])
@ -224,13 +211,6 @@ module Delayed
update_all(["locked_by = NULL, locked_at = NULL, attempts = 0, run_at = (CASE WHEN run_at > ? THEN run_at ELSE ? END), failed_at = NULL", now, now])
end
# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have syncronized clocks.
def self.db_time_now
Time.now.in_time_zone
end
class Failed < Job
include Delayed::Backend::Base
set_table_name :failed_jobs

View File

@ -12,10 +12,17 @@ module Delayed
def self.included(base)
base.extend ClassMethods
base.send :attr_writer, :current_shard
base.default_priority = Delayed::NORMAL_PRIORITY
end
def current_shard
@current_shard || Shard.default
end
module ClassMethods
attr_accessor :batches
attr_accessor :default_priority
# Add a job to the queue
# The first argument should be an object that respond_to?(:perform)
@ -61,6 +68,13 @@ module Delayed
def in_delayed_job=(val)
Thread.current[:in_delayed_job] = val
end
# Get the current time (GMT or local depending on DB)
# Note: This does not ping the DB to get the time, so all your clients
# must have syncronized clocks.
def db_time_now
Time.now.in_time_zone
end
end
def failed?

View File

@ -86,7 +86,6 @@ class Worker
job = Delayed::Job.get_and_lock_next_available(
name,
self.class.max_run_time,
queue,
min_priority,
max_priority)

View File

@ -0,0 +1,29 @@
require File.expand_path("../spec_helper", __FILE__)
describe 'Delayed::Backed::ActiveRecord::Job' do
before :all do
@job_spec_backend = Delayed::Job
Delayed.send(:remove_const, :Job)
Delayed::Job = Delayed::Backend::ActiveRecord::Job
end
after :all do
Delayed.send(:remove_const, :Job)
Delayed::Job = @job_spec_backend
end
before do
Delayed::Job.delete_all
end
it_should_behave_like 'a delayed_jobs implementation'
it "should recover as well as possible from a failure failing a job" do
Delayed::Job::Failed.stubs(:create).raises(RuntimeError)
job = "test".send_later :reverse
job_id = job.id
proc { job.fail! }.should raise_error
proc { Delayed::Job.find(job_id) }.should raise_error(ActiveRecord::RecordNotFound)
Delayed::Job.count.should == 0
end
end

View File

@ -1,6 +1,6 @@
require File.expand_path("../../../../../spec/sharding_spec_helper", __FILE__)
describe Delayed::Batch do
shared_examples_for 'Delayed::Batch' do
before :each do
Delayed::Worker.queue = nil
Delayed::Job.delete_all

View File

@ -1,9 +1,9 @@
require File.expand_path("../spec_helper", __FILE__)
describe 'random ruby objects' do
before :each do
Delayed::Worker.queue = nil
Delayed::Job.delete_all
shared_examples_for 'random ruby objects' do
def set_queue(name)
old_name = Delayed::Worker.queue
Delayed::Worker.queue = name
ensure
Delayed::Worker.queue = old_name
end
it "should respond_to :send_later method" do
@ -282,12 +282,9 @@ describe 'random ruby objects' do
it "should call send later on methods which are wrapped with handle_asynchronously" do
story = Story.create :text => 'Once upon...'
Delayed::Job.count.should == 0
job = nil
expect { job = story.whatever(1, 5) }.to change(Delayed::Job, :count).by(1)
story.whatever(1, 5)
Delayed::Job.count.should == 1
job = Delayed::Job.find(:first)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.method.should == :whatever_without_send_later
job.payload_object.args.should == [1, 5]
@ -297,12 +294,9 @@ describe 'random ruby objects' do
it "should call send later on methods which are wrapped with handle_asynchronously_with_queue" do
story = Story.create :text => 'Once upon...'
Delayed::Job.count.should == 0
job = nil
expect { job = story.whatever_else(1, 5) }.to change(Delayed::Job, :count).by(1)
story.whatever_else(1, 5)
Delayed::Job.count.should == 1
job = Delayed::Job.find(:first)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.method.should == :whatever_else_without_send_later
job.payload_object.args.should == [1, 5]
@ -311,16 +305,19 @@ describe 'random ruby objects' do
context "send_later" do
it "should use the default queue if there is one" do
Delayed::Worker.queue = "testqueue"
set_queue("testqueue") do
job = "string".send_later :reverse
job.queue.should == "testqueue"
end
end
it "should have nil queue if there is not a default" do
set_queue(nil) do
job = "string".send_later :reverse
job.queue.should == nil
end
end
end
context "send_at" do
it "should queue a new job" do
@ -344,16 +341,19 @@ describe 'random ruby objects' do
end
it "should use the default queue if there is one" do
Delayed::Worker.queue = "testqueue"
set_queue("testqueue") do
job = "string".send_at 1.hour.from_now, :reverse
job.queue.should == "testqueue"
end
end
it "should have nil queue if there is not a default" do
set_queue(nil) do
job = "string".send_at 1.hour.from_now, :reverse
job.queue.should == nil
end
end
end
context "send_at_with_queue" do
it "should queue a new job" do
@ -369,10 +369,11 @@ describe 'random ruby objects' do
end
it "should override the default queue" do
Delayed::Worker.queue = "default_queue"
set_queue("default_queue") do
job = "string".send_at_with_queue(1.hour.from_now, :length, "testqueue")
job.queue.should == "testqueue"
end
end
it "should store payload as PerformableMethod" do
job = "string".send_at_with_queue(1.hour.from_now, :count, "testqueue", 'r')

View File

@ -1,33 +0,0 @@
require File.expand_path("../spec_helper", __FILE__)
describe Delayed::Job do
before(:all) do
@backend = Delayed::Job
end
before(:each) do
Delayed::Job.delete_all
SimpleJob.runs = 0
end
it_should_behave_like 'a backend'
it "should fail on job creation if an unsaved AR object is used" do
story = Story.new :text => "Once upon..."
lambda { story.send_later(:text) }.should raise_error
reader = StoryReader.new
lambda { reader.send_later(:read, story) }.should raise_error
lambda { [story, 1, story, false].send_later(:first) }.should raise_error
end
it "should recover as well as possible from a failure failing a job" do
Delayed::Job::Failed.stubs(:create).raises(RuntimeError)
job = "test".send_later :reverse
job_id = job.id
proc { job.fail! }.should raise_error
proc { Delayed::Job.find(job_id) }.should raise_error(ActiveRecord::RecordNotFound)
Delayed::Job.count.should == 0
end
end

View File

@ -1,6 +1,4 @@
require File.expand_path("../spec_helper", __FILE__)
describe Delayed::PerformableMethod do
shared_examples_for 'Delayed::PerformableMethod' do
it "should not ignore ActiveRecord::RecordNotFound errors because they are not always permanent" do
story = Story.create :text => 'Once upon...'

View File

@ -1,6 +1,6 @@
shared_examples_for 'a backend' do
def create_job(opts = {})
@backend.enqueue(SimpleJob.new, { :queue => nil }.merge(opts))
Delayed::Job.enqueue(SimpleJob.new, { :queue => nil }.merge(opts))
end
before do
@ -8,72 +8,72 @@ shared_examples_for 'a backend' do
end
it "should set run_at automatically if not set" do
@backend.create(:payload_object => ErrorJob.new ).run_at.should_not be_nil
Delayed::Job.create(:payload_object => ErrorJob.new ).run_at.should_not be_nil
end
it "should not set run_at automatically if already set" do
later = @backend.db_time_now + 5.minutes
@backend.create(:payload_object => ErrorJob.new, :run_at => later).run_at.should be_close(later, 1)
later = Delayed::Job.db_time_now + 5.minutes
Delayed::Job.create(:payload_object => ErrorJob.new, :run_at => later).run_at.should be_close(later, 1)
end
it "should raise ArgumentError when handler doesn't respond_to :perform" do
lambda { @backend.enqueue(Object.new) }.should raise_error(ArgumentError)
lambda { Delayed::Job.enqueue(Object.new) }.should raise_error(ArgumentError)
end
it "should increase count after enqueuing items" do
@backend.enqueue SimpleJob.new
@backend.count.should == 1
Delayed::Job.enqueue SimpleJob.new
Delayed::Job.count.should == 1
end
it "should be able to set priority when enqueuing items" do
@job = @backend.enqueue SimpleJob.new, :priority => 5
@job = Delayed::Job.enqueue SimpleJob.new, :priority => 5
@job.priority.should == 5
end
it "should use the default priority when enqueuing items" do
@backend.default_priority = 0
@job = @backend.enqueue SimpleJob.new
Delayed::Job.default_priority = 0
@job = Delayed::Job.enqueue SimpleJob.new
@job.priority.should == 0
@backend.default_priority = 10
@job = @backend.enqueue SimpleJob.new
Delayed::Job.default_priority = 10
@job = Delayed::Job.enqueue SimpleJob.new
@job.priority.should == 10
@backend.default_priority = 0
Delayed::Job.default_priority = 0
end
it "should be able to set run_at when enqueuing items" do
later = @backend.db_time_now + 5.minutes
@job = @backend.enqueue SimpleJob.new, :priority => 5, :run_at => later
later = Delayed::Job.db_time_now + 5.minutes
@job = Delayed::Job.enqueue SimpleJob.new, :priority => 5, :run_at => later
@job.run_at.should be_close(later, 1)
end
it "should work with jobs in modules" do
M::ModuleJob.runs = 0
job = @backend.enqueue M::ModuleJob.new
job = Delayed::Job.enqueue M::ModuleJob.new
lambda { job.invoke_job }.should change { M::ModuleJob.runs }.from(0).to(1)
end
it "should raise an DeserializationError when the job class is totally unknown" do
job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
job = Delayed::Job.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
it "should try to load the class when it is unknown at the time of the deserialization" do
job = @backend.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
job = Delayed::Job.new :handler => "--- !ruby/object:JobThatDoesNotExist {}"
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
it "should try include the namespace when loading unknown objects" do
job = @backend.new :handler => "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
job = Delayed::Job.new :handler => "--- !ruby/object:Delayed::JobThatDoesNotExist {}"
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
it "should also try to load structs when they are unknown (raises TypeError)" do
job = @backend.new :handler => "--- !ruby/struct:JobThatDoesNotExist {}"
job = Delayed::Job.new :handler => "--- !ruby/struct:JobThatDoesNotExist {}"
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
it "should try include the namespace when loading unknown structs" do
job = @backend.new :handler => "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
job = Delayed::Job.new :handler => "--- !ruby/struct:Delayed::JobThatDoesNotExist {}"
lambda { job.payload_object.perform }.should raise_error(Delayed::Backend::DeserializationError)
end
@ -81,88 +81,86 @@ shared_examples_for 'a backend' do
it "should not find failed jobs" do
@job = create_job :attempts => 50
@job.fail!
@backend.find_available('worker', 5, 1.second).should_not include(@job)
Delayed::Job.find_available(5).should_not include(@job)
end
it "should not find jobs scheduled for the future" do
@job = create_job :run_at => (@backend.db_time_now + 1.minute)
@backend.find_available('worker', 5, 4.hours).should_not include(@job)
@job = create_job :run_at => (Delayed::Job.db_time_now + 1.minute)
Delayed::Job.find_available(5).should_not include(@job)
end
it "should not find jobs locked by another worker" do
@job = create_job(:locked_by => 'other_worker', :locked_at => @backend.db_time_now - 1.minute)
@backend.find_available('worker', 5, 4.hours).should_not include(@job)
@job = create_job
Delayed::Job.get_and_lock_next_available('other_worker').should == @job
Delayed::Job.find_available(5).should_not include(@job)
end
it "should find open jobs" do
@job = create_job
@backend.find_available('worker', 5, 4.hours).should include(@job)
Delayed::Job.find_available(5).should include(@job)
end
it "should find expired jobs" do
@job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now - 2.minutes)
@job = create_job
Delayed::Job.get_and_lock_next_available('other_worker').should == @job
@job.update_attribute(:locked_at, Delayed::Job.db_time_now - 2.minutes)
Delayed::Job.unlock_expired_jobs(1.minute)
@backend.find_available('worker', 5, 1.minute).should include(@job)
Delayed::Job.find_available(5).should include(@job)
end
end
context "when another worker is already performing an task, it" do
before :each do
@job = @backend.create :payload_object => SimpleJob.new, :locked_by => 'worker1', :locked_at => @backend.db_time_now - 5.minutes
@job = Delayed::Job.create :payload_object => SimpleJob.new
Delayed::Job.get_and_lock_next_available('worker1').should == @job
end
it "should not allow a second worker to get exclusive access" do
@job.lock_exclusively!(4.hours, 'worker2').should == false
Delayed::Job.get_and_lock_next_available('worker2').should be_nil
end
it "should allow a second worker to get exclusive access if the timeout has passed" do
Delayed::Job.unlock_expired_jobs(1.minute)
@job.lock_exclusively!(1.minute, 'worker2').should == true
end
it "should be able to get access to the task if it was started more then max_age ago" do
@job.locked_at = 5.hours.ago
@job.save
@job.update_attribute(:locked_at, 5.hours.ago)
Delayed::Job.unlock_expired_jobs(4.hours)
@job.lock_exclusively! 4.hours, 'worker2'
Delayed::Job.get_and_lock_next_available('worker2').should == @job
@job.reload
@job.locked_by.should == 'worker2'
@job.locked_at.should > 1.minute.ago
end
it "should not be found by another worker" do
@backend.find_available('worker2', 1, 6.minutes).length.should == 0
Delayed::Job.find_available(1).length.should == 0
end
it "should be found by another worker if the time has expired" do
Delayed::Job.unlock_expired_jobs(4.minutes)
@backend.find_available('worker2', 1, 4.minutes).length.should == 1
@job.update_attribute(:locked_at, 5.hours.ago)
Delayed::Job.unlock_expired_jobs(4.hours)
Delayed::Job.find_available(5).length.should == 1
end
end
context "when another worker has worked on a task since the job was found to be available, it" do
before :each do
@job = @backend.create :payload_object => SimpleJob.new
@job_copy_for_worker_2 = @backend.find(@job.id)
@job = Delayed::Job.create :payload_object => SimpleJob.new
@job_copy_for_worker_2 = Delayed::Job.find(@job.id)
end
it "should not allow a second worker to get exclusive access if already successfully processed by worker1" do
@job.destroy
@job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
@job_copy_for_worker_2.lock_exclusively!('worker2').should == false
end
it "should not allow a second worker to get exclusive access if failed to be processed by worker1 and run_at time is now in future (due to backing off behaviour)" do
@job.update_attributes(:attempts => 1, :run_at => 1.day.from_now)
@job_copy_for_worker_2.lock_exclusively!(4.hours, 'worker2').should == false
@job_copy_for_worker_2.lock_exclusively!('worker2').should == false
end
end
context "#name" do
it "should be the class name of the job that was enqueued" do
@backend.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
Delayed::Job.create(:payload_object => ErrorJob.new ).name.should == 'ErrorJob'
end
it "should be the method that will be called if its a performable method object" do
@ -179,7 +177,7 @@ shared_examples_for 'a backend' do
context "worker prioritization" do
it "should fetch jobs ordered by priority" do
10.times { create_job :priority => rand(10) }
jobs = @backend.find_available('worker', 10, 10)
jobs = Delayed::Job.find_available(10)
jobs.size.should == 10
jobs.each_cons(2) do |a, b|
a.priority.should <= b.priority
@ -189,23 +187,23 @@ shared_examples_for 'a backend' do
context "clear_locks!" do
before do
@job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now)
@job = create_job(:locked_by => 'worker', :locked_at => Delayed::Job.db_time_now)
end
it "should clear locks for the given worker" do
@backend.clear_locks!('worker')
@backend.find_available('worker2', 5, 1.minute).should include(@job)
Delayed::Job.clear_locks!('worker')
Delayed::Job.find_available(5).should include(@job)
end
it "should not clear locks for other workers" do
@backend.clear_locks!('worker1')
@backend.find_available('worker1', 5, 1.minute).should_not include(@job)
Delayed::Job.clear_locks!('worker1')
Delayed::Job.find_available(5).should_not include(@job)
end
end
context "unlock" do
before do
@job = create_job(:locked_by => 'worker', :locked_at => @backend.db_time_now)
@job = create_job(:locked_by => 'worker', :locked_at => Delayed::Job.db_time_now)
end
it "should clear locks" do
@ -219,85 +217,85 @@ shared_examples_for 'a backend' do
it "should run strand jobs in strict order" do
job1 = create_job(:strand => 'myjobs')
job2 = create_job(:strand => 'myjobs')
@backend.get_and_lock_next_available('w1', 60).should == job1
@backend.get_and_lock_next_available('w2', 60).should == nil
Delayed::Job.get_and_lock_next_available('w1').should == job1
Delayed::Job.get_and_lock_next_available('w2').should == nil
job1.destroy
# update time since the failed lock pushed it forward
job2.update_attribute(:run_at, 1.minute.ago)
@backend.get_and_lock_next_available('w3', 60).should == job2
@backend.get_and_lock_next_available('w4', 60).should == nil
Delayed::Job.get_and_lock_next_available('w3').should == job2
Delayed::Job.get_and_lock_next_available('w4').should == nil
end
it "should fail to lock if an earlier job gets locked" do
job1 = create_job(:strand => 'myjobs')
job2 = create_job(:strand => 'myjobs')
@backend.find_available('w1', 2, 60).should == [job1]
@backend.find_available('w2', 2, 60).should == [job1]
Delayed::Job.find_available(2).should == [job1]
Delayed::Job.find_available(2).should == [job1]
# job1 gets locked by w1
job1.lock_exclusively!(60, 'w1').should == true
job1.lock_exclusively!('w1').should == true
# w2 tries to lock job1, fails
job1.lock_exclusively!(60, 'w2').should == false
job1.lock_exclusively!('w2').should == false
# normally w2 would now be able to lock job2, but strands prevent it
@backend.get_and_lock_next_available('w2', 60).should be_nil
Delayed::Job.get_and_lock_next_available('w2').should be_nil
# now job1 is done
job1.destroy
# update time since the failed lock pushed it forward
job2.update_attribute(:run_at, 1.minute.ago)
job2.lock_exclusively!(60, 'w2').should == true
job2.lock_exclusively!('w2').should == true
end
it "should keep strand jobs in order as they are rescheduled" do
job1 = create_job(:strand => 'myjobs')
job2 = create_job(:strand => 'myjobs')
job3 = create_job(:strand => 'myjobs')
@backend.get_and_lock_next_available('w1', 60).should == job1
@backend.find_available('w2', 1, 60).should == []
Delayed::Job.get_and_lock_next_available('w1').should == job1
Delayed::Job.find_available(1).should == []
job1.destroy
# move job2's time forward
job2.update_attribute(:run_at, 1.second.ago)
job3.update_attribute(:run_at, 5.seconds.ago)
# we should still get job2, not job3
@backend.get_and_lock_next_available('w1', 60).should == job2
Delayed::Job.get_and_lock_next_available('w1').should == job2
end
it "should allow to run the next job if a failed job is present" do
job1 = create_job(:strand => 'myjobs')
job2 = create_job(:strand => 'myjobs')
job1.fail!
@backend.find_available('w1', 2, 60).should == [job2]
job2.lock_exclusively!(60, 'w1').should == true
Delayed::Job.find_available(2).should == [job2]
job2.lock_exclusively!('w1').should == true
end
it "should not interfere with jobs with no strand" do
job1 = create_job(:strand => nil)
job2 = create_job(:strand => 'myjobs')
@backend.get_and_lock_next_available('w1', 60).should == job1
@backend.get_and_lock_next_available('w2', 60).should == job2
@backend.get_and_lock_next_available('w3', 60).should == nil
Delayed::Job.get_and_lock_next_available('w1').should == job1
Delayed::Job.get_and_lock_next_available('w2').should == job2
Delayed::Job.get_and_lock_next_available('w3').should == nil
end
it "should not interfere with jobs in other strands" do
job1 = create_job(:strand => 'strand1')
job2 = create_job(:strand => 'strand2')
@backend.get_and_lock_next_available('w1', 60).should == job1
@backend.get_and_lock_next_available('w2', 60).should == job2
@backend.get_and_lock_next_available('w3', 60).should == nil
Delayed::Job.get_and_lock_next_available('w1').should == job1
Delayed::Job.get_and_lock_next_available('w2').should == job2
Delayed::Job.get_and_lock_next_available('w3').should == nil
end
context 'singleton' do
it "should create if there's no jobs on the strand" do
@job = create_job(:singleton => 'myjobs')
@job.should be_present
@backend.get_and_lock_next_available('w1', 60).should == @job
Delayed::Job.get_and_lock_next_available('w1').should == @job
end
it "should create if there's another job on the strand, but it's running" do
@job = create_job(:singleton => 'myjobs')
@job.should be_present
@backend.get_and_lock_next_available('w1', 60).should == @job
Delayed::Job.get_and_lock_next_available('w1').should == @job
@job2 = create_job(:singleton => 'myjobs')
@job.should be_present
@ -315,7 +313,7 @@ shared_examples_for 'a backend' do
it "should not create if there's a job running and one waiting on the strand" do
@job = create_job(:singleton => 'myjobs')
@job.should be_present
@backend.get_and_lock_next_available('w1', 60).should == @job
Delayed::Job.get_and_lock_next_available('w1').should == @job
@job2 = create_job(:singleton => 'myjobs')
@job2.should be_present
@ -346,10 +344,10 @@ shared_examples_for 'a backend' do
it "should hold/unhold jobs" do
job1 = create_job()
job1.hold!
@backend.get_and_lock_next_available('w1', 60).should be_nil
Delayed::Job.get_and_lock_next_available('w1').should be_nil
job1.unhold!
@backend.get_and_lock_next_available('w1', 60).should == job1
Delayed::Job.get_and_lock_next_available('w1').should == job1
end
it "should hold a scope of jobs" do
@ -378,66 +376,66 @@ shared_examples_for 'a backend' do
before(:each) do
Delayed::Periodic.scheduled = {}
Delayed::Periodic.cron('my SimpleJob', '*/5 * * * * *') do
@backend.enqueue(SimpleJob.new)
Delayed::Job.enqueue(SimpleJob.new)
end
end
it "should schedule jobs if they aren't scheduled yet" do
@backend.count.should == 0
audit_started = @backend.db_time_now
Delayed::Job.count.should == 0
audit_started = Delayed::Job.db_time_now
Delayed::Periodic.perform_audit!
@backend.count.should == 1
job = @backend.first
Delayed::Job.count.should == 1
job = Delayed::Job.first
job.tag.should == 'periodic: my SimpleJob'
job.payload_object.should == Delayed::Periodic.scheduled['my SimpleJob']
job.run_at.should >= audit_started
job.run_at.should <= @backend.db_time_now + 6.minutes
job.run_at.should <= Delayed::Job.db_time_now + 6.minutes
job.strand.should == job.tag
end
it "should schedule jobs if there are only failed jobs on the queue" do
@backend.count.should == 0
expect { Delayed::Periodic.perform_audit! }.to change(@backend, :count).by(1)
@backend.count.should == 1
job = @backend.first
Delayed::Job.count.should == 0
expect { Delayed::Periodic.perform_audit! }.to change(Delayed::Job, :count).by(1)
Delayed::Job.count.should == 1
job = Delayed::Job.first
job.fail!
expect { Delayed::Periodic.perform_audit! }.to change(@backend, :count).by(1)
expect { Delayed::Periodic.perform_audit! }.to change(Delayed::Job, :count).by(1)
end
it "should not schedule jobs that are already scheduled" do
@backend.count.should == 0
Delayed::Job.count.should == 0
Delayed::Periodic.perform_audit!
@backend.count.should == 1
job = @backend.first
Delayed::Job.count.should == 1
job = Delayed::Job.first
Delayed::Periodic.perform_audit!
@backend.count.should == 1
job.should == @backend.first
Delayed::Job.count.should == 1
job.should == Delayed::Job.first
end
it "should aduit on the auditor strand" do
Delayed::Periodic.audit_queue
@backend.count.should == 1
@backend.first.strand.should == Delayed::Periodic::STRAND
Delayed::Job.count.should == 1
Delayed::Job.first.strand.should == Delayed::Periodic::STRAND
end
it "should only schedule an audit if none is scheduled" do
Delayed::Periodic.audit_queue
@backend.count.should == 1
Delayed::Job.count.should == 1
Delayed::Periodic.audit_queue
@backend.count.should == 1
Delayed::Job.count.should == 1
end
it "should schedule the next job run after performing" do
Delayed::Periodic.perform_audit!
job = @backend.first
job = Delayed::Job.first
run_job(job)
job.destroy
@backend.count.should == 2
job = @backend.first(:order => 'run_at asc')
Delayed::Job.count.should == 2
job = Delayed::Job.first(:order => 'run_at asc')
job.tag.should == 'SimpleJob#perform'
next_scheduled = @backend.last(:order => 'run_at asc')
next_scheduled = Delayed::Job.last(:order => 'run_at asc')
next_scheduled.tag.should == 'periodic: my SimpleJob'
next_scheduled.payload_object.should be_is_a(Delayed::Periodic)
next_scheduled.run_at.utc.to_i.should >= Time.now.utc.to_i
@ -464,7 +462,7 @@ shared_examples_for 'a backend' do
Setting.set_config('periodic_jobs', { 'my ChangedJob' => '*/10 * * * * *' })
Delayed::Periodic.scheduled = {}
Delayed::Periodic.cron('my ChangedJob', '*/5 * * * * *') do
@backend.enqueue(SimpleJob.new)
Delayed::Job.enqueue(SimpleJob.new)
end
Delayed::Periodic.scheduled['my ChangedJob'].cron.original.should == '*/10 * * * * *'
Delayed::Periodic.audit_overrides!
@ -474,7 +472,7 @@ shared_examples_for 'a backend' do
Setting.set_config('periodic_jobs', { 'my ChangedJob' => '*/10 * * * * * *' }) # extra asterisk
Delayed::Periodic.scheduled = {}
expect { Delayed::Periodic.cron('my ChangedJob', '*/5 * * * * *') do
@backend.enqueue(SimpleJob.new)
Delayed::Job.enqueue(SimpleJob.new)
end }.to raise_error
expect { Delayed::Periodic.audit_overrides! }.to raise_error
@ -493,4 +491,14 @@ shared_examples_for 'a backend' do
job.invoke_job
Delayed::Job.in_delayed_job?.should == false
end
it "should fail on job creation if an unsaved AR object is used" do
story = Story.new :text => "Once upon..."
lambda { story.send_later(:text) }.should raise_error
reader = StoryReader.new
lambda { reader.send_later(:read, story) }.should raise_error
lambda { [story, 1, story, false].send_later(:first) }.should raise_error
end
end

View File

@ -0,0 +1,15 @@
require File.expand_path('../shared_backend_spec', __FILE__)
require File.expand_path('../delayed_batch_spec', __FILE__)
require File.expand_path('../delayed_method_spec', __FILE__)
require File.expand_path('../performable_method_spec', __FILE__)
require File.expand_path('../stats_spec', __FILE__)
require File.expand_path('../worker_spec', __FILE__)
shared_examples_for 'a delayed_jobs implementation' do
it_should_behave_like 'a backend'
it_should_behave_like 'Delayed::Batch'
it_should_behave_like 'random ruby objects'
it_should_behave_like 'Delayed::PerformableMethod'
it_should_behave_like 'Delayed::Stats'
it_should_behave_like 'Delayed::Worker'
end

View File

@ -27,4 +27,4 @@ module MyReverser
end
require File.expand_path('../sample_jobs', __FILE__)
require File.expand_path('../shared_backend_spec', __FILE__)
require File.expand_path('../shared_jobs_spec', __FILE__)

View File

@ -1,14 +1,12 @@
require File.expand_path("../spec_helper", __FILE__)
if Canvas.redis_enabled?
describe Delayed::Stats do
shared_examples_for 'Delayed::Stats' do
before do
Setting.set('delayed_jobs_store_stats', 'redis')
end
it "should store stats for jobs" do
job = "ohai".send_later(:reverse)
job.lock_exclusively!(60, 'stub worker').should be_true
job.lock_exclusively!('stub worker').should be_true
worker = mock('Delayed::Worker')
worker.stubs(:name).returns("stub worker")
Delayed::Stats.job_complete(job, worker)
@ -18,7 +16,7 @@ describe Delayed::Stats do
it "should completely clean up after stats" do
job = "ohai".send_later(:reverse)
job.lock_exclusively!(60, 'stub worker').should be_true
job.lock_exclusively!('stub worker').should be_true
worker = mock('Delayed::Worker')
worker.stubs(:name).returns("stub worker")

View File

@ -1,6 +1,4 @@
require File.expand_path("../spec_helper", __FILE__)
describe Delayed::Worker do
shared_examples_for 'Delayed::Worker' do
def job_create(opts = {})
Delayed::Job.create({:payload_object => SimpleJob.new, :queue => Delayed::Worker.queue}.merge(opts))
end
@ -177,7 +175,7 @@ describe Delayed::Worker do
it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
Delayed::Worker.on_max_failures = proc { false }
@job.update_attribute(:max_attempts, 1)
@job.lock_exclusively!(Delayed::Worker.max_run_time, @worker.name).should == true
@job.lock_exclusively!(@worker.name).should == true
@worker.perform(@job)
old_id = @job.id
@job = Delayed::Job::Failed.first
@ -191,9 +189,7 @@ describe Delayed::Worker do
# job stays locked after failing, for record keeping of time/worker
@job.should be_locked
all_jobs = Delayed::Job.all_available(@worker.name,
Delayed::Worker.max_run_time,
@job.queue)
all_jobs = Delayed::Job.all_available(@job.queue)
all_jobs.find_by_id(@job.id).should be_nil
end