Clean up more bulk update warnings

refs AE-158

Change-Id: Ibede015c86ec7f04bab5235ed57acc42a4659b8f
Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/317117
Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com>
Reviewed-by: Cody Cutrer <cody@instructure.com>
QA-Review: Jacob Burroughs <jburroughs@instructure.com>
Product-Review: Jacob Burroughs <jburroughs@instructure.com>
This commit is contained in:
Jacob Burroughs 2023-04-27 07:41:48 -05:00
parent d45fcb49df
commit 7109a6cbae
10 changed files with 49 additions and 45 deletions

View File

@ -1214,8 +1214,8 @@ class Attachment < ActiveRecord::Base
file_batches.each do |count, attachment_id, last_updated_at, display_name, context_id, context_type|
# clear the need_notify flag for this batch
Attachment.where("need_notify AND updated_at <= ? AND context_id = ? AND context_type = ?", last_updated_at, context_id, context_type)
.update_all(need_notify: nil)
Attachment.where(need_notify: true, context_id: context_id, context_type: context_type).where("updated_at <= ?", last_updated_at)
.in_batches(of: 10_000).update_all(need_notify: nil)
# skip the notification if this batch is too old to be timely
next if last_updated_at.to_time < discard_older_than

View File

@ -91,7 +91,7 @@ class ContextModule < ActiveRecord::Base
progression_scope = context_module_progressions.where(current: true).where.not(workflow_state: "locked")
progression_scope = progression_scope.where(user_id: student_ids) if student_ids
if progression_scope.update_all(["workflow_state = 'locked', lock_version = lock_version + 1, current = ?", false]) > 0
if progression_scope.in_batches(of: 10_000).update_all(["workflow_state = 'locked', lock_version = lock_version + 1, current = ?", false]) > 0
delay_if_production(n_strand: ["evaluate_module_progressions", global_context_id],
singleton: "evaluate_module_progressions:#{global_id}")
.evaluate_all_progressions
@ -105,7 +105,7 @@ class ContextModule < ActiveRecord::Base
def invalidate_progressions
self.class.connection.after_transaction_commit do
if context_module_progressions.where(current: true).update_all(current: false) > 0
if context_module_progressions.where(current: true).in_batches(of: 10_000).update_all(current: false) > 0
# don't queue a job unless necessary
delay_if_production(n_strand: ["evaluate_module_progressions", global_context_id],
singleton: "evaluate_module_progressions:#{global_id}")

View File

@ -834,7 +834,7 @@ class Course < ActiveRecord::Base
to_delete += current_associations.map { |_k, v| v.map { |_k2, v2| v2[0] } }.flatten
unless to_delete.empty?
CourseAccountAssociation.where(id: to_delete).delete_all
CourseAccountAssociation.where(id: to_delete).in_batches(of: 10_000).delete_all
end
end
Course.clear_cache_keys(course_ids_to_update_user_account_associations, :account_associations)
@ -1310,7 +1310,7 @@ class Course < ActiveRecord::Base
end
self.class.connection.after_transaction_commit do
Enrollment.where(course_id: self).touch_all
Enrollment.where(course_id: self).in_batches(of: 10_000).touch_all
user_ids = Enrollment.where(course_id: self).distinct.pluck(:user_id).sort
# We might get lots of database locks when lots of courses with the same users are being updated,
# so we can skip touching those users' updated_at stamp since another process will do it
@ -3757,14 +3757,14 @@ class Course < ActiveRecord::Base
course_sections.update_all(course_id: new_course.id)
# we also want to bring along prior enrollments, so don't use the enrollments
# association
Enrollment.where(course_id: self).update_all(course_id: new_course.id, updated_at: Time.now.utc)
Enrollment.where(course_id: self).in_batches(of: 10_000).update_all(course_id: new_course.id, updated_at: Time.now.utc)
user_ids = new_course.all_enrollments.pluck(:user_id)
self.class.connection.after_transaction_commit do
User.touch_and_clear_cache_keys(user_ids, :enrollments)
end
Shard.partition_by_shard(user_ids) do |sharded_user_ids|
Favorite.where(user_id: sharded_user_ids, context_type: "Course", context_id: id)
.update_all(context_id: new_course.id, updated_at: Time.now.utc)
.in_batches(of: 10_000).update_all(context_id: new_course.id, updated_at: Time.now.utc)
end
self.replacement_course_id = new_course.id

