redis jobs backend

This supports all the features of our current delayed jobs backend,
including priorities, queues, jobs scheduled for the future, strands,
periodic jobs, etc.

Atomicity is guaranteed by using the new Lua scripting features of Redis
2.6, so this requires 2.6 (which is still only a release candidate as of this
time).

It's not yet production ready (see the TODO at the top of the
redis/job.rb file), so trying to start canvas in prod with redis jobs
configured will raise an error as a safeguard.

The tests have been modified to all pass with redis configured as the
jobs backend. Some selenium specs were removed because the jobs UI lost
some functionality that I didn't think was important enough to port to
redis (listing the most popular tags for future and failed jobs, for
example).

Change-Id: Ie57b15bae1d4ba7b2b2344c872411165551d1ac8
Reviewed-on: https://gerrit.instructure.com/12120
Reviewed-by: Bracken Mosbacker <bracken@instructure.com>
Tested-by: Jenkins <jenkins@instructure.com>
This commit is contained in:
Brian Palmer 2012-08-16 09:21:18 -06:00
parent 4682d65d1b
commit c25465506a
23 changed files with 1117 additions and 88 deletions

View File

@ -1,9 +1,30 @@
config = {
:backend => 'active_record',
}.merge((Setting.from_config('delayed_jobs') || {}).symbolize_keys)
case config[:backend]
when 'active_record'
Delayed::Job = Delayed::Backend::ActiveRecord::Job
when 'redis'
if Rails.env.production?
raise "Redis Jobs are not yet ready for production"
end
Delayed::Job = Delayed::Backend::Redis::Job
Delayed::Backend::Redis::Job.redis = if config[:redis]
Canvas.redis_from_config(config[:redis])
else
Canvas.redis
end
else
raise "Unknown Delayed Jobs backend: `#{config[:backend]}`"
end
# If there is a sub-hash under the 'queue' key for the database config, use that
# as the connection for the job queue. The migration that creates the
# delayed_jobs table is smart enough to use this connection as well.
queue_config = ActiveRecord::Base.configurations[Rails.env]['queue']
if queue_config
Delayed::Backend::ActiveRecord::Job.establish_connection(queue_config)
db_queue_config = ActiveRecord::Base.configurations[Rails.env]['queue']
if db_queue_config
Delayed::Backend::ActiveRecord::Job.establish_connection(db_queue_config)
end
# We don't want to keep around max_attempts failed jobs that failed because the

View File

@ -32,6 +32,13 @@ module Canvas
# create the redis cluster connection using config/redis.yml
redis_settings = Setting.from_config('redis')
raise("Redis is not enabled for this install") if redis_settings.blank?
@redis = redis_from_config(redis_settings)
end
# Builds a redis object using a config hash in the format used by a couple
# different config/*.yml files, like redis.yml, cache_store.yml and
# delayed_jobs.yml
def self.redis_from_config(redis_settings)
Bundler.require 'redis'
if redis_settings.is_a?(Array)
redis_settings = { :servers => redis_settings }
@ -40,11 +47,11 @@ module Canvas
redis_settings[:servers].map! { |s|
::Redis::Factory.convert_to_redis_client_options(s).merge(:marshalling => false)
}
@redis = ::Redis::Factory.create(redis_settings[:servers])
redis = ::Redis::Factory.create(redis_settings[:servers])
if redis_settings[:database].present?
@redis.select(redis_settings[:database])
redis.select(redis_settings[:database])
end
@redis
redis
end
def self.redis_enabled?

View File

@ -131,6 +131,7 @@ Spec::Runner.configure do |config|
Notification.reset_cache!
ActiveRecord::Base.reset_any_instantiation!
Attachment.clear_cached_mime_ids
Delayed::Job.redis.flushdb if Delayed::Job == Delayed::Backend::Redis::Job
Rails::logger.try(:info, "Running #{self.class.description} #{@method_name}")
end

View File

@ -20,6 +20,10 @@ module Delayed
include Delayed::Backend::Base
set_table_name :delayed_jobs
def self.reconnect!
connection.reconnect!
end
# be aware that some strand functionality is controlled by triggers on
# the database. see
# db/migrate/20110831210257_add_delayed_jobs_next_in_strand.rb

View File

@ -0,0 +1,40 @@
local action, id_string, flavor, query, now = unpack(ARGV)
local ids = {}
if string.len(flavor) > 0 then
if flavor == 'current' then
ids = redis.call('ZRANGE', Keys.queue(query), 0, -1)
elseif flavor == 'future' then
ids = redis.call('ZRANGE', Keys.future_queue(query), 0, -1)
elseif flavor == 'strand' then
ids = redis.call('LRANGE', Keys.strand(query), 0, -1)
elseif flavor == 'tag' then
ids = redis.call('SMEMBERS', Keys.tag(query))
end
else
-- can't pass an array to redis/lua, so we split the string here
for id in string.gmatch(id_string, "([%w-]+)") do
if job_exists(id) then
table.insert(ids, id)
end
end
end
for idx, job_id in ipairs(ids) do
if action == 'hold' then
local queue, strand = unpack(redis.call('HMGET', Keys.job(job_id), 'queue', 'strand'))
remove_from_queues(job_id, queue, strand)
redis.call('HMSET', Keys.job(job_id), 'locked_at', now, 'locked_by', 'on hold', 'attempts', 50)
elseif action == 'unhold' then
local queue, locked_by = unpack(redis.call('HMGET', Keys.job(job_id), 'queue', 'locked_by'))
add_to_queues(job_id, queue, now)
redis.call('HDEL', Keys.job(job_id), 'locked_at', 'locked_by')
redis.call('HMSET', Keys.job(job_id), 'attempts', 0)
elseif action == 'destroy' then
destroy_job(job_id, now)
end
end
-- returns the # of jobs matching the query, not necessarily the # whose state was changed
return table.getn(ids)

View File

@ -0,0 +1,2 @@
local job_id, now = unpack(ARGV)
destroy_job(job_id, now)

View File

@ -0,0 +1,29 @@
local job_id, queue, strand, now, for_singleton = unpack(ARGV)
local strand_key = Keys.strand(strand)
-- if this is a singleton job, only queue it up if another doesn't exist on the strand
-- otherwise, delete it and return the other job id
if for_singleton then
local job_ids = redis.call('LRANGE', strand_key, 0, 1)
local job_to_check = 1
if job_exists(job_ids[1]) and redis.call('HGET', Keys.job(job_ids[1]), 'locked_at') then
job_to_check = 2
end
local job_to_check_id = job_ids[job_to_check]
if job_exists(job_to_check_id) then
-- delete the new job, we found a match
redis.call('DEL', Keys.job(job_id))
return job_to_check_id
end
end
-- if this job is in a strand, add it to the strand queue first
-- if it's not at the front of the strand, we won't enqueue it below
if strand_key then
add_to_strand(job_id, strand)
end
add_to_queues(job_id, queue, now)
return job_id

View File

