stop parallelized reports on error

the code that aborts "in progress" runners currently doesn't
abort runners that haven't started yet, but should

also the scope used to check whether all runners have completed
should look for failed/aborted runners (in practice this hasn't
mattered in the past because the report itself was put in
"error" state, but it feels like a recipe for incomplete data
to be returned to customers if something changes)

also make sure a report runner that fails remains in "error" state;
currently `update_parallel_progress`, called in the `ensure` clause
in `run_account_report_runner`, changes it to "complete"

test plan:
 - specs
 - smoke test parallelized reports like
   grade export and/or student activity

flag=none

refs FOO-4281
fixes FOO-4289

Change-Id: I8f64743bff6ffd2fe9515338de4c27572d213924
Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/339890
Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com>
Reviewed-by: August Thornton <august@instructure.com>
QA-Review: Jeremy Stanley <jeremy@instructure.com>
Product-Review: Jeremy Stanley <jeremy@instructure.com>
This commit is contained in:
Jeremy Stanley 2024-02-06 16:58:06 -07:00
parent 9555bf4630
commit d4463944f4
5 changed files with 98 additions and 16 deletions

View File

@ -29,6 +29,8 @@ class AccountReport < ActiveRecord::Base
has_many :account_report_runners, inverse_of: :account_report, autosave: false has_many :account_report_runners, inverse_of: :account_report, autosave: false
has_many :account_report_rows, inverse_of: :account_report, autosave: false has_many :account_report_rows, inverse_of: :account_report, autosave: false
after_save :abort_incomplete_runners_if_needed
validates :account_id, :user_id, :workflow_state, presence: true validates :account_id, :user_id, :workflow_state, presence: true
serialize :parameters, type: Hash serialize :parameters, type: Hash
@ -70,12 +72,7 @@ class AccountReport < ActiveRecord::Base
alias_method :destroy_permanently!, :destroy alias_method :destroy_permanently!, :destroy
def destroy def destroy
self.workflow_state = "deleted" self.workflow_state = "deleted"
result = save! save!
if saved_change_to_workflow_state?
abort_incomplete_runners
delay.delete_account_report_rows
end
result
end end
def self.delete_old_rows_and_runners def self.delete_old_rows_and_runners
@ -143,6 +140,13 @@ class AccountReport < ActiveRecord::Base
AccountReports.available_reports AccountReports.available_reports
end end
def abort_incomplete_runners_if_needed
if saved_change_to_workflow_state? && (deleted? || aborted? || error?)
abort_incomplete_runners
delay(priority: Delayed::LOWER_PRIORITY).delete_account_report_rows
end
end
def abort_incomplete_runners def abort_incomplete_runners
account_report_runners.incomplete.in_batches.update_all(workflow_state: "aborted", updated_at: Time.now.utc) account_report_runners.incomplete.in_batches.update_all(workflow_state: "aborted", updated_at: Time.now.utc)
end end

View File

@ -71,9 +71,9 @@ class AccountReportRunner < ActiveRecord::Base
update!(workflow_state: "error", ended_at: Time.now.utc) update!(workflow_state: "error", ended_at: Time.now.utc)
end end
scope :in_progress, -> { where(workflow_state: %w[running]) }
scope :completed, -> { where(workflow_state: %w[completed]) } scope :completed, -> { where(workflow_state: %w[completed]) }
scope :incomplete, -> { where(workflow_state: %w[created running]) } scope :incomplete, -> { where(workflow_state: %w[created running]) }
scope :incomplete_or_failed, -> { where.not(workflow_state: "completed") }
def delete_account_report_rows def delete_account_report_rows
account_report_rows.in_batches(of: 10_000).delete_all account_report_rows.in_batches(of: 10_000).delete_all

View File

