diff --git a/db/migrate/20170902211600_create_new_grade_history_batch_table.rb b/db/migrate/20170902211600_create_new_grade_history_batch_table.rb new file mode 100644 index 00000000000..42a937ac4b8 --- /dev/null +++ b/db/migrate/20170902211600_create_new_grade_history_batch_table.rb @@ -0,0 +1,67 @@ +# Copyright (C) 2017 - present Instructure, Inc. +# +# This file is part of Canvas. +# +# Canvas is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, version 3 of the License. +# +# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see . +# + +class CreateNewGradeHistoryBatchTable < ActiveRecord::Migration[4.2] + tag :predeploy + + include Canvas::Cassandra::Migration + + LAST_BATCH_TABLE = DataFixup::InitNewGradeHistoryAuditLogIndexes::LAST_BATCH_TABLE + + def self.cassandra_cluster + 'auditors' + end + + def self.up + Rails.logger.debug("InitNewGradeHistoryAuditLogIndexes: #{LAST_BATCH_TABLE} exists? => #{table_exists?(cassandra, LAST_BATCH_TABLE)}") + unless table_exists?(cassandra, LAST_BATCH_TABLE) + compression_params = if cassandra.db.use_cql3? + "WITH compression = { 'sstable_compression' : 'DeflateCompressor' }" + else + "WITH compression_parameters:sstable_compression='DeflateCompressor'" + end + + create_table_command = %{ + CREATE TABLE #{LAST_BATCH_TABLE} ( + id int, + last_id text, + PRIMARY KEY (id) + ) #{compression_params} + } + cassandra.execute(create_table_command) + end + end + + def self.down + Rails.logger.debug("InitNewGradeHistoryAuditLogIndexes: #{LAST_BATCH_TABLE} exists? => #{table_exists?(cassandra, LAST_BATCH_TABLE)}") + drop_table_command = "DROP TABLE #{LAST_BATCH_TABLE}" + cassandra.execute(drop_table_command) if table_exists?(cassandra, LAST_BATCH_TABLE) + end + + def self.table_exists?(cassandra, table) + cql = %{ + SELECT * + FROM #{table} + LIMIT 1 + } + cassandra.execute(cql) + true + rescue CassandraCQL::Error::InvalidRequestException + false + end + private_class_method :table_exists? +end diff --git a/db/migrate/20170606220650_init_new_grade_history_audit_log_indexes.rb b/db/migrate/20170902211611_init_new_grade_history_audit_log_indexes.rb similarity index 100% rename from db/migrate/20170606220650_init_new_grade_history_audit_log_indexes.rb rename to db/migrate/20170902211611_init_new_grade_history_audit_log_indexes.rb diff --git a/lib/data_fixup/init_new_grade_history_audit_log_indexes.rb b/lib/data_fixup/init_new_grade_history_audit_log_indexes.rb index e2aae0cced7..cf676267064 100644 --- a/lib/data_fixup/init_new_grade_history_audit_log_indexes.rb +++ b/lib/data_fixup/init_new_grade_history_audit_log_indexes.rb @@ -23,13 +23,13 @@ module DataFixup new.build_indexes end + LAST_BATCH_TABLE = 'grade_changes_index_last_batch'.freeze SEARCH_CQL = %{ SELECT id, created_at, context_id, assignment_id, grader_id, student_id FROM grade_changes WHERE token(id) > token(?) LIMIT ? }.freeze - INDEX_METHODS = [ :add_course_assignment_index, :add_course_assignment_grader_index, @@ -40,24 +40,30 @@ module DataFixup :add_course_student_index ].freeze - READ_BATCH_SIZE = 1000 - WRITE_BATCH_SIZE = 400 + def read_batch_size + @read_batch_size ||= + Setting.get('init_new_grade_history_audit_log_indexes_read_batch_size', 1000).to_i + end + + def write_batch_size + @write_batch_size ||= + Setting.get('init_new_grade_history_audit_log_indexes_write_batch_size', 200).to_i + end def build_indexes new_index_entries = [] - last_seen_id = nil + last_seen_id = fetch_last_id done = false until done done, last_seen_id = read_and_process_batch(last_seen_id, new_index_entries) end - write_batch(new_index_entries) end private def read_and_process_batch(starting_key, index_entries) last_id = nil - result = database.execute(SEARCH_CQL, starting_key, READ_BATCH_SIZE) + result = database.execute(SEARCH_CQL, starting_key, read_batch_size) return true, nil if result.rows == 0 result.fetch do |row| last_id = row['id'] @@ -67,14 +73,13 @@ module DataFixup end end write_in_batches(index_entries) + save_last_id(last_id) return false, last_id end - ResultStruct = Struct.new(:index, :record, :key) - def write_in_batches(batch) - while batch.size >= WRITE_BATCH_SIZE - write_batch(batch.shift(WRITE_BATCH_SIZE)) + while batch.size > 0 + write_batch(batch.shift(write_batch_size)) end end @@ -87,6 +92,8 @@ module DataFixup @database ||= Canvas::Cassandra::DatabaseBuilder.from_config(:auditors) end + ResultStruct = Struct.new(:index, :record, :key) + def add_course_assignment_index(row) index = Auditors::GradeChange::Stream.course_assignment_index key = [row['context_id'], row['assignment_id']] @@ -136,5 +143,21 @@ module DataFixup key = [row['context_id'], row['student_id']] ResultStruct.new(index, OpenStruct.new(row.to_hash), key) end + + def fetch_last_id + database.execute("SELECT last_id FROM #{LAST_BATCH_TABLE}").fetch do |row| + return row.to_hash['last_id'] + end + + nil + end + + def save_last_id(last_id) + database.execute("INSERT INTO #{LAST_BATCH_TABLE} (id, last_id) VALUES (1, ?) ", last_id) + end + + def log_message(message) + Rails.logger.debug("InitNewGradeHistoryAuditLogIndexes: #{message}") + end end end