target the correct partition(s) for messages queries with ranges
test plan: * current specs exercise both queries Change-Id: I61d9f3bb6e6758fcc895332ccd9687a13b40b894 Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/245975 Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com> Reviewed-by: Simon Williams <simon@instructure.com> QA-Review: Cody Cutrer <cody@instructure.com> Product-Review: Cody Cutrer <cody@instructure.com>
This commit is contained in:
parent
428c279bee
commit
d805bba4d3
|
@ -47,9 +47,35 @@ class MessageDispatcher < Delayed::PerformableMethod
|
|||
|
||||
def self.deliver_batch(messages)
|
||||
if messages.first.is_a?(Message::Queued)
|
||||
times = messages.map(&:created_at).sort
|
||||
range_for_partition = (times.first)..(times.last)
|
||||
messages = Message.where(:id => messages.map(&:id), :created_at => range_for_partition).to_a
|
||||
queued = messages.sort_by(&:created_at)
|
||||
message_ids = []
|
||||
messages = []
|
||||
start_time = nil
|
||||
previous_time = nil
|
||||
current_partition = nil
|
||||
queued.each_with_index do |m, i|
|
||||
start_time ||= m.created_at
|
||||
previous_time ||= m.created_at
|
||||
partition = Message.infer_partition_table_name('created_at' => m.created_at)
|
||||
current_partition ||= partition
|
||||
|
||||
if partition != current_partition || i == queued.length - 1
|
||||
# catch the last item in the list, since there will be no lookback
|
||||
if i == queued.length - 1
|
||||
message_ids << m.id
|
||||
previous_time = m.created_at
|
||||
end
|
||||
range_for_partition = start_time..previous_time
|
||||
messages.concat(Message.in_partition('created_at' => start_time).where(id: message_ids, created_at: range_for_partition).to_a)
|
||||
message_ids = []
|
||||
start_time = m.created_at
|
||||
current_partition = partition
|
||||
end
|
||||
|
||||
message_ids << m.id
|
||||
previous_time = m.created_at
|
||||
end
|
||||
raise ActiveRecord::RecordNotFound unless messages.length == queued.length
|
||||
end
|
||||
messages.each do |message|
|
||||
begin
|
||||
|
|
|
@ -335,13 +335,39 @@ class NotificationMessageCreator
|
|||
end
|
||||
|
||||
def cancel_pending_duplicate_messages
|
||||
Message.where(:notification_id => @notification).
|
||||
for(@asset).
|
||||
by_name(@notification.name).
|
||||
for_user(@to_user_channels.keys).
|
||||
cancellable.
|
||||
where("created_at BETWEEN ? AND ?", Setting.get("pending_duplicate_message_window_hours", "6").to_i.hours.ago, Time.now.utc).
|
||||
update_all(:workflow_state => 'cancelled')
|
||||
first_start_time = start_time = Setting.get("pending_duplicate_message_window_hours", "6").to_i.hours.ago
|
||||
final_end_time = Time.now.utc
|
||||
first_partition = Message.infer_partition_table_name('created_at' => first_start_time)
|
||||
|
||||
loop do
|
||||
end_time = start_time + 7.days
|
||||
end_time = final_end_time if end_time > final_end_time
|
||||
scope = Message.
|
||||
in_partition('created_at' => start_time).
|
||||
where(:notification_id => @notification).
|
||||
for(@asset).
|
||||
by_name(@notification.name).
|
||||
for_user(@to_user_channels.keys).
|
||||
cancellable
|
||||
start_partition = Message.infer_partition_table_name('created_at' => start_time)
|
||||
end_partition = Message.infer_partition_table_name('created_at' => end_time)
|
||||
if first_partition == start_partition &&
|
||||
start_partition == end_partition
|
||||
Message.infer_partition_table_name('created_at' => end_time)
|
||||
scope = scope.where(created_at: start_time..end_time)
|
||||
break_this_loop = true
|
||||
elsif start_time == first_start_time
|
||||
scope = scope.where("created_at>=?", start_time)
|
||||
elsif start_partition == end_partition
|
||||
scope = scope.where("created_at<=?", end_time)
|
||||
break_this_loop = true
|
||||
# else <no conditions; we're addressing the entire partition>
|
||||
end
|
||||
scope.update_all(:workflow_state => 'cancelled')
|
||||
|
||||
break if break_this_loop
|
||||
start_time = end_time
|
||||
end
|
||||
end
|
||||
|
||||
def too_many_messages_for?(user)
|
||||
|
|
|
@ -718,4 +718,61 @@ describe NotificationMessageCreator do
|
|||
expect(@cc.notification_policies.reload.count).to eq 1
|
||||
end
|
||||
end
|
||||
|
||||
describe "#cancel_pending_duplicate_messages" do
|
||||
context "partitions" do
|
||||
let(:subject) { NotificationMessageCreator.new(double("notification", name: nil), nil) }
|
||||
|
||||
def set_up_stubs(start_time, *conditions)
|
||||
scope = double("Message Scope")
|
||||
expect(Message).to receive(:in_partition).ordered.with('created_at' => start_time).and_return(scope)
|
||||
expect(scope).to receive(:where).ordered.and_return(scope)
|
||||
expect(scope).to receive(:for).ordered.and_return(scope)
|
||||
expect(scope).to receive(:by_name).ordered.and_return(scope)
|
||||
expect(scope).to receive(:for_user).ordered.and_return(scope)
|
||||
expect(scope).to receive(:cancellable).ordered.and_return(scope)
|
||||
unless conditions.empty?
|
||||
expect(scope).to receive(:where).with(*conditions).ordered.and_return(scope)
|
||||
end
|
||||
expect(scope).to receive(:update_all).ordered
|
||||
end
|
||||
|
||||
it "targets a single partition by default" do
|
||||
now = Time.parse("2020-08-26 12:00:00UTC")
|
||||
Timecop.freeze(now) do
|
||||
set_up_stubs(now - 6.hours, created_at: (now - 6.hours)..now)
|
||||
subject.send(:cancel_pending_duplicate_messages)
|
||||
end
|
||||
# now verify the in_partition calls will result in what we expect
|
||||
expect(Message.infer_partition_table_name('created_at' => now - 6.hours)).to eq "messages_2020_35"
|
||||
end
|
||||
|
||||
it "targets both partitions if we cross the partition boundary" do
|
||||
now = Time.parse("2020-08-24 03:00:00UTC")
|
||||
Timecop.freeze(now) do
|
||||
set_up_stubs(now - 6.hours, "created_at>=?", now - 6.hours)
|
||||
set_up_stubs(now, "created_at<=?", now)
|
||||
subject.send(:cancel_pending_duplicate_messages)
|
||||
end
|
||||
# now verify the in_partition calls will result in what we expect
|
||||
expect(Message.infer_partition_table_name('created_at' => now - 6.hours)).to eq "messages_2020_34"
|
||||
expect(Message.infer_partition_table_name('created_at' => now)).to eq "messages_2020_35"
|
||||
end
|
||||
|
||||
it "targets 3 partitions if it's really long" do
|
||||
now = Time.parse("2020-08-24 03:00:00UTC")
|
||||
Setting.set("pending_duplicate_message_window_hours", 7 * 24 + 6)
|
||||
Timecop.freeze(now) do
|
||||
set_up_stubs(now - (7 * 24 + 6).hours, "created_at>=?", now - (7 * 24 + 6).hours)
|
||||
set_up_stubs(now - 6.hours)
|
||||
set_up_stubs(now, "created_at<=?", now)
|
||||
subject.send(:cancel_pending_duplicate_messages)
|
||||
end
|
||||
# now verify the in_partition calls will result in what we expect
|
||||
expect(Message.infer_partition_table_name('created_at' => now - (24 * 7 + 6).hours)).to eq "messages_2020_33"
|
||||
expect(Message.infer_partition_table_name('created_at' => now - 6.hours)).to eq "messages_2020_34"
|
||||
expect(Message.infer_partition_table_name('created_at' => now)).to eq "messages_2020_35"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
Loading…
Reference in New Issue