kinesis publishing for `internal failure`s

fixes INTEROP-7027
flag-none

test plan:
* since we can't intentionally cause an "InternalFailure" we can only run the associated test `async_worker_spec.rb:110`

Change-Id: I42105c0475ea8036bdc955472bce8953c659cfa8
Reviewed-on: https://gerrit.instructure.com/c/canvas-lms/+/296842
Reviewed-by: Evan Battaglia <ebattaglia@instructure.com>
QA-Review: Evan Battaglia <ebattaglia@instructure.com>
Product-Review: Alexis Nast <alexis.nast@instructure.com>
Tested-by: Service Cloud Jenkins <svc.cloudjenkins@instructure.com>
This commit is contained in:
Steve McGee 2022-07-22 09:42:12 -06:00 committed by Steve Mcgee
parent ca66ff8082
commit 6f67186989
3 changed files with 68 additions and 22 deletions

View File

@ -3,7 +3,7 @@
Canvas includes the ability to push a subset of real-time events to a
Kinesis stream, which can then be consumed for various analytics
purposes. This is not a full-fidelity feed of all changes to the
database, but a targetted set of interesting actions such as
database, but a targeted set of interesting actions such as
`grade_change`, `login`, etc.
## Development and Testing
@ -14,21 +14,22 @@ There are two components to local development:
### Kinesis Stream
To enable Live Events, you need to configure the plugin in the /plugins
interface. If using the docker-compose dev setup, there is a "fake
kinesis" available in docker-compose/kinesis.override.yml available for
use. Once it's up, make sure you have the `aws` cli installed, and run
the following command to create a stream (with canvas running):
If using the docker-compose dev setup, there is a "fake kinesis" available in
docker-compose/kinesis.override.yml available for use. To start this kinesis
container run `docker-compose up -d kinesis`. Once it's up, make sure you have
the `aws` cli installed, and run the following command to create a stream (with
canvas running). Keep in mind that we are running this locally so actual AWS
credentials are not needed, run the following command as you see it here:
```bash
AWS_ACCESS_KEY_ID=key AWS_SECRET_ACCESS_KEY=secret aws --endpoint-url http://kinesis.docker/ kinesis create-stream --stream-name=live-events --shard-count=1 --region=us-east-1
```
Once the stream is created, configure your Canvas to use it in your `config/dynamic_settings.yml`.
This file is a local shim for Consul. If you have copied the example file at
`config/dynamic_settings.yml.example` recently, you should already see a live_events block
and it should already be configured properly. If you don't see a live_events block, check
the example file or copy this block:
Once the stream is created, configure your Canvas to use it in your
`config/dynamic_settings.yml`. This file is a local shim for Consul. If you have
copied the example file at `config/dynamic_settings.yml.example` recently, you
should already see a live_events block and it should already be configured properly.
If you don't see a live_events block, check the example file or copy this block:
```yml
live-events:
@ -39,7 +40,7 @@ the example file or copy this block:
```
Depending on your docker networking setup, you may need to substitute either
`http://kinesis:4567`, 'http://kinesis.docker`, or `http://kinesis.canvaslms.docker`
`http://kinesis:4567`, `http://kinesis.docker`, or `http://kinesis.canvaslms.docker`
for the aws_endpoint (the first two should be equivalent).
Restart Canvas, and events should start flowing to your kinesis stream.
@ -64,7 +65,7 @@ docker-compose logs -f --tail=100 <jobs|web> # whichever container you need
#### Connecting to local Publisher Lambda
The `live-events-publish repo should be checked out and running locally.
The `live-events-publish` repo should be checked out and running locally.
This contains the publisher lambda, and other infrastructure including a local
kinesis stream. Note the url of that kinesis stream, which may look like
`http://kinesis.live-events-publish.docker:4567`.

View File

