audit log uuid data fixup

refs: CNVS-13987

Creates new records for audit log data that has been overritten.  Since
we cannot all of the actual event data they are created as a corrupted
record

Test Plan:
  - Clear Audit logs (Not manditory but easier to see data)
  - Generate Bad Audit data for each test case.
    * Supplied script should generate the bad data.
    * Refer to test case to ensure data is created correctly.
  - Pull Commit and run migrations.
  - Check Case 1
    * The original id record should still exist with a record in
      courses_by_course.
    * Before the fix only one course change record will exst.  After the
      after the fix a second tombstone record with a record type of
      corrupted should exist with related index records in
      courses_by_course.
    * Key column in each index should match the id/bucket format.
    * ordered_id in each should match timestamp/first eight of id.
 - Check Case 2
    * After the fix pick a tombstone record and ensure that it as an
      index record in each of the index tables: authentications_by_pseudonym,
      authentications_by_user, authentications_by_account.
    * Ensure cross bucket event indexes create a tombstone and match in
      other indexes as well.  Id should be unique in the index tables
      and stream table.

- Code to generate test cases:

target_course = Course.find(...)
target_user = User.find(...)
target_pseudonym = target_user.pseudonyms.first

event = Auditors::Course.record_created(target_course, target_user,
source: :manual)
record = Auditors::Course::Record.generate(target_course, target_user,
event.event_type, event.event_data, source: :manual)
record.attributes['id'] = event.id
record.attributes['created_at'] = Time.now + 1.day
Auditors::Course::Stream.insert(record)

event = Auditors::Authentication.record(target_pseudonym, 'login')
record = Auditors::Authentication::Record.generate(target_pseudonym,
event.event_type)
record.attributes['id'] = event.id
record.attributes['created_at'] = Time.now + 100.days
Auditors::Authentication::Stream.insert(record)

Change-Id: I888da1758282d0e3f2d19a4438c1a866a6f54125
Reviewed-on: https://gerrit.instructure.com/37605
Reviewed-by: Jacob Fugal <jacob@instructure.com>
Tested-by: Jenkins <jenkins@instructure.com>
QA-Review: Jeremy Putnam <jeremyp@instructure.com>
Product-Review: Nick Cloward <ncloward@instructure.com>
This commit is contained in:
Nick Cloward 2014-07-12 15:30:28 -06:00
parent 5dce203a7d
commit 2bc1395790
3 changed files with 411 additions and 0 deletions

View File

@ -0,0 +1,16 @@
class FixAuditLogUuidIndexes < ActiveRecord::Migration
tag :postdeploy
include Canvas::Cassandra::Migration
def self.cassandra_cluster
'auditors'
end
def self.up
DataFixup::FixAuditLogUuidIndexes.send_later_if_production(:run)
end
def self.down
end
end

View File

