rework error handling in event stream

fixes CNVS-15153

 * don't raise if the configured cassandra database can't be connected
   to; instead, return nil (uncached, so it can be retried next time)
   and treat it as connected but unavailable
 * allow configuring whether an error is re-raised (configured true for
   test only in canvas), rather than peeking into rails directly from
   event stream.
 * allow registering callbacks to run on insert/update error via
   on_error
 * on insert/update error, before (maybe) reraising the error, call any
   registered callbacks
 * configure canvas to log to Rails.logger and statsd (removed from
   EventStream::Failure) on insert/update error

EventStream::Failure is now unused, but not yet removed from the code.
it should be considered deprecated and remain unused; it will be removed
in the near future.

test-plan:
 - start canvas in non-test mode (e.g. development mode) with cassandra
   and statsd configured and running
 - turn off cassandra, so even stream inserts should fail
 - change a grade (to trigger a GradeChange event)
 - with cassandra off, event insert should fail, but should *not*
   impede the remainder of the grade change action
 - EventStream::Failure record should *not* be created in the database
 - an error line should appear in the Rails log for the failed insert
 - the appropriate 'event_stream_failure.*' statsd counters should be
   incremented

Change-Id: I5c6d29c2a08276ccc06ebc8c1a59e0d33ce2cc4e
Reviewed-on: https://gerrit.instructure.com/40088
QA-Review: August Thornton <august@instructure.com>
Tested-by: Jenkins <jenkins@instructure.com>
Reviewed-by: Brian Palmer <brianp@instructure.com>
Product-Review: Jacob Fugal <jacob@instructure.com>
This commit is contained in:
Jacob Fugal 2014-08-27 09:58:13 -06:00
parent 9e3739e553
commit 954e77d645
7 changed files with 69 additions and 61 deletions

View File

@ -19,8 +19,25 @@
module Auditors
def self.stream(&block)
::EventStream::Stream.new(&block).tap do |stream|
stream.raise_on_error = Rails.env.test?
stream.on_insert do |record|
Auditors.logger.info "AUDITOR #{identifier} #{record.to_json}"
Auditors.logger.info "[AUDITOR:INFO] #{identifier}:insert #{record.to_json}"
end
stream.on_error do |operation, record, exception|
message = exception.message.to_s
Auditors.logger.error "[AUDITOR:ERROR] #{identifier}:#{operation} #{record.to_json} [#{message}]"
CanvasStatsd::Statsd.increment("event_stream_failure.stream.#{CanvasStatsd::Statsd.escape(identifier)}")
if message.blank?
CanvasStatsd::Statsd.increment("event_stream_failure.exception.blank")
elsif message.include?("No live servers")
CanvasStatsd::Statsd.increment("event_stream_failure.exception.no_live_servers")
elsif message.include?("Unavailable")
CanvasStatsd::Statsd.increment("event_stream_failure.exception.unavailable")
else
CanvasStatsd::Statsd.increment("event_stream_failure.exception.other")
end
end
end
end

View File

@ -31,8 +31,7 @@ class EventStream::Failure < ActiveRecord::Base
serialize :backtrace, Array
def self.log!(operation, stream, record, exception)
raise exception if Rails.env.test?
log_to_statsd!(stream, exception)
return if stream.raise_on_error
create!(:operation => operation.to_s,
:event_stream => stream.identifier,
:record_id => record.id.to_s,
@ -40,16 +39,4 @@ class EventStream::Failure < ActiveRecord::Base
:exception => exception.message.to_s,
:backtrace => exception.backtrace)
end
def self.log_to_statsd!(stream, exception)
CanvasStatsd::Statsd.increment("event_stream_failure.stream.#{CanvasStatsd::Statsd.escape(stream.identifier)}")
message = exception.message.to_s
if message.blank?
CanvasStatsd::Statsd.increment("event_stream_failure.exception.blank")
elsif message.include?("No live servers")
CanvasStatsd::Statsd.increment("event_stream_failure.exception.no_live_servers")
else
CanvasStatsd::Statsd.increment("event_stream_failure.exception.other")
end
end
end

View File