@ -28,6 +28,7 @@ module LiveEvents
MAX_BYTE_THRESHOLD = 5_000_000
KINESIS_RECORD_SIZE_LIMIT = 1_000_000
RETRY_LIMIT = 3
def initialize(start_thread = true, stream_client:, stream_name:)
@queue = Queue.new
@ -122,6 +123,7 @@ module LiveEvents
LiveEvents.statsd.time("live_events.put_records") do
res = yield
end
end
end
@ -146,13 +148,25 @@ module LiveEvents
def process_results(res, records)
res.records.each_with_index do |r, i|
record = records[i]
if r.error_code.present?
if r.error_code == "InternalFailure"
record[:retries_count] ||= 0
record[:retries_count] += 1
if record[:retries_count] <= RETRY_LIMIT
@queue.push(record)
LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.retry", tags: record[:tags])
else
internal_error_message = "This record has failed too many times an will no longer be retried. #{r.error_message}"
log_unprocessed(record, r.error_code, internal_error_message)
LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.final_retry", tags: record[:tags])
end
elsif r.error_code.present?
log_unprocessed(record, r.error_code, r.error_message)
else
LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.sends", tags: record[:tags])
nil
end
end.compact
end
end
def log_unprocessed(record, error_code, error_message)

View File

@ -22,9 +22,10 @@ require "spec_helper"
describe LiveEvents::AsyncWorker do
let(:put_records_return) { [] }
let(:stream_client) { double(stream_name: stream_name, put_records: OpenStruct.new(records: [], error_code: nil, error_message: nil)) }
let(:stream_client) { double(stream_name: stream_name) }
let(:stream_name) { "stream_name_x" }
let(:event_name) { "event_name" }
let(:statsd_double) { double(increment: nil) }
let(:event) do
{
event_name: event_name,
@ -51,6 +52,9 @@ describe LiveEvents::AsyncWorker do
allow(LiveEvents).to receive(:logger).and_return(double(info: nil, error: nil, debug: nil))
@worker = LiveEvents::AsyncWorker.new(false, stream_client: stream_client, stream_name: stream_name)
allow(@worker).to receive(:at_exit)
expect(LiveEvents.logger).to_not receive(:error).with(/Exception making LiveEvents async call/)
LiveEvents.statsd = statsd_double
allow(statsd_double).to receive(:time).and_yield
end
after do
@ -88,10 +92,8 @@ describe LiveEvents::AsyncWorker do
allow(results_double).to receive(:each_with_index).and_return([])
allow(stream_client).to receive(:put_records).once.and_return(results)
statsd_double = double
expect(statsd_double).to receive(:time).once
expect(statsd_double).to receive(:time).once.and_yield
LiveEvents.statsd = statsd_double
@worker.start!
4.times { @worker.push event, partition_key }
@ -107,13 +109,42 @@ describe LiveEvents::AsyncWorker do
end
context "with error putting to kinesis" do
let(:expected_batch) { { records: [{ data: /1234/, partition_key: instance_of(String) }], stream_name: "stream_name_x" } }
it "puts 'InternalFailure' records back in the queue for 1 extra retry that passes" do
results1 = double(records: [double(error_code: "InternalFailure", error_message: "internal failure message")])
results2 = double(records: [double(error_code: nil)])
expect(stream_client).to receive(:put_records).with(expected_batch).and_return(results1, results2)
expect(statsd_double).to receive(:time).and_yield.twice
expect(statsd_double).not_to receive(:increment).with("live_events.events.send_errors", any_args)
expect(statsd_double).to receive(:increment).with("live_events.events.sends", any_args)
expect(statsd_double).to receive(:increment).with("live_events.events.retry", any_args)
@worker.start!
@worker.push event, partition_key
@worker.stop!
end
it "puts 'InternalFailure' records back in the queue for 3 retries that fail" do
results = double(records: [double(error_code: "InternalFailure", error_message: "internal failure message")])
expect(stream_client).to receive(:put_records).exactly(4).times.and_return(results)
expect(statsd_double).to receive(:time).and_yield.exactly(4).times
expect(statsd_double).to receive(:increment).exactly(3).times.with("live_events.events.retry", any_args)
expect(statsd_double).to receive(:increment).once.with("live_events.events.final_retry", any_args)
expect(statsd_double).to receive(:increment).with("live_events.events.send_errors", any_args)
@worker.start!
@worker.push event, partition_key
@worker.stop!
end
it "writes errors to logger" do
results = OpenStruct.new(records: [
OpenStruct.new(error_code: "failure", error_message: "failure message")
])
allow(stream_client).to receive(:put_records).once.and_return(results)
statsd_double = double
LiveEvents.statsd = statsd_double
expect(statsd_double).to receive(:time).and_yield
expect(statsd_double).to receive(:increment).with("live_events.events.send_errors", any_args)
@worker.start!