Track last seen id for when timeouts occur

This is a variant of 31b372eeab where the old migration has been deleted
and split into two migrations. The first migration creates the table and
the second migration runs the data fixup.

When this job times out it will now resume from the last
id seen instead of from the very beginning.

This also makes the read/write batch amount configurable via Setting.

closes: GRADE-207

test plan:
- migrations work

Optional hard mode test plan:
- manual checking of records in CQLSH

Change-Id: I60b4b2b9a5fff3101eb307134b93d4b463090cbc
Reviewed-on: https://gerrit.instructure.com/123317
Tested-by: Jenkins
Reviewed-by: Keith T. Garner <kgarner@instructure.com>
QA-Review: Keith T. Garner <kgarner@instructure.com>
Reviewed-by: Jeremy Neander <jneander@instructure.com>
Product-Review: Keith T. Garner <kgarner@instructure.com>
Reviewed-by: Shahbaz Javeed <sjaveed@instructure.com>
This commit is contained in:
Derek Bender 2017-08-18 16:06:17 -05:00
parent 695dac5cbd
commit 0da39062b9
3 changed files with 100 additions and 10 deletions

View File

@ -0,0 +1,67 @@
# Copyright (C) 2017 - present Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
class CreateNewGradeHistoryBatchTable < ActiveRecord::Migration[4.2]
tag :predeploy
include Canvas::Cassandra::Migration
LAST_BATCH_TABLE = DataFixup::InitNewGradeHistoryAuditLogIndexes::LAST_BATCH_TABLE
def self.cassandra_cluster
'auditors'
end
def self.up
Rails.logger.debug("InitNewGradeHistoryAuditLogIndexes: #{LAST_BATCH_TABLE} exists? => #{table_exists?(cassandra, LAST_BATCH_TABLE)}")
unless table_exists?(cassandra, LAST_BATCH_TABLE)
compression_params = if cassandra.db.use_cql3?
"WITH compression = { 'sstable_compression' : 'DeflateCompressor' }"
else
"WITH compression_parameters:sstable_compression='DeflateCompressor'"
end
create_table_command = %{
CREATE TABLE #{LAST_BATCH_TABLE} (
id int,
last_id text,
PRIMARY KEY (id)
) #{compression_params}
}
cassandra.execute(create_table_command)
end
end
def self.down
Rails.logger.debug("InitNewGradeHistoryAuditLogIndexes: #{LAST_BATCH_TABLE} exists? => #{table_exists?(cassandra, LAST_BATCH_TABLE)}")
drop_table_command = "DROP TABLE #{LAST_BATCH_TABLE}"
cassandra.execute(drop_table_command) if table_exists?(cassandra, LAST_BATCH_TABLE)
end
def self.table_exists?(cassandra, table)
cql = %{
SELECT *
FROM #{table}
LIMIT 1
}
cassandra.execute(cql)
true
rescue CassandraCQL::Error::InvalidRequestException
false
end
private_class_method :table_exists?
end

View File

@ -23,13 +23,13 @@ module DataFixup
new.build_indexes
end
LAST_BATCH_TABLE = 'grade_changes_index_last_batch'.freeze
SEARCH_CQL = %{
SELECT id, created_at, context_id, assignment_id, grader_id, student_id
FROM grade_changes
WHERE token(id) > token(?)
LIMIT ?
}.freeze
INDEX_METHODS = [
:add_course_assignment_index,
:add_course_assignment_grader_index,
@ -40,24 +40,30 @@ module DataFixup
:add_course_student_index
].freeze
READ_BATCH_SIZE = 1000
WRITE_BATCH_SIZE = 400
def read_batch_size
@read_batch_size ||=
Setting.get('init_new_grade_history_audit_log_indexes_read_batch_size', 1000).to_i
end
def write_batch_size
@write_batch_size ||=
Setting.get('init_new_grade_history_audit_log_indexes_write_batch_size', 200).to_i
end
def build_indexes
new_index_entries = []
last_seen_id = nil
last_seen_id = fetch_last_id
done = false
until done
done, last_seen_id = read_and_process_batch(last_seen_id, new_index_entries)
end
write_batch(new_index_entries)
end
private
def read_and_process_batch(starting_key, index_entries)
last_id = nil
result = database.execute(SEARCH_CQL, starting_key, READ_BATCH_SIZE)
result = database.execute(SEARCH_CQL, starting_key, read_batch_size)
return true, nil if result.rows == 0
result.fetch do |row|
last_id = row['id']
@ -67,14 +73,13 @@ module DataFixup
end
end
write_in_batches(index_entries)
save_last_id(last_id)
return false, last_id
end
ResultStruct = Struct.new(:index, :record, :key)
def write_in_batches(batch)
while batch.size >= WRITE_BATCH_SIZE
write_batch(batch.shift(WRITE_BATCH_SIZE))
while batch.size > 0
write_batch(batch.shift(write_batch_size))
end
end
@ -87,6 +92,8 @@ module DataFixup
@database ||= Canvas::Cassandra::DatabaseBuilder.from_config(:auditors)
end
ResultStruct = Struct.new(:index, :record, :key)
def add_course_assignment_index(row)
index = Auditors::GradeChange::Stream.course_assignment_index
key = [row['context_id'], row['assignment_id']]
@ -136,5 +143,21 @@ module DataFixup
key = [row['context_id'], row['student_id']]
ResultStruct.new(index, OpenStruct.new(row.to_hash), key)
end
def fetch_last_id
database.execute("SELECT last_id FROM #{LAST_BATCH_TABLE}").fetch do |row|
return row.to_hash['last_id']
end
nil
end
def save_last_id(last_id)
database.execute("INSERT INTO #{LAST_BATCH_TABLE} (id, last_id) VALUES (1, ?) ", last_id)
end
def log_message(message)
Rails.logger.debug("InitNewGradeHistoryAuditLogIndexes: #{message}")
end
end
end