MSFT Sync: count retries per-step, and backoff
refs INTEROP-6638 flag=microsoft_group_enrollments_syncing Test plan: - Run a new sync (make sure a group doesn't exist for your course on the MS side, and use mygroup.syncer_job.run_later. NOTE: after deleting a group it takes a couple seconds for this change to settle so you may want to wait a few seconds after deleting so the new sync job doesn't find the old group) - While the job is sleeping/retrying, check the state and make sure retries_by_step is being set for whichever step is is retrying on - Edit GraphService#team_exists? and add "raise MicrosoftSync::Errors::GroupHasNoOwners" - Run another sync and make sure it is scheduling a retry for 30 seconds then 90, then 270. (You should be able to watch the job logs and see the run_at time for the job it schedules -- look for a blue INSERT block with 'INSERT INTO "public"."delayed_jobs"', 'object: !ruby/object:MicrosoftSync::StateMachineJob', and timestamps at the end) Change-Id: Ic7d7f69de9e24a140daf41c54a6409b0a58f5334 Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/262954 Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com> Reviewed-by: Wagner Goncalves <wagner.goncalves@instructure.com> QA-Review: Wagner Goncalves <wagner.goncalves@instructure.com> Product-Review: Evan Battaglia <ebattaglia@instructure.com>
This commit is contained in:
parent
a0624c197c
commit
8e2eb69e49
|
@ -186,9 +186,9 @@ module MicrosoftSync
|
||||||
memory_state = nil
|
memory_state = nil
|
||||||
|
|
||||||
loop do
|
loop do
|
||||||
|
# TODO: consider checking if group object is deleted before every step (INTEROP-6621)
|
||||||
|
log { "running step #{current_step}" }
|
||||||
begin
|
begin
|
||||||
# TODO: consider checking if group object is deleted before every step (INTEROP-6621)
|
|
||||||
log { "running step #{current_step}" }
|
|
||||||
result = steps_object.send(current_step.to_sym, memory_state, job_state_data)
|
result = steps_object.send(current_step.to_sym, memory_state, job_state_data)
|
||||||
rescue => e
|
rescue => e
|
||||||
update_state_record_to_errored_and_cleanup(e)
|
update_state_record_to_errored_and_cleanup(e)
|
||||||
|
@ -211,9 +211,7 @@ module MicrosoftSync
|
||||||
current_step, memory_state = result.next_step, result.memory_state
|
current_step, memory_state = result.next_step, result.memory_state
|
||||||
job_state_data = nil
|
job_state_data = nil
|
||||||
when Retry
|
when Retry
|
||||||
if update_state_record_to_retrying(result, current_step)
|
handle_retry(result, current_step, synchronous)
|
||||||
run_with_delay(current_step, result.delay_amount, synchronous: synchronous)
|
|
||||||
end
|
|
||||||
return
|
return
|
||||||
else
|
else
|
||||||
raise InternalError, "Step returned #{result.inspect}, expected COMPLETE/NextStep/Retry"
|
raise InternalError, "Step returned #{result.inspect}, expected COMPLETE/NextStep/Retry"
|
||||||
|
@ -236,7 +234,7 @@ module MicrosoftSync
|
||||||
return
|
return
|
||||||
end
|
end
|
||||||
|
|
||||||
self.delay(strand: strand, run_at: delay_amount&.from_now).run(step)
|
self.delay(strand: strand, run_at: delay_amount&.seconds&.from_now).run(step)
|
||||||
end
|
end
|
||||||
|
|
||||||
def update_state_record_to_errored_and_cleanup(error)
|
def update_state_record_to_errored_and_cleanup(error)
|
||||||
|
@ -247,13 +245,16 @@ module MicrosoftSync
|
||||||
steps_object.after_failure
|
steps_object.after_failure
|
||||||
end
|
end
|
||||||
|
|
||||||
# Returns false if workflow_state has since been set to deleted (so we
|
|
||||||
# should stop)
|
|
||||||
# Raises the error if we have passed the retry limit
|
# Raises the error if we have passed the retry limit
|
||||||
# Otherwise sets the job_state to keep track of (step, data, retries)
|
# Does nothing if workflow_state has since been set to deleted
|
||||||
def update_state_record_to_retrying(retry_object, current_step)
|
# Otherwise sets the job_state to keep track of (step, data, retries) and
|
||||||
|
# kicks off a retry
|
||||||
|
def handle_retry(retry_object, current_step, synchronous)
|
||||||
job_state = job_state_record.reload.job_state
|
job_state = job_state_record.reload.job_state
|
||||||
retries = job_state&.dig(:retries) || 0
|
|
||||||
|
retries_by_step = job_state&.dig(:retries_by_step) || {}
|
||||||
|
retries = retries_by_step[current_step.to_s] || 0
|
||||||
|
|
||||||
if retries >= steps_object.max_retries
|
if retries >= steps_object.max_retries
|
||||||
update_state_record_to_errored_and_cleanup(retry_object.error)
|
update_state_record_to_errored_and_cleanup(retry_object.error)
|
||||||
raise retry_object.error
|
raise retry_object.error
|
||||||
|
@ -265,14 +266,20 @@ module MicrosoftSync
|
||||||
step: current_step,
|
step: current_step,
|
||||||
updated_at: Time.zone.now,
|
updated_at: Time.zone.now,
|
||||||
data: retry_object.job_state_data,
|
data: retry_object.job_state_data,
|
||||||
retries: retries + 1,
|
retries_by_step: retries_by_step.merge(current_step.to_s => retries + 1),
|
||||||
# for debugging only:
|
# for debugging only:
|
||||||
retried_on_error: "#{retry_object.error.class}: #{retry_object.error.message}",
|
retried_on_error: "#{retry_object.error.class}: #{retry_object.error.message}",
|
||||||
}
|
}
|
||||||
|
|
||||||
job_state_record&.update_unless_deleted(
|
return unless job_state_record&.update_unless_deleted(
|
||||||
workflow_state: :retrying, job_state: new_job_state
|
workflow_state: :retrying, job_state: new_job_state
|
||||||
)
|
)
|
||||||
|
|
||||||
|
delay_amount = retry_object.delay_amount
|
||||||
|
delay_amount = delay_amount[retries] || delay_amount.last if delay_amount.is_a?(Array)
|
||||||
|
log { "handle_retry #{current_step} - #{delay_amount}" }
|
||||||
|
|
||||||
|
run_with_delay(current_step, delay_amount, synchronous: synchronous)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
@ -47,7 +47,7 @@ module MicrosoftSync
|
||||||
end
|
end
|
||||||
|
|
||||||
def max_retries
|
def max_retries
|
||||||
4
|
3
|
||||||
end
|
end
|
||||||
|
|
||||||
def restart_job_after_inactivity
|
def restart_job_after_inactivity
|
||||||
|
@ -106,7 +106,7 @@ module MicrosoftSync
|
||||||
group.update! ms_group_id: group_id
|
group.update! ms_group_id: group_id
|
||||||
StateMachineJob::NextStep.new(:step_ensure_enrollments_user_mappings_filled)
|
StateMachineJob::NextStep.new(:step_ensure_enrollments_user_mappings_filled)
|
||||||
rescue Errors::HTTPNotFound => e
|
rescue Errors::HTTPNotFound => e
|
||||||
StateMachineJob::Retry.new(error: e, delay_amount: 45.seconds, job_state_data: group_id)
|
StateMachineJob::Retry.new(error: e, delay_amount: [5, 20, 100], job_state_data: group_id)
|
||||||
end
|
end
|
||||||
|
|
||||||
# Gets users enrolled in course, get UPNs (e.g. email addresses) for them,
|
# Gets users enrolled in course, get UPNs (e.g. email addresses) for them,
|
||||||
|
@ -128,11 +128,12 @@ module MicrosoftSync
|
||||||
end
|
end
|
||||||
|
|
||||||
StateMachineJob::NextStep.new(:step_generate_diff)
|
StateMachineJob::NextStep.new(:step_generate_diff)
|
||||||
|
rescue Errors::HTTPNotFound => e
|
||||||
|
StateMachineJob::Retry.new(error: e, delay_amount: [5, 20, 100])
|
||||||
end
|
end
|
||||||
|
|
||||||
# Get group members/owners from the API and local enrollments and calculate
|
# Get group members/owners from the API and local enrollments and calculate
|
||||||
# what needs to be done. This could also be combined with execute_diff()
|
# what needs to be done.
|
||||||
# but is conceptually different and makes testing easier.
|
|
||||||
def step_generate_diff(_mem_data, _job_state_data)
|
def step_generate_diff(_mem_data, _job_state_data)
|
||||||
members = graph_service_helpers.get_group_users_aad_ids(group.ms_group_id)
|
members = graph_service_helpers.get_group_users_aad_ids(group.ms_group_id)
|
||||||
owners = graph_service_helpers.get_group_users_aad_ids(group.ms_group_id, owners: true)
|
owners = graph_service_helpers.get_group_users_aad_ids(group.ms_group_id, owners: true)
|
||||||
|
@ -143,6 +144,8 @@ module MicrosoftSync
|
||||||
end
|
end
|
||||||
|
|
||||||
StateMachineJob::NextStep.new(:step_execute_diff, diff)
|
StateMachineJob::NextStep.new(:step_execute_diff, diff)
|
||||||
|
rescue Errors::HTTPNotFound => e
|
||||||
|
StateMachineJob::Retry.new(error: e, delay_amount: [5, 20, 100])
|
||||||
end
|
end
|
||||||
|
|
||||||
# Run the API calls to add/remove users.
|
# Run the API calls to add/remove users.
|
||||||
|
@ -183,7 +186,7 @@ module MicrosoftSync
|
||||||
# enrollments are in the DB) since we last calculated the diff added them
|
# enrollments are in the DB) since we last calculated the diff added them
|
||||||
# in the generate_diff step. This is rare, but we can also sleep in that
|
# in the generate_diff step. This is rare, but we can also sleep in that
|
||||||
# case.
|
# case.
|
||||||
StateMachineJob::Retry.new(error: e, delay_amount: 1.minute)
|
StateMachineJob::Retry.new(error: e, delay_amount: [30, 90, 270])
|
||||||
end
|
end
|
||||||
|
|
||||||
def tenant
|
def tenant
|
||||||
|
|
|
@ -34,7 +34,7 @@ module MicrosoftSync
|
||||||
end
|
end
|
||||||
|
|
||||||
def max_retries
|
def max_retries
|
||||||
3
|
4
|
||||||
end
|
end
|
||||||
|
|
||||||
def restart_job_after_inactivity
|
def restart_job_after_inactivity
|
||||||
|
@ -87,8 +87,20 @@ module MicrosoftSync
|
||||||
end
|
end
|
||||||
|
|
||||||
class StateMachineJobTestSteps2 < StateMachineJobTestStepsBase
|
class StateMachineJobTestSteps2 < StateMachineJobTestStepsBase
|
||||||
|
def initialize(step_first_retries)
|
||||||
|
@step_first_retries = step_first_retries
|
||||||
|
end
|
||||||
|
|
||||||
def step_first(_mem_data, _job_state_data)
|
def step_first(_mem_data, _job_state_data)
|
||||||
StateMachineJob::Retry.new(error: Errors::PublicError.new('foo')) { steps_run << :stash }
|
if (@step_first_retries -= 1) >= 0
|
||||||
|
StateMachineJob::Retry.new(error: Errors::PublicError.new('foo')) { steps_run << :stash }
|
||||||
|
else
|
||||||
|
StateMachineJob::NextStep.new(:step_second)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def step_second(_mem_data, _job_state_data)
|
||||||
|
StateMachineJob::Retry.new(error: Errors::PublicError.new('foo'), delay_amount: [1,2,3])
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -188,22 +200,21 @@ module MicrosoftSync
|
||||||
end
|
end
|
||||||
|
|
||||||
describe 'retry counting' do
|
describe 'retry counting' do
|
||||||
let(:steps_object) { StateMachineJobTestSteps2.new }
|
let(:steps_object) { StateMachineJobTestSteps2.new(5) }
|
||||||
|
|
||||||
it 'counts retries and stores in job_state' do
|
it 'counts retries for each step and stores in job_state' do
|
||||||
subject.send(:run, nil)
|
subject.send(:run, nil)
|
||||||
expect(state_record.reload.job_state[:retries]).to eq(1)
|
expect(state_record.reload.job_state[:retries_by_step]['step_first']).to eq(1)
|
||||||
subject.send(:run, :step_first)
|
subject.send(:run, :step_first)
|
||||||
expect(state_record.reload.job_state[:retries]).to eq(2)
|
expect(state_record.reload.job_state[:retries_by_step]['step_first']).to eq(2)
|
||||||
subject.send(:run, :step_first)
|
subject.send(:run, :step_first)
|
||||||
expect(state_record.reload.job_state[:retries]).to eq(3)
|
expect(state_record.reload.job_state[:retries_by_step]['step_first']).to eq(3)
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'when the number of retries is exceeded' do
|
context 'when the number of retries for a step is exceeded' do
|
||||||
before do
|
before do
|
||||||
subject.send(:run, nil)
|
subject.send(:run, nil)
|
||||||
subject.send(:run, :step_first)
|
3.times { subject.send(:run, :step_first) }
|
||||||
subject.send(:run, :step_first)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
it 're-raises the error and sets the record state to errored' do
|
it 're-raises the error and sets the record state to errored' do
|
||||||
|
@ -216,11 +227,44 @@ module MicrosoftSync
|
||||||
|
|
||||||
it "doesn't run the stash block on the last failure" do
|
it "doesn't run the stash block on the last failure" do
|
||||||
expect { subject.send(:run, :step_first) }.to raise_error(Errors::PublicError, 'foo')
|
expect { subject.send(:run, :step_first) }.to raise_error(Errors::PublicError, 'foo')
|
||||||
expect(steps_object.steps_run.count(:stash)).to eq(3)
|
expect(steps_object.steps_run.count(:stash)).to eq(4)
|
||||||
steps_object.steps_run.clear
|
steps_object.steps_run.clear
|
||||||
expect(steps_object.steps_run).to be_empty
|
expect(steps_object.steps_run).to be_empty
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context 'when multiple steps fail' do
|
||||||
|
let(:steps_object) { StateMachineJobTestSteps2.new(2) }
|
||||||
|
|
||||||
|
before do
|
||||||
|
subject.send(:run, nil)
|
||||||
|
2.times { subject.send(:run, :step_first) }
|
||||||
|
3.times { subject.send(:run, :step_second) }
|
||||||
|
end
|
||||||
|
|
||||||
|
it 'counts retries per-step' do
|
||||||
|
expect { subject.send(:run, :step_second) }.to raise_error(Errors::PublicError, 'foo')
|
||||||
|
expect(state_record.reload.job_state).to eq(nil)
|
||||||
|
expect(state_record.workflow_state).to eq('errored')
|
||||||
|
expect(state_record.last_error).to \
|
||||||
|
eq(Errors.user_facing_message(Errors::PublicError.new('foo')))
|
||||||
|
end
|
||||||
|
|
||||||
|
context 'when delay is an array of integers' do
|
||||||
|
it 'uses delays based on the per-step retry count' do
|
||||||
|
delays = steps_object.steps_run.select{|step| step.is_a?(Array)}
|
||||||
|
expect(delays).to eq([
|
||||||
|
[:delay_run, [{run_at: nil, strand: strand}], [:step_first]],
|
||||||
|
[:delay_run, [{run_at: nil, strand: strand}], [:step_first]],
|
||||||
|
[:delay_run, [{run_at: 1.second.from_now, strand: strand}], [:step_second]],
|
||||||
|
[:delay_run, [{run_at: 2.seconds.from_now, strand: strand}], [:step_second]],
|
||||||
|
[:delay_run, [{run_at: 3.seconds.from_now, strand: strand}], [:step_second]],
|
||||||
|
# Uses last value once past end of array:
|
||||||
|
[:delay_run, [{run_at: 3.seconds.from_now, strand: strand}], [:step_second]],
|
||||||
|
])
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'when an unhandled error occurs' do
|
context 'when an unhandled error occurs' do
|
||||||
|
@ -333,14 +377,14 @@ module MicrosoftSync
|
||||||
it 'restarts the job (in-progress job has stalled)' do
|
it 'restarts the job (in-progress job has stalled)' do
|
||||||
Timecop.travel((6.hours + 1.second).from_now)
|
Timecop.travel((6.hours + 1.second).from_now)
|
||||||
expect(state_record.job_state[:step]).to eq(:step_first)
|
expect(state_record.job_state[:step]).to eq(:step_first)
|
||||||
expect(state_record.job_state[:retries]).to eq(2)
|
expect(state_record.job_state[:retries_by_step]['step_first']).to eq(2)
|
||||||
expect(steps_object).to receive(:step_first) do
|
expect(steps_object).to receive(:step_first) do
|
||||||
expect(state_record.job_state).to eq(nil)
|
expect(state_record.job_state).to eq(nil)
|
||||||
expect(state_record.workflow_state).to eq('running')
|
expect(state_record.workflow_state).to eq('running')
|
||||||
described_class::Retry.new(error: StandardError.new)
|
described_class::Retry.new(error: StandardError.new)
|
||||||
end
|
end
|
||||||
expect { subject.send(:run, nil) }.to change{ state_record.job_state[:updated_at] }
|
expect { subject.send(:run, nil) }.to change{ state_record.job_state[:updated_at] }
|
||||||
expect(state_record.job_state[:retries]).to eq(1)
|
expect(state_record.job_state[:retries_by_step]['step_first']).to eq(1)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
@ -41,7 +41,10 @@ describe MicrosoftSync::SyncerSteps do
|
||||||
expect(result).to be_a(MicrosoftSync::StateMachineJob::Retry)
|
expect(result).to be_a(MicrosoftSync::StateMachineJob::Retry)
|
||||||
expect(result.error.class).to eq(error_class)
|
expect(result.error.class).to eq(error_class)
|
||||||
expect(result.delay_amount).to eq(delay_amount)
|
expect(result.delay_amount).to eq(delay_amount)
|
||||||
expect(result.delay_amount.to_i).to be < syncer_steps.restart_job_after_inactivity.to_i
|
# Check that we haven't specified any delays to big for our restart_job_after_inactivity
|
||||||
|
[result.delay_amount].flatten.each do |delay|
|
||||||
|
expect(delay.to_i).to be < syncer_steps.restart_job_after_inactivity.to_i
|
||||||
|
end
|
||||||
expect(result.job_state_data).to eq(job_state_data)
|
expect(result.job_state_data).to eq(job_state_data)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
@ -75,7 +78,7 @@ describe MicrosoftSync::SyncerSteps do
|
||||||
describe '#max_retries' do
|
describe '#max_retries' do
|
||||||
subject { syncer_steps.max_retries }
|
subject { syncer_steps.max_retries }
|
||||||
|
|
||||||
it { is_expected.to eq(4) }
|
it { is_expected.to eq(3) }
|
||||||
end
|
end
|
||||||
|
|
||||||
describe '#after_failure' do
|
describe '#after_failure' do
|
||||||
|
@ -206,7 +209,7 @@ describe MicrosoftSync::SyncerSteps do
|
||||||
expect { subject }.to_not change { group.reload.ms_group_id }
|
expect { subject }.to_not change { group.reload.ms_group_id }
|
||||||
expect_retry(
|
expect_retry(
|
||||||
subject, error_class: MicrosoftSync::Errors::HTTPNotFound,
|
subject, error_class: MicrosoftSync::Errors::HTTPNotFound,
|
||||||
delay_amount: 45.seconds, job_state_data: 'newid'
|
delay_amount: [5, 20, 100], job_state_data: 'newid'
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
@ -314,6 +317,16 @@ describe MicrosoftSync::SyncerSteps do
|
||||||
expect(upns_looked_up).to include("student1@example.com")
|
expect(upns_looked_up).to include("student1@example.com")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context 'on 404' do
|
||||||
|
it 'retries with a delay' do
|
||||||
|
expect(graph_service_helpers).to receive(:users_upns_to_aads).and_raise(new_http_error(404))
|
||||||
|
expect_retry(
|
||||||
|
subject, error_class: MicrosoftSync::Errors::HTTPNotFound,
|
||||||
|
delay_amount: [5, 20, 100]
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe '#step_generate_diff' do
|
describe '#step_generate_diff' do
|
||||||
|
@ -344,6 +357,17 @@ describe MicrosoftSync::SyncerSteps do
|
||||||
expect_next_step(subject, :step_execute_diff, diff)
|
expect_next_step(subject, :step_execute_diff, diff)
|
||||||
expect(members_and_enrollment_types).to eq([['0', 'TeacherEnrollment']])
|
expect(members_and_enrollment_types).to eq([['0', 'TeacherEnrollment']])
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context 'on 404' do
|
||||||
|
it 'retries with a delay' do
|
||||||
|
expect(graph_service_helpers).to receive(:get_group_users_aad_ids)
|
||||||
|
.and_raise(new_http_error(404))
|
||||||
|
expect_retry(
|
||||||
|
subject, error_class: MicrosoftSync::Errors::HTTPNotFound,
|
||||||
|
delay_amount: [5, 20, 100]
|
||||||
|
)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
describe '#execute_diff' do
|
describe '#execute_diff' do
|
||||||
|
@ -409,25 +433,25 @@ describe MicrosoftSync::SyncerSteps do
|
||||||
end
|
end
|
||||||
|
|
||||||
context 'when the Microsoft API errors with "group has no owners"' do
|
context 'when the Microsoft API errors with "group has no owners"' do
|
||||||
it "retries in 1 minute" do
|
it "retries in (30, 90, 270 seconds)" do
|
||||||
expect(graph_service).to receive(:create_education_class_team).with('mygroupid').
|
expect(graph_service).to receive(:create_education_class_team).with('mygroupid').
|
||||||
and_raise(MicrosoftSync::Errors::GroupHasNoOwners)
|
and_raise(MicrosoftSync::Errors::GroupHasNoOwners)
|
||||||
expect_retry(
|
expect_retry(
|
||||||
subject,
|
subject,
|
||||||
error_class: MicrosoftSync::Errors::GroupHasNoOwners,
|
error_class: MicrosoftSync::Errors::GroupHasNoOwners,
|
||||||
delay_amount: 1.minute
|
delay_amount: [30, 90, 270]
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
context "when the Microsoft API errors with a 404 (e.g., group doesn't exist)" do
|
context "when the Microsoft API errors with a 404 (e.g., group doesn't exist)" do
|
||||||
it "retries in 1 minute" do
|
it "retries in (30, 90, 270 seconds)" do
|
||||||
expect(graph_service).to \
|
expect(graph_service).to \
|
||||||
receive(:create_education_class_team).with('mygroupid').and_raise(new_http_error(404))
|
receive(:create_education_class_team).with('mygroupid').and_raise(new_http_error(404))
|
||||||
expect_retry(
|
expect_retry(
|
||||||
subject,
|
subject,
|
||||||
error_class: MicrosoftSync::Errors::HTTPNotFound,
|
error_class: MicrosoftSync::Errors::HTTPNotFound,
|
||||||
delay_amount: 1.minute
|
delay_amount: [30, 90, 270]
|
||||||
)
|
)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
Loading…
Reference in New Issue