From 6f67186989d7c4e443eedfbc65403407df94b478 Mon Sep 17 00:00:00 2001 From: Steve McGee Date: Fri, 22 Jul 2022 09:42:12 -0600 Subject: [PATCH] kinesis publishing for `internal failure`s fixes INTEROP-7027 flag-none test plan: * since we can't intentionally cause an "InternalFailure" we can only run the associated test `async_worker_spec.rb:110` Change-Id: I42105c0475ea8036bdc955472bce8953c659cfa8 Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/296842 Reviewed-by: Evan Battaglia QA-Review: Evan Battaglia Product-Review: Alexis Nast Tested-by: Service Cloud Jenkins --- doc/live_events.md | 27 ++++++------ .../lib/live_events/async_worker.rb | 20 +++++++-- .../spec/live_events/async_worker_spec.rb | 43 ++++++++++++++++--- 3 files changed, 68 insertions(+), 22 deletions(-) diff --git a/doc/live_events.md b/doc/live_events.md index 2b23c225407..c7419291e8d 100644 --- a/doc/live_events.md +++ b/doc/live_events.md @@ -3,7 +3,7 @@ Canvas includes the ability to push a subset of real-time events to a Kinesis stream, which can then be consumed for various analytics purposes. This is not a full-fidelity feed of all changes to the -database, but a targetted set of interesting actions such as +database, but a targeted set of interesting actions such as `grade_change`, `login`, etc. ## Development and Testing @@ -14,21 +14,22 @@ There are two components to local development: ### Kinesis Stream -To enable Live Events, you need to configure the plugin in the /plugins -interface. If using the docker-compose dev setup, there is a "fake -kinesis" available in docker-compose/kinesis.override.yml available for -use. Once it's up, make sure you have the `aws` cli installed, and run -the following command to create a stream (with canvas running): +If using the docker-compose dev setup, there is a "fake kinesis" available in +docker-compose/kinesis.override.yml available for use. To start this kinesis +container run `docker-compose up -d kinesis`. Once it's up, make sure you have +the `aws` cli installed, and run the following command to create a stream (with +canvas running). Keep in mind that we are running this locally so actual AWS +credentials are not needed, run the following command as you see it here: ```bash AWS_ACCESS_KEY_ID=key AWS_SECRET_ACCESS_KEY=secret aws --endpoint-url http://kinesis.docker/ kinesis create-stream --stream-name=live-events --shard-count=1 --region=us-east-1 ``` -Once the stream is created, configure your Canvas to use it in your `config/dynamic_settings.yml`. -This file is a local shim for Consul. If you have copied the example file at -`config/dynamic_settings.yml.example` recently, you should already see a live_events block -and it should already be configured properly. If you don't see a live_events block, check -the example file or copy this block: +Once the stream is created, configure your Canvas to use it in your +`config/dynamic_settings.yml`. This file is a local shim for Consul. If you have +copied the example file at `config/dynamic_settings.yml.example` recently, you +should already see a live_events block and it should already be configured properly. +If you don't see a live_events block, check the example file or copy this block: ```yml live-events: @@ -39,7 +40,7 @@ the example file or copy this block: ``` Depending on your docker networking setup, you may need to substitute either -`http://kinesis:4567`, 'http://kinesis.docker`, or `http://kinesis.canvaslms.docker` +`http://kinesis:4567`, `http://kinesis.docker`, or `http://kinesis.canvaslms.docker` for the aws_endpoint (the first two should be equivalent). Restart Canvas, and events should start flowing to your kinesis stream. @@ -64,7 +65,7 @@ docker-compose logs -f --tail=100 # whichever container you need #### Connecting to local Publisher Lambda -The `live-events-publish repo should be checked out and running locally. +The `live-events-publish` repo should be checked out and running locally. This contains the publisher lambda, and other infrastructure including a local kinesis stream. Note the url of that kinesis stream, which may look like `http://kinesis.live-events-publish.docker:4567`. diff --git a/gems/live_events/lib/live_events/async_worker.rb b/gems/live_events/lib/live_events/async_worker.rb index ec713468031..3164a8bbbcc 100644 --- a/gems/live_events/lib/live_events/async_worker.rb +++ b/gems/live_events/lib/live_events/async_worker.rb @@ -28,6 +28,7 @@ module LiveEvents MAX_BYTE_THRESHOLD = 5_000_000 KINESIS_RECORD_SIZE_LIMIT = 1_000_000 + RETRY_LIMIT = 3 def initialize(start_thread = true, stream_client:, stream_name:) @queue = Queue.new @@ -122,6 +123,7 @@ module LiveEvents LiveEvents.statsd.time("live_events.put_records") do res = yield end + end end @@ -146,13 +148,25 @@ module LiveEvents def process_results(res, records) res.records.each_with_index do |r, i| record = records[i] - if r.error_code.present? + if r.error_code == "InternalFailure" + record[:retries_count] ||= 0 + record[:retries_count] += 1 + + if record[:retries_count] <= RETRY_LIMIT + @queue.push(record) + LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.retry", tags: record[:tags]) + else + internal_error_message = "This record has failed too many times an will no longer be retried. #{r.error_message}" + log_unprocessed(record, r.error_code, internal_error_message) + LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.final_retry", tags: record[:tags]) + end + + elsif r.error_code.present? log_unprocessed(record, r.error_code, r.error_message) else LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.sends", tags: record[:tags]) - nil end - end.compact + end end def log_unprocessed(record, error_code, error_message) diff --git a/gems/live_events/spec/live_events/async_worker_spec.rb b/gems/live_events/spec/live_events/async_worker_spec.rb index 08a2481a7ac..1516b8d5086 100644 --- a/gems/live_events/spec/live_events/async_worker_spec.rb +++ b/gems/live_events/spec/live_events/async_worker_spec.rb @@ -22,9 +22,10 @@ require "spec_helper" describe LiveEvents::AsyncWorker do let(:put_records_return) { [] } - let(:stream_client) { double(stream_name: stream_name, put_records: OpenStruct.new(records: [], error_code: nil, error_message: nil)) } + let(:stream_client) { double(stream_name: stream_name) } let(:stream_name) { "stream_name_x" } let(:event_name) { "event_name" } + let(:statsd_double) { double(increment: nil) } let(:event) do { event_name: event_name, @@ -51,6 +52,9 @@ describe LiveEvents::AsyncWorker do allow(LiveEvents).to receive(:logger).and_return(double(info: nil, error: nil, debug: nil)) @worker = LiveEvents::AsyncWorker.new(false, stream_client: stream_client, stream_name: stream_name) allow(@worker).to receive(:at_exit) + expect(LiveEvents.logger).to_not receive(:error).with(/Exception making LiveEvents async call/) + LiveEvents.statsd = statsd_double + allow(statsd_double).to receive(:time).and_yield end after do @@ -88,10 +92,8 @@ describe LiveEvents::AsyncWorker do allow(results_double).to receive(:each_with_index).and_return([]) allow(stream_client).to receive(:put_records).once.and_return(results) - statsd_double = double - expect(statsd_double).to receive(:time).once + expect(statsd_double).to receive(:time).once.and_yield - LiveEvents.statsd = statsd_double @worker.start! 4.times { @worker.push event, partition_key } @@ -107,13 +109,42 @@ describe LiveEvents::AsyncWorker do end context "with error putting to kinesis" do + let(:expected_batch) { { records: [{ data: /1234/, partition_key: instance_of(String) }], stream_name: "stream_name_x" } } + + it "puts 'InternalFailure' records back in the queue for 1 extra retry that passes" do + results1 = double(records: [double(error_code: "InternalFailure", error_message: "internal failure message")]) + results2 = double(records: [double(error_code: nil)]) + + expect(stream_client).to receive(:put_records).with(expected_batch).and_return(results1, results2) + expect(statsd_double).to receive(:time).and_yield.twice + expect(statsd_double).not_to receive(:increment).with("live_events.events.send_errors", any_args) + expect(statsd_double).to receive(:increment).with("live_events.events.sends", any_args) + expect(statsd_double).to receive(:increment).with("live_events.events.retry", any_args) + + @worker.start! + @worker.push event, partition_key + @worker.stop! + end + + it "puts 'InternalFailure' records back in the queue for 3 retries that fail" do + results = double(records: [double(error_code: "InternalFailure", error_message: "internal failure message")]) + + expect(stream_client).to receive(:put_records).exactly(4).times.and_return(results) + expect(statsd_double).to receive(:time).and_yield.exactly(4).times + expect(statsd_double).to receive(:increment).exactly(3).times.with("live_events.events.retry", any_args) + expect(statsd_double).to receive(:increment).once.with("live_events.events.final_retry", any_args) + expect(statsd_double).to receive(:increment).with("live_events.events.send_errors", any_args) + + @worker.start! + @worker.push event, partition_key + @worker.stop! + end + it "writes errors to logger" do results = OpenStruct.new(records: [ OpenStruct.new(error_code: "failure", error_message: "failure message") ]) allow(stream_client).to receive(:put_records).once.and_return(results) - statsd_double = double - LiveEvents.statsd = statsd_double expect(statsd_double).to receive(:time).and_yield expect(statsd_double).to receive(:increment).with("live_events.events.send_errors", any_args) @worker.start!