deliver messages to all recipients in one job, closes #1
If any individual message delivery fails, we'll reschedule it as its own individual job. Change-Id: I51ae5941fd001c61e6c6b708185ff12585d0a49f Reviewed-on: https://gerrit.instructure.com/2390 Tested-by: Hudson <hudson@instructure.com> Reviewed-by: Bracken Mosbacker <bracken@instructure.com>
This commit is contained in:
parent
b86fb7fd83
commit
4115014102
|
@ -140,12 +140,12 @@ class Message < ActiveRecord::Base
|
|||
{ :conditions => {:workflow_state => state.to_s } }
|
||||
end
|
||||
}
|
||||
|
||||
|
||||
workflow do
|
||||
state :created do
|
||||
event :stage, :transitions_to => :staged do
|
||||
self.dispatch_at = Time.now.utc + self.delay_for
|
||||
if self.to != 'dashboard'
|
||||
if self.to != 'dashboard' && !@stage_without_dispatch
|
||||
MessageDispatcher.dispatch(self)
|
||||
end
|
||||
end
|
||||
|
@ -199,6 +199,12 @@ class Message < ActiveRecord::Base
|
|||
end
|
||||
|
||||
end
|
||||
|
||||
# skip dispatching the message during the stage transition, useful when batch
|
||||
# dispatching.
|
||||
def stage_without_dispatch!
|
||||
@stage_without_dispatch = true
|
||||
end
|
||||
|
||||
# Sets a few defaults and gets it on its way to be dispatched.
|
||||
# The path: created -> staged -> sending -> sent
|
||||
|
|
|
@ -213,18 +213,20 @@ class Notification < ActiveRecord::Base
|
|||
end
|
||||
end
|
||||
@delayed_messages_to_save.each{|m| m.save! }
|
||||
|
||||
|
||||
messages.each{|m|
|
||||
if m.to != 'dashboard'
|
||||
m.save!
|
||||
elsif Notification.types_to_show_in_feed.include?(self.name)
|
||||
dashboard_messages, dispatch_messages = messages.partition { |m| m.to == 'dashboard' }
|
||||
|
||||
dashboard_messages.each do |m|
|
||||
if Notification.types_to_show_in_feed.include?(self.name)
|
||||
m.set_asset_context_code
|
||||
m.infer_defaults
|
||||
m.create_stream_items
|
||||
end
|
||||
}
|
||||
|
||||
end
|
||||
|
||||
dispatch_messages.each { |m| m.stage_without_dispatch!; m.save! }
|
||||
MessageDispatcher.batch_dispatch(dispatch_messages)
|
||||
|
||||
# re-set cached values
|
||||
@user_counts.each{|user_id, cnt| recent_messages_for_user(user_id, cnt) }
|
||||
|
||||
|
|
|
@ -23,9 +23,36 @@ class MessageDispatcher < Delayed::PerformableMethod
|
|||
:run_at => message.dispatch_at)
|
||||
end
|
||||
|
||||
def self.batch_dispatch(messages)
|
||||
return if messages.empty?
|
||||
|
||||
if messages.size == 1
|
||||
self.dispatch(messages.first)
|
||||
return
|
||||
end
|
||||
|
||||
dispatch_at = messages.first.dispatch_at
|
||||
|
||||
Delayed::Job.enqueue(self.new(self, :deliver_batch, [messages]),
|
||||
:run_at => messages.first.dispatch_at)
|
||||
end
|
||||
|
||||
# Called by delayed_job when a job fails to reschedule it.
|
||||
def reschedule_at(now, num_attempts)
|
||||
live_object.dispatch_at
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
def self.deliver_batch(messages)
|
||||
messages.each do |message|
|
||||
begin
|
||||
message.deliver
|
||||
rescue
|
||||
# this delivery failed, we'll have to make an individual job to retry
|
||||
self.dispatch(message)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
end
|
||||
|
|
|
@ -45,7 +45,7 @@ describe WikiPage do
|
|||
p.messages_sent.should_not be_empty
|
||||
p.messages_sent["Updated Wiki Page"].should_not be_nil
|
||||
p.messages_sent["Updated Wiki Page"].should_not be_empty
|
||||
p.messages_sent["Updated Wiki Page"].map(&:user).should be_include(@user) #[0].user.should eql(@user)
|
||||
p.messages_sent["Updated Wiki Page"].map(&:user).should be_include(@user)
|
||||
end
|
||||
|
||||
context "atom" do
|
||||
|
|
|
@ -108,8 +108,7 @@ module Instructure #:nodoc:
|
|||
to_list = Array[to_list].flatten
|
||||
n = DelayedNotification.send_later_if_production(:process, record, notification, (to_list || []).compact.map(&:asset_string))
|
||||
n ||= DelayedNotification.new(:asset => record, :notification => notification, :recipient_keys => (to_list || []).compact.map(&:asset_string))
|
||||
record.messages_sent[self.dispatch] = n
|
||||
if ENV['RAILS_ENV'] == 'test'
|
||||
if Rails.env.test?
|
||||
record.messages_sent[self.dispatch] = n.is_a?(DelayedNotification) ? n.process : n
|
||||
end
|
||||
n
|
||||
|
|
Loading…
Reference in New Issue