Add notification failure processor

Using the new notification_service allows us to provide more specific
failure feedback to canvas.  When we enqueue a message to the
notification service, we pass along the canvas global message id.  If
the message fails to send, we enqueue a failure message to a
"notification_failure" sqs queue, and reference the global message id.
This allows us to write failure information off to the canvas message
object and put it into an error state.

Test Plan:
  * Start local fake_sqs environment

If using docker
`$ docker pull feathj/fake-sqs`
`$ docker run -it -p 9494:9494 -e VIRTUAL_HOST=sqs.docker feathj/fake-sqs`

If running native
`$ gem install fake_sqs`
`$ fake_sqs`

  * Create `<canvas>/config/notification_failures.yml` file and place
    the following in it:

If using docker
```
development:
  use_ssl: false
  sqs_endpoint: sqs.docker
  sqs_port: 9494
  access_key_id: access key id
  secret_access_key: secret access key
```

If running native
```
development:
  use_ssl: false
  sqs_endpoint: localhost
  sqs_port: 4568
  access_key_id: access key id
  secret_access_key: secret access key
```

  * Create a canvas message to put in error state
    * Login to canvas
    * Create new conversation message
    * Open rails console and confirm that message.state is not
      "transmission_error", also take note of message id
  * Start canvas jobs, from canvas-lms directory:
`$ bundle exec script/delayed_job run`

  * Manually enqueue failure message to fake_sqs
```
require 'yaml'
require 'aws-sdk'
require 'aws-sdk-core'
require 'aws-sdk-resources'
require 'aws-sdk-v1'

client = AWS::SQS::Client.new(
  use_ssl: false,
  sqs_endpoint: '<YOUR_SQS_HOST>',
  sqs_port: <YOUR_SQS_PORT>,
  access_key_id: 'access key id',
  secret_access_key: 'secret access key'
)
client.create_queue(queue_name: 'notification-service-failures') rescue
nil
queue_url = client
  .list_queues[:queue_urls]
  .reject { |queue| /dead/i.match(queue) }
  .detect { |queue| /notification-service-failures/.match(queue) }

puts queue_url
puts client.send_message(queue_url: queue_url, message_body: {
  'global_id' => <YOUR_MESSAGE_ID>,
  'error' => 'the message failed to send amigo'
}.to_json)
```
  * Verify that message is state is set to "transmission_error" and the
    transmission_errors field has your error message

closes CNVS-26442

Change-Id: Ic379142727d4e186ae3032241caca1b1e4c5e074
Reviewed-on: https://gerrit.instructure.com/70447
Reviewed-by: Christina Wuest <cwuest@instructure.com>
Reviewed-by: Steven Burnett <sburnett@instructure.com>
Tested-by: Jenkins
QA-Review: Heath Hales <hhales@instructure.com>
Product-Review: Jonathan Featherstone <jfeatherstone@instructure.com>
This commit is contained in:
Jonathan Featherstone 2016-01-15 17:03:06 -07:00
parent fe4a9d74ca
commit 0db9ae3260
6 changed files with 175 additions and 2 deletions

View File