@ -0,0 +1,5 @@
local job_id = unpack(ARGV)
local failed_at, queue, strand = unpack(redis.call('HMGET', Keys.job(job_id), 'failed_at', 'queue', 'strand'))
remove_from_queues(job_id, queue, strand)
redis.call('ZADD', Keys.failed_jobs(), failed_at, job_id)

View File

@ -0,0 +1,3 @@
local queue, limit, offset, min_priority, max_priority, now = unpack(ARGV)
return find_available(queue, limit, offset, min_priority, max_priority, now)

View File

@ -0,0 +1,115 @@
#
# Copyright (C) 2012 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
# This module handles loading the Lua functions into Redis and running them
module Delayed::Backend::Redis
class Functions
class Script < Struct.new(:text, :sha)
end
def self.script_include
@script_include ||= File.read(File.dirname(__FILE__) + "/include.lua")
end
def logger
ActiveRecord::Base.logger
end
def log_timing(name)
result = nil
ms = Benchmark.ms { result = yield }
if logger && logger.debug?
line = 'Redis Jobs Timing: %s (%.1fms)' % [name, ms]
logger.debug(line)
end
result
end
# The map of <script name> -> <script text>
# If the script isn't already loaded, this will load it and prepend include.lua to it
SCRIPTS = Hash.new do |hash, key|
filename = File.dirname(__FILE__) + "/#{key}.lua"
if File.file?(filename)
hash[key] = Script.new(Delayed::Backend::Redis::Functions.script_include + "\n\n" + File.read(filename))
end
end
# Run the given script, passing in the given keys and args
# If the script isn't already loaded into redis, this will catch the error,
# load it in, then run it again, giving up after a couple tries.
def run_script(redis, script_name, keys, argv)
script = SCRIPTS[script_name]
raise(ArgumentError, "unknown script: #{script_name}") unless script
if !script.sha
script.sha = redis.script(:load, script.text)
end
attempts = 0
log_timing(script_name) do
begin
attempts += 1
redis.evalsha(script.sha, :keys => keys, :argv => argv)
rescue Redis::CommandError => e
raise unless e.message =~ /NOSCRIPT/ && attempts <= 2
script.sha = redis.script(:load, script.text)
retry
end
end
end
def find_available(redis, queue, limit, offset, min_priority, max_priority, now)
run_script(redis, :find_available, [], [queue, limit, offset, min_priority, max_priority, now.utc.to_f])
end
def get_and_lock_next_available(redis, worker_name, queue, min_priority, max_priority, now)
attrs = run_script(redis, :get_and_lock_next_available, [], [queue, min_priority, max_priority, worker_name, now.utc.to_f])
Hash[*attrs]
end
def enqueue(redis, job_id, queue, strand, now)
run_script(redis, :enqueue, [], [job_id, queue, strand, now.utc.to_f])
end
def destroy_job(redis, job_id, now)
run_script(redis, :destroy_job, [], [job_id, now.utc.to_f])
end
def tickle_strand(redis, job_id, strand, now)
run_script(redis, :tickle_strand, [], [job_id, strand, now.utc.to_f])
end
def create_singleton(redis, job_id, queue, strand, now)
run_script(redis, :enqueue, [], [job_id, queue, strand, now.utc.to_f, true])
end
def fail(redis, job_id)
run_script(redis, :fail, [], [job_id])
end
def set_running(redis, job_id)
run_script(redis, :set_running, [], [job_id])
end
def bulk_update(redis, action, ids, flavor, query, now)
ids = (ids || []).join(",")
run_script(redis, :bulk_update, [], [action, ids, flavor, query, now.utc.to_f])
end
end
end

View File

@ -0,0 +1,17 @@
local queue, min_priority, max_priority, worker_name, now = unpack(ARGV)
local job_id = find_available(queue, 1, 0, min_priority, max_priority, now)[1]
if job_exists(job_id) then
-- update the job with locked_by and locked_at
redis.call('HMSET', Keys.job(job_id), 'locked_by', worker_name, 'locked_at', now)
-- add the job to the running_jobs set
redis.call('ZADD', Keys.running_jobs(), now, job_id)
-- remove the job from the pending jobs queue
redis.call('ZREM', Keys.queue(queue), job_id)
-- return the list of job attributes
return redis.call('HGETALL', Keys.job(job_id))
else
return {}
end

View File

