Enrich live_events put_records errors

Better tag metrics in datadog to allow aggregations
across all event_types. Also allow for request_id
to be sent to datadog when a failure is made so we
can easily track down the problem request in logs.

closes PLAT-5026

Test Plan:
 - force a live event to fail putting to kinesis (you can
   do this by updating the stubbed client to add error code)
 - note that the enrichment is now present
 - see that the dropped event is also pushed to sentry and
   the logs

Change-Id: I6c82081c69ced40b32294bdd0e7f21849b19494e
Reviewed-on: https://gerrit.instructure.com/212192
Tested-by: Jenkins
QA-Review: Tucker Mcknight <tmcknight@instructure.com>
Reviewed-by: Weston Dransfield <wdransfield@instructure.com>
Product-Review: Marc Phillips <mphillips@instructure.com>
This commit is contained in:
Marc Phillips 2019-10-04 15:55:39 -06:00
parent 2cc92ecb1a
commit e1e0dde546
4 changed files with 30 additions and 11 deletions

View File

@ -19,6 +19,7 @@ class StubbedClient
def self.put_records(records:, stream_name:) def self.put_records(records:, stream_name:)
events = records.map { |e| JSON.parse(e[:data]).dig('attributes', 'event_name') }.join(' | ') events = records.map { |e| JSON.parse(e[:data]).dig('attributes', 'event_name') }.join(' | ')
puts "Events #{events} put to stream #{stream_name}: #{records}" puts "Events #{events} put to stream #{stream_name}: #{records}"
records.map { |r| OpenStruct.new(error_code: nil, error_message: nil ) }
end end
def self.stream_name def self.stream_name
@ -30,6 +31,7 @@ Rails.configuration.to_prepare do
LiveEvents.logger = Rails.logger LiveEvents.logger = Rails.logger
LiveEvents.cache = Rails.cache LiveEvents.cache = Rails.cache
LiveEvents.statsd = InstStatsd::Statsd LiveEvents.statsd = InstStatsd::Statsd
LiveEvents.error_reporter = Canvas::Errors
LiveEvents.max_queue_size = -> { Setting.get('live_events_max_queue_size', 5000).to_i } LiveEvents.max_queue_size = -> { Setting.get('live_events_max_queue_size', 5000).to_i }
LiveEvents.settings = -> { LiveEvents.settings = -> {
plugin_settings = Canvas::Plugin.find(:live_events)&.settings plugin_settings = Canvas::Plugin.find(:live_events)&.settings

View File

@ -20,7 +20,7 @@ require 'inst_statsd'
module LiveEvents module LiveEvents
class << self class << self
attr_accessor :logger, :cache, :statsd attr_accessor :logger, :cache, :statsd, :error_reporter
attr_reader :stream_client attr_reader :stream_client
# rubocop:disable Style/TrivialAccessors # rubocop:disable Style/TrivialAccessors

View File

@ -59,8 +59,10 @@ module LiveEvents
@queue << { @queue << {
data: event_json, data: event_json,
partition_key: partition_key, partition_key: partition_key,
statsd_prefix: "live_events.events.#{event[:attributes][:event_name]}", statsd_prefix: "live_events.events",
total_bytes: total_bytes tags: { event: event[:attributes][:event_name] },
total_bytes: total_bytes,
request_id: event[:attributes][:request_id]
} }
true true
end end
@ -146,18 +148,28 @@ module LiveEvents
def process_results(res, records) def process_results(res, records)
res.records.each_with_index do |r, i| res.records.each_with_index do |r, i|
rec = records[i]
if r.error_code.present? if r.error_code.present?
log_unprocessed(records[i]) log_unprocessed(rec, r.error_code, r.error_message)
else else
LiveEvents&.statsd&.increment("#{records[i][:statsd_prefix]}.sends") LiveEvents&.statsd&.increment("#{rec[:statsd_prefix]}.sends", tags: rec[:tags])
nil nil
end end
end.compact end.compact
end end
def log_unprocessed(record) def log_unprocessed(record, error_code, error_message)
logger.error("Error posting event #{record[:statsd_prefix]} with partition key #{record[:partition_key]}: #{record[:data]}") logger.error(
LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.send_errors") "Error posting event #{record.dig(:tags, :event)} with partition key #{record[:partition_key]}. Error Code: #{error_code} | Error Message: #{error_message}"
)
logger.debug(
"Failed event data: #{record[:data]}"
)
LiveEvents&.error_reporter&.capture(:dropped_live_event, message: { request_id: record[:request_id], error_code: error_code, error_message: error_message }.to_json)
LiveEvents&.statsd&.increment(
"#{record[:statsd_prefix]}.send_errors",
tags: rec[:tags].merge(error_code: error_code)
)
end end
end end
end end

View File

@ -72,7 +72,8 @@ module LiveEvents
end end
def post_event(event_name, payload, time = Time.now, ctx = {}, partition_key = nil) def post_event(event_name, payload, time = Time.now, ctx = {}, partition_key = nil)
statsd_prefix = "live_events.events.#{event_name}" statsd_prefix = "live_events.events"
tags = { event: event_name }
ctx ||= {} ctx ||= {}
attributes = ctx.except(*ATTRIBUTE_BLACKLIST).merge({ attributes = ctx.except(*ATTRIBUTE_BLACKLIST).merge({
@ -92,8 +93,12 @@ module LiveEvents
pusher = @worker || LiveEvents.worker pusher = @worker || LiveEvents.worker
unless pusher.push(event, partition_key) unless pusher.push(event, partition_key)
LiveEvents.logger.error("Error queueing job for worker event: #{event_json}") LiveEvents.logger.error("Error queueing job for live event: #{event.to_json}")
LiveEvents&.statsd&.increment("#{statsd_prefix}.queue_full_errors") LiveEvents&.error_reporter&.capture(
:dropped_live_event,
message: "Error queueing job for live event with request id: #{attributes[:request_id]}"
)
LiveEvents&.statsd&.increment("#{statsd_prefix}.queue_full_errors", tags: tags)
end end
end end
end end