@ -84,12 +84,14 @@ class Message < ActiveRecord::Base
MessageDispatcher.dispatch(self) MessageDispatcher.dispatch(self)
end end
end end
event :set_transmission_error, :transitions_to => :transmission_error
event :cancel, :transitions_to => :cancelled event :cancel, :transitions_to => :cancelled
event :close, :transitions_to => :closed # needed for dashboard messages event :close, :transitions_to => :closed # needed for dashboard messages
end end
state :staged do state :staged do
event :dispatch, :transitions_to => :sending event :dispatch, :transitions_to => :sending
event :set_transmission_error, :transitions_to => :transmission_error
event :cancel, :transitions_to => :cancelled event :cancel, :transitions_to => :cancelled
event :close, :transitions_to => :closed # needed for dashboard messages event :close, :transitions_to => :closed # needed for dashboard messages
end end
@ -98,6 +100,7 @@ class Message < ActiveRecord::Base
event :complete_dispatch, :transitions_to => :sent do event :complete_dispatch, :transitions_to => :sent do
self.sent_at ||= Time.now self.sent_at ||= Time.now
end end
event :set_transmission_error, :transitions_to => :transmission_error
event :cancel, :transitions_to => :cancelled event :cancel, :transitions_to => :cancelled
event :close, :transitions_to => :closed event :close, :transitions_to => :closed
event :errored_dispatch, :transitions_to => :staged do event :errored_dispatch, :transitions_to => :staged do
@ -107,6 +110,7 @@ class Message < ActiveRecord::Base
end end
state :sent do state :sent do
event :set_transmission_error, :transitions_to => :transmission_error
event :close, :transitions_to => :closed event :close, :transitions_to => :closed
event :bounce, :transitions_to => :bounced do event :bounce, :transitions_to => :bounced do
# Permenant reminder that this bounced. # Permenant reminder that this bounced.
@ -122,12 +126,19 @@ class Message < ActiveRecord::Base
end end
state :dashboard do state :dashboard do
event :set_transmission_error, :transitions_to => :transmission_error
event :close, :transitions_to => :closed event :close, :transitions_to => :closed
event :cancel, :transitions_to => :closed event :cancel, :transitions_to => :closed
end end
state :cancelled state :cancelled
state :transmission_error do
event :close, :transitions_to => :closed
end
state :closed do state :closed do
event :set_transmission_error, :transitions_to => :transmission_error
event :send_message, :transitions_to => :closed do event :send_message, :transitions_to => :closed do
self.sent_at ||= Time.now self.sent_at ||= Time.now
end end
@ -557,7 +568,7 @@ class Message < ActiveRecord::Base
def enqueue_to_sqs def enqueue_to_sqs
message_body = path_type == "email" ? Mailer.create_message(self).to_s : body message_body = path_type == "email" ? Mailer.create_message(self).to_s : body
NotificationService.process(message_body, path_type, to, remote_configuration) NotificationService.process(global_id, message_body, path_type, to, remote_configuration)
complete_dispatch complete_dispatch
rescue AWS::SQS::Errors::ServiceError => e rescue AWS::SQS::Errors::ServiceError => e
Canvas::Errors.capture( Canvas::Errors.capture(

View File

@ -0,0 +1,84 @@
#
# Copyright (C) 2016 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require 'aws-sdk'
class ConfigurationMissingError < StandardError; end
class NotificationFailureProcessor
attr_reader :config
POLL_PARAMS = %i(initial_timeout idle_timeout wait_time_seconds visibility_timeout).freeze
DEFAULT_CONFIG = {
notification_failure_queue_name: 'notification-service-failures',
idle_timeout: 10
}.freeze
def self.config
return @config if instance_variable_defined?(:@config)
@config = ConfigFile.load('notification_failures').try(:symbolize_keys)
end
class << self
alias_method :enabled?, :config
end
def self.process(config = self.config)
new(config).process
end
def initialize(config = self.class.config)
raise ConfigurationMissingError unless self.class.enabled? || config
@config = DEFAULT_CONFIG.merge(config)
end
def process
notification_failure_queue.poll(config.slice(*POLL_PARAMS)) do |message|
failure_notification = parse_failure_notification(message)
process_failure_notification(failure_notification) if failure_notification
end
end
private
def parse_failure_notification(message)
JSON.parse(message.body)
rescue JSON::ParserError
nil
end
def process_failure_notification(notification)
global_id = notification['global_id']
error_message = notification['error']
message = global_message(global_id)
message.set_transmission_error if message
message.transmission_errors = error_message if message && error_message
message.save!
end
def notification_failure_queue
return @notification_failure_queue if defined?(@notification_failure_queue)
sqs = AWS::SQS.new(config)
@notification_failure_queue = sqs.queues.named(config[:notification_failure_queue_name])
end
def global_message(global_id)
Message.find(global_id)
end
end

View File

@ -20,8 +20,9 @@ require 'aws-sdk'
class NotificationService class NotificationService
def self.process(body, type, to, remote) def self.process(global_id, body, type, to, remote)
self.notification_queue.send_message({ self.notification_queue.send_message({
'global_id' => global_id,
'type' => type, 'type' => type,
'delivery' => { 'remote' => remote }, 'delivery' => { 'remote' => remote },
'message' => body, 'message' => body,

View File

@ -117,6 +117,12 @@ Rails.configuration.after_initialize do
end end
end end
if NotificationFailureProcessor.enabled?
Delayed::Periodic.cron 'NotificationFailureProcessor.process', '*/5 * * * *' do
NotificationFailureProcessor.process
end
end
Delayed::Periodic.cron 'Quizzes::QuizSubmissionEventPartitioner.process', '0 0 * * *' do Delayed::Periodic.cron 'Quizzes::QuizSubmissionEventPartitioner.process', '0 0 * * *' do
with_each_shard_by_database(Quizzes::QuizSubmissionEventPartitioner, :process) with_each_shard_by_database(Quizzes::QuizSubmissionEventPartitioner, :process)
end end

View File

@ -0,0 +1,7 @@
development:
access_key_id: access_key
secret_access_key: secret_key
# notification_failure_queue_name: notification_service_failures
# idle_timeout: 10
# You can also specify the following values to be passed into the sqs queue's
# poll command: initial_timeout, wait_time_seconds, visibility_timeout

View File

@ -0,0 +1,64 @@
#
# Copyright (C) 2016 Instructure, Inc.
#
# This file is part of Canvas.
#
# Canvas is free software: you can redistribute it and/or modify it under
# the terms of the GNU Affero General Public License as published by the Free
# Software Foundation, version 3 of the License.
#
# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY
# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more
# details.
#
# You should have received a copy of the GNU Affero General Public License along
# with this program. If not, see <http://www.gnu.org/licenses/>.
#
require File.expand_path(File.dirname(__FILE__) + '/../spec_helper.rb')
require File.expand_path(File.dirname(__FILE__) + '/../messages/messages_helper')
describe NotificationFailureProcessor do
before(:once) do
user_model
@au = tie_user_to_account(@user, account: account_model)
@message = generate_message(:account_user_notification, :email, @au, user: @user)
@failure_messages = [
{
global_id: 5000,
error: 'Error from mail system'
},
{
global_id: 5001,
error: 'Error from SNS system'
}
]
end
def mock_message(obj)
message = mock
message.stubs(:body).returns(obj.to_json)
message
end
def mock_queue
queue = mock
queue.expects(:poll).multiple_yields(*@failure_messages.map { |m| mock_message(m) })
queue
end
describe '.process' do
it 'puts message into error state' do
expect(@message.state).to_not eq(:error)
expect(@message.transmission_errors).to be_blank
nfp = NotificationFailureProcessor.new(access_key: 'key', secret_access_key: 'secret')
nfp.stubs(:global_message).returns(@message)
nfp.stubs(:notification_failure_queue).returns(mock_queue)
nfp.process
expect(@message.state).to eq(:transmission_error)
expect(@message.transmission_errors).to_not be_blank
end
end
end