@ -0,0 +1,203 @@
-- Keys holds the various functions to map to redis keys
-- These are duplicated from job.rb
local Keys = {}
Keys.job = function(id)
return "job/" .. id
end
Keys.running_jobs = function()
return "running_jobs"
end
Keys.failed_jobs = function()
return "failed_jobs"
end
Keys.queue = function(queue)
return "queue/" .. (queue or '')
end
Keys.future_queue = function(queue)
return Keys.queue(queue) .. "/future"
end
Keys.strand = function(strand_name)
if strand_name and string.len(strand_name) > 0 then
return "strand/" .. strand_name
else
return nil
end
end
Keys.tag_counts = function(flavor)
return "tag_counts/" .. flavor
end
Keys.tag = function(tag)
return "tag/" .. tag
end
Keys.waiting_strand_job_priority = function()
return 2000000
end
-- remove the given job from the various queues
local remove_from_queues = function(job_id, queue, strand)
local tag = unpack(redis.call('HMGET', Keys.job(job_id), 'tag'))
redis.call("SREM", Keys.tag(tag), job_id)
local current_delta = -redis.call('ZREM', Keys.queue(queue), job_id)
redis.call('ZREM', Keys.running_jobs(), job_id)
local future_delta = -redis.call('ZREM', Keys.future_queue(queue), job_id)
if current_delta ~= 0 then
redis.call('ZINCRBY', Keys.tag_counts('current'), current_delta, tag)
end
local total_delta = current_delta + future_delta
if total_delta ~= 0 then
redis.call('ZINCRBY', Keys.tag_counts('all'), total_delta, tag)
end
local strand_key = Keys.strand(strand)
if strand_key then
redis.call('LREM', strand_key, 1, job_id)
end
end
-- returns the id for the first job on the strand, or nil if none
local strand_next_job_id = function(strand)
local strand_key = Keys.strand(strand)
if not strand_key then return nil end
return redis.call('LRANGE', strand_key, 0, 0)[1]
end
-- returns next_in_strand -- whether this added job is at the front of the strand
local add_to_strand = function(job_id, strand)
local strand_key = Keys.strand(strand)
if not strand_key then return end
redis.call('RPUSH', strand_key, job_id) -- add to strand list
local next_id = strand_next_job_id(strand)
return next_id == job_id
end
-- add this given job to the correct queues based on its state and the current time
-- also updates the tag counts and tag job lists
local add_to_queues = function(job_id, queue, now)
local run_at, priority, tag, strand = unpack(redis.call('HMGET', Keys.job(job_id), 'run_at', 'priority', 'tag', 'strand'))
redis.call("SADD", Keys.tag(tag), job_id)
if strand then
local next_job_id = strand_next_job_id(strand)
if next_job_id and next_job_id ~= job_id then
priority = Keys.waiting_strand_job_priority()
end
end
local current_delta = 0
local future_delta = 0
if run_at > now then
future_delta = future_delta + redis.call('ZADD', Keys.future_queue(queue), run_at, job_id)
current_delta = current_delta - redis.call('ZREM', Keys.queue(queue), job_id)
else
-- floor the run_at so we don't have a float in our float
local sort_key = priority .. '.' .. math.floor(run_at)
current_delta = current_delta + redis.call('ZADD', Keys.queue(queue), sort_key, job_id)
future_delta = future_delta - redis.call('ZREM', Keys.future_queue(queue), job_id)
end
if current_delta ~= 0 then
redis.call('ZINCRBY', Keys.tag_counts('current'), current_delta, tag)
end
local total_delta = current_delta + future_delta
if total_delta ~= 0 then
redis.call('ZINCRBY', Keys.tag_counts('all'), total_delta, tag)
end
end
local job_exists = function(job_id)
return job_id and redis.call('HGET', Keys.job(job_id), 'id')
end
-- find jobs available for running
-- checks the future queue too, and moves and now-ready jobs
-- into the current queue
local find_available = function(queue, limit, offset, min_priority, max_priority, now)
local ready_future_jobs = redis.call('ZRANGEBYSCORE', Keys.future_queue(queue), 0, now, 'limit', 0, limit)
for i, job_id in ipairs(ready_future_jobs) do
add_to_queues(job_id, queue, now)
end
if not min_priority or min_priority == '' then
min_priority = '0'
end
if not max_priority or max_priority == '' then
max_priority = "+inf"
else
max_priority = "(" .. (max_priority + 1)
end
local job_ids = redis.call('ZRANGEBYSCORE', Keys.queue(queue), min_priority, max_priority, 'limit', offset, limit)
for idx = table.getn(job_ids), 1, -1 do
local job_id = job_ids[idx]
if not job_exists(job_id) then
table.remove(job_ids, idx)
redis.call('ZREM', Keys.queue(queue), job_id)
end
end
return job_ids
end
-- "tickle" the strand, removing the given job_id and setting the job at the
-- front of the strand as eligible to run, if it's not already
local tickle_strand = function(job_id, strand, now)
local strand_key = Keys.strand(strand)
-- this LREM could be (relatively) slow if the strand is very large and this
-- job isn't near the front. however, in normal usage, we only delete from the
-- front. also the linked list is in memory, so even with thousands of jobs on
-- the strand it'll be quite fast.
--
-- alternatively we could make strands sorted sets, which would avoid a
-- linear search to delete this job. jobs need to be sorted on insertion
-- order though, and we're using GUIDs for keys here rather than an
-- incrementing integer, so we'd have to use an artificial counter as the
-- sort key (through `incrby strand_name` probably).
redis.call('LREM', strand_key, 1, job_id)
-- normally this loop will only run once, but we loop so that if there's any
-- job ids on the strand that don't actually exist anymore, we'll throw them
-- out and keep searching until we find a legit job or the strand is empty
while true do
local next_id = redis.call('LRANGE', strand_key, 0, 0)[1]
if next_id == nil then
break
elseif job_exists(next_id) then
-- technically jobs on the same strand can be in different queues,
-- though that functionality isn't currently used
local queue = redis.call('HGET', Keys.job(next_id), 'queue')
add_to_queues(next_id, queue, now)
break
else
redis.call('LPOP', strand_key)
end
end
end
local destroy_job = function(job_id, now)
local queue, strand = unpack(redis.call('HMGET', Keys.job(job_id), 'queue', 'strand'))
remove_from_queues(job_id, queue, strand)
if Keys.strand(strand) then
tickle_strand(job_id, strand, now)
end
redis.call('ZREM', Keys.failed_jobs(), job_id)
redis.call('DEL', Keys.job(job_id))
end

View File

