AUA compaction: write plugin updates to master
refs FOO-937 Change-Id: I0bd1cc91af9e56c2db9e30a55de36d1640f1560f Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/247094 Reviewed-by: Rob Orton <rob@instructure.com> Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com> QA-Review: Ethan Vizitei <evizitei@instructure.com> Product-Review: Ethan Vizitei <evizitei@instructure.com>
This commit is contained in:
parent
042cb1f72c
commit
28ca07badd
|
@ -115,12 +115,16 @@ class AssetUserAccessLog
|
|||
# to defer the writes for longer by slowing the processing down, which allows us to take
|
||||
# fewer writes at peak and use spare I/O capacity later in the day if necessary to catch up.
|
||||
def self.compact
|
||||
ps = plugin_setting
|
||||
if ps.nil?
|
||||
return Rails.logger.warn("[AUA_LOG_COMPACTION:#{Shard.current.id}] - PluginSetting nil, aborting")
|
||||
end
|
||||
ts = Time.now.utc
|
||||
yesterday_ts = ts - 1.day
|
||||
yesterday_model = log_model(yesterday_ts)
|
||||
if yesterday_model.take(1).size > 0
|
||||
yesterday_completed = compact_partition(yesterday_ts)
|
||||
ps = plugin_setting.reload
|
||||
ps.reload
|
||||
if yesterday_completed && ps.settings[:max_log_ids][yesterday_ts.wday] >= yesterday_model.maximum(:id)
|
||||
# we have now compacted all the writes from the previous day.
|
||||
# since the timestamp (now) is into the NEXT utc day, no further
|
||||
|
@ -199,26 +203,28 @@ class AssetUserAccessLog
|
|||
new_iterator_pos = log_segment_aggregation.map{|r| r["max_id"]}.max
|
||||
Shackles.activate(:master) do
|
||||
partition_model.connection.execute(update_query)
|
||||
end
|
||||
# Here we want to write the iteration state onto the plugin setting
|
||||
# so that we don't double count rows later. The next time the job
|
||||
# runs it can pick up at this point and only count rows that haven't yet been counted.
|
||||
ps.reload
|
||||
# also, no need to clear the cache, these are LOCAL
|
||||
# plugin settings, so let's not beat up consul
|
||||
PluginSetting.suspend_callbacks(:clear_cache) do
|
||||
ps.settings[:max_log_ids] ||= [0,0,0,0,0,0,0] # (just in case...)
|
||||
ps.settings[:max_log_ids][ts.wday] = new_iterator_pos
|
||||
ps.save
|
||||
# Here we want to write the iteration state onto the plugin setting
|
||||
# so that we don't double count rows later. The next time the job
|
||||
# runs it can pick up at this point and only count rows that haven't yet been counted.
|
||||
ps.reload
|
||||
# also, no need to clear the cache, these are LOCAL
|
||||
# plugin settings, so let's not beat up consul
|
||||
PluginSetting.suspend_callbacks(:clear_cache) do
|
||||
ps.settings[:max_log_ids] ||= [0,0,0,0,0,0,0] # (just in case...)
|
||||
ps.settings[:max_log_ids][ts.wday] = new_iterator_pos
|
||||
ps.save
|
||||
end
|
||||
end
|
||||
log_id_bookmark = new_iterator_pos
|
||||
sleep(intra_batch_pause) if intra_batch_pause > 0.0
|
||||
else
|
||||
# no records found in this range, we must be paging through an open segment
|
||||
# just keep going until the iterator catches up to real records.
|
||||
PluginSetting.suspend_callbacks(:clear_cache) do
|
||||
ps.reload.settings[:max_log_ids][ts.wday] = batch_upper_boundary
|
||||
ps.save
|
||||
Shackles.activate(:master) do
|
||||
PluginSetting.suspend_callbacks(:clear_cache) do
|
||||
ps.reload.settings[:max_log_ids][ts.wday] = batch_upper_boundary
|
||||
ps.save
|
||||
end
|
||||
end
|
||||
# we don't ever want to set the bookmark PAST the partition we expected
|
||||
# to consume
|
||||
|
|
|
@ -148,6 +148,13 @@ describe AssetUserAccessLog do
|
|||
end
|
||||
end
|
||||
end
|
||||
|
||||
it "aborts job immediately if plugin setting is nil" do
|
||||
ps = PluginSetting.where(name: "asset_user_access_logs").delete_all
|
||||
expect(AssetUserAccess).to_not receive(:compact_partition)
|
||||
AssetUserAccessLog.compact
|
||||
expect(@asset_1.reload.view_score).to be_nil
|
||||
end
|
||||
end
|
||||
|
||||
describe ".reschedule!" do
|
||||
|
|
Loading…
Reference in New Issue