Cancel orphaned Progresses
When calling progress.process_job with a singleton and on_conflict: :overwrite, inst-jobs may not queue a new job and instead update an existing queued job with the same singleton. If this happens, the job's handler will now point to the new Progress object and the old Progress will no longer be updated when the job completes. It will be left as "queued" indefinitely, and enough of these queued Progresses can be created on a context to cause performance issues. So, when calling process_job, detect if a job was updated (not inserted), and then cancel any other queued Progress objects with the same job id. Note: if a job is enqueued with a singleton and on_conflict: :use_earliest, inst-jobs may update an existing job if it encounters a conflict (and job.enqueue_result will be :updated), but it will not update the job's handler, so the job will still point at the old Progress. This commit does not address that use case. closes LF-1026 flag = none Test plan: - In a rails console, find a course with a course pace - Stop delayed_job - Run `progresses = []` - Run `progresses << pace.create_publish_progress` three times in a row - Expect the first 2 progresses to be canceled, and the third to be queued Change-Id: I571397e93771fa469aa79f03f8710f66e607b453 Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/333550 Reviewed-by: Robin Kuss <rkuss@instructure.com> Reviewed-by: Jacob Burroughs <jburroughs@instructure.com> QA-Review: Robin Kuss <rkuss@instructure.com> Product-Review: Jackson Howe <jackson.howe@instructure.com> Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com>
This commit is contained in:
parent
62cdc1694d
commit
2813651b57
|
@ -48,6 +48,7 @@ class Progress < ActiveRecord::Base
|
|||
state :queued do
|
||||
event :start, transitions_to: :running
|
||||
event :fail, transitions_to: :failed
|
||||
event :cancel, transitions_to: :canceled
|
||||
end
|
||||
state :running do
|
||||
event(:complete, transitions_to: :completed) { self.completion = 100 }
|
||||
|
@ -55,6 +56,7 @@ class Progress < ActiveRecord::Base
|
|||
end
|
||||
state :completed
|
||||
state :failed
|
||||
state :canceled
|
||||
end
|
||||
|
||||
set_policy do
|
||||
|
@ -116,11 +118,21 @@ class Progress < ActiveRecord::Base
|
|||
ActiveRecord::Base.connection.after_transaction_commit do
|
||||
job = Delayed::Job.enqueue(work, **enqueue_args)
|
||||
update(delayed_job_id: job.id)
|
||||
cancel_orphaned_progresses(job.id) if enqueue_args[:on_conflict] == :overwrite && job.enqueue_result == :updated
|
||||
job
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
# If a job is enqueued with `on_conflict: :overwrite`, and another job already exists with the same
|
||||
# singleton, then the existing job's handler will be updated to point at the new Progress. Thus, cancel any
|
||||
# other queued Progresses that were pointing at the job (since they'll never get updated).
|
||||
def cancel_orphaned_progresses(job_id)
|
||||
Progress.where(delayed_job_id: job_id, workflow_state: "queued").where.not(id:).update_all(workflow_state: "canceled")
|
||||
end
|
||||
|
||||
# (private)
|
||||
class Work < Delayed::PerformableMethod
|
||||
def initialize(progress, *args, **kwargs)
|
||||
|
|
|
@ -61,6 +61,18 @@ describe Progress do
|
|||
expect(progress.reload).to be_failed
|
||||
end
|
||||
|
||||
it "cancels other queued progresses with the same delayed_job_id" do
|
||||
job1 = progress.process_job(Jerbs, :succeed, { singleton: "test_singleton", on_conflict: :overwrite }, :flag)
|
||||
expect(progress.reload).to be_queued
|
||||
expect(job1.enqueue_result).to be :inserted
|
||||
progress2 = Progress.create!(tag: "test", context: @user)
|
||||
job2 = progress2.process_job(Jerbs, :succeed, { singleton: "test_singleton", on_conflict: :overwrite }, :flag)
|
||||
expect(progress2.reload).to be_queued
|
||||
expect(progress.reload).to be_canceled
|
||||
expect(job1.id).to be job2.id
|
||||
expect(job2.enqueue_result).to be :updated
|
||||
end
|
||||
|
||||
it "defaults to low priority" do
|
||||
job = progress.process_job(Jerbs, :succeed, {}, :flag)
|
||||
expect(job.priority).to eq Delayed::LOW_PRIORITY
|
||||
|
|
Loading…
Reference in New Issue