@ -0,0 +1,451 @@
#
# Copyright (C) 2012 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
# This can't currently be made compatible with redis cluster, because the Lua functions
# access keys that aren't in their keys argument list (since they pop jobs off
# a queue and then update the job with that id).
# still TODO:
# * a consequence of our ignore-redis-failures code is that if redis is unavailable, creating delayed jobs silently fails, which is probably not what we want
# * need a way to migrate between jobs backends
# * we need some auditors:
# * fail jobs in running_jobs if they've timed out
# * have pools audit their workers and immediately fail jobs locked by dead workers (make sure this handles the restart case where two pools are running)
# * have a master auditor that fails jobs if a whole pool dies
# * audit strands ocasionally, look for any stuck strands where the strand queue isn't empty but there's no strand job running or queued
module Delayed::Backend::Redis
class Job < ActiveRecord::Base
include Delayed::Backend::Base
# This redis instance needs to be set by the application during jobs configuration
cattr_accessor :redis
# An overview of where and when things are stored in redis:
#
# Jobs are given a UUID for an id, rather than an incrementing integer id.
# The job attributes are then stored in a redis hash at job/<id>. Attribute
# values are generally stored as their json representation, except for
# timestamps, which as stored as floating point utc-time-since-unix-epoch
# values, so that we can compare timestamps in Lua without a date parser.
#
# Jobs that are schedule to run immediately (in the present/past) are
# inserted into the queue named queue/<queue_name>. The queue is a sorted
# set, with the value being the job id and the weight being a floating point
# value, <priority>.<run_at>. This formatting is key to efficient
# querying of the next job to run.
#
# Jobs that are scheduled to run in the future are not inserted into the
# queue, but rather a future queue named queue/<queue_name>/future. This
# queue is also a sorted set, with the value being the job id, but the weight
# is just the <run_at> value.
#
# If the job is on a strand, the flow is different. First, it's inserted into
# a list named strand/<strand>. When strand jobs are inserted into the
# current jobs queue, we check if they're next to run in the strand. If not,
# we give them a special priority that is greater than MAX_PRIORITY, so that
# they won't run. When a strand job is finished, failed or deleted,
# "tickle_strand" is called, which removes that job from the list and if that
# job was at the front of the list, changes the priority on the next job so
# that it's eligible to run.
#
# For singletons, the flow is the same as for other strand jobs, except that
# the job is thrown out if there are already any non-running jobs in the
# strand list.
#
# If a job fails, it's removed from the normal queues and inserted into the
# failed_jobs sorted set, with job id as the value and failure time as the
# key. The hash of job attributes is also renamed from job/<id> to
# failed_job/<id> -- use Delayed::Job::Failed to query those jobs, same as
# with AR jobs.
#
# We also insert into some other data structures for admin functionality.
# tag_counts/current and tag_counts/all are sorted sets storing the count of
# jobs for each tag. tag/<tag> is a set of existing job ids that have that tag.
#
# Most all of this happens in Lua functions, for atomicity. See the other
# files in this directory -- functions.rb is a wrapper to call the lua
# functions, and the individual functions are defined in .lua files in this
# directory.
# these key mappings are duplicated in the redis lua code, in include.lua
module Keys
RUNNING_JOBS = "running_jobs"
FAILED_JOBS = "failed_jobs"
JOB = proc { |id| "job/#{id}" }
FAILED_JOB = proc { |id| "failed_job/#{id}" }
QUEUE = proc { |name| "queue/#{name}" }
FUTURE_QUEUE = proc { |name| "#{QUEUE[name]}/future" }
STRAND = proc { |strand| strand ? "strand/#{strand}" : nil }
TAG_COUNTS = proc { |flavor| "tag_counts/#{flavor}" }
TAG = proc { |tag| "tag/#{tag}" }
end
WAITING_STRAND_JOB_PRIORITY = 2000000
if WAITING_STRAND_JOB_PRIORITY <= Delayed::MAX_PRIORITY
# if you change this, note that the value is duplicated in include.lua
raise("Delayed::MAX_PRIORITY must be less than #{WAITING_STRAND_JOB_PRIORITY}")
end
def self.reconnect!
self.redis.reconnect
end
def self.columns
@@columns ||= []
end
def self.functions
@@functions ||= Delayed::Backend::Redis::Functions.new
end
def self.column(name, sql_type = nil, default = nil, null = true)
columns << ActiveRecord::ConnectionAdapters::Column.new(name.to_s, default,
sql_type.to_s, null)
end
attr_protected
def self.tableless?
true
end
def self.table_exists?
# mostly just override this so .inspect doesn't explode
true
end
def self.table_name
raise "Job has no table"
end
def self.find_one(id, options)
job = self.get_with_ids([id]).first
job || raise(ActiveRecord::RecordNotFound, "Couldn't find Job with ID=#{id}")
end
def self.find_some(ids, options)
self.get_with_ids(ids).compact
end
def self.get_with_ids(ids)
redis.pipelined {
ids.each { |id| redis.hgetall(key_for_job_id(id)) }
}.map { |attrs| self.instantiate_from_attrs(attrs) }
end
def self.key_for_job_id(job_id)
Keys::JOB[job_id]
end
def self.get_and_lock_next_available(worker_name,
queue = Delayed::Worker.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
check_queue(queue)
check_priorities(min_priority, max_priority)
# as an optimization this lua function returns the hash of job attributes,
# rather than just a job id, saving a round trip
job_attrs = functions.get_and_lock_next_available(redis, worker_name, queue, min_priority, max_priority, db_time_now)
instantiate_from_attrs(job_attrs) # will return nil if the attrs are blank
end
def self.find_available(limit,
queue = Delayed::Worker.queue,
min_priority = Delayed::MIN_PRIORITY,
max_priority = Delayed::MAX_PRIORITY)
check_queue(queue)
check_priorities(min_priority, max_priority)
self.find(functions.find_available(redis, queue, limit, 0, min_priority, max_priority, db_time_now))
end
# get a list of jobs of the given flavor in the given queue
# flavor is :current, :future, :failed, :strand or :tag
# depending on the flavor, query has a different meaning:
# for :current and :future, it's the queue name (defaults to Delayed::Worker.queue)
# for :strand it's the strand name
# for :tag it's the tag name
# for :failed it's ignored
def self.list_jobs(flavor,
limit,
offset = 0,
query = nil)
case flavor.to_s
when 'current'
query ||= Delayed::Worker.queue
check_queue(query)
self.find(functions.find_available(redis, query, limit, offset, nil, nil, db_time_now))
when 'future'
query ||= Delayed::Worker.queue
check_queue(query)
self.find(redis.zrangebyscore(Keys::FUTURE_QUEUE[query], 0, "+inf", :limit => [offset, limit]))
when 'failed'
Failed.find(redis.zrevrangebyscore(Keys::FAILED_JOBS, "+inf", 0, :limit => [offset, limit]))
when 'strand'
self.find(redis.lrange(Keys::STRAND[query], offset, offset + limit - 1))
when 'tag'
# This is optimized for writing, since list_jobs(:tag) will only ever happen in the admin UI
ids = redis.smembers(Keys::TAG[query])
self.find(ids[offset, limit])
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
# get the total job count for the given flavor
# flavor is :current, :future or :failed
# for the :failed flavor, queue is currently ignored
def self.jobs_count(flavor,
queue = Delayed::Worker.queue)
case flavor.to_s
when 'current'
check_queue(queue)
redis.zcard(Keys::QUEUE[queue])
when 'future'
check_queue(queue)
redis.zcard(Keys::FUTURE_QUEUE[queue])
when 'failed'
redis.zcard(Keys::FAILED_JOBS)
else
raise ArgumentError, "invalid flavor: #{flavor.inspect}"
end
end
def self.strand_size(strand)
redis.llen(Keys::STRAND[strand])
end
def self.running_jobs()
self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, "+inf"))
end
def self.clear_locks!(worker_name)
self.running_jobs.each do |job|
# TODO: mark the job as failed one attempt
job.unlock! if job.locked_by == worker_name
end
end
def self.unlock_expired_jobs(max_run_time = Delayed::Worker.max_run_time)
cutoff = db_time_now - max_run_time
self.find(redis.zrangebyscore(Keys::RUNNING_JOBS, 0, cutoff.utc.to_i)).each do |job|
# TODO: mark the job as failed one attempt
job.unlock!
end
end
# returns a list of hashes { :tag => tag_name, :count => current_count }
# in descending count order
# flavor is :current or :all
def self.tag_counts(flavor,
limit,
offset = 0)
raise(ArgumentError, "invalid flavor: #{flavor.inspect}") unless %w(current all).include?(flavor.to_s)
key = Keys::TAG_COUNTS[flavor]
redis.zrevrangebyscore(key, '+inf', 1, :limit => [offset, limit], :withscores => true).map { |tag, count| { :tag => tag, :count => count } }
end
# perform a bulk update of a set of jobs
# action is :hold, :unhold, or :destroy
# to specify the jobs to act on, either pass opts[:ids] = [list of job ids]
# or opts[:flavor] = <some flavor> to perform on all jobs of that flavor
#
# see the list_jobs action for the list of available flavors and the meaning
# of opts[:query] for each
def self.bulk_update(action, opts)
if %w(current future).include?(opts[:flavor].to_s)
opts[:query] ||= Delayed::Worker.queue
end
functions.bulk_update(redis, action, opts[:ids], opts[:flavor], opts[:query], db_time_now)
end
def self.create_singleton(options)
self.create!(options.merge(:singleton => true))
end
def self.scoped(*a)
raise ArgumentError, "Can't scope delayed jobs"
end
column(:id, :string)
column(:priority, :integer, 0)
column(:attempts, :integer, 0)
column(:handler, :text)
column(:last_error, :text)
column(:queue, :string)
column(:run_at, :timestamp)
column(:locked_at, :timestamp)
column(:failed_at, :timestamp)
column(:locked_by, :string)
column(:created_at, :timestamp)
column(:updated_at, :timestamp)
column(:tag, :string)
column(:max_attempts, :integer)
column(:strand, :string)
# not saved, just used as a marker when creating
attr_accessor :singleton
def lock_in_redis!(worker_name)
self.locked_at = self.class.db_time_now
self.locked_by = worker_name
save
end
def unlock!
self.locked_at = nil
self.locked_by = nil
save!
end
def save(*a)
return false if destroyed?
callback :before_save
result = if new_record?
callback :before_create
create
else
update
end
callback(:after_save) if result
result
end
def save!(*a)
save(*a) || raise(RecordNotSaved)
end
def destroy
self.class.functions.destroy_job(redis, id, self.class.db_time_now)
@destroyed = true
freeze
end
# take this job off the strand, and queue up the next strand job if this job
# was at the front
def tickle_strand
if strand.present?
self.class.functions.tickle_strand(redis, id, strand, self.class.db_time_now)
end
end
def create_and_lock!(worker_name)
raise "job already exists" unless new_record?
lock_in_redis!(worker_name)
end
def fail!
self.failed_at = self.class.db_time_now
save!
redis.rename Keys::JOB[id], Keys::FAILED_JOB[id]
tickle_strand
end
protected
def update_queues
if failed_at
self.class.functions.fail(redis, id)
elsif locked_at
self.class.functions.set_running(redis, id)
elsif singleton
job_id = self.class.functions.create_singleton(redis, id, queue, strand, self.class.db_time_now)
# if create_singleton returns a different job id, that means this job got
# deleted because there was already that other job on the strand. so
# replace this job with the other for returning.
if job_id != self.id
self.id = job_id
self.reload
end
else
self.class.functions.enqueue(redis, id, queue, strand, self.class.db_time_now)
end
end
def create
self.id ||= UUIDSingleton.instance.generate
self.created_at = self.updated_at = Time.now.utc
save_job_to_redis
update_queues
@new_record = false
self.id
end
def update(attribute_names = @attributes.keys)
self.updated_at = Time.now.utc
save_job_to_redis
update_queues
true
end
def queue_score
"#{priority}.#{run_at.to_i}".to_f
end
def save_job_to_redis
to_delete = []
attrs = {}
attributes.each do |k,v|
if v.nil?
to_delete << k if !new_record? && changed.include?(k)
elsif v.is_a?(ActiveSupport::TimeWithZone)
attrs[k] = v.utc.to_f
else
attrs[k] = v.as_json
end
end
key = Keys::JOB[id]
redis.mapped_hmset(key, attrs)
redis.hdel(key, to_delete) unless to_delete.empty?
end
def self.instantiate_from_attrs(redis_attrs)
if redis_attrs['id'].present?
# nil attributes don't come back at all
@attrs_template ||= columns.inject({}) { |h,c| h[c.name] = nil; h }
attrs = @attrs_template.dup.merge!(redis_attrs)
self.time_attribute_names.each { |k| attrs[k] = Time.zone.at(attrs[k].to_f) if attrs[k] }
instantiate(attrs)
else
nil
end
end
# we store time attributes in redis as floats so we don't have to do
# timestamp parsing in lua, so this builds the list of time attributes that
# need special serialization/deserialization
def self.time_attribute_names
@time_attribute_names ||= columns.find_all { |c| c.type == :timestamp }.map { |c| c.name.to_s }
end
def global_id
id
end
class Failed < Job
include Delayed::Backend::Base
def self.key_for_job_id(job_id)
Keys::FAILED_JOB[job_id]
end
def original_id
id
end
end
end
end