@ -26,6 +26,8 @@ class EventStream::Stream
attr_config :time_to_live, :type => Fixnum, :default => 1.year
attr_config :read_consistency_level, :default => nil
attr_accessor :raise_on_error
def initialize(&blk)
instance_exec(&blk) if blk
attr_config_validate
@ -44,12 +46,8 @@ class EventStream::Stream
end
def insert(record)
if available?
execute(:insert, record)
record
else
nil
end
execute(:insert, record)
record
end
def on_update(&callback)
@ -57,12 +55,12 @@ class EventStream::Stream
end
def update(record)
if available?
execute(:update, record)
record
else
nil
end
execute(:update, record)
record
end
def on_error(&callback)
add_callback(:error, callback)
end
def fetch(ids)
@ -125,7 +123,14 @@ class EventStream::Stream
@callbacks[operation] ||= []
end
class Unavailable < Exception; end
def execute(operation, record)
unless available?
run_callbacks(:error, operation, record, Unavailable.new)
return
end
ttl_seconds = self.ttl_seconds(record.created_at)
return if ttl_seconds < 0
@ -134,16 +139,17 @@ class EventStream::Stream
run_callbacks(operation, record)
end
rescue Exception => exception
EventStream::Failure.log!(operation, self, record, exception)
run_callbacks(:error, operation, record, exception)
raise if raise_on_error
end
def add_callback(operation, callback)
callbacks_for(operation) << callback
end
def run_callbacks(operation, record)
def run_callbacks(operation, *args)
callbacks_for(operation).each do |callback|
instance_exec(record, &callback)
instance_exec(*args, &callback)
end
end
end

View File

@ -26,17 +26,16 @@ describe EventStream::Failure do
:attributes => { 'attribute' => 'attribute_value' },
:changes => { 'changed_attribute' => 'changed_value' })
@stream = double('stream', :identifier => 'stream_identifier')
@stream = double('stream',
:identifier => 'stream_identifier',
:raise_on_error => false)
allow(@stream).to receive(:operation_payload).with(:insert, @record).and_return(@record.attributes)
allow(@stream).to receive(:operation_payload).with(:update, @record).and_return(@record.changes)
@exception = Exception.new
allow(@exception).to receive(:message).and_return(double('exception_message', :to_s => 'exception_message_string'))
allow(@exception).to receive(:backtrace).and_return([42])
# By default the log! method raises exceptions in test env. Override this
# to log the CanvasEvent and not raise it for these tests.
allow(Rails.env).to receive(:test?).and_return(false)
end
it "creates a new db record" do

View File

@ -443,22 +443,28 @@ describe EventStream::Stream do
@exception = Exception.new
end
shared_examples_for "recording failures" do
before do
# By default the log! method raises exceptions in test env. Override this
# to log the event and not raise it for these tests.
Rails.env.stub(:test?).and_return(false)
end
it "records failed inserts" do
expect(EventStream::Failure).to receive(:log!).once.with(:insert, @stream, @record, @exception)
shared_examples_for "error callbacks" do
it "triggers callbacks on failed inserts" do
spy = double('spy')
@stream.on_error{ |*args| spy.capture(*args) }
expect(spy).to receive(:capture).once.with(:insert, @record, @exception)
@stream.insert(@record)
end
it "records failed updates" do
expect(EventStream::Failure).to receive(:log!).once.with(:update, @stream, @record, @exception)
it "triggers callbacks on failed updates" do
spy = double('spy')
@stream.on_error{ |*args| spy.capture(*args) }
expect(spy).to receive(:capture).once.with(:update, @record, @exception)
@stream.update(@record)
end
it "raises error if raises_on_error is true, but still calls callbacks" do
spy = double('spy')
@stream.raise_on_error = true
@stream.on_error{ spy.trigger }
expect(spy).to receive(:trigger).once
expect{ @stream.insert(@record) }.to raise_exception
end
end
context "failing database" do
@ -466,7 +472,7 @@ describe EventStream::Stream do
@database.stub(:batch).and_raise(@exception)
end
include_examples "recording failures"
include_examples "error callbacks"
end
context "failing callbacks" do
@ -491,7 +497,7 @@ describe EventStream::Stream do
@stream.update(@record)
end
include_examples "recording failures"
include_examples "error callbacks"
end
end
end

View File

@ -23,15 +23,3 @@ RSpec.configure do |config|
end
Time.zone = "UTC"
module Rails
def self.env
@env ||= Env.new
end
class Env
def test?
false
end
end
end

View File

@ -29,7 +29,12 @@ module Canvas
opts[:timeout] = config['timeout'] if config['timeout']
fingerprint = "#{config_name}:#{environment}"
Bundler.require 'cassandra'
@connections[key] = CanvasCassandra::Database.new(fingerprint, servers, opts, Rails.logger)
begin
@connections[key] = CanvasCassandra::Database.new(fingerprint, servers, opts, Rails.logger)
rescue Exception => exception
Rails.logger.error "Failed to create cassandra connection for #{key}: #{exception}"
nil # don't save this nil into @connections[key], so we can retry later
end
end
end