update canvas-jobs

includes n-strand improvements to perform load balancing
at runtime instead of queue time

(e.g. should prevent one strand from getting blocked when
 queueing many imports at a time)

test plan:
* regression test delayed jobs

Change-Id: I2f616d380c841d7231af5159c09c24cbcc8d235d
Reviewed-on: https://gerrit.instructure.com/65976
Reviewed-by: Cody Cutrer <cody@instructure.com>
Tested-by: Jenkins
QA-Review: August Thornton <august@instructure.com>
Product-Review: James Williams  <jamesw@instructure.com>
This commit is contained in:
James Williams 2015-10-28 10:27:51 -06:00
parent 59183acae8
commit b4ce2b49e1
2 changed files with 77 additions and 1 deletions

View File

@ -34,7 +34,7 @@ gem 'bcrypt-ruby', '3.0.1'
gem 'canvas_connect', '0.3.8'
gem 'adobe_connect', '1.0.3', require: false
gem 'canvas_webex', '0.15'
gem 'canvas-jobs', '0.9.16'
gem 'canvas-jobs', '0.10.0'
gem 'rufus-scheduler', '3.1.2', require: false
gem 'ffi', '1.1.5', require: false
gem 'hairtrigger', '0.2.15'

View File

@ -0,0 +1,76 @@
class AddMaxConcurrentToJobs < ActiveRecord::Migration
tag :predeploy
def connection
Delayed::Backend::ActiveRecord::Job.connection
end
def up
add_column :delayed_jobs, :max_concurrent, :integer, :default => 1, :null => false
if connection.adapter_name == 'PostgreSQL'
search_path = Shard.current.name
execute(<<-CODE)
CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_before_insert_row_tr_fn')} () RETURNS trigger AS $$
BEGIN
IF NEW.strand IS NOT NULL THEN
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(NEW.strand));
IF (SELECT COUNT(*) FROM delayed_jobs WHERE strand = NEW.strand) >= NEW.max_concurrent THEN
NEW.next_in_strand := 'f';
END IF;
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql SET search_path TO #{search_path};
CODE
execute(<<-CODE)
CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_after_delete_row_tr_fn')} () RETURNS trigger AS $$
BEGIN
IF OLD.strand IS NOT NULL THEN
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(OLD.strand));
IF (SELECT COUNT(*) FROM delayed_jobs WHERE strand = OLD.strand AND next_in_strand = 't') < OLD.max_concurrent THEN
UPDATE delayed_jobs SET next_in_strand = 't' WHERE id = (
SELECT id FROM delayed_jobs j2 WHERE next_in_strand = 'f' AND
j2.strand = OLD.strand ORDER BY j2.id ASC LIMIT 1 FOR UPDATE
);
END IF;
END IF;
RETURN OLD;
END;
$$ LANGUAGE plpgsql SET search_path TO #{search_path};
CODE
end
end
def down
remove_column :delayed_jobs, :max_concurrent
if connection.adapter_name == 'PostgreSQL'
search_path = Shard.current.name
execute(<<-CODE)
CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_before_insert_row_tr_fn')} () RETURNS trigger AS $$
BEGIN
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(NEW.strand));
IF (SELECT 1 FROM delayed_jobs WHERE strand = NEW.strand LIMIT 1) = 1 THEN
NEW.next_in_strand := 'f';
END IF;
RETURN NEW;
END;
$$ LANGUAGE plpgsql SET search_path TO #{search_path};
CODE
execute(<<-CODE)
CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_after_delete_row_tr_fn')} () RETURNS trigger AS $$
BEGIN
PERFORM pg_advisory_xact_lock(half_md5_as_bigint(OLD.strand));
UPDATE delayed_jobs SET next_in_strand = 't' WHERE id = (SELECT id FROM delayed_jobs j2 WHERE j2.strand = OLD.strand ORDER BY j2.strand, j2.id ASC LIMIT 1 FOR UPDATE);
RETURN OLD;
END;
$$ LANGUAGE plpgsql SET search_path TO #{search_path};
CODE
end
end
end