@ -428,7 +428,7 @@ module AccountReports::ReportHelper
@account_report = report_runner.account_report @account_report = report_runner.account_report
begin begin
if @account_report.aborted? || @account_report.deleted? if @account_report.aborted? || @account_report.deleted? || @account_report.error?
report_runner.abort report_runner.abort
return return
end end
@ -442,11 +442,11 @@ module AccountReports::ReportHelper
fail_with_error(e) fail_with_error(e)
ensure ensure
update_parallel_progress(account_report: @account_report, report_runner:) update_parallel_progress(account_report: @account_report, report_runner:)
compile_parallel_report(headers, files:) if last_account_report_runner?(@account_report) compile_parallel_report(report_runner, headers, files:) if last_account_report_runner?(@account_report)
end end
end end
def compile_parallel_report(headers, files: nil) def compile_parallel_report(report_runner, headers, files: nil)
GuardRail.activate(:primary) { @account_report.update(total_lines: @account_report.account_report_rows.count + 1) } GuardRail.activate(:primary) { @account_report.update(total_lines: @account_report.account_report_rows.count + 1) }
xlog_location = AccountReport.current_xlog_location xlog_location = AccountReport.current_xlog_location
# wait 2 minutes for report db to catch up, if it does not catch up, use the # wait 2 minutes for report db to catch up, if it does not catch up, use the
@ -499,9 +499,6 @@ module AccountReports::ReportHelper
def fail_with_error(error) def fail_with_error(error)
GuardRail.activate(:primary) do GuardRail.activate(:primary) do
# this should leave the runner that caused a failure to be in running or error state.
@account_report.account_report_runners.in_progress.update_all(workflow_state: "aborted")
@account_report.delete_account_report_rows
Canvas::Errors.capture_exception(:account_report, error) Canvas::Errors.capture_exception(:account_report, error)
@account_report.workflow_state = "error" @account_report.workflow_state = "error"
@account_report.save! @account_report.save!
@ -519,7 +516,7 @@ module AccountReports::ReportHelper
end end
def update_parallel_progress(account_report: @account_report, report_runner:) def update_parallel_progress(account_report: @account_report, report_runner:)
return if runner_aborted?(report_runner) return if runner_aborted?(report_runner) || report_runner.error?
report_runner.complete report_runner.complete
# let the regular report process update progress to 100 percent, cap at 99. # let the regular report process update progress to 100 percent, cap at 99.
@ -538,7 +535,7 @@ module AccountReports::ReportHelper
end end
def last_account_report_runner?(account_report) def last_account_report_runner?(account_report)
return false if account_report.account_report_runners.incomplete.exists? return false if account_report.account_report_runners.incomplete_or_failed.exists?
AccountReport.transaction do AccountReport.transaction do
@account_report.reload(lock: true) @account_report.reload(lock: true)

View File

@ -18,12 +18,38 @@
# with this program. If not, see <http://www.gnu.org/licenses/>. # with this program. If not, see <http://www.gnu.org/licenses/>.
# #
require_relative "report_spec_helper"
module AccountReports module AccountReports
class TestReport class TestReport
include ReportHelper include ReportHelper
def initialize(account_report) def initialize(account_report, items = [])
@account_report = account_report @account_report = account_report
@items = items
end
def test_report
create_report_runners(@items, @items.size, min: 1)
write_report_in_batches(["item"])
end
def test_report_runner(runner)
runner.batch_items.each_with_index do |item, i|
raise "fail" if item == "fail"
add_report_row(row: [item], row_number: i, report_runner: runner)
end
end
end
module Default
def self.test_report(account_report)
TestReport.new(account_report, account_report.parameters[:items]).test_report
end
def self.parallel_test_report(account_report, runner)
TestReport.new(account_report).test_report_runner(runner)
end end
end end
end end
@ -71,6 +97,31 @@ describe "report helper" do
expect(account_report.parameters["extra_text"]).to eq "Failed, the report failed to generate a file. Please try again." expect(account_report.parameters["extra_text"]).to eq "Failed, the report failed to generate a file. Please try again."
end end
describe "parallel run" do
include ReportSpecHelper
before :once do
AccountReports.configure_account_report "Default", {
"test_report" => {
title: -> { "Test Report" },
}
}
@account = Account.default
end
it "assembles rows from each runner" do
result = read_report("test_report", params: { items: (1..6).to_a }, order: "skip")
expect(result).to match_array([["1"], ["2"], ["3"], ["4"], ["5"], ["6"]])
end
it "handles errors appropriately" do
ar = run_report("test_report", params: { items: [1, 2, "fail", 4, 5, 6] })
expect(ar).to be_error
expect(ar.account_report_runners.group(:workflow_state).count).to eq("completed" => 2, "error" => 1, "aborted" => 3)
expect(ErrorReport.last.message).to eq "fail"
end
end
describe "load pseudonyms" do describe "load pseudonyms" do
before(:once) do before(:once) do
@user = user_with_pseudonym(active_all: true, account:, user:) @user = user_with_pseudonym(active_all: true, account:, user:)

View File

@ -43,4 +43,34 @@ describe AccountReport do
expect(AccountReportRow.where(id: a_row_2.id).count).to eq(0) expect(AccountReportRow.where(id: a_row_2.id).count).to eq(0)
end end
end end
describe "stopping parallelized reports" do
before :once do
@account = account_model
@report = AccountReport.create!(account: @account, user: account_admin_user, workflow_state: "running")
@runners = [@report.account_report_runners.create!(workflow_state: "completed"),
@report.account_report_runners.create!(workflow_state: "running"),
@report.account_report_runners.create!(workflow_state: "created")]
end
it "aborts runners when errored" do
@report.mark_as_errored
expect(@runners.map { |r| r.reload.workflow_state }).to eq %w[completed aborted aborted]
end
it "aborts runners when deleted" do
@report.destroy
expect(@runners.map { |r| r.reload.workflow_state }).to eq %w[completed aborted aborted]
end
it "aborts runners when aborted" do
@report.update! workflow_state: "aborted"
expect(@runners.map { |r| r.reload.workflow_state }).to eq %w[completed aborted aborted]
end
it "leaves runners alone when completed" do
@report.update! workflow_state: "completed"
expect(@runners.map { |r| r.reload.workflow_state }).to eq %w[completed running created]
end
end
end end