View File

@ -86,7 +86,9 @@ class Enrollment::BatchStateUpdater
EnrollmentState.where(enrollment_id: batch).update_all_locked_in_order(state: "deleted", state_valid_until: nil, updated_at: Time.now.utc)
# we need the order to match the queries in GradeCalculator's save_course_and_grading_period_scores and save_assignment_group_scores,
# _and_ the fact that the former runs first
Score.where(enrollment_id: batch).order(Arel.sql("assignment_group_id NULLS FIRST, enrollment_id")).update_all_locked_in_order(workflow_state: "deleted", updated_at: Time.zone.now)
Score.transaction do
Score.where(enrollment_id: batch).order(Arel.sql("assignment_group_id NULLS FIRST, enrollment_id")).in_batches(of: 10_000).update_all_locked_in_order(workflow_state: "deleted", updated_at: Time.zone.now)
end
data
end

View File

@ -254,18 +254,18 @@ class EnrollmentState < ActiveRecord::Base
INVALIDATEABLE_STATES = %w[pending_invited pending_active invited active completed inactive].freeze # don't worry about creation_pending or rejected, etc
def self.invalidate_states(enrollment_scope)
EnrollmentState.where(enrollment_id: enrollment_scope, state: INVALIDATEABLE_STATES)
EnrollmentState.where(enrollment_id: enrollment_scope, state: INVALIDATEABLE_STATES).in_batches(of: 10_000)
.update_all(["lock_version = COALESCE(lock_version, 0) + 1, state_is_current = ?", false])
end
def self.invalidate_states_and_access(enrollment_scope)
EnrollmentState.where(enrollment_id: enrollment_scope, state: INVALIDATEABLE_STATES)
EnrollmentState.where(enrollment_id: enrollment_scope, state: INVALIDATEABLE_STATES).in_batches(of: 10_000)
.update_all(["lock_version = COALESCE(lock_version, 0) + 1, state_is_current = ?, access_is_current = ?", false, false])
end
def self.force_recalculation(enrollment_ids, strand: nil)
if enrollment_ids.any?
EnrollmentState.where(enrollment_id: enrollment_ids)
EnrollmentState.where(enrollment_id: enrollment_ids).in_batches(of: 10_000)
.update_all(["lock_version = COALESCE(lock_version, 0) + 1, state_is_current = ?", false])
args = strand ? { n_strand: strand } : {}
EnrollmentState.delay_if_production(**args).process_states_for_ids(enrollment_ids)
@ -273,7 +273,7 @@ class EnrollmentState < ActiveRecord::Base
end
def self.invalidate_access(enrollment_scope, states_to_update)
EnrollmentState.where(enrollment_id: enrollment_scope, state: states_to_update)
EnrollmentState.where(enrollment_id: enrollment_scope, state: states_to_update).in_batches(of: 10_000)
.update_all(["lock_version = COALESCE(lock_version, 0) + 1, access_is_current = ?", false])
end

View File

@ -53,7 +53,7 @@ class ObserverAlert < ActiveRecord::Base
end
def self.clean_up_old_alerts
ObserverAlert.where("created_at < ?", 6.months.ago).delete_all
ObserverAlert.where("created_at < ?", 6.months.ago).in_batches(of: 10_000).delete_all
end
def self.create_assignment_missing_alerts

View File

@ -46,7 +46,7 @@ class StreamItemInstance < ActiveRecord::Base
# is an array of [context_type, context_id])
def update_all_with_invalidation(contexts, updates)
contexts.each { |context| StreamItemCache.invalidate_context_stream_item_key(context.first, context.last) }
original_update_all(updates)
in_batches(of: 10_000).update_all(updates)
end
end