View File

@ -0,0 +1,5 @@
local job_id = unpack(ARGV)
local locked_at, queue, strand = unpack(redis.call('HMGET', Keys.job(job_id), 'locked_at', 'queue', 'strand'))
remove_from_queues(job_id, queue, strand)
redis.call('ZADD', Keys.running_jobs(), locked_at, job_id)

View File

@ -0,0 +1,2 @@
local job_id, strand, now = unpack(ARGV)
tickle_strand(job_id, strand, now)

View File

@ -122,7 +122,7 @@ class Pool
end
pid = fork do
Delayed::Job.connection.reconnect!
Delayed::Job.reconnect!
Delayed::Periodic.load_periodic_jobs_config
worker.start
end

View File

@ -14,21 +14,21 @@ module Stats
# redis.keys("job:id:*")
# but that requires walking the entire keyspace
# (this is an index sorted by completion time)
redis.lpush("job:id", job.id)
redis.lpush("job_stats:id", job.id)
# set of tags ever seen
# sorted set, but all weights are 0 so that we get a sorted-by-name set
redis.zadd('job:tag', 0, job.tag)
redis.zadd('job_stats:tag', 0, job.tag)
# second sorted set, with weights as number of jobs of that type completed
redis.zincrby("job:tag:counts", 1, job.tag)
redis.zincrby("job_stats:tag:counts", 1, job.tag)
# job details, stored in a redis hash
data = job.attributes
data['finished_at'] = job.class.db_time_now
data['worker'] = worker.name
data['full_command'] = job.full_name
data['run_time'] = data['finished_at'] - data['locked_at']
redis.hmset("job:id:#{job.id}", *data.to_a.flatten)
redis.hmset("job_stats:id:#{job.id}", *data.to_a.flatten)
ttl = Setting.get_cached('delayed_jobs_stats_ttl', 1.month.to_s).to_i.from_now
redis.expireat("job:id:#{job.id}", ttl.to_i)
redis.expireat("job_stats:id:#{job.id}", ttl.to_i)
end
rescue
Rails.logger.warn "Failed saving job stats: #{$!.inspect}"
@ -45,9 +45,9 @@ module Stats
# sort job:id, looking up the id in the individual job hash, and storing
# back to the same key. the end result is that any job that's expired will
# get its id in this list reset to the empty string
redis.sort 'job:id', :by => 'nosort', :get => 'job:id:*->id', :store => 'job:id'
redis.sort 'job_stats:id', :by => 'nosort', :get => 'job_stats:id:*->id', :store => 'job_stats:id'
# now, remove all elements that match the empty string
redis.lrem 'job:id', 0, ''
redis.lrem 'job_stats:id', 0, ''
# we don't remove anything from job:tag or job:tag:counts -- we don't have
# enough information stored to do so. in theory those sets could grow
# without bound, which would be an issue, but really they are limited in

View File

@ -14,7 +14,5 @@ require File.dirname(__FILE__) + '/delayed/backend/active_record'
require File.dirname(__FILE__) + '/delayed/worker'
require File.dirname(__FILE__) + '/delayed/yaml_extensions'
Delayed::Job = Delayed::Backend::ActiveRecord::Job
Object.send(:include, Delayed::MessageSending)
Module.send(:include, Delayed::MessageSending::ClassMethods)

View File

@ -33,7 +33,7 @@ shared_examples_for 'Delayed::Batch' do
"string".send_later(:size).should be_true
"string".send_later(:gsub, /./, "!").should be_true
}
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 1
job = Delayed::Job.find_available(1).first
expect{ job.invoke_job }.to raise_error
end
@ -43,7 +43,7 @@ shared_examples_for 'Delayed::Batch' do
"string".send_later(:size).should be_true
"string".send_later(:gsub, /./, "!").should be_true
}
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 1
batch_job = Delayed::Job.find_available(1).first
batch_job.batch?.should == true
@ -67,7 +67,7 @@ shared_examples_for 'Delayed::Batch' do
"string".send_later_enqueue_args(:size, :priority => Delayed::LOW_PRIORITY).should be_true
"string".send_later(:gsub, /./, "!").should be_true
}
Delayed::Job.count.should == 2
Delayed::Job.jobs_count(:current).should == 2
end
end

