re-query on missing records

refs CNVS-48876
flag = none

TEST PLAN:
 1) run backfill
 2) re-queries should happen
 3) evidence should appear in logs

Change-Id: Ieb423f527151de08483ac1c330e046e4c886f8d4
Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/238837
Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com>
Reviewed-by: Simon Williams <simon@instructure.com>
QA-Review: Simon Williams <simon@instructure.com>
Product-Review: Ethan Vizitei <evizitei@instructure.com>
This commit is contained in:
Ethan Vizitei 2020-05-29 13:45:45 -05:00
parent 4aba057b26
commit 8e11c17578
1 changed files with 30 additions and 20 deletions

View File

@ -20,6 +20,7 @@ require 'set'
module DataFixup::Auditors
module Migrate
DEFAULT_BATCH_SIZE = 100
DEFAULT_REPAIR_BATCH_SIZE = 1000
module AuditorWorker
def initialize(account_id, date, operation_type: :backfill)
@ -76,6 +77,25 @@ module DataFixup::Auditors
retry
end
end
def fetch_attributes_resiliantly(stream_type, ids)
retries = 0
max_retries = 10
begin
recs = stream_type.fetch(ids, strategy: :serial)
if recs.size != ids.size
found_ids = recs.map(&:id)
missing_ids = ids - found_ids
raise RuntimeError, "NOT FOUND: #{missing_ids.join(',')}"
end
return recs
rescue CassandraCQL::Thrift::TimedOutException, RuntimeError
raise if retries >= max_retries
sleep 1.4 ** retries
retries += 1
retry
end
end
# rubocop:enable Lint/NoSleep
def filter_for_idempotency(ar_attributes_list, auditor_ar_type)
@ -96,11 +116,13 @@ module DataFixup::Auditors
end
end
def migrate_in_pages(collection, auditor_ar_type, batch_size=DEFAULT_BATCH_SIZE)
def migrate_in_pages(ids_collection, stream_type, auditor_ar_type, batch_size=DEFAULT_BATCH_SIZE)
next_page = 1
until next_page.nil?
page_args = { page: next_page, per_page: batch_size}
auditor_recs = get_cassandra_records_resiliantly(collection, page_args)
auditor_id_recs = get_cassandra_records_resiliantly(ids_collection, page_args)
collect_propsed_ids = auditor_id_recs.map{|rec| rec['id']}
auditor_recs = fetch_attributes_resiliantly(stream_type, collect_propsed_ids)
ar_attributes_list = auditor_recs.map do |rec|
auditor_ar_type.ar_attributes_from_event_stream(rec)
end
@ -112,7 +134,7 @@ module DataFixup::Auditors
new_attrs_list = filter_dead_foreign_keys(new_attrs_list)
bulk_insert_auditor_recs(auditor_ar_type, new_attrs_list) if new_attrs_list.size > 0
end
next_page = auditor_recs.next_page
next_page = auditor_id_recs.next_page
end
end
@ -125,7 +147,7 @@ module DataFixup::Auditors
# that subset to insert. This makes it much faster for traversing a large dataset
# when some or most of the records are filled in already. Obviously it would be somewhat
# slower than the migrate pass if there were NO records migrated.
def repair_in_pages(ids_collection, stream_type, auditor_ar_type, batch_size=DEFAULT_BATCH_SIZE)
def repair_in_pages(ids_collection, stream_type, auditor_ar_type, batch_size=DEFAULT_REPAIR_BATCH_SIZE)
next_page = 1
until next_page.nil?
page_args = { page: next_page, per_page: batch_size}
@ -134,7 +156,7 @@ module DataFixup::Auditors
existing_ids = auditor_ar_type.where(uuid: collect_propsed_ids).pluck(:uuid)
insertable_ids = collect_propsed_ids - existing_ids
if insertable_ids.size > 0
auditor_recs = stream_type.fetch(insertable_ids, strategy: :serial)
auditor_recs = fetch_attributes_resiliantly(stream_type, insertable_ids)
ar_attributes_list = auditor_recs.map do |rec|
auditor_ar_type.ar_attributes_from_event_stream(rec)
end
@ -382,16 +404,12 @@ module DataFixup::Auditors
:authentication
end
def cassandra_collection
Auditors::Authentication.for_account(account, cassandra_query_options)
end
def cassandra_id_collection
Auditors::Authentication::Stream.ids_for_account(account, cassandra_query_options)
end
def perform_migration
migrate_in_pages(cassandra_collection, Auditors::ActiveRecord::AuthenticationRecord)
migrate_in_pages(cassandra_id_collection, Auditors::Authentication::Stream, Auditors::ActiveRecord::AuthenticationRecord)
end
def perform_repair
@ -421,16 +439,12 @@ module DataFixup::Auditors
:course
end
def cassandra_collection
Auditors::Course.for_account(account, cassandra_query_options)
end
def cassandra_id_collection
Auditors::Course::Stream.ids_for_account(account, cassandra_query_options)
end
def perform_migration
migrate_in_pages(cassandra_collection, Auditors::ActiveRecord::CourseRecord)
migrate_in_pages(cassandra_id_collection, Auditors::Course::Stream, Auditors::ActiveRecord::CourseRecord)
end
def perform_repair
@ -456,10 +470,6 @@ module DataFixup::Auditors
:grade_change
end
def cassandra_collection_for(course)
Auditors::GradeChange.for_course(course, cassandra_query_options)
end
def cassandra_id_collection_for(course)
Auditors::GradeChange::Stream.ids_for_course(course, cassandra_query_options)
end
@ -475,7 +485,7 @@ module DataFixup::Auditors
all_course_ids = migrateable_course_ids.to_a
all_course_ids.in_groups_of(1000) do |course_ids|
Course.where(id: course_ids).each do |course|
migrate_in_pages(cassandra_collection_for(course), Auditors::ActiveRecord::GradeChangeRecord)
migrate_in_pages(cassandra_id_collection_for(course), Auditors::GradeChange::Stream, Auditors::ActiveRecord::GradeChangeRecord)
end
end
end