redis jobs for rails 3

since we're not inheriting from AR::Base, we have to implement some more
stuff ourself. still better than getting all of AR::Base, though.

Change-Id: Ifb21966310b90864394e135d97f1bef1ed6650a4
Reviewed-on: https://gerrit.instructure.com/30254
Tested-by: Jenkins <jenkins@instructure.com>
Reviewed-by: Brian Palmer <brianp@instructure.com>
Product-Review: Cody Cutrer <cody@instructure.com>
QA-Review: Cody Cutrer <cody@instructure.com>
This commit is contained in:
Cody Cutrer 2014-02-18 14:30:43 -07:00
parent da22ab5d76
commit 21e967dd9f
4 changed files with 243 additions and 126 deletions

View File

@ -36,6 +36,7 @@ class Job
unless CANVAS_RAILS2
extend ActiveModel::Callbacks
define_model_callbacks :create, :save
include ActiveModel::Dirty
end
include Delayed::Backend::Base
# This redis instance needs to be set by the application during jobs configuration
@ -107,34 +108,142 @@ class Job
raise("Delayed::MAX_PRIORITY must be less than #{WAITING_STRAND_JOB_PRIORITY}")
end
def self.reconnect!
self.redis.reconnect
end
def self.columns
@@columns ||= []
end
def self.functions
@@functions ||= Delayed::Backend::Redis::Functions.new(redis)
end
def self.column(name, sql_type = nil, default = nil, null = true)
columns << ActiveRecord::ConnectionAdapters::Column.new(name.to_s, default,
sql_type.to_s, null)
sql_type.to_s, null)
end
attr_protected if CANVAS_RAILS2
column(:id, :string)
column(:priority, :integer, 0)
column(:attempts, :integer, 0)
column(:handler, :text)
column(:last_error, :text)
column(:queue, :string)
column(:run_at, :timestamp)
column(:locked_at, :timestamp)
column(:failed_at, :timestamp)
column(:locked_by, :string)
column(:created_at, :timestamp)
column(:updated_at, :timestamp)
column(:tag, :string)
column(:max_attempts, :integer)
column(:strand, :string)
def self.tableless?
true
if CANVAS_RAILS2
attr_protected
def self.tableless?
true
end
def self.table_exists?
# mostly just override this so .inspect doesn't explode
true
end
def self.table_name
raise "Job has no table"
end
def self.scoped(*a)
raise ArgumentError, "Can't scope delayed jobs"
end
else
def attributes
@attributes
end
def self.members
if !@members
@members = columns.map { |c| c.name.to_sym }
@members.each do |m|
class_eval <<-RUBY
def #{m}
attributes[#{m.inspect}]
end
def #{m}=(v)
#{m}_will_change!
attributes[#{m.inspect}] = v
end
RUBY
end
define_attribute_methods(@members)
end
@members
end
def initialize(attrs = {})
@attributes = {}.with_indifferent_access
self.class.members # make sure accessors are defined
attrs.each { |k, v| self.send("#{k}=", v) }
self.priority ||= 0
self.attempts ||= 0
@new_record = true
end
def self.instantiate(attrs)
result = new(attrs)
result.instance_variable_set(:@new_record, false)
result
end
def self.create(attrs = {})
result = new(attrs)
result.save
result
end
def self.create!(attrs = {})
result = new(attrs)
result.save!
result
end
def [](key)
attributes[key]
end
def []=(key, value)
raise NameError unless self.class.members.include?(key.to_sym)
attributes[key] = value
end
def self.find(ids)
if Array === ids
find_some(ids, {})
else
find_one(ids, {})
end
end
def new_record?
!!@new_record
end
def destroyed?
!!@destroyed
end
def ==(other)
id == other.id
end
def hash
id.hash
end
end
def self.table_exists?
# mostly just override this so .inspect doesn't explode
true
def self.reconnect!
self.redis.reconnect
end
def self.table_name
raise "Job has no table"
def self.functions
@@functions ||= Delayed::Backend::Redis::Functions.new(redis)
end
def self.find_one(id, options)
@ -155,9 +264,9 @@ class Job
end
def self.get_and_lock_next_available(worker_name,
queue = Delayed::Worker.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
queue = Delayed::Worker.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
check_queue(queue)
check_priorities(min_priority, max_priority)
@ -169,9 +278,9 @@ class Job
end
def self.find_available(limit,
queue = Delayed::Worker.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
queue = Delayed::Worker.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
check_queue(queue)
check_priorities(min_priority, max_priority)
@ -187,28 +296,28 @@ class Job
# for :tag it's the tag name
# for :failed it's ignored
def self.list_jobs(flavor,
limit,
offset = 0,
query = nil)
limit,
offset = 0,
query = nil)
case flavor.to_s
when 'current'
query ||= Delayed::Worker.queue
check_queue(query)
self.find(functions.find_available(query, limit, offset, nil, nil, db_time_now))
when 'future'
query ||= Delayed::Worker.queue
check_queue(query)
self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
when 'failed'
Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
when 'strand'
self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
when 'tag'
# This is optimized for writing, since list_jobs(:tag) will only ever happen in the admin UI
ids = redis.smembers(Keys::TAG[query])
self.find(ids[offset, limit])
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
when 'current'
query ||= Delayed::Worker.queue
check_queue(query)
self.find(functions.find_available(query, limit, offset, nil, nil, db_time_now))
when 'future'
query ||= Delayed::Worker.queue
check_queue(query)
self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
when 'failed'
Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
when 'strand'
self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
when 'tag'
# This is optimized for writing, since list_jobs(:tag) will only ever happen in the admin UI
ids = redis.smembers(Keys::TAG[query])
self.find(ids[offset, limit])
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
@ -216,18 +325,18 @@ class Job
# flavor is :current, :future or :failed
# for the :failed flavor, queue is currently ignored
def self.jobs_count(flavor,
queue = Delayed::Worker.queue)
queue = Delayed::Worker.queue)
case flavor.to_s
when 'current'
check_queue(queue)
redis.zcard(Keys::QUEUE[queue])
when 'future'
check_queue(queue)
redis.zcard(Keys::FUTURE_QUEUE[queue])
when 'failed'
redis.zcard(Keys::FAILED_JOBS)
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
when 'current'
check_queue(queue)
redis.zcard(Keys::QUEUE[queue])
when 'future'
check_queue(queue)
redis.zcard(Keys::FUTURE_QUEUE[queue])
when 'failed'
redis.zcard(Keys::FAILED_JOBS)
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
@ -250,8 +359,8 @@ class Job
# in descending count order
# flavor is :current or :all
def self.tag_counts(flavor,
limit,
offset = 0)
limit,
offset = 0)
raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
key = Keys::TAG_COUNTS[flavor]
redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
@ -275,26 +384,6 @@ class Job
self.create!(options.merge(:singleton => true))
end
def self.scoped(*a)
raise ArgumentError, "Can't scope delayed jobs"
end
column(:id, :string)
column(:priority, :integer, 0)
column(:attempts, :integer, 0)
column(:handler, :text)
column(:last_error, :text)
column(:queue, :string)
column(:run_at, :timestamp)
column(:locked_at, :timestamp)
column(:failed_at, :timestamp)
column(:locked_by, :string)
column(:created_at, :timestamp)
column(:updated_at, :timestamp)
column(:tag, :string)
column(:max_attempts, :integer)
column(:strand, :string)
# not saved, just used as a marker when creating
attr_accessor :singleton
@ -310,17 +399,33 @@ class Job
save!
end
def save(*a)
return false if destroyed?
callback :before_save
result = if new_record?
callback :before_create
create
else
update
if CANVAS_RAILS2
def save(*a)
return false if destroyed?
callback :before_save
result = if new_record?
callback :before_create
create
else
update
end
callback(:after_save) if result
result
end
else
def save(*a)
return false if destroyed?
result = run_callbacks(:save) do
if new_record?
run_callbacks(:create) { create }
else
update
end
end
@previously_changed = changes
@changed_attributes.clear
result
end
callback(:after_save) if result
result
end
def save!(*a)
@ -367,8 +472,8 @@ class Job
# deleted because there was already that other job on the strand. so
# replace this job with the other for returning.
if job_id != self.id
self.id = job_id
self.reload
singleton = self.class.find(job_id)
@attributes = singleton.attributes
end
else
self.class.functions.enqueue(id, queue, strand, self.class.db_time_now)
@ -385,7 +490,7 @@ class Job
self.id
end
def update(attribute_names = @attributes.keys)
def update
self.updated_at = Time.now.utc
save_job_to_redis
update_queues
@ -419,6 +524,7 @@ class Job
@attrs_template ||= columns.inject({}) { |h,c| h[c.name] = nil; h }
attrs = @attrs_template.merge(redis_attrs)
self.time_attribute_names.each { |k| attrs[k] = Time.zone.at(attrs[k].to_f) if attrs[k] }
self.integer_attribute_names.each { |k| attrs[k] = attrs[k].to_i if attrs[k] }
instantiate(attrs)
else
nil
@ -431,6 +537,9 @@ class Job
def self.time_attribute_names
@time_attribute_names ||= columns.find_all { |c| c.type == :timestamp }.map { |c| c.name.to_s }
end
def self.integer_attribute_names
@integer_attribute_names ||= columns.find_all { |c| c.type == :integer }.map { |c| c.name.to_s }
end
def global_id
id

View File

@ -73,7 +73,7 @@ describe 'Delayed::Backend::Redis::Job' do
job = "string".send_later :reverse
job.should be_nil
Delayed::Job.jobs_count(:current).should == before_count
Delayed::Job.connection.run_transaction_commit_callbacks
ActiveRecord::Base.connection.run_transaction_commit_callbacks
Delayed::Job.jobs_count(:current) == before_count + 1
end
end

View File

@ -204,7 +204,8 @@ shared_examples_for 'a backend' do
Delayed::Job.get_and_lock_next_available('w2').should == nil
job1.destroy
# update time since the failed lock pushed it forward
job2.update_attribute(:run_at, 1.minute.ago)
job2.run_at = 1.minute.ago
job2.save!
Delayed::Job.get_and_lock_next_available('w3').should == job2
Delayed::Job.get_and_lock_next_available('w4').should == nil
end
@ -224,7 +225,8 @@ shared_examples_for 'a backend' do
# now job1 is done
job1.destroy
# update time since the failed lock pushed it forward
job2.update_attribute(:run_at, 1.minute.ago)
job2.run_at = 1.minute.ago
job2.save!
Delayed::Job.get_and_lock_next_available('w2').should == job2
end
@ -232,13 +234,15 @@ shared_examples_for 'a backend' do
job1 = create_job(:strand => 'myjobs')
job2 = create_job(:strand => 'myjobs')
job3 = create_job(:strand => 'myjobs')
Delayed::Job.get_and_lock_next_available('w1').should == job1.reload
Delayed::Job.get_and_lock_next_available('w1').should == job1
Delayed::Job.find_available(1).should == []
job1.destroy
Delayed::Job.find_available(1).should == [job2]
# move job2's time forward
job2.update_attribute(:run_at, 1.second.ago)
job3.update_attribute(:run_at, 5.seconds.ago)
job2.run_at = 1.second.ago
job2.save!
job3.run_at = 5.seconds.ago
job3.save!
# we should still get job2, not job3
Delayed::Job.get_and_lock_next_available('w1').should == job2
end
@ -491,7 +495,7 @@ shared_examples_for 'a backend' do
end
it "should return the queued jobs" do
Set.new(Delayed::Job.list_jobs(:current, 100)).should == Set.new(@jobs)
Delayed::Job.list_jobs(:current, 100).map(&:id).sort.should == @jobs.map(&:id).sort
end
it "should paginate the returned jobs" do
@ -617,16 +621,16 @@ shared_examples_for 'a backend' do
@ignored_jobs.any? { |j| j.on_hold? }.should be_false
Delayed::Job.bulk_update('hold', :flavor => @flavor, :query => @query).should == @affected_jobs.size
@affected_jobs.all? { |j| j.reload.on_hold? }.should be_true
@ignored_jobs.any? { |j| j.reload.on_hold? }.should be_false
@affected_jobs.all? { |j| Delayed::Job.find(j.id).on_hold? }.should be_true
@ignored_jobs.any? { |j| Delayed::Job.find(j.id).on_hold? }.should be_false
end
it "should un-hold a scope of jobs" do
pending "fragile on mysql for unknown reasons" if %w{MySQL Mysql2}.include?(Delayed::Job.connection.adapter_name)
pending "fragile on mysql for unknown reasons" if Delayed::Job == Delayed::Backend::ActiveRecord::Job && %w{MySQL Mysql2}.include?(Delayed::Job.connection.adapter_name)
Delayed::Job.bulk_update('unhold', :flavor => @flavor, :query => @query).should == @affected_jobs.size
@affected_jobs.any? { |j| j.reload.on_hold? }.should be_false
@ignored_jobs.any? { |j| j.reload.on_hold? }.should be_false
@affected_jobs.any? { |j| Delayed::Job.find(j.id).on_hold? }.should be_false
@ignored_jobs.any? { |j| Delayed::Job.find(j.id).on_hold? }.should be_false
end
it "should delete a scope of jobs" do
@ -695,14 +699,14 @@ shared_examples_for 'a backend' do
j2 = create_job(:run_at => 2.hours.from_now)
j3 = "test".send_later_enqueue_args(:to_i, :strand => 's1', :no_delay => true)
Delayed::Job.bulk_update('hold', :ids => [j1.id, j2.id]).should == 2
j1.reload.on_hold?.should be_true
j2.reload.on_hold?.should be_true
j3.reload.on_hold?.should be_false
Delayed::Job.find(j1.id).on_hold?.should be_true
Delayed::Job.find(j2.id).on_hold?.should be_true
Delayed::Job.find(j3.id).on_hold?.should be_false
Delayed::Job.bulk_update('unhold', :ids => [j2.id]).should == 1
j1.reload.on_hold?.should be_true
j2.reload.on_hold?.should be_false
j3.reload.on_hold?.should be_false
Delayed::Job.find(j1.id).on_hold?.should be_true
Delayed::Job.find(j2.id).on_hold?.should be_false
Delayed::Job.find(j3.id).on_hold?.should be_false
end
it "should delete given job ids" do
@ -732,8 +736,9 @@ shared_examples_for 'a backend' do
{ :tag => "String#upcase", :count => 2 },
{ :tag => "String#downcase", :count => 1 }]
@cur[0,4].each { |j| j.destroy }
@future[0].update_attribute(:run_at, 1.hour.ago)
@future[1].update_attribute(:run_at, 1.hour.ago)
@future[0].run_at = @future[1].run_at = 1.hour.ago
@future[0].save!
@future[1].save!
Delayed::Job.tag_counts(:current, 5).should == [{ :tag => "String#to_i", :count => 4 },
{ :tag => "String#downcase", :count => 3 },
@ -773,10 +778,10 @@ shared_examples_for 'a backend' do
Delayed::Job.unlock_orphaned_jobs(nil, "Jobworker").should == 1
job1.reload.locked_by.should_not be_nil
job2.reload.locked_by.should be_nil
job3.reload.locked_by.should_not be_nil
job4.reload.locked_by.should_not be_nil
Delayed::Job.find(job1.id).locked_by.should_not be_nil
Delayed::Job.find(job2.id).locked_by.should be_nil
Delayed::Job.find(job3.id).locked_by.should_not be_nil
Delayed::Job.find(job4.id).locked_by.should_not be_nil
Delayed::Job.unlock_orphaned_jobs(nil, "Jobworker").should == 0
end
@ -798,10 +803,10 @@ shared_examples_for 'a backend' do
Delayed::Job.unlock_orphaned_jobs(child_pid2, "Jobworker").should == 0
Delayed::Job.unlock_orphaned_jobs(child_pid, "Jobworker").should == 1
job1.reload.locked_by.should_not be_nil
job2.reload.locked_by.should be_nil
job3.reload.locked_by.should_not be_nil
job4.reload.locked_by.should_not be_nil
Delayed::Job.find(job1.id).locked_by.should_not be_nil
Delayed::Job.find(job2.id).locked_by.should be_nil
Delayed::Job.find(job3.id).locked_by.should_not be_nil
Delayed::Job.find(job4.id).locked_by.should_not be_nil
Delayed::Job.unlock_orphaned_jobs(child_pid, "Jobworker").should == 0
end

View File

@ -16,7 +16,7 @@ shared_examples_for 'Delayed::Worker' do
describe "running a job" do
it "should not fail when running a job with a % in the name" do
@job = User.send_later_enqueue_args(:name_parts, { no_delay: true }, "Some % Name")
@worker.perform(@job.reload)
@worker.perform(@job)
end
end
@ -122,9 +122,10 @@ shared_examples_for 'Delayed::Worker' do
it "should record last_error when destroy_failed_jobs = false, max_attempts = 1" do
Delayed::Worker.on_max_failures = proc { false }
@job.update_attribute(:max_attempts, 1)
Delayed::Job.get_and_lock_next_available('w1').should == @job.reload
@worker.perform(@job)
@job.max_attempts = 1
@job.save!
(job = Delayed::Job.get_and_lock_next_available('w1')).should == @job
@worker.perform(job)
old_id = @job.id
@job = Delayed::Job.list_jobs(:failed, 1).first
(@job.respond_to?(:original_id) ? @job.original_id : @job.id).should == old_id
@ -142,7 +143,7 @@ shared_examples_for 'Delayed::Worker' do
it "should re-schedule jobs after failing" do
@worker.perform(@job)
@job.reload
@job = Delayed::Job.find(@job.id)
@job.last_error.should =~ /did not work/
@job.last_error.should =~ /sample_jobs.rb:8:in `perform'/
@job.attempts.should == 1
@ -183,7 +184,8 @@ shared_examples_for 'Delayed::Worker' do
it "should be destroyed if failed more than Job#max_attempts times" do
Delayed::Worker.max_attempts = 25
@job.expects(:destroy)
@job.update_attribute(:max_attempts, 2)
@job.max_attempts = 2
@job.save!
2.times { @job.reschedule }
end
end
@ -198,14 +200,15 @@ shared_examples_for 'Delayed::Worker' do
end
it "should be failed if it failed more than Worker.max_attempts times" do
@job.reload.failed_at.should == nil
@job.failed_at.should == nil
Delayed::Worker.max_attempts.times { @job.reschedule }
Delayed::Job.list_jobs(:failed, 100).size.should == 1
end
it "should not be failed if it failed fewer than Worker.max_attempts times" do
(Delayed::Worker.max_attempts - 1).times { @job.reschedule }
@job.reload.failed_at.should == nil
@job = Delayed::Job.find(@job.id)
@job.failed_at.should == nil
end
end