View File

@ -1036,7 +1036,7 @@ module UsefulFindInBatches
base_class = klass.base_class
base_class.unscoped do
batch_relation = base_class.from(table).select("*").limit(of).preload(includes_values + preload_values)
batch_relation = base_class.from("#{connection.quote_column_name(table)} as #{connection.quote_column_name(base_class.table_name)}").limit(of).preload(includes_values + preload_values)
batch_relation = batch_relation.order(Arel.sql(connection.quote_column_name(index))) if index
yielded_relation = batch_relation
loop do

View File

@ -577,33 +577,35 @@ class GradeCalculator
end
# Update existing course and grading period Scores or create them if needed.
Score.connection.execute(<<~SQL.squish)
INSERT INTO #{Score.quoted_table_name}
(
enrollment_id, grading_period_id,
#{columns_to_insert_or_update[:columns].join(", ")},
course_score, root_account_id, created_at, updated_at
)
SELECT
enrollments.id as enrollment_id,
#{@grading_period.try(:id) || "NULL"} as grading_period_id,
#{columns_to_insert_or_update[:insert_values].join(", ")},
#{@grading_period ? "FALSE" : "TRUE"} AS course_score,
#{@course.root_account_id} AS root_account_id,
#{updated_at} as created_at,
#{updated_at} as updated_at
FROM #{Enrollment.quoted_table_name} enrollments
WHERE
enrollments.id IN (#{joined_enrollment_ids})
ORDER BY enrollment_id
ON CONFLICT #{conflict_target}
DO UPDATE SET
#{columns_to_insert_or_update[:update_values].join(", ")},
updated_at = excluded.updated_at,
root_account_id = #{@course.root_account_id},
/* if workflow_state was previously deleted for some reason, update it to active */
workflow_state = COALESCE(NULLIF(excluded.workflow_state, 'deleted'), 'active')
SQL
Score.connection.with_max_update_limit(enrollments.length) do
Score.connection.execute(<<~SQL.squish)
INSERT INTO #{Score.quoted_table_name}
(
enrollment_id, grading_period_id,
#{columns_to_insert_or_update[:columns].join(", ")},
course_score, root_account_id, created_at, updated_at
)
SELECT
enrollments.id as enrollment_id,
#{@grading_period.try(:id) || "NULL"} as grading_period_id,
#{columns_to_insert_or_update[:insert_values].join(", ")},
#{@grading_period ? "FALSE" : "TRUE"} AS course_score,
#{@course.root_account_id} AS root_account_id,
#{updated_at} as created_at,
#{updated_at} as updated_at
FROM #{Enrollment.quoted_table_name} enrollments
WHERE
enrollments.id IN (#{joined_enrollment_ids})
ORDER BY enrollment_id
ON CONFLICT #{conflict_target}
DO UPDATE SET
#{columns_to_insert_or_update[:update_values].join(", ")},
updated_at = excluded.updated_at,
root_account_id = #{@course.root_account_id},
/* if workflow_state was previously deleted for some reason, update it to active */
workflow_state = COALESCE(NULLIF(excluded.workflow_state, 'deleted'), 'active')
SQL
end
rescue ActiveRecord::Deadlocked => e
Canvas::Errors.capture_exception(:grade_calcuator, e, :warn)
raise Delayed::RetriableError, "Deadlock in upserting course or grading period scores"

View File

@ -26,10 +26,10 @@ describe StreamItemInstance do
end
describe ".update_all_with_invalidation" do
it "invalidates stream item cache keys and runs update_all (the original)" do
it "invalidates stream item cache keys and runs batched updates" do
# expect
expect(StreamItemCache).to receive(:invalidate_context_stream_item_key).twice
expect(StreamItemInstance).to receive(:original_update_all).with("updates")
expect(StreamItemInstance).to receive(:in_batches).and_call_original
# when
StreamItemInstance.update_all_with_invalidation(["code_1", "code_2"],
"updates")