View File

@ -15,19 +15,19 @@ shared_examples_for 'random ruby objects' do
end
it "should add a new entry to the job table when send_later is called on it" do
lambda { Object.new.send_later(:to_s) }.should change { Delayed::Job.count }.by(1)
lambda { Object.new.send_later(:to_s) }.should change { Delayed::Job.jobs_count(:current) }.by(1)
end
it "should add a new entry to the job table when send_later_with_queue is called on it" do
lambda { Object.new.send_later_with_queue(:to_s, "testqueue") }.should change { Delayed::Job.count }.by(1)
lambda { Object.new.send_later_with_queue(:to_s, "testqueue") }.should change { Delayed::Job.jobs_count(:current, "testqueue") }.by(1)
end
it "should add a new entry to the job table when send_later is called on the class" do
lambda { Object.send_later(:to_s) }.should change { Delayed::Job.count }.by(1)
lambda { Object.send_later(:to_s) }.should change { Delayed::Job.jobs_count(:current) }.by(1)
end
it "should add a new entry to the job table when send_later_with_queue is called on the class" do
lambda { Object.send_later_with_queue(:to_s, "testqueue") }.should change { Delayed::Job.count }.by(1)
lambda { Object.send_later_with_queue(:to_s, "testqueue") }.should change { Delayed::Job.jobs_count(:current, "testqueue") }.by(1)
end
context "class methods" do
@ -39,10 +39,10 @@ shared_examples_for 'random ruby objects' do
add_send_later_methods :test_method, {}, true
end
obj = TestObject.new
lambda { obj.test_method }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method_with_send_later }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method }.should change { Delayed::Job.jobs_count(:current) }.by(1)
lambda { obj.test_method_with_send_later }.should change { Delayed::Job.jobs_count(:current) }.by(1)
obj.ran.should be_false
lambda { obj.test_method_without_send_later }.should_not change { Delayed::Job.count }
lambda { obj.test_method_without_send_later }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should be_true
end
@ -53,13 +53,13 @@ shared_examples_for 'random ruby objects' do
add_send_later_methods :test_method, {}, false
end
obj = TestObject.new
lambda { obj.test_method_with_send_later }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method_with_send_later }.should change { Delayed::Job.jobs_count(:current) }.by(1)
obj.ran.should be_false
lambda { obj.test_method }.should_not change { Delayed::Job.count }
lambda { obj.test_method }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should be_true
obj.ran = false
obj.ran.should be_false
lambda { obj.test_method_without_send_later }.should_not change { Delayed::Job.count }
lambda { obj.test_method_without_send_later }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should be_true
end
@ -130,10 +130,10 @@ shared_examples_for 'random ruby objects' do
add_send_later_methods :test_method?, {}, true
end
obj = TestObject.new
lambda { obj.test_method? }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method_with_send_later? }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method? }.should change { Delayed::Job.jobs_count(:current) }.by(1)
lambda { obj.test_method_with_send_later? }.should change { Delayed::Job.jobs_count(:current) }.by(1)
obj.ran.should be_false
lambda { obj.test_method_without_send_later? }.should_not change { Delayed::Job.count }
lambda { obj.test_method_without_send_later? }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should be_true
end
@ -144,13 +144,13 @@ shared_examples_for 'random ruby objects' do
add_send_later_methods :test_method?, {}, false
end
obj = TestObject.new
lambda { obj.test_method_with_send_later? }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method_with_send_later? }.should change { Delayed::Job.jobs_count(:current) }.by(1)
obj.ran.should be_false
lambda { obj.test_method? }.should_not change { Delayed::Job.count }
lambda { obj.test_method? }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should be_true
obj.ran = false
obj.ran.should be_false
lambda { obj.test_method_without_send_later? }.should_not change { Delayed::Job.count }
lambda { obj.test_method_without_send_later? }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should be_true
end
@ -161,10 +161,10 @@ shared_examples_for 'random ruby objects' do
add_send_later_methods :test_method=, {}, true
end
obj = TestObject.new
lambda { obj.test_method = 3 }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method_with_send_later = 4 }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method = 3 }.should change { Delayed::Job.jobs_count(:current) }.by(1)
lambda { obj.test_method_with_send_later = 4 }.should change { Delayed::Job.jobs_count(:current) }.by(1)
obj.ran.should be_nil
lambda { obj.test_method_without_send_later = 5 }.should_not change { Delayed::Job.count }
lambda { obj.test_method_without_send_later = 5 }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should == 5
end
@ -175,11 +175,11 @@ shared_examples_for 'random ruby objects' do
add_send_later_methods :test_method=, {}, false
end
obj = TestObject.new
lambda { obj.test_method_with_send_later = 1 }.should change { Delayed::Job.count }.by(1)
lambda { obj.test_method_with_send_later = 1 }.should change { Delayed::Job.jobs_count(:current) }.by(1)
obj.ran.should be_nil
lambda { obj.test_method = 2 }.should_not change { Delayed::Job.count }
lambda { obj.test_method = 2 }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should == 2
lambda { obj.test_method_without_send_later = 3 }.should_not change { Delayed::Job.count }
lambda { obj.test_method_without_send_later = 3 }.should_not change { Delayed::Job.jobs_count(:current) }
obj.ran.should == 3
end
@ -283,7 +283,7 @@ shared_examples_for 'random ruby objects' do
story = Story.create :text => 'Once upon...'
job = nil
expect { job = story.whatever(1, 5) }.to change(Delayed::Job, :count).by(1)
expect { job = story.whatever(1, 5) }.to change { Delayed::Job.jobs_count(:current) }.by(1)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.method.should == :whatever_without_send_later
@ -295,7 +295,7 @@ shared_examples_for 'random ruby objects' do
story = Story.create :text => 'Once upon...'
job = nil
expect { job = story.whatever_else(1, 5) }.to change(Delayed::Job, :count).by(1)
expect { job = story.whatever_else(1, 5) }.to change { Delayed::Job.jobs_count(:current, "testqueue") }.by(1)
job.payload_object.class.should == Delayed::PerformableMethod
job.payload_object.method.should == :whatever_else_without_send_later
@ -323,7 +323,7 @@ shared_examples_for 'random ruby objects' do
it "should queue a new job" do
lambda do
"string".send_at(1.hour.from_now, :length)
end.should change { Delayed::Job.count }.by(1)
end.should change { Delayed::Job.jobs_count(:future) }.by(1)
end
it "should schedule the job in the future" do
@ -352,7 +352,7 @@ shared_examples_for 'random ruby objects' do
it "should queue a new job" do
lambda do
"string".send_at_with_queue(1.hour.from_now, :length, "testqueue")
end.should change { Delayed::Job.count }.by(1)
end.should change { Delayed::Job.jobs_count(:future, "testqueue") }.by(1)
end
it "should schedule the job in the future" do

View File

