Fix Pact Tests for Live Events

refs QUIZ-6827

Test Plan:
 Running the pact tests that fail will not fail if all are run

Change-Id: I0779dfbb6b0547713eaa1a6d7595a7e7c9db07ff
Reviewed-on: https://gerrit.instructure.com/208095
Tested-by: Jenkins
Reviewed-by: Weston Dransfield <wdransfield@instructure.com>
QA-Review: Weston Dransfield <wdransfield@instructure.com>
Product-Review: Marc Phillips <mphillips@instructure.com>
This commit is contained in:
Marc Phillips 2019-09-04 10:46:34 -06:00
parent 0f92f9fd50
commit 4322a8e0b7
9 changed files with 23 additions and 9 deletions

View File

@ -77,7 +77,7 @@ module LiveEvents
logger.warn "Starting new LiveEvents worker thread due to fork." logger.warn "Starting new LiveEvents worker thread due to fork."
end end
@worker = LiveEvents::AsyncWorker.new(stream_client: client.stream_client, stream_name: client.stream_name) @worker = LiveEvents::AsyncWorker.new(stream_client: @stream_client, stream_name: @stream_client&.stream_name)
@launched_pid = Process.pid @launched_pid = Process.pid
end end
@ -95,7 +95,7 @@ module LiveEvents
if @client && !new_client? if @client && !new_client?
@client @client
else else
@client = LiveEvents::Client.new(LiveEvents::Client.config, @stream_client, @stream_client&.stream_name) @client = LiveEvents::Client.new(LiveEvents::Client.config, @stream_client, @stream_client&.stream_name, worker: @worker)
end end
end end

View File

@ -31,7 +31,7 @@ module LiveEvents
# queue for cases when the process is shutting down. # queue for cases when the process is shutting down.
class AsyncWorker class AsyncWorker
attr_reader :logger attr_accessor :logger, :stream_client, :stream_name
MAX_BYTE_THRESHOLD = 5_000_000 MAX_BYTE_THRESHOLD = 5_000_000
KINESIS_RECORD_SIZE_LIMIT = 1_000_000 KINESIS_RECORD_SIZE_LIMIT = 1_000_000
@ -79,6 +79,7 @@ module LiveEvents
end end
def start! def start!
return if @running
@thread = Thread.new { self.run_thread } @thread = Thread.new { self.run_thread }
@running = true @running = true
at_exit { stop! } at_exit { stop! }

View File

@ -36,10 +36,15 @@ module LiveEvents
res.dup res.dup
end end
def initialize(config = nil, aws_stream_client = nil, aws_stream_name = nil) def initialize(config = nil, aws_stream_client = nil, aws_stream_name = nil, worker: nil)
config ||= LiveEvents::Client.config config ||= LiveEvents::Client.config
@stream_client = aws_stream_client || Aws::Kinesis::Client.new(Client.aws_config(config)) @stream_client = aws_stream_client || Aws::Kinesis::Client.new(Client.aws_config(config))
@stream_name = aws_stream_name || config['kinesis_stream_name'] @stream_name = aws_stream_name || config['kinesis_stream_name']
if worker
@worker = worker
@worker.stream_client = @stream_client
@worker.stream_name = @stream_name
end
end end
def self.aws_config(plugin_config) def self.aws_config(plugin_config)
@ -84,7 +89,9 @@ module LiveEvents
# let it be the user_id when that's available. # let it be the user_id when that's available.
partition_key ||= (ctx["user_id"] && ctx["user_id"].try(:to_s)) || rand(1000).to_s partition_key ||= (ctx["user_id"] && ctx["user_id"].try(:to_s)) || rand(1000).to_s
unless LiveEvents.worker.push(event, partition_key) pusher = @worker || LiveEvents.worker
unless pusher.push(event, partition_key)
LiveEvents.logger.error("Error queueing job for worker event: #{event_json}") LiveEvents.logger.error("Error queueing job for worker event: #{event_json}")
LiveEvents&.statsd&.increment("#{statsd_prefix}.queue_full_errors") LiveEvents&.statsd&.increment("#{statsd_prefix}.queue_full_errors")
end end

View File

@ -61,6 +61,10 @@ describe LiveEvents::AsyncWorker do
allow(@worker).to receive(:at_exit) allow(@worker).to receive(:at_exit)
end end
after(:each) do
LiveEvents.statsd = nil
end
describe "push" do describe "push" do
it "should execute stuff pushed on the queue" do it "should execute stuff pushed on the queue" do
results_double = double results_double = double

View File

@ -63,6 +63,7 @@ describe LiveEvents::Client do
LiveEvents.clear_context! LiveEvents.clear_context!
@client = LiveEvents::Client.new nil, fclient, test_stream_name @client = LiveEvents::Client.new nil, fclient, test_stream_name
LiveEvents.worker.start!
end end
def expect_put_records(payload, stream_client = LiveEvents.stream_client) def expect_put_records(payload, stream_client = LiveEvents.stream_client)
@ -79,6 +80,7 @@ describe LiveEvents::Client do
}) })
expect(res[:endpoint]).to eq("http://example.com:6543/") expect(res[:endpoint]).to eq("http://example.com:6543/")
LiveEvents.worker.stop!
end end
end end

View File

@ -28,7 +28,7 @@ RSpec.describe 'Canvas LMS Live Events', :pact_live_events do
) )
end end
it 'keeps the contract', skip: 'QUIZ-6827' do it 'keeps the contract' do
live_event.emit_with do live_event.emit_with do
params = { params = {
:name => "Quizzes.Next", :name => "Quizzes.Next",

View File

@ -28,7 +28,7 @@ RSpec.describe 'Canvas LMS Live Events', :pact_live_events do
) )
end end
it 'keeps the contract', skip: 'QUIZ-6827' do it 'keeps the contract' do
live_event.emit_with do live_event.emit_with do
# arrange # arrange
params = { params = {

View File

@ -28,7 +28,7 @@ RSpec.describe 'Canvas LMS Live Events', :pact_live_events do
) )
end end
it 'keeps the contract', skip: 'QUIZ-6827' do it 'keeps the contract' do
live_event.emit_with do live_event.emit_with do
params = { params = {
:name => "Quizzes.Next", :name => "Quizzes.Next",

View File

@ -28,7 +28,7 @@ RSpec.describe 'Canvas LMS Live Events', :pact_live_events do
) )
end end
it 'keeps the contract', skip: 'QUIZ-6827' do it 'keeps the contract' do
live_event.emit_with do live_event.emit_with do
# arrange # arrange
params = { params = {