From e1e0dde546d70db6224be4590cd3ae072997fcd6 Mon Sep 17 00:00:00 2001 From: Marc Phillips Date: Fri, 4 Oct 2019 15:55:39 -0600 Subject: [PATCH] Enrich live_events put_records errors Better tag metrics in datadog to allow aggregations across all event_types. Also allow for request_id to be sent to datadog when a failure is made so we can easily track down the problem request in logs. closes PLAT-5026 Test Plan: - force a live event to fail putting to kinesis (you can do this by updating the stubbed client to add error code) - note that the enrichment is now present - see that the dropped event is also pushed to sentry and the logs Change-Id: I6c82081c69ced40b32294bdd0e7f21849b19494e Reviewed-on: https://gerrit.instructure.com/212192 Tested-by: Jenkins QA-Review: Tucker Mcknight Reviewed-by: Weston Dransfield Product-Review: Marc Phillips --- config/initializers/live_events.rb | 2 ++ gems/live_events/lib/live_events.rb | 2 +- .../lib/live_events/async_worker.rb | 26 ++++++++++++++----- gems/live_events/lib/live_events/client.rb | 11 +++++--- 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/config/initializers/live_events.rb b/config/initializers/live_events.rb index bcbb087b9fc..c5ec06b0015 100644 --- a/config/initializers/live_events.rb +++ b/config/initializers/live_events.rb @@ -19,6 +19,7 @@ class StubbedClient def self.put_records(records:, stream_name:) events = records.map { |e| JSON.parse(e[:data]).dig('attributes', 'event_name') }.join(' | ') puts "Events #{events} put to stream #{stream_name}: #{records}" + records.map { |r| OpenStruct.new(error_code: nil, error_message: nil ) } end def self.stream_name @@ -30,6 +31,7 @@ Rails.configuration.to_prepare do LiveEvents.logger = Rails.logger LiveEvents.cache = Rails.cache LiveEvents.statsd = InstStatsd::Statsd + LiveEvents.error_reporter = Canvas::Errors LiveEvents.max_queue_size = -> { Setting.get('live_events_max_queue_size', 5000).to_i } LiveEvents.settings = -> { plugin_settings = Canvas::Plugin.find(:live_events)&.settings diff --git a/gems/live_events/lib/live_events.rb b/gems/live_events/lib/live_events.rb index 0287ccf5011..06597e07ee2 100644 --- a/gems/live_events/lib/live_events.rb +++ b/gems/live_events/lib/live_events.rb @@ -20,7 +20,7 @@ require 'inst_statsd' module LiveEvents class << self - attr_accessor :logger, :cache, :statsd + attr_accessor :logger, :cache, :statsd, :error_reporter attr_reader :stream_client # rubocop:disable Style/TrivialAccessors diff --git a/gems/live_events/lib/live_events/async_worker.rb b/gems/live_events/lib/live_events/async_worker.rb index 62fb91d353a..7111890ca62 100644 --- a/gems/live_events/lib/live_events/async_worker.rb +++ b/gems/live_events/lib/live_events/async_worker.rb @@ -59,8 +59,10 @@ module LiveEvents @queue << { data: event_json, partition_key: partition_key, - statsd_prefix: "live_events.events.#{event[:attributes][:event_name]}", - total_bytes: total_bytes + statsd_prefix: "live_events.events", + tags: { event: event[:attributes][:event_name] }, + total_bytes: total_bytes, + request_id: event[:attributes][:request_id] } true end @@ -146,18 +148,28 @@ module LiveEvents def process_results(res, records) res.records.each_with_index do |r, i| + rec = records[i] if r.error_code.present? - log_unprocessed(records[i]) + log_unprocessed(rec, r.error_code, r.error_message) else - LiveEvents&.statsd&.increment("#{records[i][:statsd_prefix]}.sends") + LiveEvents&.statsd&.increment("#{rec[:statsd_prefix]}.sends", tags: rec[:tags]) nil end end.compact end - def log_unprocessed(record) - logger.error("Error posting event #{record[:statsd_prefix]} with partition key #{record[:partition_key]}: #{record[:data]}") - LiveEvents&.statsd&.increment("#{record[:statsd_prefix]}.send_errors") + def log_unprocessed(record, error_code, error_message) + logger.error( + "Error posting event #{record.dig(:tags, :event)} with partition key #{record[:partition_key]}. Error Code: #{error_code} | Error Message: #{error_message}" + ) + logger.debug( + "Failed event data: #{record[:data]}" + ) + LiveEvents&.error_reporter&.capture(:dropped_live_event, message: { request_id: record[:request_id], error_code: error_code, error_message: error_message }.to_json) + LiveEvents&.statsd&.increment( + "#{record[:statsd_prefix]}.send_errors", + tags: rec[:tags].merge(error_code: error_code) + ) end end end diff --git a/gems/live_events/lib/live_events/client.rb b/gems/live_events/lib/live_events/client.rb index 9d454c3e231..ecd5384ce9a 100644 --- a/gems/live_events/lib/live_events/client.rb +++ b/gems/live_events/lib/live_events/client.rb @@ -72,7 +72,8 @@ module LiveEvents end def post_event(event_name, payload, time = Time.now, ctx = {}, partition_key = nil) - statsd_prefix = "live_events.events.#{event_name}" + statsd_prefix = "live_events.events" + tags = { event: event_name } ctx ||= {} attributes = ctx.except(*ATTRIBUTE_BLACKLIST).merge({ @@ -92,8 +93,12 @@ module LiveEvents pusher = @worker || LiveEvents.worker unless pusher.push(event, partition_key) - LiveEvents.logger.error("Error queueing job for worker event: #{event_json}") - LiveEvents&.statsd&.increment("#{statsd_prefix}.queue_full_errors") + LiveEvents.logger.error("Error queueing job for live event: #{event.to_json}") + LiveEvents&.error_reporter&.capture( + :dropped_live_event, + message: "Error queueing job for live event with request id: #{attributes[:request_id]}" + ) + LiveEvents&.statsd&.increment("#{statsd_prefix}.queue_full_errors", tags: tags) end end end