@ -0,0 +1,263 @@
module DataFixup
class FixAuditLogUuidIndexes
MAPPING_TABLE = 'corrupted_index_mapping'
CORRUPTED_EVENT_TYPE = 'corrupted'
INDEXES = [
Auditors::Authentication::Stream.pseudonym_index,
Auditors::Authentication::Stream.user_index,
Auditors::Authentication::Stream.account_index,
Auditors::Course::Stream.course_index,
Auditors::GradeChange::Stream.assignment_index,
Auditors::GradeChange::Stream.course_index,
Auditors::GradeChange::Stream.root_account_grader_index,
Auditors::GradeChange::Stream.root_account_student_index
]
def self.run
migration = new
# Fix Indexes
INDEXES.each do |index|
migration.fix_index(index)
end
# Clean up
migration.drop_mapping_table
end
def initialize
@corrected_ids = {}
return if mapping_table_exists?
compression_params = database.db.use_cql3? ?
"WITH compression = { 'sstable_compression' : 'DeflateCompressor' }" :
"WITH compression_parameters:sstable_compression='DeflateCompressor'"
cql = %{
CREATE TABLE #{MAPPING_TABLE} (
record_type text,
id text,
new_id text,
created_at timestamp,
PRIMARY KEY (record_type, id, created_at)
) #{compression_params}
}
database.execute(cql)
end
# Check if the mapping table exits
def mapping_table_exists?
cql = %{
SELECT columnfamily_name
FROM System.schema_columnfamilies
WHERE columnfamily_name = ?
AND keyspace_name = ?
ALLOW FILTERING
}
database.execute(cql, MAPPING_TABLE, database.keyspace).count == 1
end
# Drops mapping table
def drop_mapping_table
database.execute("DROP TABLE #{MAPPING_TABLE}") if mapping_table_exists?
end
# The date the bug was released.
def start_time
@start_time ||= Time.new(2014, 6, 14)
end
# Cassandra database connection
def database
@database ||= Canvas::Cassandra::DatabaseBuilder.from_config(:auditors)
end
# Fixes a specified index
def fix_index(index)
iterate_invalid_keys(index) do |rows|
update_index_batch(index, rows)
end
end
# Returns corrupted indexes.
def iterate_invalid_keys(index)
# page 1 implicit start at first event in "current" bucket
bucket = index.bucket_for_time(Time.now)
previous_bucket = bucket + index.bucket_size
oldest_bucket = index.bucket_for_time(start_time)
cql = %{
SELECT #{index.id_column},
#{index.key_column},
ordered_id
FROM #{index.table} %CONSISTENCY%
WHERE ordered_id >= ?
AND ordered_id < ?
ALLOW FILTERING
}
# pull results from each bucket until the page is full or we go past the
# end bucket
until bucket < oldest_bucket
array = []
database.execute(cql, "#{bucket}/", "#{previous_bucket}/", consistency: index.event_stream.read_consistency_level).fetch do |row|
row = row.to_hash
# Strip the bucket off the key.
row['index_key'] = row[index.key_column].split("/#{bucket}").first
row['timestamp'] = row['ordered_id'].split('/').first.to_i
array << row
end
yield array
previous_bucket = bucket
bucket -= index.bucket_size
end
end
# Fixes a corrupted index record
def update_index_batch(index, rows)
need_inspection = []
need_tombstone = []
updates = []
# Check each row to see if we need to update it or inspect it.
rows.each do |row|
current_id, timestamp, key = extract_row_keys(index, row)
actual_id = query_corrected_id(index.event_stream, current_id, timestamp)
if actual_id.nil?
# We couldnt find a mapping so we need to fix this look into this one.
need_inspection << row
elsif actual_id != current_id
# We have already created a base record for this index so this record just needs to be recreated.
updates << [current_id, key, timestamp, actual_id]
end
end
if need_inspection.present?
ids = need_inspection.map{ |row| row[index.id_column] }
events = index.event_stream.fetch(ids).index_by(&:id)
need_inspection.each do |row|
current_id, timestamp, key = extract_row_keys(index, row)
event = events[current_id]
# If the event is a corrupted event we have already fixed it so save
# the mapping and move along.
if event.event_type == 'corrupted'
store_corrected_id(index.event_stream, current_id, timestamp, current_id)
next
end
# If the timestamp is different we know its been changed.
if event.created_at.to_i != timestamp
need_tombstone << row
elsif entry = index.entry_proc.call(event)
# Get the index entry and key to match up the index's key to the records
# key. If they are different its corrupted. If its the same we know its clean.
# We need to convert the actual_key to a string, a proc might return a non string
# but the index key will always be a string.
actual_key = (index.key_proc ? index.key_proc.call(*entry) : entry).to_s
if key != actual_key
need_tombstone << row
else
store_corrected_id(index.event_stream, current_id, timestamp, current_id)
end
else
# the current event data indicates no index entry should exist, but one
# did, so the index entry must have referred to prior event data.
need_tombstone << row
end
end
end
database.batch do
if need_tombstone.present?
# Loop through each record we need to create a tombstone for.
need_tombstone.each do |row|
current_id, timestamp, key = extract_row_keys(index, row)
actual_id = CanvasUUID.generate
create_tombstone(index.event_stream, actual_id, timestamp)
store_corrected_id(index.event_stream, current_id, timestamp, actual_id)
updates << [current_id, key, timestamp, actual_id]
end
end
if updates.present?
# Loop through each row that needs updating and delete the corrupted index
# Then create a fixed tombstone index.
updates.each do |current_id, key, timestamp, actual_id|
delete_index_entry(index, current_id, key, timestamp)
create_index_entry(index, actual_id, key, timestamp)
end
end
end
end
# Extracts key information from a row
def extract_row_keys(index, row)
current_id = row[index.id_column]
timestamp = row['timestamp']
key = row['index_key']
[current_id, timestamp, key]
end
# Returns the new_id if a mapping exists
def query_corrected_id(stream, id, timestamp)
key = corrected_id_key(stream, id, timestamp)
if corrected_id = @corrected_ids[key]
return corrected_id
else
database.execute("SELECT new_id FROM #{MAPPING_TABLE} WHERE record_type = ? AND id = ? AND created_at = ?", stream.record_type, id, timestamp).fetch do |row|
return @corrected_ids[key] = row.to_hash['new_id']
end
end
end
# Stores a new_id in the mapping table
def store_corrected_id(stream, current_id, timestamp, actual_id)
database.execute("INSERT INTO #{MAPPING_TABLE} (record_type, id, new_id, created_at) VALUES (?, ?, ?, ?)", stream.record_type, current_id, actual_id, timestamp)
@corrected_ids[corrected_id_key(stream, current_id, timestamp)] = actual_id
end
def corrected_id_key(stream, id, timestamp)
[stream.record_type, id, timestamp].join('/')
end
def create_tombstone(stream, id, timestamp)
ttl_seconds = stream.ttl_seconds(timestamp)
return if ttl_seconds < 0
database.insert_record(stream.table, {stream.id_column => id}, {
'created_at' => Time.at(timestamp),
'event_type' => CORRUPTED_EVENT_TYPE
}, ttl_seconds)
end
# Creates an index for the record.
def create_index_entry(index, id, key, timestamp)
ttl_seconds = index.event_stream.ttl_seconds(timestamp)
return if ttl_seconds < 0
# Add the bucket back onto the key.
key = index.create_key(index.bucket_for_time(timestamp), key)
ordered_id = "#{timestamp}/#{id[0, 8]}"
database.execute(index.insert_cql, key, ordered_id, id, ttl_seconds)
end
# Deletes a corrupted index entry.
def delete_index_entry(index, id, key, timestamp)
# Add the bucket back onto the key.
key = index.create_key(index.bucket_for_time(timestamp), key)
ordered_id = "#{timestamp}/#{id[0, 8]}"
database.execute("DELETE FROM #{index.table} WHERE ordered_id = ? AND key = ?", ordered_id, key)
end
end
end

