diff --git a/lib/message_dispatcher.rb b/lib/message_dispatcher.rb index 8f569075576..9621358bd42 100644 --- a/lib/message_dispatcher.rb +++ b/lib/message_dispatcher.rb @@ -47,9 +47,35 @@ class MessageDispatcher < Delayed::PerformableMethod def self.deliver_batch(messages) if messages.first.is_a?(Message::Queued) - times = messages.map(&:created_at).sort - range_for_partition = (times.first)..(times.last) - messages = Message.where(:id => messages.map(&:id), :created_at => range_for_partition).to_a + queued = messages.sort_by(&:created_at) + message_ids = [] + messages = [] + start_time = nil + previous_time = nil + current_partition = nil + queued.each_with_index do |m, i| + start_time ||= m.created_at + previous_time ||= m.created_at + partition = Message.infer_partition_table_name('created_at' => m.created_at) + current_partition ||= partition + + if partition != current_partition || i == queued.length - 1 + # catch the last item in the list, since there will be no lookback + if i == queued.length - 1 + message_ids << m.id + previous_time = m.created_at + end + range_for_partition = start_time..previous_time + messages.concat(Message.in_partition('created_at' => start_time).where(id: message_ids, created_at: range_for_partition).to_a) + message_ids = [] + start_time = m.created_at + current_partition = partition + end + + message_ids << m.id + previous_time = m.created_at + end + raise ActiveRecord::RecordNotFound unless messages.length == queued.length end messages.each do |message| begin diff --git a/lib/notification_message_creator.rb b/lib/notification_message_creator.rb index 335b669282e..43da0852ae4 100644 --- a/lib/notification_message_creator.rb +++ b/lib/notification_message_creator.rb @@ -335,13 +335,39 @@ class NotificationMessageCreator end def cancel_pending_duplicate_messages - Message.where(:notification_id => @notification). - for(@asset). - by_name(@notification.name). - for_user(@to_user_channels.keys). - cancellable. - where("created_at BETWEEN ? AND ?", Setting.get("pending_duplicate_message_window_hours", "6").to_i.hours.ago, Time.now.utc). - update_all(:workflow_state => 'cancelled') + first_start_time = start_time = Setting.get("pending_duplicate_message_window_hours", "6").to_i.hours.ago + final_end_time = Time.now.utc + first_partition = Message.infer_partition_table_name('created_at' => first_start_time) + + loop do + end_time = start_time + 7.days + end_time = final_end_time if end_time > final_end_time + scope = Message. + in_partition('created_at' => start_time). + where(:notification_id => @notification). + for(@asset). + by_name(@notification.name). + for_user(@to_user_channels.keys). + cancellable + start_partition = Message.infer_partition_table_name('created_at' => start_time) + end_partition = Message.infer_partition_table_name('created_at' => end_time) + if first_partition == start_partition && + start_partition == end_partition + Message.infer_partition_table_name('created_at' => end_time) + scope = scope.where(created_at: start_time..end_time) + break_this_loop = true + elsif start_time == first_start_time + scope = scope.where("created_at>=?", start_time) + elsif start_partition == end_partition + scope = scope.where("created_at<=?", end_time) + break_this_loop = true + # else + end + scope.update_all(:workflow_state => 'cancelled') + + break if break_this_loop + start_time = end_time + end end def too_many_messages_for?(user) diff --git a/spec/lib/notification_message_creator_spec.rb b/spec/lib/notification_message_creator_spec.rb index 517ac1248d5..99081a33f64 100644 --- a/spec/lib/notification_message_creator_spec.rb +++ b/spec/lib/notification_message_creator_spec.rb @@ -718,4 +718,61 @@ describe NotificationMessageCreator do expect(@cc.notification_policies.reload.count).to eq 1 end end + + describe "#cancel_pending_duplicate_messages" do + context "partitions" do + let(:subject) { NotificationMessageCreator.new(double("notification", name: nil), nil) } + + def set_up_stubs(start_time, *conditions) + scope = double("Message Scope") + expect(Message).to receive(:in_partition).ordered.with('created_at' => start_time).and_return(scope) + expect(scope).to receive(:where).ordered.and_return(scope) + expect(scope).to receive(:for).ordered.and_return(scope) + expect(scope).to receive(:by_name).ordered.and_return(scope) + expect(scope).to receive(:for_user).ordered.and_return(scope) + expect(scope).to receive(:cancellable).ordered.and_return(scope) + unless conditions.empty? + expect(scope).to receive(:where).with(*conditions).ordered.and_return(scope) + end + expect(scope).to receive(:update_all).ordered + end + + it "targets a single partition by default" do + now = Time.parse("2020-08-26 12:00:00UTC") + Timecop.freeze(now) do + set_up_stubs(now - 6.hours, created_at: (now - 6.hours)..now) + subject.send(:cancel_pending_duplicate_messages) + end + # now verify the in_partition calls will result in what we expect + expect(Message.infer_partition_table_name('created_at' => now - 6.hours)).to eq "messages_2020_35" + end + + it "targets both partitions if we cross the partition boundary" do + now = Time.parse("2020-08-24 03:00:00UTC") + Timecop.freeze(now) do + set_up_stubs(now - 6.hours, "created_at>=?", now - 6.hours) + set_up_stubs(now, "created_at<=?", now) + subject.send(:cancel_pending_duplicate_messages) + end + # now verify the in_partition calls will result in what we expect + expect(Message.infer_partition_table_name('created_at' => now - 6.hours)).to eq "messages_2020_34" + expect(Message.infer_partition_table_name('created_at' => now)).to eq "messages_2020_35" + end + + it "targets 3 partitions if it's really long" do + now = Time.parse("2020-08-24 03:00:00UTC") + Setting.set("pending_duplicate_message_window_hours", 7 * 24 + 6) + Timecop.freeze(now) do + set_up_stubs(now - (7 * 24 + 6).hours, "created_at>=?", now - (7 * 24 + 6).hours) + set_up_stubs(now - 6.hours) + set_up_stubs(now, "created_at<=?", now) + subject.send(:cancel_pending_duplicate_messages) + end + # now verify the in_partition calls will result in what we expect + expect(Message.infer_partition_table_name('created_at' => now - (24 * 7 + 6).hours)).to eq "messages_2020_33" + expect(Message.infer_partition_table_name('created_at' => now - 6.hours)).to eq "messages_2020_34" + expect(Message.infer_partition_table_name('created_at' => now)).to eq "messages_2020_35" + end + end + end end