@ -0,0 +1,70 @@
require File.expand_path("../spec_helper", __FILE__)
if Canvas.redis_enabled?
describe 'Delayed::Backend::Redis::Job' do
before :all do
@job_spec_backend = Delayed::Job
Delayed.send(:remove_const, :Job)
Delayed::Job = Delayed::Backend::Redis::Job
Delayed::Job.redis ||= Canvas.redis
end
after :all do
Delayed.send(:remove_const, :Job)
Delayed::Job = @job_spec_backend
end
before do
Delayed::Job.redis.flushdb
end
it_should_behave_like 'a delayed_jobs implementation'
describe "tickle_strand" do
it "should continue trying to tickle until the strand is empty" do
jobs = []
3.times { jobs << "test".send_later_enqueue_args(:to_s, :strand => "s1") }
job = "test".send_later_enqueue_args(:to_s, :strand => "s1")
# manually delete the first jobs, bypassing the strand book-keeping
jobs.each { |j| Delayed::Job.redis.del(Delayed::Job::Keys::JOB[j.id]) }
Delayed::Job.redis.llen(Delayed::Job::Keys::STRAND['s1']).should == 4
job.destroy
Delayed::Job.redis.llen(Delayed::Job::Keys::STRAND['s1']).should == 0
end
it "should tickle until it finds an existing job" do
jobs = []
3.times { jobs << "test".send_later_enqueue_args(:to_s, :strand => "s1") }
job = "test".send_later_enqueue_args(:to_s, :strand => "s1")
# manually delete the first jobs, bypassing the strand book-keeping
jobs[0...-1].each { |j| Delayed::Job.redis.del(Delayed::Job::Keys::JOB[j.id]) }
Delayed::Job.redis.llen(Delayed::Job::Keys::STRAND['s1']).should == 4
jobs[-1].destroy
Delayed::Job.redis.lrange(Delayed::Job::Keys::STRAND['s1'], 0, -1).should == [job.id]
Delayed::Job.get_and_lock_next_available('test worker').should == nil
Delayed::Job.get_and_lock_next_available('test worker').should == job
end
end
describe "missing jobs in queues" do
before do
@job = "test".send_later(:to_s)
@job2 = "test".send_later(:to_s)
# manually delete the job from redis
Delayed::Job.redis.del(Delayed::Job::Keys::JOB[@job.id])
end
it "should discard when trying to lock" do
Delayed::Job.get_and_lock_next_available("test worker").should be_nil
Delayed::Job.get_and_lock_next_available("test worker").should == @job2
end
it "should filter for find_available" do
Delayed::Job.find_available(1).should == []
Delayed::Job.find_available(1).should == [@job2]
end
end
end
end

View File