View File

@ -0,0 +1,132 @@
require 'spec_helper'
require File.expand_path(File.dirname(__FILE__) + '/../../../lib/data_fixup/fix_audit_log_uuid_indexes')
describe DataFixup::FixAuditLogUuidIndexes do
subject do
DataFixup::FixAuditLogUuidIndexes
end
before do
@database ||= Canvas::Cassandra::DatabaseBuilder.from_config(:auditors)
@stream_tables = {}
DataFixup::FixAuditLogUuidIndexes::INDEXES.each do |index|
@stream_tables[index.event_stream.table] ||= []
@stream_tables[index.event_stream.table] << index.table
end
# We don't know what data might be missing from previous tests
# generating events so we need to truncate the tables before
# we test the fixup.
@stream_tables.each do |stream_table, index_tables|
@database.execute("TRUNCATE #{stream_table}")
index_tables.each do |table|
@database.execute("TRUNCATE #{table}")
end
end
end
def check_event_stream(event_id, stream_table, expected_total)
# Check the stream table and make sure the right record count exits.
# Along with the right count of corrupted events.
corrupted_total = 0
rows = @database.execute("SELECT id, event_type FROM #{stream_table}")
rows.count.should == expected_total
rows.fetch do |row|
row = row.to_hash
corrupted_total += 1 if row['event_type'] == 'corrupted'
end
corrupted_total.should == expected_total - 1
# Check each Index table and make sure there is only one
# with the specified event_id remaining. Others should
# have been changed to a new id. Also check that the count
# matches the total records.
@stream_tables[stream_table].each do |index_table|
count = 0
rows = @database.execute("SELECT id FROM #{index_table}")
rows.count.should == expected_total
rows.fetch do |row|
row = row.to_hash
count += 1 if row['id'] == event_id
end
count.should == 1
end
end
def corrupt_grade_changes
event_id = CanvasSlug.generate
CanvasUUID.stubs(:generate).returns(event_id)
(1..3).each do |i|
time = Time.now - i.days
Timecop.freeze(time) do
course_with_teacher
student_in_course
@assignment = @course.assignments.create!(:title => 'Assignment', :points_possible => 10)
end
Timecop.freeze(time + 1.hour) do
@assignment.grade_student(@student, grade: i, grader: @teacher).first
end
end
CanvasUUID.unstub(:generate)
{ event_id: event_id, count: 3 }
end
def corrupt_course_changes
event_id = CanvasSlug.generate
CanvasUUID.stubs(:generate).returns(event_id)
(1..3).each do |i|
time = Time.now - i.days
Timecop.freeze(time) do
course_with_teacher
Auditors::Course.record_created(@course, @teacher, source: :manual)
end
end
CanvasUUID.unstub(:generate)
{ event_id: event_id, count: 3 }
end
def corrupt_authentications
event_id = CanvasSlug.generate
CanvasUUID.stubs(:generate).returns(event_id)
(1..3).each do |i|
time = Time.now - i.days
Timecop.freeze(time) do
site_admin_user(user: user_with_pseudonym(account: Account.site_admin))
Auditors::Authentication.record(@pseudonym, 'login')
end
end
CanvasUUID.unstub(:generate)
{ event_id: event_id, count: 3 }
end
it "fixes the corrupted data" do
# Create bad data
stream_checks = {}
stream_checks['grade_changes'] = corrupt_grade_changes
stream_checks['courses'] = corrupt_course_changes
stream_checks['authentications'] = corrupt_authentications
# Run Fix
DataFixup::FixAuditLogUuidIndexes.run
# Make sure the data is fixed
stream_checks.each do |stream_table, checks|
check_event_stream(checks[:event_id], stream_table, checks[:count])
end
end
end