diff --git a/lib/message_bus/async_producer.rb b/lib/message_bus/async_producer.rb index b1a4a330001..e7bdf4cde5c 100644 --- a/lib/message_bus/async_producer.rb +++ b/lib/message_bus/async_producer.rb @@ -162,7 +162,7 @@ module MessageBus # context and reconnect. If we get a timeout again, that is NOT # the problem, and we should let the error raise. retries += 1 - raise ex if retries > 1 + raise e if retries > 1 Rails.logger.info "[AUA] Pulsar failure during message send, retrying..." CanvasErrors.capture_exception(:message_bus, e, :warn) diff --git a/spec/lib/message_bus/async_producer_spec.rb b/spec/lib/message_bus/async_producer_spec.rb index 657a4befaa0..7b5d85d1c41 100644 --- a/spec/lib/message_bus/async_producer_spec.rb +++ b/spec/lib/message_bus/async_producer_spec.rb @@ -21,33 +21,44 @@ require 'spec_helper' describe MessageBus::AsyncProducer do + before(:each) do skip("pulsar config required to test") unless MessageBus.enabled? end - around(:each) do |example| - old_interval = MessageBus.worker_process_interval_lambda - old_queue_size = MessageBus.max_mem_queue_size_lambda - old_logger = MessageBus.logger - # let's not waste time with queue throttling in tests - MessageBus.worker_process_interval = -> { 0.01 } - MessageBus.max_mem_queue_size = -> { 10 } - MessageBus.logger = Rails.logger - example.run - ensure - MessageBus.worker_process_interval = old_interval unless old_interval.nil? - MessageBus.max_mem_queue_size_lambda = old_queue_size unless old_queue_size.nil? - MessageBus.logger = old_logger + describe "#produce_message error handling" do + it "re-raises errors correctly if they're not rescuable" do + Bundler.require(:pulsar) + producer = ::MessageBus::AsyncProducer.new(start_thread: false) + allow(MessageBus).to receive(:producer_for).and_raise(::Pulsar::Error::AlreadyClosed) + expect{ producer.send(:produce_message, ['a', 'b', 'c']) }.to raise_error(::Pulsar::Error::AlreadyClosed) + end end - after(:each) do - MessageBus.reset! - end - - let(:producer){ MessageBus::AsyncProducer.new(start_thread: false) } - let(:namespace){ "test-only" } - describe "push" do + + around(:each) do |example| + old_interval = MessageBus.worker_process_interval_lambda + old_queue_size = MessageBus.max_mem_queue_size_lambda + old_logger = MessageBus.logger + # let's not waste time with queue throttling in tests + MessageBus.worker_process_interval = -> { 0.01 } + MessageBus.max_mem_queue_size = -> { 10 } + MessageBus.logger = Rails.logger + example.run + ensure + MessageBus.worker_process_interval = old_interval unless old_interval.nil? + MessageBus.max_mem_queue_size_lambda = old_queue_size unless old_queue_size.nil? + MessageBus.logger = old_logger + end + + after(:each) do + MessageBus.reset! + end + + let(:producer){ MessageBus::AsyncProducer.new(start_thread: false) } + let(:namespace){ "test-only" } + it "pushes onto the queue but does not execute" do topic_name = "lazily-created-topic-#{SecureRandom.hex(16)}" subscription_name = "subscription-#{SecureRandom.hex(4)}"