move AUA log compaction state to local table
closes FOO-1003 TEST PLAN: 1) aua log compaction continues to work 2) state on plugin setting and in metadata table is in sync Change-Id: Ie161fcb079f80a4072ddd99f3a561c97d33026fd Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/248501 Reviewed-by: Jacob Burroughs <jburroughs@instructure.com> Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com> QA-Review: Ethan Vizitei <evizitei@instructure.com> Product-Review: Ethan Vizitei <evizitei@instructure.com>
This commit is contained in:
parent
c3c184d5bb
commit
68198567df
|
@ -72,7 +72,8 @@ class AssetUserAccessLog
|
|||
|
||||
MODEL_BY_DAY_OF_WEEK_INDEX = [
|
||||
AuaLog0, AuaLog1, AuaLog2, AuaLog3, AuaLog4, AuaLog5, AuaLog6
|
||||
]
|
||||
].freeze
|
||||
METADATUM_KEY = "aua_logs_compaction_state".freeze
|
||||
|
||||
def self.put_view(asset_user_access, timestamp: nil)
|
||||
# the "timestamp:" argument is useful for testing or backfill/replay
|
||||
|
@ -105,6 +106,14 @@ class AssetUserAccessLog
|
|||
PluginSetting.find_by_name(:asset_user_access_logs)
|
||||
end
|
||||
|
||||
def self.metadatum_payload
|
||||
CanvasMetadatum.get(METADATUM_KEY, {max_log_ids: [0,0,0,0,0,0,0]})
|
||||
end
|
||||
|
||||
def self.update_metadatum(compaction_state)
|
||||
CanvasMetadatum.set(METADATUM_KEY, compaction_state)
|
||||
end
|
||||
|
||||
# This is the job component, taking the inserts that have
|
||||
# accumulated and writing them to the AUA records they actually
|
||||
# belong to with as few updates as possible. This should help control
|
||||
|
@ -125,7 +134,11 @@ class AssetUserAccessLog
|
|||
if yesterday_model.take(1).size > 0
|
||||
yesterday_completed = compact_partition(yesterday_ts)
|
||||
ps.reload
|
||||
if yesterday_completed && ps.settings[:max_log_ids][yesterday_ts.wday] >= yesterday_model.maximum(:id)
|
||||
compaction_state = self.metadatum_payload
|
||||
# TODO: once iterator state is fully transitioned to the metadatum, we can remove
|
||||
# the PluginSetting "settings" state entirely.
|
||||
max_yesterday_id = [compaction_state[:max_log_ids][yesterday_ts.wday], ps.settings[:max_log_ids][yesterday_ts.wday]].max
|
||||
if yesterday_completed && max_yesterday_id >= yesterday_model.maximum(:id)
|
||||
# we have now compacted all the writes from the previous day.
|
||||
# since the timestamp (now) is into the NEXT utc day, no further
|
||||
# writes can happen to yesterdays partition, and we can truncate it,
|
||||
|
@ -134,14 +147,20 @@ class AssetUserAccessLog
|
|||
# "reset" by looking for the max id value in a table and making it bigger than that.
|
||||
# Tracking iterator state indefinitely could result in missing writes if a truncated
|
||||
# table gets it's iterator reset).
|
||||
yesterday_model.transaction do
|
||||
if truncation_enabled?
|
||||
Shackles.activate(:deploy) do
|
||||
yesterday_model.connection.truncate(yesterday_model.table_name)
|
||||
end
|
||||
# TODO: once iterator state is fully transitioned to the metadatum, we can remove
|
||||
# the PluginSetting "settings" state entirely.
|
||||
PluginSetting.suspend_callbacks(:clear_cache) do
|
||||
ps.settings[:max_log_ids][yesterday_ts.wday] = 0
|
||||
ps.save
|
||||
end
|
||||
compaction_state[:max_log_ids][yesterday_ts.wday] = 0
|
||||
self.update_metadatum(compaction_state)
|
||||
end
|
||||
end
|
||||
end
|
||||
return AssetUserAccessLog.reschedule! unless yesterday_completed
|
||||
|
@ -184,7 +203,7 @@ class AssetUserAccessLog
|
|||
# "just a few more"
|
||||
partition_upper_bound = partition_model.maximum(:id)
|
||||
partition_lower_bound = partition_model.minimum(:id)
|
||||
# fetch from the plugin setting the last compacted log id. This lets us
|
||||
# fetch from the canvas metadatum compaction state the last compacted log id. This lets us
|
||||
# resume log compaction past the records we've already processed, but without
|
||||
# having to delete records as we go (which would churn write IO), leaving the log cleanup
|
||||
# to the truncation operation that occurs after finally processing "yesterdays" partition.
|
||||
|
@ -193,7 +212,11 @@ class AssetUserAccessLog
|
|||
# deciding we already chomped these logs).
|
||||
ps = plugin_setting
|
||||
max_log_ids = ps.reload.settings.fetch(:max_log_ids, [0,0,0,0,0,0,0])
|
||||
log_id_bookmark = [(partition_lower_bound-1), (max_log_ids[ts.wday] || 0)].max
|
||||
# TODO: once iterator state is fully transitioned to the metadatum, we can remove
|
||||
# the PluginSetting "settings" state entirely.
|
||||
compaction_state = self.metadatum_payload
|
||||
state_max_log_ids = compaction_state.fetch(:max_log_ids, [0,0,0,0,0,0,0])
|
||||
log_id_bookmark = [(partition_lower_bound-1), (max_log_ids[ts.wday] || 0), (state_max_log_ids[ts.wday])].max
|
||||
while log_id_bookmark < partition_upper_bound
|
||||
Rails.logger.info("[AUA_LOG_COMPACTION:#{Shard.current.id}] - processing #{log_id_bookmark} from #{partition_upper_bound}")
|
||||
# maybe we won't need this, but if we need to slow down throughput and don't want to hold
|
||||
|
@ -217,17 +240,21 @@ class AssetUserAccessLog
|
|||
Rails.logger.info("[AUA_LOG_COMPACTION:#{Shard.current.id}] - batch updating (sometimes these queries don't get logged)...")
|
||||
partition_model.connection.execute(update_query)
|
||||
Rails.logger.info("[AUA_LOG_COMPACTION:#{Shard.current.id}] - ...batch update complete")
|
||||
# Here we want to write the iteration state onto the plugin setting
|
||||
# Here we want to write the iteration state into the database
|
||||
# so that we don't double count rows later. The next time the job
|
||||
# runs it can pick up at this point and only count rows that haven't yet been counted.
|
||||
ps.reload
|
||||
# also, no need to clear the cache, these are LOCAL
|
||||
# plugin settings, so let's not beat up consul
|
||||
# TODO: once iterator state is fully transitioned to the metadatum, we can remove
|
||||
# the PluginSetting "settings" state entirely.
|
||||
PluginSetting.suspend_callbacks(:clear_cache) do
|
||||
ps.settings[:max_log_ids] ||= [0,0,0,0,0,0,0] # (just in case...)
|
||||
ps.settings[:max_log_ids][ts.wday] = new_iterator_pos
|
||||
ps.save
|
||||
end
|
||||
compaction_state[:max_log_ids][ts.wday] = new_iterator_pos
|
||||
self.update_metadatum(compaction_state)
|
||||
end
|
||||
end
|
||||
log_id_bookmark = new_iterator_pos
|
||||
|
@ -260,10 +287,16 @@ class AssetUserAccessLog
|
|||
# to just under it's ID
|
||||
new_bookmark_id = next_id - 1
|
||||
Shackles.activate(:master) do
|
||||
partition_model.transaction do
|
||||
# TODO: once iterator state is fully transitioned to the metadatum, we can remove
|
||||
# the PluginSetting "settings" state entirely.
|
||||
PluginSetting.suspend_callbacks(:clear_cache) do
|
||||
ps.reload.settings[:max_log_ids][ts.wday] = new_bookmark_id
|
||||
ps.save
|
||||
end
|
||||
compaction_state[:max_log_ids][ts.wday] = new_bookmark_id
|
||||
self.update_metadatum(compaction_state)
|
||||
end
|
||||
end
|
||||
log_id_bookmark = new_bookmark_id
|
||||
end
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
#
|
||||
# Copyright (C) 2020 - present Instructure, Inc.
|
||||
#
|
||||
# This file is part of Canvas.
|
||||
#
|
||||
# Canvas is free software: you can redistribute it and/or modify it under
|
||||
# the terms of the GNU Affero General Public License as published by the Free
|
||||
# Software Foundation, version 3 of the License.
|
||||
#
|
||||
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License along
|
||||
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
|
||||
class CanvasMetadatum < ActiveRecord::Base
|
||||
class MetadataArgumentError < ArgumentError; end
|
||||
# The Metadatum class is intended to be a place for storing
|
||||
# bits of state the are not really part of the canvas data itself.
|
||||
# An example of a good usecase would be processing state
|
||||
# for internal delayed operations (see AssetUserAccessLog).
|
||||
#
|
||||
# Although "Setting" or config information could be
|
||||
# stored in the table, this isn't very heavily cached
|
||||
# (intentionally, because the current use case is wanting
|
||||
# current read-and-write operations keep state for a logical
|
||||
# process).
|
||||
#
|
||||
# If you want to store something in here that is going to be
|
||||
# read-heavy, consider adding a caching path like what's in the
|
||||
# Setting class and allowing consumers to specify whether they want
|
||||
# it or not.
|
||||
self.table_name = "canvas_metadata"
|
||||
|
||||
def self.get(key, default={})
|
||||
raise MetadataArgumentError, "default payload should be a hash: #{default}" unless default.is_a?(Hash)
|
||||
object = CanvasMetadatum.where(key: key).take
|
||||
(object&.payload || default).with_indifferent_access
|
||||
end
|
||||
|
||||
# this payload will be stored as a jsonb document,
|
||||
# so it expects you're passing it a hash. If we
|
||||
# have other usecases later we can relax the requirement,
|
||||
# but let's be strict as long as this is precisely what
|
||||
# we expect.
|
||||
def self.set(key, payload)
|
||||
raise MetadataArgumentError, "payload should be a hash: #{payload}" unless payload.is_a?(Hash)
|
||||
object = CanvasMetadatum.find_or_initialize_by(key: key)
|
||||
object.payload = payload
|
||||
object.save!
|
||||
end
|
||||
end
|
|
@ -0,0 +1,12 @@
|
|||
class CreateCanvasMetadata < ActiveRecord::Migration[5.2]
|
||||
tag :predeploy
|
||||
|
||||
def change
|
||||
create_table :canvas_metadata do |t|
|
||||
t.string :key, null: false
|
||||
t.jsonb :payload, null: false
|
||||
t.timestamps
|
||||
end
|
||||
add_index :canvas_metadata, :key, unique: true
|
||||
end
|
||||
end
|
|
@ -96,6 +96,17 @@ describe AssetUserAccessLog do
|
|||
end
|
||||
end
|
||||
|
||||
it "writes iterator state properly" do
|
||||
expect(@asset_1.view_score).to be_nil
|
||||
Timecop.freeze do
|
||||
generate_log([@asset_1, @asset_2, @asset_3], 100)
|
||||
AssetUserAccessLog.compact
|
||||
partition_model = AssetUserAccessLog.log_model(Time.zone.now)
|
||||
compaction_state = AssetUserAccessLog.metadatum_payload
|
||||
expect(compaction_state[:max_log_ids].max).to eq(partition_model.maximum(:id))
|
||||
end
|
||||
end
|
||||
|
||||
it "truncates yesterday after compacting changes" do
|
||||
Timecop.freeze do
|
||||
Timecop.travel(24.hours.ago) do
|
||||
|
|
|
@ -0,0 +1,43 @@
|
|||
#
|
||||
# Copyright (C) 2020 - present Instructure, Inc.
|
||||
#
|
||||
# This file is part of Canvas.
|
||||
#
|
||||
# Canvas is free software: you can redistribute it and/or modify it under
|
||||
# the terms of the GNU Affero General Public License as published by the Free
|
||||
# Software Foundation, version 3 of the License.
|
||||
#
|
||||
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
|
||||
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
|
||||
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
|
||||
# details.
|
||||
#
|
||||
# You should have received a copy of the GNU Affero General Public License along
|
||||
# with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
#
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
describe CanvasMetadatum do
|
||||
|
||||
describe "getting" do
|
||||
|
||||
it 'should get the default value as a hash' do
|
||||
expect(CanvasMetadatum.get('some_key', {state: 23})[:state]).to eq 23
|
||||
end
|
||||
|
||||
it 'will not accept other forms of argument' do
|
||||
expect{ CanvasMetadatum.get('some_key', 'some value') }.to raise_error(CanvasMetadatum::MetadataArgumentError)
|
||||
end
|
||||
|
||||
it 'should return set values' do
|
||||
CanvasMetadatum.set('some_key', {int_val: 23, string_val: "asdf", array_val: [2,4,8,16], hash_val: {nested: "string_value"}})
|
||||
payload = CanvasMetadatum.get('some_key')
|
||||
expect(payload[:int_val]).to eq(23)
|
||||
expect(payload[:string_val]).to eq("asdf")
|
||||
expect(payload[:array_val]).to eq([2,4,8,16])
|
||||
expect(payload[:hash_val][:nested]).to eq("string_value")
|
||||
end
|
||||
end
|
||||
|
||||
end
|
Loading…
Reference in New Issue