limit simultaneous sis import due date recache job runs
closes EVAL-2376 flag=none As part of this change, the number of simultaneously running enrollment importer due date recache jobs per shard is capped at 10. In order to change this, run: ``` Setting.set( "DueDateCacher#recompute_for_sis_import_num_strands", "20" ) ``` Test Plan: 1. create an assignment 2. Verify in a rails console that submissions were created by DueDateCacher for the assignment: Submission.where(assignment_id: <id>).count => should be greater than 0 3. Run a SIS import that adds a new enrollment to a course (for a user that has not yet been added to the course), and verify submissions are created for any assignment that is assigned to the newly-added enrollment: Submission.where( course_id: <id>, user_id: <enrollment's user_id> ).count => should be greater than 0 Change-Id: I5f28b5bfdc7f419bc2fc450aa4cc23fe2c28a51f Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/288944 Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com> Reviewed-by: Aaron Shafovaloff <ashafovaloff@instructure.com> Reviewed-by: Kai Bjorkman <kbjorkman@instructure.com> QA-Review: Kai Bjorkman <kbjorkman@instructure.com> Product-Review: Syed Hussain <shussain@instructure.com>
This commit is contained in:
parent
cf2c7e43c8
commit
deb4370dca
|
@ -123,21 +123,38 @@ class DueDateCacher
|
|||
end
|
||||
|
||||
def self.recompute_users_for_course(user_ids, course, assignments = nil, inst_jobs_opts = {})
|
||||
opts = inst_jobs_opts.extract!(:update_grades, :executing_user, :sis_import, :require_singleton).reverse_merge(require_singleton: assignments.nil?)
|
||||
user_ids = Array(user_ids)
|
||||
course = Course.find(course) unless course.is_a?(Course)
|
||||
update_grades = inst_jobs_opts.delete(:update_grades) || false
|
||||
update_grades = opts[:update_grades] || false
|
||||
inst_jobs_opts[:max_attempts] ||= 10
|
||||
inst_jobs_opts[:strand] ||= "cached_due_date:calculator:Course:#{course.global_id}"
|
||||
if assignments.nil?
|
||||
if opts[:require_singleton]
|
||||
inst_jobs_opts[:singleton] ||= "cached_due_date:calculator:Course:#{course.global_id}:Users:#{Digest::SHA256.hexdigest(user_ids.sort.join(":"))}:UpdateGrades:#{update_grades ? 1 : 0}"
|
||||
end
|
||||
assignments ||= Assignment.active.where(context: course).pluck(:id)
|
||||
return if assignments.empty?
|
||||
|
||||
current_caller = caller(1..1).first
|
||||
executing_user = inst_jobs_opts.delete(:executing_user) || current_executing_user
|
||||
due_date_cacher = new(course, assignments, user_ids, update_grades: update_grades, original_caller: current_caller, executing_user: executing_user)
|
||||
executing_user = opts[:executing_user] || current_executing_user
|
||||
|
||||
if opts[:sis_import]
|
||||
running_jobs_count = Delayed::Job.running.where(shard_id: course.shard.id, tag: "DueDateCacher#recompute_for_sis_import").count
|
||||
max_jobs = Setting.get("DueDateCacher#recompute_for_sis_import_num_strands", "10").to_i
|
||||
|
||||
if running_jobs_count >= max_jobs
|
||||
# there are too many sis recompute jobs running concurrently now. let's check again in a bit to see if we can run.
|
||||
return delay_if_production(
|
||||
**inst_jobs_opts,
|
||||
run_at: Setting.get("DueDateCacher#recompute_for_sis_import_requeue_delay", "10").to_i.seconds.from_now
|
||||
).recompute_users_for_course(user_ids, course, assignments, opts)
|
||||
else
|
||||
due_date_cacher = new(course, assignments, user_ids, update_grades: update_grades, original_caller: current_caller, executing_user: executing_user)
|
||||
return due_date_cacher.delay_if_production(**inst_jobs_opts).recompute_for_sis_import
|
||||
end
|
||||
end
|
||||
|
||||
due_date_cacher = new(course, assignments, user_ids, update_grades: update_grades, original_caller: current_caller, executing_user: executing_user)
|
||||
due_date_cacher.delay_if_production(**inst_jobs_opts).recompute
|
||||
end
|
||||
|
||||
|
@ -167,6 +184,12 @@ class DueDateCacher
|
|||
end
|
||||
end
|
||||
|
||||
# exists so that we can identify (and limit) jobs running specifically for sis imports
|
||||
# Delayed::Job.where(tag: "DueDateCacher#recompute_for_sis_import")
|
||||
def recompute_for_sis_import
|
||||
recompute
|
||||
end
|
||||
|
||||
def recompute
|
||||
Rails.logger.debug "DUE DATE CACHER STARTS: #{Time.zone.now.to_i}"
|
||||
Rails.logger.debug "DDC#recompute() - original caller: #{@original_caller}"
|
||||
|
|
|
@ -51,7 +51,7 @@ module SIS
|
|||
end
|
||||
i.courses_to_recache_due_dates.to_a.in_groups_of(1000, false) do |batch|
|
||||
batch.each do |course_id, user_ids|
|
||||
DueDateCacher.recompute_users_for_course(user_ids.uniq, course_id, nil, update_grades: true)
|
||||
DueDateCacher.recompute_users_for_course(user_ids.uniq, course_id, nil, sis_import: true, update_grades: true)
|
||||
end
|
||||
end
|
||||
# We batch these up at the end because normally a user would get several enrollments, and there's no reason
|
||||
|
|
|
@ -215,104 +215,140 @@ describe DueDateCacher do
|
|||
end
|
||||
|
||||
describe ".recompute_users_for_course" do
|
||||
let!(:assignment_1) { @assignment }
|
||||
let(:assignment_2) { assignment_model(course: @course) }
|
||||
let(:assignments) { [assignment_1, assignment_2] }
|
||||
context "when run for a sis import" do
|
||||
specs_require_sharding
|
||||
|
||||
let!(:student_1) { @student }
|
||||
let(:student_2) { student_in_course(course: @course) }
|
||||
let(:student_ids) { [student_1.id, student_2.id] }
|
||||
let(:instance) { instance_double("DueDateCacher", recompute: nil) }
|
||||
before do
|
||||
allow(Rails.env).to receive(:production?).and_return(true)
|
||||
end
|
||||
|
||||
it "delegates to an instance" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
expect(instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course)
|
||||
end
|
||||
it "calls recompute_for_sis_import in a delayed job" do
|
||||
expect do
|
||||
DueDateCacher.recompute_users_for_course(@student.id, @course, nil, sis_import: true)
|
||||
end.to change {
|
||||
Delayed::Job.where(tag: "DueDateCacher#recompute_for_sis_import").count
|
||||
}.from(0).to(1)
|
||||
end
|
||||
|
||||
it "passes along the whole user array" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
.with(@course, Assignment.active.where(context: @course).pluck(:id), student_ids,
|
||||
hash_including(update_grades: false))
|
||||
DueDateCacher.recompute_users_for_course(student_ids, @course)
|
||||
end
|
||||
it "limits the number of sis recompute jobs that can run concurrently" do
|
||||
course_with_student(active_all: true)
|
||||
assignment_model(course: @course)
|
||||
|
||||
it "calls recompute with the value of update_grades if it is set to true" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(update_grades: true))
|
||||
.and_return(instance)
|
||||
expect(instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, assignments.map(&:id), update_grades: true)
|
||||
end
|
||||
|
||||
it "calls recompute with the value of update_grades if it is set to false" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(update_grades: false))
|
||||
.and_return(instance)
|
||||
expect(instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, assignments.map(&:id), update_grades: false)
|
||||
end
|
||||
|
||||
it "passes assignments if it has any specified" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
.with(@course, assignments, student_ids, hash_including(update_grades: false))
|
||||
DueDateCacher.recompute_users_for_course(student_ids, @course, assignments)
|
||||
end
|
||||
|
||||
it "handles being called with a course id" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
.with(@course, Assignment.active.where(context: @course).pluck(:id), student_ids,
|
||||
hash_including(update_grades: false))
|
||||
DueDateCacher.recompute_users_for_course(student_ids, @course.id)
|
||||
end
|
||||
|
||||
it "queues a delayed job in a singleton if given no assignments and no singleton option" do
|
||||
@instance = double
|
||||
expect(DueDateCacher).to receive(:new).and_return(@instance)
|
||||
expect(@instance).to receive(:delay_if_production)
|
||||
.with(
|
||||
singleton: "cached_due_date:calculator:Course:#{@course.global_id}:Users:#{Digest::SHA256.hexdigest(student_1.id.to_s)}:UpdateGrades:0",
|
||||
strand: "cached_due_date:calculator:Course:#{@course.global_id}",
|
||||
max_attempts: 10
|
||||
Setting.set("DueDateCacher#recompute_for_sis_import_num_strands", "1")
|
||||
Delayed::Job.create!(
|
||||
locked_at: Time.zone.now,
|
||||
locked_by: "foo",
|
||||
tag: "DueDateCacher#recompute_for_sis_import",
|
||||
shard_id: @course.shard.id
|
||||
)
|
||||
.and_return(@instance)
|
||||
expect(@instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course)
|
||||
end
|
||||
|
||||
it "queues a delayed job in a singleton if given no assignments and a singleton option" do
|
||||
@instance = double
|
||||
expect(DueDateCacher).to receive(:new).and_return(@instance)
|
||||
expect(@instance).to receive(:delay_if_production)
|
||||
.with(singleton: "what:up:dog", max_attempts: 10, strand: "cached_due_date:calculator:Course:#{@course.global_id}")
|
||||
.and_return(@instance)
|
||||
expect(@instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, nil, singleton: "what:up:dog", strand: "cached_due_date:calculator:Course:#{@course.global_id}")
|
||||
end
|
||||
|
||||
it "initializes a DueDateCacher with the value of executing_user if set" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(executing_user: student_1))
|
||||
.and_return(instance)
|
||||
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, nil, executing_user: student_1)
|
||||
end
|
||||
|
||||
it "initializes a DueDateCacher with the user set by with_executing_user if executing_user is not passed" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(executing_user: student_1))
|
||||
.and_return(instance)
|
||||
|
||||
DueDateCacher.with_executing_user(student_1) do
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, nil)
|
||||
expect do
|
||||
DueDateCacher.recompute_users_for_course(@student.id, @course, nil, sis_import: true)
|
||||
end.not_to change {
|
||||
Delayed::Job.where(tag: "DueDateCacher#recompute_for_sis_import").count
|
||||
}.from(1)
|
||||
end
|
||||
end
|
||||
|
||||
it "initializes a DueDateCacher with a nil executing_user if no user has been specified" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), hash_including(executing_user: nil))
|
||||
.and_return(instance)
|
||||
DueDateCacher.recompute_course(@course, run_immediately: true)
|
||||
context "when not run for a sis import" do
|
||||
let!(:assignment_1) { @assignment }
|
||||
let(:assignment_2) { assignment_model(course: @course) }
|
||||
let(:assignments) { [assignment_1, assignment_2] }
|
||||
|
||||
let!(:student_1) { @student }
|
||||
let(:student_2) { student_in_course(course: @course) }
|
||||
let(:student_ids) { [student_1.id, student_2.id] }
|
||||
let(:instance) { instance_double("DueDateCacher", recompute: nil) }
|
||||
|
||||
it "delegates to an instance" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
expect(instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course)
|
||||
end
|
||||
|
||||
it "passes along the whole user array" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
.with(@course, Assignment.active.where(context: @course).pluck(:id), student_ids,
|
||||
hash_including(update_grades: false))
|
||||
DueDateCacher.recompute_users_for_course(student_ids, @course)
|
||||
end
|
||||
|
||||
it "calls recompute with the value of update_grades if it is set to true" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(update_grades: true))
|
||||
.and_return(instance)
|
||||
expect(instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, assignments.map(&:id), update_grades: true)
|
||||
end
|
||||
|
||||
it "calls recompute with the value of update_grades if it is set to false" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(update_grades: false))
|
||||
.and_return(instance)
|
||||
expect(instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, assignments.map(&:id), update_grades: false)
|
||||
end
|
||||
|
||||
it "passes assignments if it has any specified" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
.with(@course, assignments, student_ids, hash_including(update_grades: false))
|
||||
DueDateCacher.recompute_users_for_course(student_ids, @course, assignments)
|
||||
end
|
||||
|
||||
it "handles being called with a course id" do
|
||||
expect(DueDateCacher).to receive(:new).and_return(instance)
|
||||
.with(@course, Assignment.active.where(context: @course).pluck(:id), student_ids,
|
||||
hash_including(update_grades: false))
|
||||
DueDateCacher.recompute_users_for_course(student_ids, @course.id)
|
||||
end
|
||||
|
||||
it "queues a delayed job in a singleton if given no assignments and no singleton option" do
|
||||
@instance = double
|
||||
expect(DueDateCacher).to receive(:new).and_return(@instance)
|
||||
expect(@instance).to receive(:delay_if_production)
|
||||
.with(
|
||||
singleton: "cached_due_date:calculator:Course:#{@course.global_id}:Users:#{Digest::SHA256.hexdigest(student_1.id.to_s)}:UpdateGrades:0",
|
||||
strand: "cached_due_date:calculator:Course:#{@course.global_id}",
|
||||
max_attempts: 10
|
||||
)
|
||||
.and_return(@instance)
|
||||
expect(@instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course)
|
||||
end
|
||||
|
||||
it "queues a delayed job in a singleton if given no assignments and a singleton option" do
|
||||
@instance = double
|
||||
expect(DueDateCacher).to receive(:new).and_return(@instance)
|
||||
expect(@instance).to receive(:delay_if_production)
|
||||
.with(singleton: "what:up:dog", max_attempts: 10, strand: "cached_due_date:calculator:Course:#{@course.global_id}")
|
||||
.and_return(@instance)
|
||||
expect(@instance).to receive(:recompute)
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, nil, singleton: "what:up:dog", strand: "cached_due_date:calculator:Course:#{@course.global_id}")
|
||||
end
|
||||
|
||||
it "initializes a DueDateCacher with the value of executing_user if set" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(executing_user: student_1))
|
||||
.and_return(instance)
|
||||
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, nil, executing_user: student_1)
|
||||
end
|
||||
|
||||
it "initializes a DueDateCacher with the user set by with_executing_user if executing_user is not passed" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), [student_1.id], hash_including(executing_user: student_1))
|
||||
.and_return(instance)
|
||||
|
||||
DueDateCacher.with_executing_user(student_1) do
|
||||
DueDateCacher.recompute_users_for_course(student_1.id, @course, nil)
|
||||
end
|
||||
end
|
||||
|
||||
it "initializes a DueDateCacher with a nil executing_user if no user has been specified" do
|
||||
expect(DueDateCacher).to receive(:new)
|
||||
.with(@course, match_array(assignments.map(&:id)), hash_including(executing_user: nil))
|
||||
.and_return(instance)
|
||||
DueDateCacher.recompute_course(@course, run_immediately: true)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
@ -612,9 +612,9 @@ describe SIS::CSV::EnrollmentImporter do
|
|||
expect(DueDateCacher).not_to receive(:recompute)
|
||||
# there are no assignments so this will just return, but we just want to see
|
||||
# that it gets called correctly and for the users that wre imported
|
||||
expect(DueDateCacher).to receive(:recompute_users_for_course).with([user1.id], course1.id, nil, update_grades: true)
|
||||
expect(DueDateCacher).to receive(:recompute_users_for_course).with([user1.id], course1.id, nil, sis_import: true, update_grades: true)
|
||||
expect(DueDateCacher).to receive(:recompute_users_for_course)
|
||||
.with([user1.id, user2.id], course2.id, nil, update_grades: true)
|
||||
.with([user1.id, user2.id], course2.id, nil, sis_import: true, update_grades: true)
|
||||
process_csv_data_cleanly(
|
||||
"course_id,user_id,role,status",
|
||||
"C001,U001,student,active",
|
||||
|
|
Loading…
Reference in New Issue