@ -22,7 +22,7 @@ shared_examples_for 'a backend' do
it "should increase count after enqueuing items" do
Delayed::Job.enqueue SimpleJob.new
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 1
end
it "should be able to set priority when enqueuing items" do
@ -367,9 +367,9 @@ shared_examples_for 'a backend' do
end
it "should schedule jobs if they aren't scheduled yet" do
Delayed::Job.count.should == 0
Delayed::Job.jobs_count(:current).should == 0
Delayed::Periodic.perform_audit!
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 1
job = Delayed::Job.get_and_lock_next_available('test1')
job.tag.should == 'periodic: my SimpleJob'
job.payload_object.should == Delayed::Periodic.scheduled['my SimpleJob']
@ -379,28 +379,28 @@ shared_examples_for 'a backend' do
end
it "should schedule jobs if there are only failed jobs on the queue" do
Delayed::Job.count.should == 0
expect { Delayed::Periodic.perform_audit! }.to change(Delayed::Job, :count).by(1)
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 0
expect { Delayed::Periodic.perform_audit! }.to change { Delayed::Job.jobs_count(:current) }.by(1)
Delayed::Job.jobs_count(:current).should == 1
job = Delayed::Job.get_and_lock_next_available('test1')
job.fail!
expect { Delayed::Periodic.perform_audit! }.to change(Delayed::Job, :count).by(1)
expect { Delayed::Periodic.perform_audit! }.to change{ Delayed::Job.jobs_count(:current) }.by(1)
end
it "should not schedule jobs that are already scheduled" do
Delayed::Job.count.should == 0
Delayed::Job.jobs_count(:current).should == 0
Delayed::Periodic.perform_audit!
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 1
job = Delayed::Job.find_available(1).first
Delayed::Periodic.perform_audit!
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 1
# verify that the same job still exists, it wasn't just replaced with a new one
job.should == Delayed::Job.find_available(1).first
end
it "should schedule the next job run after performing" do
Delayed::Periodic.perform_audit!
Delayed::Job.count.should == 1
Delayed::Job.jobs_count(:current).should == 1
job = Delayed::Job.get_and_lock_next_available('test')
run_job(job)
@ -460,8 +460,8 @@ shared_examples_for 'a backend' do
lambda { [story, 1, story, false].send_later(:first) }.should raise_error
end
# the sort order of current_jobs depends on the back-end implementation,
# so sort order isn't tested in these specs
# the sort order of current_jobs and list_jobs depends on the back-end
# implementation, so sort order isn't tested in these specs
describe "current jobs, queue size, strand_size" do
before do
@jobs = []
@ -504,6 +504,40 @@ shared_examples_for 'a backend' do
end
end
it "should return the jobs in a strand" do
strand_jobs = []
3.times { strand_jobs << create_job(:strand => 'test1') }
2.times { create_job(:strand => 'test2') }
strand_jobs << create_job(:strand => 'test1', :run_at => 5.hours.from_now)
create_job
jobs = Delayed::Job.list_jobs(:strand, 3, 0, "test1")
jobs.size.should == 3
jobs += Delayed::Job.list_jobs(:strand, 3, 3, "test1")
jobs.size.should == 4
jobs.sort_by { |j| j.id }.should == strand_jobs.sort_by { |j| j.id }
end
it "should return the jobs for a tag" do
tag_jobs = []
3.times { tag_jobs << "test".send_later(:to_s) }
2.times { "test".send_later(:to_i) }
tag_jobs << "test".send_later_enqueue_args(:to_s, :run_at => 5.hours.from_now)
tag_jobs << "test".send_later_enqueue_args(:to_s, :strand => "test1")
"test".send_later_enqueue_args(:to_i, :strand => "test1")
create_job
jobs = Delayed::Job.list_jobs(:tag, 3, 0, "String#to_s")
jobs.size.should == 3
jobs += Delayed::Job.list_jobs(:tag, 3, 3, "String#to_s")
jobs.size.should == 5
jobs.sort_by { |j| j.id }.should == tag_jobs.sort_by { |j| j.id }
end
describe "running_jobs" do
it "should return the running jobs, ordered by locked_at" do
Delayed::Job.stubs(:db_time_now).returns(10.minutes.ago)
@ -564,13 +598,16 @@ shared_examples_for 'a backend' do
@ignored_jobs = []
end
it "should hold and un-hold a scope of jobs" do
it "should hold a scope of jobs" do
@affected_jobs.all? { |j| j.on_hold? }.should be_false
@ignored_jobs.any? { |j| j.on_hold? }.should be_false
Delayed::Job.bulk_update('hold', :flavor => @flavor, :query => @query).should == @affected_jobs.size
@affected_jobs.all? { |j| j.reload.on_hold? }.should be_true
@ignored_jobs.any? { |j| j.reload.on_hold? }.should be_false
end
it "should un-hold a scope of jobs" do
Delayed::Job.bulk_update('unhold', :flavor => @flavor, :query => @query).should == @affected_jobs.size
@affected_jobs.any? { |j| j.reload.on_hold? }.should be_false
@ -579,8 +616,8 @@ shared_examples_for 'a backend' do
it "should delete a scope of jobs" do
Delayed::Job.bulk_update('destroy', :flavor => @flavor, :query => @query).should == @affected_jobs.size
@affected_jobs.map { |j| Delayed::Job.find_by_id(j.id) }.compact.should be_blank
@ignored_jobs.map { |j| Delayed::Job.find_by_id(j.id) }.compact.size.should == @ignored_jobs.size
@affected_jobs.map { |j| Delayed::Job.find(j.id) rescue nil }.compact.should be_blank
@ignored_jobs.map { |j| Delayed::Job.find(j.id) rescue nil }.compact.size.should == @ignored_jobs.size
end
end
@ -647,37 +684,54 @@ shared_examples_for 'a backend' do
it "should delete given job ids" do
jobs = (0..2).map { create_job }
Delayed::Job.bulk_update('destroy', :ids => jobs[0,2]).should == 2
jobs.map { |j| Delayed::Job.find_by_id(j.id) }.compact.should == jobs[2,1]
Delayed::Job.bulk_update('destroy', :ids => jobs[0,2].map(&:id)).should == 2
jobs.map { |j| Delayed::Job.find(j.id) rescue nil }.compact.should == jobs[2,1]
end
end
describe "tag_counts" do
before do
2.times { "test".send_later :to_s }
4.times { "test".send_later :to_i }
3.times { "test".send_later :upcase }
@cur = []
3.times { @cur << "test".send_later(:to_s) }
5.times { @cur << "test".send_later(:to_i) }
2.times { @cur << "test".send_later(:upcase) }
("test".send_later :downcase).fail!
5.times { "test".send_at 3.hours.from_now, :downcase }
"test".send_later :downcase
@future = []
5.times { @future << "test".send_at(3.hours.from_now, :downcase) }
@cur << "test".send_later(:downcase)
end
it "should return a sorted list of popular current tags" do
Delayed::Job.tag_counts(:current, 1).should == [{ :tag => "String#to_i", :count => 4 }]
Delayed::Job.tag_counts(:current, 1, 1).should == [{ :tag => "String#upcase", :count => 3 }]
Delayed::Job.tag_counts(:current, 5).should == [{ :tag => "String#to_i", :count => 4 },
{ :tag => "String#upcase", :count => 3 },
{ :tag => "String#to_s", :count => 2 },
Delayed::Job.tag_counts(:current, 1).should == [{ :tag => "String#to_i", :count => 5 }]
Delayed::Job.tag_counts(:current, 1, 1).should == [{ :tag => "String#to_s", :count => 3 }]
Delayed::Job.tag_counts(:current, 5).should == [{ :tag => "String#to_i", :count => 5 },
{ :tag => "String#to_s", :count => 3 },
{ :tag => "String#upcase", :count => 2 },
{ :tag => "String#downcase", :count => 1 }]
@cur[0,4].each { |j| j.destroy }
@future[0].update_attribute(:run_at, 1.hour.ago)
Delayed::Job.tag_counts(:current, 5).should == [{ :tag => "String#to_i", :count => 4 },
{ :tag => "String#upcase", :count => 2 },
{ :tag => "String#downcase", :count => 2 }]
end
it "should return a sorted list of all popular tags" do
Delayed::Job.tag_counts(:all, 1).should == [{ :tag => "String#downcase", :count => 6 }]
Delayed::Job.tag_counts(:all, 1, 1).should == [{ :tag => "String#to_i", :count => 4 }]
Delayed::Job.tag_counts(:all, 1, 1).should == [{ :tag => "String#to_i", :count => 5 }]
Delayed::Job.tag_counts(:all, 5).should == [{ :tag => "String#downcase", :count => 6 },
{ :tag => "String#to_i", :count => 4 },
{ :tag => "String#upcase", :count => 3 },
{ :tag => "String#to_s", :count => 2 },]
{ :tag => "String#to_i", :count => 5 },
{ :tag => "String#to_s", :count => 3 },
{ :tag => "String#upcase", :count => 2 },]
@cur[0,4].each { |j| j.destroy }
@future[0].destroy
@future[1].fail!
@future[2].fail!
Delayed::Job.tag_counts(:all, 5).should == [{ :tag => "String#to_i", :count => 4 },
{ :tag => "String#downcase", :count => 3 },
{ :tag => "String#upcase", :count => 2 },]
end
end
end

View File

@ -5,31 +5,33 @@ shared_examples_for 'Delayed::Stats' do
end
it "should store stats for jobs" do
job = "ohai".send_later(:reverse)
job.lock_exclusively!('stub worker').should be_true
"ohai".send_later(:reverse)
job = Delayed::Job.get_and_lock_next_available('stub worker')
job.should be_present
worker = mock('Delayed::Worker')
worker.stubs(:name).returns("stub worker")
Delayed::Stats.job_complete(job, worker)
Canvas.redis.hget("job:id:#{job.id}", "worker").should == 'stub worker'
Canvas.redis.hget("job:id:#{job.id}", "id").should == job.id.to_s
Canvas.redis.hget("job_stats:id:#{job.id}", "worker").should == 'stub worker'
Canvas.redis.hget("job_stats:id:#{job.id}", "id").should == job.id.to_s
end
it "should completely clean up after stats" do
job = "ohai".send_later(:reverse)
job.lock_exclusively!('stub worker').should be_true
"ohai".send_later(:reverse)
job = Delayed::Job.get_and_lock_next_available('stub worker')
job.should be_present
worker = mock('Delayed::Worker')
worker.stubs(:name).returns("stub worker")
Canvas.redis.keys("job:*").should be_empty
Canvas.redis.keys("job_stats:*").should be_empty
Delayed::Stats.job_complete(job, worker)
Canvas.redis.keys("job:*").should_not be_empty
Canvas.redis.type('job:id').should == 'list'
Canvas.redis.keys("job_stats:*").should_not be_empty
Canvas.redis.type('job_stats:id').should == 'list'
# delete the job, ensuring that there's nothing left after cleanup
Canvas.redis.del("job:id:#{job.id}")
Canvas.redis.del("job_stats:id:#{job.id}")
Delayed::Stats.cleanup
# these 2 keys remain forever, they don't grow without bound
Canvas.redis.keys("job:*").sort.should == ['job:tag', 'job:tag:counts']
Canvas.redis.keys("job_stats:*").sort.should == ['job_stats:tag', 'job_stats:tag:counts']
end
end
end