canvas-lms/lib/microsoft_sync/syncer_steps.rb

460 lines
19 KiB
Ruby

# frozen_string_literal: true
#
# Copyright (C) 2021 - present Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
#
# Code which syncs course enrollments to Microsoft groups/teams. This is a good
# starting place to understand the syncing process.
# See also MicrosoftSync::Group model.
#
# This ideally shouldn't contain much job plumbing (retry logic, strands, state
# storage, etc.), but focus on the business logic about what to do in each step
# of a sync. For job plumbing, see StateMachineJob. This should normally be
# used by creating a StateMachineJob with this as the steps_object; see
# MicrosoftSync::Group#syncer_job
# group.syncer_job.run_later
# group.syncer_job.run_synchronously # e.g. manually in a console
#
module MicrosoftSync
class SyncerSteps
# Database batch size for users without AAD ids. Should be an even multiple of
# GraphServiceHelpers::USERS_ULUVS_TO_AADS_BATCH_SIZE:
ENROLLMENTS_ULUV_FETCHING_BATCH_SIZE = 750
# Delays for intermittent errors and to allow Microsoft's
# eventually-consistent API time to settle:
STANDARD_RETRY_DELAY = [15, 60, 300].freeze
DELAY_BEFORE_UPDATE_GROUP = 8.seconds
DELAY_BEFORE_CREATE_TEAM = 24.seconds
# The more changes, the more likely it is that some of the changes will
# be duplicative and cause retries when adding/removing, which makes
# partial sync less efficient, and it makes sense to just do a full sync.
# This number is fairly arbitrary but it should be small enough that
# we can get all the enrollments from the DB in one fetch. Also, given a
# max course size of 25k, this means that in the worst case, triggering a
# full sync would be (25k/20) extra requests to get the current user list,
# or about 3 extra requests per user added... not too bad.
MAX_PARTIAL_SYNC_CHANGES = 500
STATSD_NAME = 'microsoft_sync.syncer_steps'
STATSD_NAME_SKIPPED_BATCHES = "#{STATSD_NAME}.skipped_batches"
STATSD_NAME_SKIPPED_TOTAL = "#{STATSD_NAME}.skipped_total"
# Can happen when User disables sync on account-level when jobs are running:
class TenantMissingOrSyncDisabled < Errors::GracefulCancelError
def self.public_message
I18n.t 'Tenant missing or sync disabled. ' \
'Check the Microsoft sync integration settings for the course and account.'
end
end
class MultipleEducationClasses < Errors::GracefulCancelError
def self.public_message
I18n.t 'Multiple Microsoft education classes already exist for the course.'
end
end
class MaxMemberEnrollmentsReached < Errors::GracefulCancelError
def self.public_message
I18n.t 'Microsoft 365 allows a maximum of %{max} members in a team.'
end
def public_interpolated_values
{ max: MicrosoftSync::MembershipDiff::MAX_ENROLLMENT_MEMBERS }
end
end
class MaxOwnerEnrollmentsReached < Errors::GracefulCancelError
def self.public_message
I18n.t 'Microsoft 365 allows a maximum of %{max} owners in a team.'
end
def public_interpolated_values
{ max: MicrosoftSync::MembershipDiff::MAX_ENROLLMENT_OWNERS }
end
end
attr_reader :group
delegate :course, to: :group
def initialize(group)
@group = group
end
def max_retries
3
end
def max_delay
6.hours
end
def after_failure
# We can clean up here e.g. (MicrosoftSync::GroupMember.delete_all)
# when we have retry in getting owners & executing diff
end
def after_complete
group.update!(last_synced_at: Time.zone.now)
end
def retry_object_for_error(e, **extra_args)
delay_amount = e.retry_after_seconds if e.is_a?(Errors::Throttled)
delay_amount ||= STANDARD_RETRY_DELAY
StateMachineJob::Retry.new(error: e, delay_amount: delay_amount, **extra_args)
end
def step_initial(job_type, _job_state_data)
StateMachineJob::NextStep.new(
job_type.to_s == 'partial' ? :step_partial_sync : :step_full_sync_prerequisites
)
end
# Checks if there are too many enrollments to sync, and if, so aborts.
# Also deletes PartialSyncChanges that are unnecessary now that we will
# be processing all enrollments. (Deletes only records that have been
# replicated to the secondary. We read Enrollments from the secondary so we
# don't want to delete PartialSyncChanges corresponding to enrollments not
# yet replicated.)
def step_full_sync_prerequisites(_mem_data, _job_state_data)
if CanvasModelsHelpers.max_enrollment_members_reached?(course)
raise_and_disable_group(MaxMemberEnrollmentsReached)
elsif CanvasModelsHelpers.max_enrollment_owners_reached?(course)
raise_and_disable_group(MaxOwnerEnrollmentsReached)
end
PartialSyncChange.delete_all_replicated_to_secondary_for_course(course.id)
StateMachineJob::NextStep.new(:step_ensure_class_group_exists)
end
# Second step of a full sync. Create group on the Microsoft side.
def step_ensure_class_group_exists(_mem_data, _job_state_data)
remote_ids = graph_service_helpers.list_education_classes_for_course(course).map { |c| c['id'] }
# If we've created the group previously, we're good to go
if group.ms_group_id && remote_ids == [group.ms_group_id]
return StateMachineJob::NextStep.new(:step_ensure_enrollments_user_mappings_filled)
end
raise MultipleEducationClasses if remote_ids.length > 1
# Create a group if needed. If there is already a group but we do not
# have it in the Group record, use it but first update it with course
# data in case it was never done.
new_group_id = remote_ids.first
unless new_group_id
new_group_id = graph_service_helpers.create_education_class(course)['id']
end
StateMachineJob::DelayedNextStep.new(
:step_update_group_with_course_data, DELAY_BEFORE_UPDATE_GROUP, new_group_id
)
rescue *Errors::INTERMITTENT => e
retry_object_for_error(e)
end
def step_update_group_with_course_data(_mem_state, group_id)
graph_service_helpers.update_group_with_course_data(group_id, course)
group.update! ms_group_id: group_id
StateMachineJob::NextStep.new(:step_ensure_enrollments_user_mappings_filled)
rescue *Errors::INTERMITTENT_AND_NOTFOUND => e
retry_object_for_error(e, job_state_data: group_id)
end
# Gets users enrolled in course, get ULUVs (user lookup values, e.g.
# addresses, username) for them, looks up the AADs (Azure Active Directory
# object IDs -- Microsoft's internal ID for the user) from Microsoft, and
# writes the User->AAD mapping into the UserMapping table. If a user
# doesn't have whatever we use to bulid the ULUV (e.g. email or SIS id, as
# specified by the microsoft_sync_login_attribute Account setting), or
# Microsoft doesn't have a user for the calculated ULUV, skips that user.
def step_ensure_enrollments_user_mappings_filled(_mem_data, _job_state_data)
MicrosoftSync::UserMapping.find_enrolled_user_ids_without_mappings(
course: course, batch_size: ENROLLMENTS_ULUV_FETCHING_BATCH_SIZE
) do |user_ids|
ensure_user_mappings(user_ids)
end
StateMachineJob::NextStep.new(:step_generate_diff)
rescue *Errors::INTERMITTENT_AND_NOTFOUND => e
retry_object_for_error(e)
end
def ensure_user_mappings(user_ids)
users_uluvs_finder = MicrosoftSync::UsersUluvsFinder.new(user_ids, group.root_account)
users_and_uluvs = users_uluvs_finder.call
remote_attr = account_settings[:microsoft_sync_remote_attribute]
# If some users in different slices have the same ULUVs, this could end up
# looking up the same ULUV multiple times; but this should be very rare
users_and_uluvs.each_slice(GraphServiceHelpers::USERS_ULUVS_TO_AADS_BATCH_SIZE) do |slice|
uluv_to_aad = graph_service_helpers.users_uluvs_to_aads(remote_attr, slice.map(&:last))
user_id_to_aad = slice.map { |user_id, uluv| [user_id, uluv_to_aad[uluv]] }.to_h.compact
# NOTE: root_account here must be the same (values loaded into memory at the same time)
# as passed into UsersUluvsFinder AND as used in #tenant, for the "have settings changed?"
# check to work. For example, using course.root_account here would NOT be correct.
UserMapping.bulk_insert_for_root_account(group.root_account, user_id_to_aad)
end
end
# Get group members/owners from the API and local enrollments and calculate
# what needs to be done.
def step_generate_diff(_mem_data, _job_state_data)
members = graph_service_helpers.get_group_users_aad_ids(group.ms_group_id)
owners = graph_service_helpers.get_group_users_aad_ids(group.ms_group_id, owners: true)
diff = MembershipDiff.new(members, owners)
UserMapping.enrollments_and_aads(course).find_each do |enrollment|
diff.set_local_member(enrollment.aad_id, enrollment.type)
end
StateMachineJob::NextStep.new(:step_execute_diff, diff)
rescue *Errors::INTERMITTENT_AND_NOTFOUND => e
retry_object_for_error(e)
end
def log_batch_skipped(type, users)
return unless users # GraphService batch functions return nil if all succesful
n_total = users.values.map(&:length).sum
Rails.logger.warn("#{self.class.name} (#{group.global_id}): " \
"Skipping redundant #{type} for #{n_total}: #{users.to_json}")
InstStatsd::Statsd.increment("#{STATSD_NAME_SKIPPED_BATCHES}.#{type}",
tags: { sync_type: sync_type })
InstStatsd::Statsd.count("#{STATSD_NAME_SKIPPED_TOTAL}.#{type}", n_total,
tags: { sync_type: sync_type })
end
# Run the API calls to add/remove users.
def step_execute_diff(diff, _job_state_data)
# TODO: If there are no instructor enrollments, we actually want to
# remove the group on the Microsoft side (INTEROP-6672)
raise Errors::MissingOwners if diff.local_owners.empty?
raise_and_disable_group(MaxMemberEnrollmentsReached) if diff.max_enrollment_members_reached?
raise_and_disable_group(MaxOwnerEnrollmentsReached) if diff.max_enrollment_owners_reached?
execute_diff(diff)
StateMachineJob::NextStep.new(:step_check_team_exists)
rescue *Errors::INTERMITTENT_AND_NOTFOUND => e
retry_object_for_error(e, step: :step_generate_diff)
end
def raise_and_disable_group(error_class)
err = error_class.new
# Need to manually update last_error; StateMachineJob won't do it since the group
# will be in a 'deleted' state
group.update last_error: MicrosoftSync::Errors.serialize(err)
group.destroy
raise err
end
# Execute a MembershipDiff or PartialMembershipDiff -- add and remove
# users in batches
def execute_diff(diff)
execute_diff_remove_users(diff)
execute_diff_add_users(diff)
rescue Errors::MissingOwners
# If the group is close to the max number of users, we might need to
# remove users first to make room for new users.
# e.g.: group has 25000 users, course has 100 removed but 1 added. Need
# to remove at least 1 user before we can add 1.
#
# However, Microsoft will not let you remove the last owner in a group.
# So if a course has 1 owner and it is swapped out for a different owner,
# we should add the new one first. This is a rare scenario and because
# the Microsoft API is eventually consistent, we'd have to wait a bit to
# remove the old owner. So just add the new owners, raise the error and
# have them manually re-sync.
execute_diff_add_users(diff)
raise
end
def execute_diff_add_users(diff)
diff.additions_in_slices_of(GraphService::GROUP_USERS_BATCH_SIZE) do |members_and_owners|
skipped = graph_service.add_users_to_group_ignore_duplicates(
group.ms_group_id, **members_and_owners
)
log_batch_skipped(:add, skipped)
end
rescue Errors::MembersQuotaExceeded
raise_and_disable_group(MaxMemberEnrollmentsReached)
rescue Errors::OwnersQuotaExceeded
raise_and_disable_group(MaxOwnerEnrollmentsReached)
end
def execute_diff_remove_users(diff)
diff.removals_in_slices_of(GraphService::GROUP_USERS_BATCH_SIZE) do |members_and_owners|
skipped = graph_service.remove_group_users_ignore_missing(
group.ms_group_id, **members_and_owners
)
log_batch_skipped(:remove, skipped)
end
end
def step_check_team_exists(_mem_data, _job_state_data)
if course.enrollments.where(type: MembershipDiff::OWNER_ENROLLMENT_TYPES).any? \
&& !graph_service.team_exists?(group.ms_group_id)
StateMachineJob::DelayedNextStep.new(:step_create_team, DELAY_BEFORE_CREATE_TEAM)
else
StateMachineJob::COMPLETE
end
rescue *Errors::INTERMITTENT => e
retry_object_for_error(e)
end
def step_create_team(_mem_data, _job_state_data)
graph_service.create_education_class_team(group.ms_group_id)
StateMachineJob::COMPLETE
rescue MicrosoftSync::Errors::TeamAlreadyExists
StateMachineJob::COMPLETE
rescue MicrosoftSync::Errors::GroupHasNoOwners, MicrosoftSync::Errors::HTTPNotFound => e
# API is eventually consistent: We often have to wait a couple minutes
# after creating the group and adding owners for the Teams API to see the
# group and owners.
# It's also possible for the course to have added owners (so the
# enrollments are in the DB) since we last calculated the diff and added them
# in the generate_diff step. This is rare, but we can also sleep in that
# case. We'll eventually fail but the team will be created next time we sync.
StateMachineJob::Retry.new(error: e, delay_amount: [30, 90, 270])
rescue *Errors::INTERMITTENT => e
retry_object_for_error(e)
end
# Returns array of arrays of [user_id, change_enrollment_type, updated_at]
# [[234, 'member', timestamp1], [456, 'member', timestamp2], [456, 'owner', timestamp3]]
def load_partial_sync_changes
PartialSyncChange.where(course: course).limit(MAX_PARTIAL_SYNC_CHANGES + 1)
.pluck(:user_id, :enrollment_type, :updated_at)
end
def step_partial_sync(_mem_state, _job_state)
# Step 1. Kick off a full sync if we haven't created a group yet, or if
# there are too many changes to effectively handle here.
if group.ms_group_id.nil? ||
(changes = load_partial_sync_changes).length > MAX_PARTIAL_SYNC_CHANGES
InstStatsd::Statsd.increment("#{STATSD_NAME}.partial_into_full")
return StateMachineJob::NextStep.new(:step_full_sync_prerequisites)
end
return StateMachineJob::COMPLETE if changes.empty?
# Set sync_type before graph_service used (created) but after we may switch to full sync:
self.sync_type = 'partial'
# Step 2. ensure users have aad object ids:
# changes_by_user_id is a hash from user_id ->
# array of arrays of [user_id, change_enrollment_type, updated_at]:
changes_by_user_id = changes.group_by(&:first)
user_ids = changes_by_user_id.keys
ensure_user_mappings(UserMapping.user_ids_without_mappings(user_ids, course.root_account_id))
# Step 3. Generate PartialMembership diff by combining three mappings:
# user_id -> change type from PartialSyncChanges
# user_id <-> aad id from UserMappings (UserMapping.enrollments_aad_aads)
# user_id -> enrollment types from Enrollments (UserMapping.enrollments_aad_aads)
# We need to look at all Enrollments for each user, not just those of the change type
# because the user could be removed as an owner but not as a member, etc.
# e.g., { 123 => ['member', 'owner'], 1230000000000099 => ['owner']}
users_to_msft_role_types = changes_by_user_id.transform_values { |chgs| chgs.map(&:second) }
diff = PartialMembershipDiff.new(users_to_msft_role_types)
mappings = UserMapping.where(root_account_id: course.root_account_id, user_id: user_ids)
.pluck(:user_id, :aad_id)
mappings.each { |user_id, aad_id| diff.set_member_mapping(user_id, aad_id) }
users_with_mappings = mappings.map(&:first)
enrollments = Enrollment.microsoft_sync_relevant
.where(course: course, user_id: users_with_mappings)
.pluck(:user_id, :type)
enrollments.each { |user_id, enrollment_type| diff.set_local_member(user_id, enrollment_type) }
# Step 4. Execute diff and delete changes. Match additionally on
# updated_at to avoid deleting any changes that have updated since we
# started (and will get executed on the next run)
diff.log_all_actions
execute_diff(diff)
PartialSyncChange
.where(course: course)
.with_values_in(%w[user_id enrollment_type updated_at], changes)
.delete_all
StateMachineJob::COMPLETE
rescue Errors::Throttled => e
# If throttled, it's likely due to adds/removes. the fact we didn't
# delete changes will mean retrying will cause us to fail more and probably
# get throttled more. It will save quota to do a full sync, even if it means
# incurring more read quota from getting the list of users in a group
# (generally, cheaper).
full_sync_after = e.retry_after_seconds || STANDARD_RETRY_DELAY
Rails.logger.info 'MicrosoftSync::SyncerSteps: partial sync throttled, ' \
"full sync in #{full_sync_after}"
InstStatsd::Statsd.increment("#{STATSD_NAME}.partial_into_full_throttled")
StateMachineJob::DelayedNextStep.new(:step_full_sync_prerequisites, full_sync_after)
rescue *Errors::INTERMITTENT_AND_NOTFOUND => e
retry_object_for_error(e)
end
# Only serialize Group (AR model, so really just Group id) when enqueueing
# a job. The rest of the instance variables should be reloaded when the job
# starts again.
def encode_with(coder)
coder['group'] = @group
end
private
attr_writer :sync_type
def sync_type
@sync_type || 'full'
end
def tenant
@tenant ||=
begin
enabled = account_settings[:microsoft_sync_enabled]
tenant = account_settings[:microsoft_sync_tenant]
raise TenantMissingOrSyncDisabled unless enabled && tenant
tenant
end
end
def account_settings
@account_settings ||= group.root_account.settings
end
def graph_service_helpers
@graph_service_helpers ||= tenant && GraphServiceHelpers.new(tenant, sync_type: sync_type)
end
def graph_service
@graph_service ||= graph_service_helpers.graph_service
end
end
end