From b4ce2b49e14452837fa9cc7518c6f8aea88741b4 Mon Sep 17 00:00:00 2001 From: James Williams Date: Wed, 28 Oct 2015 10:27:51 -0600 Subject: [PATCH] update canvas-jobs includes n-strand improvements to perform load balancing at runtime instead of queue time (e.g. should prevent one strand from getting blocked when queueing many imports at a time) test plan: * regression test delayed jobs Change-Id: I2f616d380c841d7231af5159c09c24cbcc8d235d Reviewed-on: https://gerrit.instructure.com/65976 Reviewed-by: Cody Cutrer Tested-by: Jenkins QA-Review: August Thornton Product-Review: James Williams --- Gemfile.d/app.rb | 2 +- ...150807133223_add_max_concurrent_to_jobs.rb | 76 +++++++++++++++++++ 2 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 db/migrate/20150807133223_add_max_concurrent_to_jobs.rb diff --git a/Gemfile.d/app.rb b/Gemfile.d/app.rb index 514fc69ccb8..9548aac8a8d 100644 --- a/Gemfile.d/app.rb +++ b/Gemfile.d/app.rb @@ -34,7 +34,7 @@ gem 'bcrypt-ruby', '3.0.1' gem 'canvas_connect', '0.3.8' gem 'adobe_connect', '1.0.3', require: false gem 'canvas_webex', '0.15' -gem 'canvas-jobs', '0.9.16' +gem 'canvas-jobs', '0.10.0' gem 'rufus-scheduler', '3.1.2', require: false gem 'ffi', '1.1.5', require: false gem 'hairtrigger', '0.2.15' diff --git a/db/migrate/20150807133223_add_max_concurrent_to_jobs.rb b/db/migrate/20150807133223_add_max_concurrent_to_jobs.rb new file mode 100644 index 00000000000..7588c0f7c0b --- /dev/null +++ b/db/migrate/20150807133223_add_max_concurrent_to_jobs.rb @@ -0,0 +1,76 @@ +class AddMaxConcurrentToJobs < ActiveRecord::Migration + tag :predeploy + + def connection + Delayed::Backend::ActiveRecord::Job.connection + end + + def up + add_column :delayed_jobs, :max_concurrent, :integer, :default => 1, :null => false + + if connection.adapter_name == 'PostgreSQL' + search_path = Shard.current.name + + execute(<<-CODE) + CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_before_insert_row_tr_fn')} () RETURNS trigger AS $$ + BEGIN + IF NEW.strand IS NOT NULL THEN + PERFORM pg_advisory_xact_lock(half_md5_as_bigint(NEW.strand)); + IF (SELECT COUNT(*) FROM delayed_jobs WHERE strand = NEW.strand) >= NEW.max_concurrent THEN + NEW.next_in_strand := 'f'; + END IF; + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql SET search_path TO #{search_path}; + CODE + + execute(<<-CODE) + CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_after_delete_row_tr_fn')} () RETURNS trigger AS $$ + BEGIN + IF OLD.strand IS NOT NULL THEN + PERFORM pg_advisory_xact_lock(half_md5_as_bigint(OLD.strand)); + IF (SELECT COUNT(*) FROM delayed_jobs WHERE strand = OLD.strand AND next_in_strand = 't') < OLD.max_concurrent THEN + UPDATE delayed_jobs SET next_in_strand = 't' WHERE id = ( + SELECT id FROM delayed_jobs j2 WHERE next_in_strand = 'f' AND + j2.strand = OLD.strand ORDER BY j2.id ASC LIMIT 1 FOR UPDATE + ); + END IF; + END IF; + RETURN OLD; + END; + $$ LANGUAGE plpgsql SET search_path TO #{search_path}; + CODE + end + end + + def down + remove_column :delayed_jobs, :max_concurrent + + if connection.adapter_name == 'PostgreSQL' + search_path = Shard.current.name + + execute(<<-CODE) + CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_before_insert_row_tr_fn')} () RETURNS trigger AS $$ + BEGIN + PERFORM pg_advisory_xact_lock(half_md5_as_bigint(NEW.strand)); + IF (SELECT 1 FROM delayed_jobs WHERE strand = NEW.strand LIMIT 1) = 1 THEN + NEW.next_in_strand := 'f'; + END IF; + RETURN NEW; + END; + $$ LANGUAGE plpgsql SET search_path TO #{search_path}; + CODE + + execute(<<-CODE) + CREATE OR REPLACE FUNCTION #{connection.quote_table_name('delayed_jobs_after_delete_row_tr_fn')} () RETURNS trigger AS $$ + BEGIN + PERFORM pg_advisory_xact_lock(half_md5_as_bigint(OLD.strand)); + UPDATE delayed_jobs SET next_in_strand = 't' WHERE id = (SELECT id FROM delayed_jobs j2 WHERE j2.strand = OLD.strand ORDER BY j2.strand, j2.id ASC LIMIT 1 FOR UPDATE); + RETURN OLD; + END; + $$ LANGUAGE plpgsql SET search_path TO #{search_path}; + CODE + end + end +end