From 0db9ae3260beae5cf2f1a3874c17ca74ca19835f Mon Sep 17 00:00:00 2001 From: Jonathan Featherstone Date: Fri, 15 Jan 2016 17:03:06 -0700 Subject: [PATCH] Add notification failure processor Using the new notification_service allows us to provide more specific failure feedback to canvas. When we enqueue a message to the notification service, we pass along the canvas global message id. If the message fails to send, we enqueue a failure message to a "notification_failure" sqs queue, and reference the global message id. This allows us to write failure information off to the canvas message object and put it into an error state. Test Plan: * Start local fake_sqs environment If using docker `$ docker pull feathj/fake-sqs` `$ docker run -it -p 9494:9494 -e VIRTUAL_HOST=sqs.docker feathj/fake-sqs` If running native `$ gem install fake_sqs` `$ fake_sqs` * Create `/config/notification_failures.yml` file and place the following in it: If using docker ``` development: use_ssl: false sqs_endpoint: sqs.docker sqs_port: 9494 access_key_id: access key id secret_access_key: secret access key ``` If running native ``` development: use_ssl: false sqs_endpoint: localhost sqs_port: 4568 access_key_id: access key id secret_access_key: secret access key ``` * Create a canvas message to put in error state * Login to canvas * Create new conversation message * Open rails console and confirm that message.state is not "transmission_error", also take note of message id * Start canvas jobs, from canvas-lms directory: `$ bundle exec script/delayed_job run` * Manually enqueue failure message to fake_sqs ``` require 'yaml' require 'aws-sdk' require 'aws-sdk-core' require 'aws-sdk-resources' require 'aws-sdk-v1' client = AWS::SQS::Client.new( use_ssl: false, sqs_endpoint: '', sqs_port: , access_key_id: 'access key id', secret_access_key: 'secret access key' ) client.create_queue(queue_name: 'notification-service-failures') rescue nil queue_url = client .list_queues[:queue_urls] .reject { |queue| /dead/i.match(queue) } .detect { |queue| /notification-service-failures/.match(queue) } puts queue_url puts client.send_message(queue_url: queue_url, message_body: { 'global_id' => , 'error' => 'the message failed to send amigo' }.to_json) ``` * Verify that message is state is set to "transmission_error" and the transmission_errors field has your error message closes CNVS-26442 Change-Id: Ic379142727d4e186ae3032241caca1b1e4c5e074 Reviewed-on: https://gerrit.instructure.com/70447 Reviewed-by: Christina Wuest Reviewed-by: Steven Burnett Tested-by: Jenkins QA-Review: Heath Hales Product-Review: Jonathan Featherstone --- app/models/message.rb | 13 ++- app/models/notification_failure_processor.rb | 84 +++++++++++++++++++ app/models/notification_service.rb | 3 +- config/initializers/periodic_jobs.rb | 6 ++ config/notification_failures.yml.example | 7 ++ .../notification_failure_processor_spec.rb | 64 ++++++++++++++ 6 files changed, 175 insertions(+), 2 deletions(-) create mode 100644 app/models/notification_failure_processor.rb create mode 100644 config/notification_failures.yml.example create mode 100644 spec/models/notification_failure_processor_spec.rb diff --git a/app/models/message.rb b/app/models/message.rb index c9f170ade28..66ecd882a34 100644 --- a/app/models/message.rb +++ b/app/models/message.rb @@ -84,12 +84,14 @@ class Message < ActiveRecord::Base MessageDispatcher.dispatch(self) end end + event :set_transmission_error, :transitions_to => :transmission_error event :cancel, :transitions_to => :cancelled event :close, :transitions_to => :closed # needed for dashboard messages end state :staged do event :dispatch, :transitions_to => :sending + event :set_transmission_error, :transitions_to => :transmission_error event :cancel, :transitions_to => :cancelled event :close, :transitions_to => :closed # needed for dashboard messages end @@ -98,6 +100,7 @@ class Message < ActiveRecord::Base event :complete_dispatch, :transitions_to => :sent do self.sent_at ||= Time.now end + event :set_transmission_error, :transitions_to => :transmission_error event :cancel, :transitions_to => :cancelled event :close, :transitions_to => :closed event :errored_dispatch, :transitions_to => :staged do @@ -107,6 +110,7 @@ class Message < ActiveRecord::Base end state :sent do + event :set_transmission_error, :transitions_to => :transmission_error event :close, :transitions_to => :closed event :bounce, :transitions_to => :bounced do # Permenant reminder that this bounced. @@ -122,12 +126,19 @@ class Message < ActiveRecord::Base end state :dashboard do + event :set_transmission_error, :transitions_to => :transmission_error event :close, :transitions_to => :closed event :cancel, :transitions_to => :closed end + state :cancelled + state :transmission_error do + event :close, :transitions_to => :closed + end + state :closed do + event :set_transmission_error, :transitions_to => :transmission_error event :send_message, :transitions_to => :closed do self.sent_at ||= Time.now end @@ -557,7 +568,7 @@ class Message < ActiveRecord::Base def enqueue_to_sqs message_body = path_type == "email" ? Mailer.create_message(self).to_s : body - NotificationService.process(message_body, path_type, to, remote_configuration) + NotificationService.process(global_id, message_body, path_type, to, remote_configuration) complete_dispatch rescue AWS::SQS::Errors::ServiceError => e Canvas::Errors.capture( diff --git a/app/models/notification_failure_processor.rb b/app/models/notification_failure_processor.rb new file mode 100644 index 00000000000..1756787f337 --- /dev/null +++ b/app/models/notification_failure_processor.rb @@ -0,0 +1,84 @@ +# +# Copyright (C) 2016 Instructure, Inc. +# +# This file is part of Canvas. +# +# Canvas is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, version 3 of the License. +# +# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see . +# + +require 'aws-sdk' + +class ConfigurationMissingError < StandardError; end + +class NotificationFailureProcessor + attr_reader :config + + POLL_PARAMS = %i(initial_timeout idle_timeout wait_time_seconds visibility_timeout).freeze + DEFAULT_CONFIG = { + notification_failure_queue_name: 'notification-service-failures', + idle_timeout: 10 + }.freeze + + def self.config + return @config if instance_variable_defined?(:@config) + @config = ConfigFile.load('notification_failures').try(:symbolize_keys) + end + + class << self + alias_method :enabled?, :config + end + + def self.process(config = self.config) + new(config).process + end + + def initialize(config = self.class.config) + raise ConfigurationMissingError unless self.class.enabled? || config + @config = DEFAULT_CONFIG.merge(config) + end + + def process + notification_failure_queue.poll(config.slice(*POLL_PARAMS)) do |message| + failure_notification = parse_failure_notification(message) + process_failure_notification(failure_notification) if failure_notification + end + end + + private + + def parse_failure_notification(message) + JSON.parse(message.body) + rescue JSON::ParserError + nil + end + + def process_failure_notification(notification) + global_id = notification['global_id'] + error_message = notification['error'] + message = global_message(global_id) + + message.set_transmission_error if message + message.transmission_errors = error_message if message && error_message + message.save! + end + + def notification_failure_queue + return @notification_failure_queue if defined?(@notification_failure_queue) + sqs = AWS::SQS.new(config) + @notification_failure_queue = sqs.queues.named(config[:notification_failure_queue_name]) + end + + def global_message(global_id) + Message.find(global_id) + end +end \ No newline at end of file diff --git a/app/models/notification_service.rb b/app/models/notification_service.rb index 80a17a8ed5d..90bb5859674 100644 --- a/app/models/notification_service.rb +++ b/app/models/notification_service.rb @@ -20,8 +20,9 @@ require 'aws-sdk' class NotificationService - def self.process(body, type, to, remote) + def self.process(global_id, body, type, to, remote) self.notification_queue.send_message({ + 'global_id' => global_id, 'type' => type, 'delivery' => { 'remote' => remote }, 'message' => body, diff --git a/config/initializers/periodic_jobs.rb b/config/initializers/periodic_jobs.rb index 06046b50340..e7b0e2e6182 100644 --- a/config/initializers/periodic_jobs.rb +++ b/config/initializers/periodic_jobs.rb @@ -117,6 +117,12 @@ Rails.configuration.after_initialize do end end + if NotificationFailureProcessor.enabled? + Delayed::Periodic.cron 'NotificationFailureProcessor.process', '*/5 * * * *' do + NotificationFailureProcessor.process + end + end + Delayed::Periodic.cron 'Quizzes::QuizSubmissionEventPartitioner.process', '0 0 * * *' do with_each_shard_by_database(Quizzes::QuizSubmissionEventPartitioner, :process) end diff --git a/config/notification_failures.yml.example b/config/notification_failures.yml.example new file mode 100644 index 00000000000..6ecf28e8c0f --- /dev/null +++ b/config/notification_failures.yml.example @@ -0,0 +1,7 @@ +development: + access_key_id: access_key + secret_access_key: secret_key + # notification_failure_queue_name: notification_service_failures + # idle_timeout: 10 + # You can also specify the following values to be passed into the sqs queue's + # poll command: initial_timeout, wait_time_seconds, visibility_timeout \ No newline at end of file diff --git a/spec/models/notification_failure_processor_spec.rb b/spec/models/notification_failure_processor_spec.rb new file mode 100644 index 00000000000..144f75f683f --- /dev/null +++ b/spec/models/notification_failure_processor_spec.rb @@ -0,0 +1,64 @@ +# +# Copyright (C) 2016 Instructure, Inc. +# +# This file is part of Canvas. +# +# Canvas is free software: you can redistribute it and/or modify it under +# the terms of the GNU Affero General Public License as published by the Free +# Software Foundation, version 3 of the License. +# +# Canvas is distributed in the hope that it will be useful, but WITHOUT ANY +# WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR +# A PARTICULAR PURPOSE. See the GNU Affero General Public License for more +# details. +# +# You should have received a copy of the GNU Affero General Public License along +# with this program. If not, see . +# + +require File.expand_path(File.dirname(__FILE__) + '/../spec_helper.rb') +require File.expand_path(File.dirname(__FILE__) + '/../messages/messages_helper') + +describe NotificationFailureProcessor do + before(:once) do + user_model + @au = tie_user_to_account(@user, account: account_model) + @message = generate_message(:account_user_notification, :email, @au, user: @user) + + @failure_messages = [ + { + global_id: 5000, + error: 'Error from mail system' + }, + { + global_id: 5001, + error: 'Error from SNS system' + } + ] + end + + def mock_message(obj) + message = mock + message.stubs(:body).returns(obj.to_json) + message + end + + def mock_queue + queue = mock + queue.expects(:poll).multiple_yields(*@failure_messages.map { |m| mock_message(m) }) + queue + end + + describe '.process' do + it 'puts message into error state' do + expect(@message.state).to_not eq(:error) + expect(@message.transmission_errors).to be_blank + nfp = NotificationFailureProcessor.new(access_key: 'key', secret_access_key: 'secret') + nfp.stubs(:global_message).returns(@message) + nfp.stubs(:notification_failure_queue).returns(mock_queue) + nfp.process + expect(@message.state).to eq(:transmission_error) + expect(@message.transmission_errors).to_not be_blank + end + end +end \ No newline at end of file